Skip to content

[BTS-2171] Wait immediately after sending network requests with skipScheduler #21869

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
devel
-----

* Fix BTS-2171: Reduce the amount of code executed on dedicated network threads,
which might have reduced its throughput or delayed processing of other network
responses.

* Updated ArangoDB Starter to v0.19.13.

* Delete the Coordinator-V8 wrappers for the agency.
Expand Down
229 changes: 166 additions & 63 deletions arangod/Cluster/ClusterMethods.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,16 @@ futures::Future<OperationResult> countOnCoordinator(
};
return handleResponsesFromAllShards(options, results, handler, pre, post);
};
return futures::collectAll(std::move(futures)).thenValue(std::move(cb));
auto fut = futures::collectAll(std::move(futures));
if (api == transaction::MethodsApi::Synchronous) {
// Wait here if the caller is synchronous, because in this case,
// skipScheduler is set and we must not execute arbitrary code on the
// network thread. Waiting here will ensure that the current thread
// continues execution after the following co_await, and not the network
// thread resolving the promise.
fut.wait();
}
return std::move(fut).thenValue(std::move(cb));
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -1642,13 +1651,29 @@ futures::Future<OperationResult> insertDocumentOnCoordinator(
res.response().stealPayload(),
std::move(options), {});
};
if (api == transaction::MethodsApi::Synchronous) {
// Wait here if the caller is synchronous, because in this case,
// skipScheduler is set and we must not execute arbitrary code on the
// network thread. Waiting here will ensure that the current thread
// continues execution after the following co_await, and not the network
// thread resolving the promise.
futures[0].wait();
}
return std::move(futures[0]).thenValue(cb);
}

return futures::collectAll(std::move(futures))
.thenValue([opCtx(std::move(opCtx))](
std::vector<Try<network::Response>>&& results)
-> OperationResult {
auto fut = futures::collectAll(std::move(futures));
if (api == transaction::MethodsApi::Synchronous) {
// Wait here if the caller is synchronous, because in this case,
// skipScheduler is set and we must not execute arbitrary code on the
// network thread. Waiting here will ensure that the current thread
// continues execution after the following co_await, and not the network
// thread resolving the promise.
fut.wait();
}
return std::move(fut).thenValue(
[opCtx(std::move(opCtx))](
std::vector<Try<network::Response>>&& results) -> OperationResult {
return handleCRUDShardResponsesFast(network::clusterResultInsert,
opCtx, results);
});
Expand Down Expand Up @@ -1781,13 +1806,30 @@ futures::Future<OperationResult> removeDocumentOnCoordinator(
res.response().stealPayload(),
std::move(options), {});
};
if (api == transaction::MethodsApi::Synchronous) {
// Wait here if the caller is synchronous, because in this case,
// skipScheduler is set and we must not execute arbitrary code on
// the network thread. Waiting here will ensure that the current
// thread continues execution after the following co_await, and
// not the network thread resolving the promise.
futures[0].wait();
}
return std::move(futures[0]).thenValue(cb);
}

return futures::collectAll(std::move(futures))
.thenValue([opCtx = std::move(opCtx)](
std::vector<Try<network::Response>>&& results)
-> OperationResult {
auto fut = futures::collectAll(std::move(futures));
if (api == transaction::MethodsApi::Synchronous) {
// Wait here if the caller is synchronous, because in this case,
// skipScheduler is set and we must not execute arbitrary code on
// the network thread. Waiting here will ensure that the current
// thread continues execution after the following co_await, and not
// the network thread resolving the promise.
fut.wait();
}
return std::move(fut).thenValue(
[opCtx = std::move(opCtx)](
std::vector<Try<network::Response>>&& results)
-> OperationResult {
return handleCRUDShardResponsesFast(
network::clusterResultRemove, opCtx, results);
});
Expand Down Expand Up @@ -1839,14 +1881,22 @@ futures::Future<OperationResult> removeDocumentOnCoordinator(
/*cannot move*/ buffer, reqOpts, std::move(headers)));
}

return futures::collectAll(std::move(futures))
.thenValue(
[=](std::vector<Try<network::Response>>&& responses) mutable
-> OperationResult {
return ::handleCRUDShardResponsesSlow(
network::clusterResultRemove, expectedLen,
std::move(options), responses);
});
auto fut = futures::collectAll(std::move(futures));
if (api == transaction::MethodsApi::Synchronous) {
// Wait here if the caller is synchronous, because in this case,
// skipScheduler is set and we must not execute arbitrary code on the
// network thread. Waiting here will ensure that the current thread
// continues execution after the following co_await, and not the
// network thread resolving the promise.
fut.wait();
}
return std::move(fut).thenValue(
[=](std::vector<Try<network::Response>>&& responses) mutable
-> OperationResult {
return ::handleCRUDShardResponsesSlow(
network::clusterResultRemove, expectedLen, std::move(options),
responses);
});
});
}

Expand Down Expand Up @@ -1924,7 +1974,16 @@ futures::Future<OperationResult> truncateCollectionOnCoordinator(
options, results,
[](Result&, VPackBuilder&, ShardID const&, VPackSlice) -> void {});
};
return futures::collectAll(std::move(futures)).thenValue(std::move(cb));
auto fut = futures::collectAll(std::move(futures));
if (api == transaction::MethodsApi::Synchronous) {
// Wait here if the caller is synchronous, because in this case,
// skipScheduler is set and we must not execute arbitrary code on the
// network thread. Waiting here will ensure that the current thread
// continues execution after the following co_await, and not the network
// thread resolving the promise.
fut.wait();
}
return std::move(fut).thenValue(std::move(cb));
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -2062,6 +2121,14 @@ Future<OperationResult> getDocumentOnCoordinator(
// Now compute the result
if (!useMultiple) { // single-shard fast track
TRI_ASSERT(futures.size() == 1);
if (api == transaction::MethodsApi::Synchronous) {
// Wait here if the caller is synchronous, because in this case,
// skipScheduler is set and we must not execute arbitrary code on the
// network thread. Waiting here will ensure that the current thread
// continues execution after the following co_await, and not the
// network thread resolving the promise.
futures[0].wait();
}
return std::move(futures[0])
.thenValue([options](network::Response res) -> OperationResult {
if (res.error != fuerte::Error::NoError) {
Expand All @@ -2074,9 +2141,18 @@ Future<OperationResult> getDocumentOnCoordinator(
});
}

return futures::collectAll(std::move(futures))
.thenValue([opCtx = std::move(opCtx)](
std::vector<Try<network::Response>>&& results) {
auto fut = futures::collectAll(std::move(futures));
if (api == transaction::MethodsApi::Synchronous) {
// Wait here if the caller is synchronous, because in this case,
// skipScheduler is set and we must not execute arbitrary code on the
// network thread. Waiting here will ensure that the current thread
// continues execution after the following co_await, and not the network
// thread resolving the promise.
fut.wait();
}
return std::move(fut).thenValue(
[opCtx = std::move(opCtx)](
std::vector<Try<network::Response>>&& results) {
return handleCRUDShardResponsesFast(network::clusterResultDocument,
opCtx, results);
});
Expand Down Expand Up @@ -2647,13 +2723,30 @@ futures::Future<OperationResult> modifyDocumentOnCoordinator(
res.response().stealPayload(),
std::move(options), {});
};
if (api == transaction::MethodsApi::Synchronous) {
// Wait here if the caller is synchronous, because in this case,
// skipScheduler is set and we must not execute arbitrary code on the
// network thread. Waiting here will ensure that the current thread
// continues execution after the following co_await, and not the
// network thread resolving the promise.
futures[0].wait();
}
return std::move(futures[0]).thenValue(cb);
}

return futures::collectAll(std::move(futures))
.thenValue([opCtx = std::move(opCtx)](
std::vector<Try<network::Response>>&& results)
-> OperationResult {
auto fut = futures::collectAll(std::move(futures));
if (api == transaction::MethodsApi::Synchronous) {
// Wait here if the caller is synchronous, because in this case,
// skipScheduler is set and we must not execute arbitrary code on the
// network thread. Waiting here will ensure that the current thread
// continues execution after the following co_await, and not the network
// thread resolving the promise.
fut.wait();
}
return std::move(fut).thenValue(
[opCtx =
std::move(opCtx)](std::vector<Try<network::Response>>&& results)
-> OperationResult {
return handleCRUDShardResponsesFast(network::clusterResultModify,
opCtx, results);
});
Expand All @@ -2668,48 +2761,58 @@ futures::Future<OperationResult> modifyDocumentOnCoordinator(
f = ::beginTransactionOnAllLeaders(trx, *shardIds, api);
}

return std::move(f).thenValue([=, &trx](Result&&) mutable
-> Future<OperationResult> {
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
std::vector<Future<network::Response>> futures;
futures.reserve(shardIds->size());
return std::move(f).thenValue(
[=, &trx](Result&&) mutable -> Future<OperationResult> {
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
std::vector<Future<network::Response>> futures;
futures.reserve(shardIds->size());

size_t expectedLen = useMultiple ? slice.length() : 0;
VPackBuffer<uint8_t> buffer;
buffer.append(slice.begin(), slice.byteSize());
size_t expectedLen = useMultiple ? slice.length() : 0;
VPackBuffer<uint8_t> buffer;
buffer.append(slice.begin(), slice.byteSize());

for (auto const& shardServers : *shardIds) {
ShardID const& shard = shardServers.first;
network::Headers headers;
// Just make sure that no dirty read flag makes it here, since we
// are writing and then `addTransactionHeaderForShard` might
// misbehave!
TRI_ASSERT(!trx.state()->options().allowDirtyReads);
addTransactionHeaderForShard(trx, *shardIds, shard, headers);
for (auto const& shardServers : *shardIds) {
ShardID const& shard = shardServers.first;
network::Headers headers;
// Just make sure that no dirty read flag makes it here, since we
// are writing and then `addTransactionHeaderForShard` might
// misbehave!
TRI_ASSERT(!trx.state()->options().allowDirtyReads);
addTransactionHeaderForShard(trx, *shardIds, shard, headers);

std::string url;
if (!useMultiple) {
// send to single document API
std::string_view key(slice.get(StaticStrings::KeyString).stringView());
url = absl::StrCat("/_api/document/", std::string{shard}, "/",
StringUtils::urlEncode(key.data(), key.size()));
} else {
// send to batch API
url = absl::StrCat("/_api/document/", std::string{shard});
}
futures.emplace_back(network::sendRequestRetry(
pool, "shard:" + shard, restVerb, std::move(url),
/*cannot move*/ buffer, reqOpts, std::move(headers)));
}
std::string url;
if (!useMultiple) {
// send to single document API
std::string_view key(
slice.get(StaticStrings::KeyString).stringView());
url = absl::StrCat("/_api/document/", std::string{shard}, "/",
StringUtils::urlEncode(key.data(), key.size()));
} else {
// send to batch API
url = absl::StrCat("/_api/document/", std::string{shard});
}
futures.emplace_back(network::sendRequestRetry(
pool, "shard:" + shard, restVerb, std::move(url),
/*cannot move*/ buffer, reqOpts, std::move(headers)));
}

return futures::collectAll(std::move(futures))
.thenValue([=](std::vector<Try<network::Response>>&& responses) mutable
-> OperationResult {
return ::handleCRUDShardResponsesSlow(network::clusterResultModify,
expectedLen, std::move(options),
responses);
});
});
auto fut = futures::collectAll(std::move(futures));
if (api == transaction::MethodsApi::Synchronous) {
// Wait here if the caller is synchronous, because in this case,
// skipScheduler is set and we must not execute arbitrary code on the
// network thread. Waiting here will ensure that the current thread
// continues execution after the following co_await, and not the
// network thread resolving the promise.
fut.wait();
}
return std::move(fut).thenValue(
[=](std::vector<Try<network::Response>>&& responses) mutable
-> OperationResult {
return ::handleCRUDShardResponsesSlow(
network::clusterResultModify, expectedLen, std::move(options),
responses);
});
});
}

/// @brief flush WAL on all DBservers
Expand Down
Loading