diff --git a/CHANGELOG b/CHANGELOG index 7d7cc6153ac6..52d011343a8a 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,10 @@ devel ----- +* Fix BTS-2171: Reduce the amount of code executed on dedicated network threads, + which might have reduced its throughput or delayed processing of other network + responses. + * Updated ArangoDB Starter to v0.19.13. * Delete the Coordinator-V8 wrappers for the agency. diff --git a/arangod/Cluster/ClusterMethods.cpp b/arangod/Cluster/ClusterMethods.cpp index 188b939436b5..a20493359019 100644 --- a/arangod/Cluster/ClusterMethods.cpp +++ b/arangod/Cluster/ClusterMethods.cpp @@ -1265,7 +1265,16 @@ futures::Future countOnCoordinator( }; return handleResponsesFromAllShards(options, results, handler, pre, post); }; - return futures::collectAll(std::move(futures)).thenValue(std::move(cb)); + auto fut = futures::collectAll(std::move(futures)); + if (api == transaction::MethodsApi::Synchronous) { + // Wait here if the caller is synchronous, because in this case, + // skipScheduler is set and we must not execute arbitrary code on the + // network thread. Waiting here will ensure that the current thread + // continues execution after the following co_await, and not the network + // thread resolving the promise. + fut.wait(); + } + return std::move(fut).thenValue(std::move(cb)); } //////////////////////////////////////////////////////////////////////////////// @@ -1642,13 +1651,29 @@ futures::Future insertDocumentOnCoordinator( res.response().stealPayload(), std::move(options), {}); }; + if (api == transaction::MethodsApi::Synchronous) { + // Wait here if the caller is synchronous, because in this case, + // skipScheduler is set and we must not execute arbitrary code on the + // network thread. Waiting here will ensure that the current thread + // continues execution after the following co_await, and not the network + // thread resolving the promise. + futures[0].wait(); + } return std::move(futures[0]).thenValue(cb); } - return futures::collectAll(std::move(futures)) - .thenValue([opCtx(std::move(opCtx))]( - std::vector>&& results) - -> OperationResult { + auto fut = futures::collectAll(std::move(futures)); + if (api == transaction::MethodsApi::Synchronous) { + // Wait here if the caller is synchronous, because in this case, + // skipScheduler is set and we must not execute arbitrary code on the + // network thread. Waiting here will ensure that the current thread + // continues execution after the following co_await, and not the network + // thread resolving the promise. + fut.wait(); + } + return std::move(fut).thenValue( + [opCtx(std::move(opCtx))]( + std::vector>&& results) -> OperationResult { return handleCRUDShardResponsesFast(network::clusterResultInsert, opCtx, results); }); @@ -1781,13 +1806,30 @@ futures::Future removeDocumentOnCoordinator( res.response().stealPayload(), std::move(options), {}); }; + if (api == transaction::MethodsApi::Synchronous) { + // Wait here if the caller is synchronous, because in this case, + // skipScheduler is set and we must not execute arbitrary code on + // the network thread. Waiting here will ensure that the current + // thread continues execution after the following co_await, and + // not the network thread resolving the promise. + futures[0].wait(); + } return std::move(futures[0]).thenValue(cb); } - return futures::collectAll(std::move(futures)) - .thenValue([opCtx = std::move(opCtx)]( - std::vector>&& results) - -> OperationResult { + auto fut = futures::collectAll(std::move(futures)); + if (api == transaction::MethodsApi::Synchronous) { + // Wait here if the caller is synchronous, because in this case, + // skipScheduler is set and we must not execute arbitrary code on + // the network thread. Waiting here will ensure that the current + // thread continues execution after the following co_await, and not + // the network thread resolving the promise. + fut.wait(); + } + return std::move(fut).thenValue( + [opCtx = std::move(opCtx)]( + std::vector>&& results) + -> OperationResult { return handleCRUDShardResponsesFast( network::clusterResultRemove, opCtx, results); }); @@ -1839,14 +1881,22 @@ futures::Future removeDocumentOnCoordinator( /*cannot move*/ buffer, reqOpts, std::move(headers))); } - return futures::collectAll(std::move(futures)) - .thenValue( - [=](std::vector>&& responses) mutable - -> OperationResult { - return ::handleCRUDShardResponsesSlow( - network::clusterResultRemove, expectedLen, - std::move(options), responses); - }); + auto fut = futures::collectAll(std::move(futures)); + if (api == transaction::MethodsApi::Synchronous) { + // Wait here if the caller is synchronous, because in this case, + // skipScheduler is set and we must not execute arbitrary code on the + // network thread. Waiting here will ensure that the current thread + // continues execution after the following co_await, and not the + // network thread resolving the promise. + fut.wait(); + } + return std::move(fut).thenValue( + [=](std::vector>&& responses) mutable + -> OperationResult { + return ::handleCRUDShardResponsesSlow( + network::clusterResultRemove, expectedLen, std::move(options), + responses); + }); }); } @@ -1924,7 +1974,16 @@ futures::Future truncateCollectionOnCoordinator( options, results, [](Result&, VPackBuilder&, ShardID const&, VPackSlice) -> void {}); }; - return futures::collectAll(std::move(futures)).thenValue(std::move(cb)); + auto fut = futures::collectAll(std::move(futures)); + if (api == transaction::MethodsApi::Synchronous) { + // Wait here if the caller is synchronous, because in this case, + // skipScheduler is set and we must not execute arbitrary code on the + // network thread. Waiting here will ensure that the current thread + // continues execution after the following co_await, and not the network + // thread resolving the promise. + fut.wait(); + } + return std::move(fut).thenValue(std::move(cb)); } //////////////////////////////////////////////////////////////////////////////// @@ -2062,6 +2121,14 @@ Future getDocumentOnCoordinator( // Now compute the result if (!useMultiple) { // single-shard fast track TRI_ASSERT(futures.size() == 1); + if (api == transaction::MethodsApi::Synchronous) { + // Wait here if the caller is synchronous, because in this case, + // skipScheduler is set and we must not execute arbitrary code on the + // network thread. Waiting here will ensure that the current thread + // continues execution after the following co_await, and not the + // network thread resolving the promise. + futures[0].wait(); + } return std::move(futures[0]) .thenValue([options](network::Response res) -> OperationResult { if (res.error != fuerte::Error::NoError) { @@ -2074,9 +2141,18 @@ Future getDocumentOnCoordinator( }); } - return futures::collectAll(std::move(futures)) - .thenValue([opCtx = std::move(opCtx)]( - std::vector>&& results) { + auto fut = futures::collectAll(std::move(futures)); + if (api == transaction::MethodsApi::Synchronous) { + // Wait here if the caller is synchronous, because in this case, + // skipScheduler is set and we must not execute arbitrary code on the + // network thread. Waiting here will ensure that the current thread + // continues execution after the following co_await, and not the network + // thread resolving the promise. + fut.wait(); + } + return std::move(fut).thenValue( + [opCtx = std::move(opCtx)]( + std::vector>&& results) { return handleCRUDShardResponsesFast(network::clusterResultDocument, opCtx, results); }); @@ -2647,13 +2723,30 @@ futures::Future modifyDocumentOnCoordinator( res.response().stealPayload(), std::move(options), {}); }; + if (api == transaction::MethodsApi::Synchronous) { + // Wait here if the caller is synchronous, because in this case, + // skipScheduler is set and we must not execute arbitrary code on the + // network thread. Waiting here will ensure that the current thread + // continues execution after the following co_await, and not the + // network thread resolving the promise. + futures[0].wait(); + } return std::move(futures[0]).thenValue(cb); } - return futures::collectAll(std::move(futures)) - .thenValue([opCtx = std::move(opCtx)]( - std::vector>&& results) - -> OperationResult { + auto fut = futures::collectAll(std::move(futures)); + if (api == transaction::MethodsApi::Synchronous) { + // Wait here if the caller is synchronous, because in this case, + // skipScheduler is set and we must not execute arbitrary code on the + // network thread. Waiting here will ensure that the current thread + // continues execution after the following co_await, and not the network + // thread resolving the promise. + fut.wait(); + } + return std::move(fut).thenValue( + [opCtx = + std::move(opCtx)](std::vector>&& results) + -> OperationResult { return handleCRUDShardResponsesFast(network::clusterResultModify, opCtx, results); }); @@ -2668,48 +2761,58 @@ futures::Future modifyDocumentOnCoordinator( f = ::beginTransactionOnAllLeaders(trx, *shardIds, api); } - return std::move(f).thenValue([=, &trx](Result&&) mutable - -> Future { - auto* pool = trx.vocbase().server().getFeature().pool(); - std::vector> futures; - futures.reserve(shardIds->size()); + return std::move(f).thenValue( + [=, &trx](Result&&) mutable -> Future { + auto* pool = trx.vocbase().server().getFeature().pool(); + std::vector> futures; + futures.reserve(shardIds->size()); - size_t expectedLen = useMultiple ? slice.length() : 0; - VPackBuffer buffer; - buffer.append(slice.begin(), slice.byteSize()); + size_t expectedLen = useMultiple ? slice.length() : 0; + VPackBuffer buffer; + buffer.append(slice.begin(), slice.byteSize()); - for (auto const& shardServers : *shardIds) { - ShardID const& shard = shardServers.first; - network::Headers headers; - // Just make sure that no dirty read flag makes it here, since we - // are writing and then `addTransactionHeaderForShard` might - // misbehave! - TRI_ASSERT(!trx.state()->options().allowDirtyReads); - addTransactionHeaderForShard(trx, *shardIds, shard, headers); + for (auto const& shardServers : *shardIds) { + ShardID const& shard = shardServers.first; + network::Headers headers; + // Just make sure that no dirty read flag makes it here, since we + // are writing and then `addTransactionHeaderForShard` might + // misbehave! + TRI_ASSERT(!trx.state()->options().allowDirtyReads); + addTransactionHeaderForShard(trx, *shardIds, shard, headers); - std::string url; - if (!useMultiple) { - // send to single document API - std::string_view key(slice.get(StaticStrings::KeyString).stringView()); - url = absl::StrCat("/_api/document/", std::string{shard}, "/", - StringUtils::urlEncode(key.data(), key.size())); - } else { - // send to batch API - url = absl::StrCat("/_api/document/", std::string{shard}); - } - futures.emplace_back(network::sendRequestRetry( - pool, "shard:" + shard, restVerb, std::move(url), - /*cannot move*/ buffer, reqOpts, std::move(headers))); - } + std::string url; + if (!useMultiple) { + // send to single document API + std::string_view key( + slice.get(StaticStrings::KeyString).stringView()); + url = absl::StrCat("/_api/document/", std::string{shard}, "/", + StringUtils::urlEncode(key.data(), key.size())); + } else { + // send to batch API + url = absl::StrCat("/_api/document/", std::string{shard}); + } + futures.emplace_back(network::sendRequestRetry( + pool, "shard:" + shard, restVerb, std::move(url), + /*cannot move*/ buffer, reqOpts, std::move(headers))); + } - return futures::collectAll(std::move(futures)) - .thenValue([=](std::vector>&& responses) mutable - -> OperationResult { - return ::handleCRUDShardResponsesSlow(network::clusterResultModify, - expectedLen, std::move(options), - responses); - }); - }); + auto fut = futures::collectAll(std::move(futures)); + if (api == transaction::MethodsApi::Synchronous) { + // Wait here if the caller is synchronous, because in this case, + // skipScheduler is set and we must not execute arbitrary code on the + // network thread. Waiting here will ensure that the current thread + // continues execution after the following co_await, and not the + // network thread resolving the promise. + fut.wait(); + } + return std::move(fut).thenValue( + [=](std::vector>&& responses) mutable + -> OperationResult { + return ::handleCRUDShardResponsesSlow( + network::clusterResultModify, expectedLen, std::move(options), + responses); + }); + }); } /// @brief flush WAL on all DBservers diff --git a/arangod/Cluster/ClusterTrxMethods.cpp b/arangod/Cluster/ClusterTrxMethods.cpp index 7973d9164d3d..f706737d56e3 100644 --- a/arangod/Cluster/ClusterTrxMethods.cpp +++ b/arangod/Cluster/ClusterTrxMethods.cpp @@ -285,94 +285,101 @@ Future commitAbortTransaction(arangodb::TransactionState* state, VPackBuffer(), reqOpts, headers)); } - return futures::collectAll(requests).thenValue( - [=, commitGuard = std::move(commitGuard)]( - std::vector>&& responses) -> Result { - if (state->isCoordinator()) { - TRI_ASSERT(state->id().isCoordinatorTransactionId()); - - Result error; - for (Try const& tryRes : responses) { - network::Response const& resp = - tryRes.get(); // throws exceptions upwards - Result res = ::checkTransactionResult(tidPlus, status, resp); - if (res.fail()) { - LOG_TOPIC("59bb0", ERR, Logger::TRANSACTIONS) - << " failed to " << stateString << " transaction " - << state->id() << " on server " << resp.destination; - error = std::move(res); - if (resp.fail()) { - // only on communication error - state->vocbase() - .metrics() - .transactions_lost_subordinates->count(); - } - break; - } - } + auto fut = futures::collectAll(requests); + if (api == transaction::MethodsApi::Synchronous) { + // Wait here if the caller is synchronous, because in this case, + // skipScheduler is set and we must not execute arbitrary code on the + // network thread. Waiting here will ensure that the current thread + // continues execution after the following co_await, and not the network + // thread resolving the promise. + fut.wait(); + } + return std::move(fut).thenValue([=, commitGuard = std::move(commitGuard)]( + std::vector>&& + responses) -> Result { + if (state->isCoordinator()) { + TRI_ASSERT(state->id().isCoordinatorTransactionId()); - return error; + Result error; + for (Try const& tryRes : responses) { + network::Response const& resp = + tryRes.get(); // throws exceptions upwards + Result res = ::checkTransactionResult(tidPlus, status, resp); + if (res.fail()) { + LOG_TOPIC("59bb0", ERR, Logger::TRANSACTIONS) + << " failed to " << stateString << " transaction " << state->id() + << " on server " << resp.destination; + error = std::move(res); + if (resp.fail()) { + // only on communication error + state->vocbase().metrics().transactions_lost_subordinates->count(); + } + break; } + } - TRI_ASSERT(state->isDBServer()); - TRI_ASSERT(state->id().isLeaderTransactionId()); - - // Drop all followers that were not successful: - for (Try const& tryRes : responses) { - network::Response const& resp = - tryRes.get(); // throws exceptions upwards + return error; + } - Result res = ::checkTransactionResult(tidPlus, status, resp); - if (res.fail()) { // remove followers for all participating - // collections - ServerID follower = resp.serverId(); - LOG_TOPIC("230c3", INFO, Logger::REPLICATION) + TRI_ASSERT(state->isDBServer()); + TRI_ASSERT(state->id().isLeaderTransactionId()); + + // Drop all followers that were not successful: + for (Try const& tryRes : responses) { + network::Response const& resp = + tryRes.get(); // throws exceptions upwards + + Result res = ::checkTransactionResult(tidPlus, status, resp); + if (res.fail()) { // remove followers for all participating + // collections + ServerID follower = resp.serverId(); + LOG_TOPIC("230c3", INFO, Logger::REPLICATION) + << "synchronous replication of transaction " << stateString + << " operation: " + << "dropping follower " << follower + << " for all participating shards in" + << " transaction " << state->id().id() << " (status " + << arangodb::transaction::statusString(status) + << "), status code: " << static_cast(resp.statusCode()) + << ", message: " << resp.combinedResult().errorMessage(); + state->allCollections([&](TransactionCollection& tc) { + auto cc = tc.collection(); + if (cc) { + LOG_TOPIC("709c9", WARN, Logger::REPLICATION) << "synchronous replication of transaction " << stateString << " operation: " - << "dropping follower " << follower - << " for all participating shards in" - << " transaction " << state->id().id() << " (status " - << arangodb::transaction::statusString(status) - << "), status code: " << static_cast(resp.statusCode()) - << ", message: " << resp.combinedResult().errorMessage(); - state->allCollections([&](TransactionCollection& tc) { - auto cc = tc.collection(); - if (cc) { - LOG_TOPIC("709c9", WARN, Logger::REPLICATION) - << "synchronous replication of transaction " << stateString - << " operation: " - << "dropping follower " << follower << " for shard " - << cc->vocbase().name() << "/" << tc.collectionName() - << ": " << resp.combinedResult().errorMessage(); - - Result r = cc->followers()->remove(follower); - if (r.fail()) { - LOG_TOPIC("4971f", ERR, Logger::REPLICATION) - << "synchronous replication: could not drop follower " - << follower << " for shard " << cc->vocbase().name() - << "/" << tc.collectionName() << ": " << r.errorMessage(); - if (res.is(TRI_ERROR_CLUSTER_NOT_LEADER)) { - // In this case, we know that we are not or no longer - // the leader for this shard. Therefore we need to - // send a code which let's the coordinator retry. - res.reset(TRI_ERROR_CLUSTER_SHARD_LEADER_RESIGNED); - } else { - // In this case, some other error occurred and we - // most likely are still the proper leader, so - // the error needs to be reported and the local - // transaction must be rolled back. - res.reset(TRI_ERROR_CLUSTER_COULD_NOT_DROP_FOLLOWER); - } - } + << "dropping follower " << follower << " for shard " + << cc->vocbase().name() << "/" << tc.collectionName() << ": " + << resp.combinedResult().errorMessage(); + + Result r = cc->followers()->remove(follower); + if (r.fail()) { + LOG_TOPIC("4971f", ERR, Logger::REPLICATION) + << "synchronous replication: could not drop follower " + << follower << " for shard " << cc->vocbase().name() << "/" + << tc.collectionName() << ": " << r.errorMessage(); + if (res.is(TRI_ERROR_CLUSTER_NOT_LEADER)) { + // In this case, we know that we are not or no longer + // the leader for this shard. Therefore we need to + // send a code which let's the coordinator retry. + res.reset(TRI_ERROR_CLUSTER_SHARD_LEADER_RESIGNED); + } else { + // In this case, some other error occurred and we + // most likely are still the proper leader, so + // the error needs to be reported and the local + // transaction must be rolled back. + res.reset(TRI_ERROR_CLUSTER_COULD_NOT_DROP_FOLLOWER); } - // continue dropping the follower for all shards in this - // transaction - return true; - }); + } } - } - return Result(); // succeed even if some followers did not commit - }); + // continue dropping the follower for all shards in this + // transaction + return true; + }); + } + } + return Result(); // succeed even if some followers did not commit + }); } Future commitAbortTransaction(transaction::Methods& trx, @@ -438,7 +445,17 @@ Future beginTransactionOnLeaders( TransactionId const tid = state->id().child(); - auto responses = co_await futures::collectAll(requests); + auto fut = futures::collectAll(requests); + if (api == transaction::MethodsApi::Synchronous) { + // Wait here if the caller is synchronous, because in this case, + // skipScheduler is set and we must not execute arbitrary code on the + // network thread. Waiting here will ensure that the current thread + // continues execution after the following co_await, and not the network + // thread resolving the promise. + fut.wait(); + } + + auto responses = co_await std::move(fut); // We need to make sure to get() all responses. // Otherwise they will eventually resolve and trigger the @@ -506,8 +523,16 @@ Future beginTransactionOnLeaders( serverBefore = leader; #endif - auto const resolvedResponse = - co_await ::beginTransactionRequest(*state, leader, api); + auto fut = ::beginTransactionRequest(*state, leader, api); + if (api == transaction::MethodsApi::Synchronous) { + // Wait here if the caller is synchronous, because in this case, + // skipScheduler is set and we must not execute arbitrary code on the + // network thread. Waiting here will ensure that the current thread + // continues execution after the following co_await, and not the network + // thread resolving the promise. + fut.wait(); + } + auto const resolvedResponse = co_await std::move(fut); if (resolvedResponse.fail()) { co_return resolvedResponse.combinedResult(); }