diff --git a/arangod/Aql/ExecutionEngine.cpp b/arangod/Aql/ExecutionEngine.cpp index 614ad25a0d48..39732279bb05 100644 --- a/arangod/Aql/ExecutionEngine.cpp +++ b/arangod/Aql/ExecutionEngine.cpp @@ -653,6 +653,9 @@ struct DistributedQueryInstanciator final std::pair ExecutionEngine::initializeCursor( SharedAqlItemBlockPtr&& items, size_t pos) { + // TODO (Tobias) I'm not sure this lock is really necessary here, I put it + // here to keep similar behavior during a refactoring. + auto guard = getQuery().acquireLockGuard(); if (_query.killed()) { THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED); } @@ -667,6 +670,23 @@ std::pair ExecutionEngine::initializeCursor( return res; } +auto ExecutionEngine::executeRemoteCall(AqlCallStack const& executeCall, + std::string const& shardId) + -> std::tuple { + auto const rootNodeType = root()->getPlanNode()->getType(); + + // shardId is set IFF the root node is scatter or distribute + TRI_ASSERT(shardId.empty() != (rootNodeType == ExecutionNode::SCATTER || + rootNodeType == ExecutionNode::DISTRIBUTE)); + + auto guard = getQuery().acquireLockGuard(); + if (shardId.empty()) { + return execute(executeCall); + } else { + return executeForClient(executeCall, shardId); + } +} + auto ExecutionEngine::execute(AqlCallStack const& stack) -> std::tuple { if (_query.killed()) { diff --git a/arangod/Aql/ExecutionEngine.h b/arangod/Aql/ExecutionEngine.h index b6e0a88bb963..01c6681af265 100644 --- a/arangod/Aql/ExecutionEngine.h +++ b/arangod/Aql/ExecutionEngine.h @@ -100,12 +100,17 @@ class ExecutionEngine { std::pair initializeCursor( SharedAqlItemBlockPtr&& items, size_t pos); + auto executeRemoteCall(AqlCallStack const& stack, std::string const& shardId) + -> std::tuple; + auto execute(AqlCallStack const& stack) -> std::tuple; + private: auto executeForClient(AqlCallStack const& stack, std::string const& clientId) -> std::tuple; + public: /// @brief whether or not initializeCursor was called bool initializeCursorCalled() const; diff --git a/arangod/Aql/Query.cpp b/arangod/Aql/Query.cpp index 7303e864c7cd..fa3e050f7450 100644 --- a/arangod/Aql/Query.cpp +++ b/arangod/Aql/Query.cpp @@ -2001,7 +2001,7 @@ std::shared_ptr Query::newTrxContext() const { } velocypack::Options const& Query::vpackOptions() const { - return *(_transactionContext->getVPackOptions()); + return *_transactionContext->getVPackOptions(); } transaction::Methods& Query::trxForOptimization() { diff --git a/arangod/Aql/RestAqlHandler.cpp b/arangod/Aql/RestAqlHandler.cpp index 0ff68259c760..0da49a66e309 100644 --- a/arangod/Aql/RestAqlHandler.cpp +++ b/arangod/Aql/RestAqlHandler.cpp @@ -68,12 +68,6 @@ RestAqlHandler::RestAqlHandler(ArangodServer& server, GeneralRequest* request, TRI_ASSERT(_queryRegistry != nullptr); } -RestAqlHandler::~RestAqlHandler() { - if (_logContextQueryIdEntry) { - LogContext::Current::popEntry(_logContextQueryIdEntry); - } -} - // POST method for /_api/aql/setup (internal) // Only available on DBServers in the Cluster. // This route sets-up all the query engines required @@ -154,13 +148,10 @@ futures::Future RestAqlHandler::setupClusterQuery() { TRI_ASSERT(clusterQueryId > 0); } - TRI_ASSERT(_logContextQueryIdValue == nullptr); - _logContextQueryIdValue = LogContext::makeValue() - .with(clusterQueryId) - .share(); - TRI_ASSERT(_logContextQueryIdEntry == nullptr); - _logContextQueryIdEntry = - LogContext::Current::pushValues(_logContextQueryIdValue); + auto queryIdValue = LogContext::makeValue() + .with(clusterQueryId) + .share(); + auto const logContextScopeGuard = ScopedValue{std::move(queryIdValue)}; VPackSlice lockInfoSlice = querySlice.get("lockInfo"); @@ -405,38 +396,29 @@ futures::Future RestAqlHandler::setupClusterQuery() { // PUT method for /_api/aql//, (internal) // see comment in header for details -RestStatus RestAqlHandler::useQuery(std::string const& operation, - std::string const& idString) { +auto RestAqlHandler::useQuery(std::string const& operation, + std::string const& idString) -> async { bool success = false; VPackSlice querySlice = this->parseVPackBody(success); if (!success) { - return RestStatus::DONE; + co_return; } - if (_logContextQueryIdValue == nullptr) { - _logContextQueryIdValue = LogContext::makeValue() - .with(idString) - .share(); - TRI_ASSERT(_logContextQueryIdEntry == nullptr); - _logContextQueryIdEntry = - LogContext::Current::pushValues(_logContextQueryIdValue); - } + auto queryIdValue = + LogContext::makeValue().with(idString).share(); + auto const logContextScopeGuard = ScopedValue{std::move(queryIdValue)}; if (!_engine) { // the PUT verb TRI_ASSERT(this->state() == RestHandler::HandlerState::EXECUTE || this->state() == RestHandler::HandlerState::CONTINUED); - auto res = findEngine(idString); + auto res = co_await findEngine(idString); + TRI_ASSERT(!res.is(TRI_ERROR_LOCKED)); if (res.fail()) { - if (res.is(TRI_ERROR_LOCKED)) { - // engine is still in use, but we have enqueued a callback to be woken - // up once it is free again - return RestStatus::WAITING; - } TRI_ASSERT(res.is(TRI_ERROR_QUERY_NOT_FOUND)); generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_QUERY_NOT_FOUND, absl::StrCat("query ID ", idString, " not found")); - return RestStatus::DONE; + co_return; } std::shared_ptr ss = _engine->sharedState(); ss->setWakeupHandler(withLogContext( @@ -445,7 +427,6 @@ RestStatus RestAqlHandler::useQuery(std::string const& operation, TRI_ASSERT(_engine != nullptr); TRI_ASSERT(std::to_string(_engine->engineId()) == idString); - auto guard = _engine->getQuery().acquireLockGuard(); if (_engine->getQuery().queryOptions().profile >= ProfileLevel::TraceOne) { LOG_TOPIC("1bf67", INFO, Logger::QUERIES) @@ -455,7 +436,7 @@ RestStatus RestAqlHandler::useQuery(std::string const& operation, } try { - return handleUseQuery(operation, querySlice); + co_await handleUseQuery(operation, querySlice); } catch (arangodb::basics::Exception const& ex) { generateError(rest::ResponseCode::SERVER_ERROR, ex.code(), ex.what()); } catch (std::exception const& ex) { @@ -471,26 +452,15 @@ RestStatus RestAqlHandler::useQuery(std::string const& operation, generateError(rest::ResponseCode::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, "an unknown exception occurred"); } - - return RestStatus::DONE; -} - -void RestAqlHandler::prepareExecute(bool isContinue) { - RestVocbaseBaseHandler::prepareExecute(isContinue); - if (_logContextQueryIdValue != nullptr) { - TRI_ASSERT(_logContextQueryIdEntry == nullptr); - _logContextQueryIdEntry = - LogContext::Current::pushValues(_logContextQueryIdValue); - } } // executes the handler -RestStatus RestAqlHandler::execute() { +auto RestAqlHandler::executeAsync() -> futures::Future { if (ServerState::instance()->isSingleServer()) { generateError(rest::ResponseCode::NOT_IMPLEMENTED, TRI_ERROR_HTTP_NOT_IMPLEMENTED, "this endpoint is only available in clusters"); - return RestStatus::DONE; + co_return; } std::vector const& suffixes = _request->suffixes(); @@ -504,7 +474,8 @@ RestStatus RestAqlHandler::execute() { if (suffixes.size() != 1) { generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND); } else if (suffixes[0] == "setup") { - return waitForFuture(setupClusterQuery()); + co_await setupClusterQuery(); + co_return; } else { auto msg = absl::StrCat("Unknown POST API: ", basics::StringUtils::join(suffixes, '/')); @@ -522,10 +493,7 @@ RestStatus RestAqlHandler::execute() { generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND, std::move(msg)); } else { - auto status = useQuery(suffixes[0], suffixes[1]); - if (status == RestStatus::WAITING) { - return status; - } + co_await useQuery(suffixes[0], suffixes[1]); } break; } @@ -536,10 +504,10 @@ RestStatus RestAqlHandler::execute() { LOG_TOPIC("f1993", ERR, arangodb::Logger::AQL) << msg; generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND, std::move(msg)); - return RestStatus::DONE; + co_return; } if (suffixes[0] == "finish") { - return handleFinishQuery(suffixes[1]); + co_return co_await handleFinishQuery(suffixes[1]); } generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_QUERY_NOT_FOUND, @@ -554,39 +522,7 @@ RestStatus RestAqlHandler::execute() { } } - return RestStatus::DONE; -} - -RestStatus RestAqlHandler::continueExecute() { - std::vector const& suffixes = _request->suffixes(); - - // extract the sub-request type - rest::RequestType type = _request->requestType(); - - if (type == rest::RequestType::POST) { - // we can get here when the future produced in setupClusterQuery() - // completes. in this case we can simply declare success - TRI_ASSERT(suffixes.size() == 1 && suffixes[0] == "setup"); - return RestStatus::DONE; - } - if (type == rest::RequestType::PUT) { - TRI_ASSERT(suffixes.size() == 2); - return useQuery(suffixes[0], suffixes[1]); - } - if (type == rest::RequestType::DELETE_REQ) { - // we can get here when the future produced in handleFinishQuery() - // completes. in this case we can simply declare success - TRI_ASSERT(suffixes.size() == 2 && suffixes[0] == "finish"); - return RestStatus::DONE; - } - - generateError( - rest::ResponseCode::SERVER_ERROR, TRI_ERROR_INTERNAL, - absl::StrCat("continued non-continuable method for ", - GeneralRequest::translateMethod(type), " /_api/aql/", - basics::StringUtils::join(suffixes, "/"))); - - return RestStatus::DONE; + co_return; } void RestAqlHandler::shutdownExecute(bool isFinalized) noexcept { @@ -607,14 +543,11 @@ void RestAqlHandler::shutdownExecute(bool isFinalized) noexcept { << "Ignoring exception during rest handler shutdown: " << ex.what(); } - if (_logContextQueryIdEntry) { - LogContext::Current::popEntry(_logContextQueryIdEntry); - } RestVocbaseBaseHandler::shutdownExecute(isFinalized); } // dig out the query from ID, handle errors -Result RestAqlHandler::findEngine(std::string const& idString) { +auto RestAqlHandler::findEngine(std::string const& idString) -> async { TRI_ASSERT(_engine == nullptr); uint64_t qId = arangodb::basics::StringUtils::uint64(idString); @@ -670,16 +603,26 @@ Result RestAqlHandler::findEngine(std::string const& idString) { // Here Engine could be gone } } - auto res = _queryRegistry->openExecutionEngine( - qId, [self = shared_from_this()]() { self->wakeupHandler(); }); + + auto res = co_await waitingFunToCoro([&] { + auto res = _queryRegistry->openExecutionEngine( + qId, [self = shared_from_this()]() { self->wakeupHandler(); }); + if (res.is(TRI_ERROR_LOCKED)) { + // engine is still in use, but we have enqueued a callback to be woken + // up once it is free again + return std::optional{std::nullopt}; + } + return std::optional{std::move(res)}; + }); + if (res.fail()) { - return std::move(res).result(); + co_return std::move(res).result(); } _engine = res.get(); TRI_ASSERT(_engine != nullptr || _engine->engineId() == qId); - return Result{}; + co_return Result{}; } class AqlExecuteCall { @@ -760,8 +703,8 @@ auto AqlExecuteCall::fromVelocyPack(VPackSlice const slice) } // handle for useQuery -RestStatus RestAqlHandler::handleUseQuery(std::string const& operation, - VPackSlice querySlice) { +auto RestAqlHandler::handleUseQuery(std::string const& operation, + VPackSlice querySlice) -> async { VPackOptions const* opts = &VPackOptions::Defaults; if (_engine) { // might be destroyed on shutdown opts = &_engine->getQuery().vpackOptions(); @@ -775,41 +718,31 @@ RestStatus RestAqlHandler::handleUseQuery(std::string const& operation, auto maybeExecuteCall = AqlExecuteCall::fromVelocyPack(querySlice); if (maybeExecuteCall.fail()) { generateError(std::move(maybeExecuteCall).result()); - return RestStatus::DONE; + co_return; } TRI_IF_FAILURE("RestAqlHandler::getSome") { THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } auto& executeCall = maybeExecuteCall.get(); - auto items = SharedAqlItemBlockPtr{}; - auto skipped = SkipResult{}; - auto state = ExecutionState::HASMORE; - std::string const& shardId = _request->header(StaticStrings::AqlShardIdHeader); - auto const rootNodeType = _engine->root()->getPlanNode()->getType(); - - // shardId is set IFF the root node is scatter or distribute - TRI_ASSERT(shardId.empty() != (rootNodeType == ExecutionNode::SCATTER || - rootNodeType == ExecutionNode::DISTRIBUTE)); - - if (shardId.empty()) { - std::tie(state, skipped, items) = - _engine->execute(executeCall.callStack()); - } else { - std::tie(state, skipped, items) = - _engine->executeForClient(executeCall.callStack(), shardId); - } + auto [state, skipped, items] = co_await waitingFunToCoro( + [&]() -> std::optional> { + auto res = + _engine->executeRemoteCall(executeCall.callStack(), shardId); + if (std::get<0>(res) == ExecutionState::WAITING) { + TRI_IF_FAILURE("RestAqlHandler::killWhileWaiting") { + _queryRegistry->destroyQuery(_engine->engineId(), + TRI_ERROR_QUERY_KILLED); + } + return std::nullopt; + } + return res; + }); - if (state == ExecutionState::WAITING) { - TRI_IF_FAILURE("RestAqlHandler::killWhileWaiting") { - _queryRegistry->destroyQuery(_engine->engineId(), - TRI_ERROR_QUERY_KILLED); - } - return RestStatus::WAITING; - } TRI_IF_FAILURE("RestAqlHandler::killWhileWritingResult") { _queryRegistry->destroyQuery(_engine->engineId(), TRI_ERROR_QUERY_KILLED); } @@ -821,27 +754,31 @@ RestStatus RestAqlHandler::handleUseQuery(std::string const& operation, } else if (operation == "initializeCursor") { auto items = _engine->itemBlockManager().requestAndInitBlock( querySlice.get("items")); - auto tmpRes = _engine->initializeCursor(std::move(items), /*pos*/ 0); - if (tmpRes.first == ExecutionState::WAITING) { - return RestStatus::WAITING; - } - answerBuilder.add(StaticStrings::Error, VPackValue(tmpRes.second.fail())); - answerBuilder.add(StaticStrings::Code, - VPackValue(tmpRes.second.errorNumber())); + auto const res = co_await waitingFunToCoro([&]() -> std::optional { + auto tmpRes = _engine->initializeCursor(std::move(items), /*pos*/ 0); + if (tmpRes.first == ExecutionState::WAITING) { + return std::nullopt; + } else { + return std::move(tmpRes.second); + } + }); + answerBuilder.add(StaticStrings::Error, VPackValue(res.fail())); + answerBuilder.add(StaticStrings::Code, VPackValue(res.errorNumber())); } else { generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND); - return RestStatus::DONE; + co_return; } answerBuilder.close(); generateResult(rest::ResponseCode::OK, std::move(answerBuffer), opts); - return RestStatus::DONE; + co_return; } // handle query finalization for all engines -RestStatus RestAqlHandler::handleFinishQuery(std::string const& idString) { +auto RestAqlHandler::handleFinishQuery(std::string const& idString) + -> async { TRI_IF_FAILURE("Query::finishTimeout") { // intentionally delay the request std::this_thread::sleep_for( @@ -852,7 +789,7 @@ RestStatus RestAqlHandler::handleFinishQuery(std::string const& idString) { bool success = false; VPackSlice querySlice = this->parseVPackBody(success); if (!success) { - return RestStatus::DONE; + co_return; } auto errorCode = @@ -860,49 +797,36 @@ RestStatus RestAqlHandler::handleFinishQuery(std::string const& idString) { ErrorCode::ValueType>( querySlice, StaticStrings::Code, TRI_ERROR_INTERNAL); - auto f = - _queryRegistry->finishQuery(qid, errorCode) - .thenValue([self = shared_from_this(), this, - errorCode](std::shared_ptr query) mutable - -> futures::Future { - if (query == nullptr) { - // this may be a race between query garbage collection and - // the client shutting down the query. it is debatable - // whether this is an actual error if we only want to abort - // the query... - generateError(rest::ResponseCode::NOT_FOUND, - TRI_ERROR_HTTP_NOT_FOUND); - return futures::Unit{}; - } - // we must be the only user of this query - TRI_ASSERT(query.use_count() == 1) - << "Finalizing query with use_count " << query.use_count(); - return query->finalizeClusterQuery(errorCode).thenValue( - [self = std::move(self), this, - q = std::move(query)](Result res) { - VPackBufferUInt8 buffer; - VPackBuilder answerBuilder(buffer); - answerBuilder.openObject(/*unindexed*/ true); - answerBuilder.add(VPackValue("stats")); - - q->executionStatsGuard().doUnderLock( - [&](auto& executionStats) { - executionStats.toVelocyPack( - answerBuilder, q->queryOptions().fullCount); - }); - - q->warnings().toVelocyPack(answerBuilder); - answerBuilder.add(StaticStrings::Error, - VPackValue(res.fail())); - answerBuilder.add(StaticStrings::Code, - VPackValue(res.errorNumber())); - answerBuilder.close(); - - generateResult(rest::ResponseCode::OK, std::move(buffer)); - }); - }); - - return waitForFuture(std::move(f)); + auto query = co_await _queryRegistry->finishQuery(qid, errorCode); + + if (query == nullptr) { + // this may be a race between query garbage collection and + // the client shutting down the query. it is debatable + // whether this is an actual error if we only want to abort + // the query... + generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND); + co_return; + } + // we must be the only user of this query + TRI_ASSERT(query.use_count() == 1) + << "Finalizing query with use_count " << query.use_count(); + auto res = co_await query->finalizeClusterQuery(errorCode); + + VPackBufferUInt8 buffer; + VPackBuilder answerBuilder(buffer); + answerBuilder.openObject(/*unindexed*/ true); + answerBuilder.add(VPackValue("stats")); + + query->executionStatsGuard().doUnderLock([&](auto& executionStats) { + executionStats.toVelocyPack(answerBuilder, query->queryOptions().fullCount); + }); + + query->warnings().toVelocyPack(answerBuilder); + answerBuilder.add(StaticStrings::Error, VPackValue(res.fail())); + answerBuilder.add(StaticStrings::Code, VPackValue(res.errorNumber())); + answerBuilder.close(); + + generateResult(rest::ResponseCode::OK, std::move(buffer)); } RequestLane RestAqlHandler::lane() const { diff --git a/arangod/Aql/RestAqlHandler.h b/arangod/Aql/RestAqlHandler.h index 91bedf286dd2..e1c79d38e666 100644 --- a/arangod/Aql/RestAqlHandler.h +++ b/arangod/Aql/RestAqlHandler.h @@ -46,13 +46,11 @@ class RestAqlHandler : public RestVocbaseBaseHandler { public: RestAqlHandler(ArangodServer&, GeneralRequest*, GeneralResponse*, QueryRegistry*); - ~RestAqlHandler(); + ~RestAqlHandler() override = default; char const* name() const override final { return "RestAqlHandler"; } RequestLane lane() const override final; - RestStatus execute() override; - RestStatus continueExecute() override; - void prepareExecute(bool isContinue) override; + auto executeAsync() -> futures::Future override; void shutdownExecute(bool isFinalized) noexcept override; class Route { @@ -102,8 +100,8 @@ class RestAqlHandler : public RestVocbaseBaseHandler { // set, then the root block of the stored query must be a ScatterBlock // and the shard ID is given as an additional argument to the ScatterBlock's // special API. - RestStatus useQuery(std::string const& operation, - std::string const& idString); + auto useQuery(std::string const& operation, std::string const& idString) + -> async; // POST method for /_api/aql/setup (internal) // Only available on DBServers in the Cluster. @@ -131,22 +129,19 @@ class RestAqlHandler : public RestVocbaseBaseHandler { [[nodiscard]] futures::Future setupClusterQuery(); // handle for useQuery - RestStatus handleUseQuery(std::string const&, - arangodb::velocypack::Slice querySlice); + auto handleUseQuery(std::string const& operation, + velocypack::Slice querySlice) -> async; // handle query finalization for all engines - RestStatus handleFinishQuery(std::string const& idString); + auto handleFinishQuery(std::string const& idString) -> async; // dig out vocbase from context and query from ID, handle errors - Result findEngine(std::string const& idString); + auto findEngine(std::string const& idString) -> async; // our query registry QueryRegistry* _queryRegistry; ExecutionEngine* _engine; - - std::shared_ptr _logContextQueryIdValue; - LogContext::EntryPtr _logContextQueryIdEntry; }; } // namespace arangodb::aql diff --git a/arangod/GeneralServer/RestHandler.cpp b/arangod/GeneralServer/RestHandler.cpp index c836e6632e01..4ab15e5768cc 100644 --- a/arangod/GeneralServer/RestHandler.cpp +++ b/arangod/GeneralServer/RestHandler.cpp @@ -25,9 +25,8 @@ #include "ApplicationFeatures/ApplicationServer.h" #include "Auth/TokenCache.h" -#include "Basics/RecursiveLocker.h" -#include "Basics/debugging.h" #include "Basics/dtrace-wrapper.h" +#include "Basics/error.h" #include "Cluster/ClusterFeature.h" #include "Cluster/ClusterInfo.h" #include "Cluster/ServerState.h" @@ -47,6 +46,8 @@ #include "VocBase/Identifiers/TransactionId.h" #include "VocBase/ticks.h" +#include +#include #include #include #include @@ -412,77 +413,50 @@ void RestHandler::handleExceptionPtr(std::exception_ptr eptr) noexcept try { // can do here to signal this problem. } -void RestHandler::runHandlerStateMachine() { - // _executionMutex has to be locked here - TRI_ASSERT(_sendResponseCallback); - - while (true) { - switch (_state) { - case HandlerState::PREPARE: - prepareEngine(); - break; - - case HandlerState::EXECUTE: { - executeEngine(/*isContinue*/ false); - if (_state == HandlerState::PAUSED) { - shutdownExecute(false); - LOG_TOPIC("23a33", DEBUG, Logger::COMMUNICATION) - << "Pausing rest handler execution " << this; - return; // stop state machine - } - break; - } - - case HandlerState::CONTINUED: { - executeEngine(/*isContinue*/ true); - if (_state == HandlerState::PAUSED) { - shutdownExecute(/*isFinalized*/ false); - LOG_TOPIC("23727", DEBUG, Logger::COMMUNICATION) - << "Pausing rest handler execution " << this; - return; // stop state machine - } - break; - } +auto RestHandler::runHandlerStateMachine() -> futures::Future { + auto fail = [&]() { + TRI_ASSERT(_state == HandlerState::FAILED); + _statistics.SET_REQUEST_END(); + // Callback may stealStatistics! + _sendResponseCallback(this); - case HandlerState::PAUSED: - LOG_TOPIC("ae26f", DEBUG, Logger::COMMUNICATION) - << "Resuming rest handler execution " << this; - _state = HandlerState::CONTINUED; - break; - - case HandlerState::FINALIZE: - _statistics.SET_REQUEST_END(); + shutdownExecute(false); + }; - // shutdownExecute is noexcept - shutdownExecute(true); // may not be moved down + TRI_ASSERT(_state == HandlerState::PREPARE); + auto logContextValues = prepareEngine(); + auto const logScopeGuard = + LogContext::Accessor::ScopedValue(std::move(logContextValues)); - _state = HandlerState::DONE; + if (_state == HandlerState::FAILED) { + co_return fail(); + } + TRI_ASSERT(_state == HandlerState::EXECUTE); + co_await executeEngine(); + if (_state == HandlerState::FAILED) { + co_return fail(); + } - // compress response if required - compressResponse(); - // Callback may stealStatistics! - _sendResponseCallback(this); - break; + TRI_ASSERT(_state == HandlerState::FINALIZE); + _statistics.SET_REQUEST_END(); - case HandlerState::FAILED: - _statistics.SET_REQUEST_END(); - // Callback may stealStatistics! - _sendResponseCallback(this); + // shutdownExecute is noexcept + shutdownExecute(true); // may not be moved down - shutdownExecute(false); - return; + _state = HandlerState::DONE; - case HandlerState::DONE: - return; - } - } + // compress response if required + compressResponse(); + // Callback may stealStatistics! + _sendResponseCallback(this); } // ----------------------------------------------------------------------------- // --SECTION-- private methods // ----------------------------------------------------------------------------- -void RestHandler::prepareEngine() { +auto RestHandler::prepareEngine() + -> std::vector> { // set end immediately so we do not get negative statistics _statistics.SET_REQUEST_START_END(); @@ -491,13 +465,13 @@ void RestHandler::prepareEngine() { Exception err(TRI_ERROR_REQUEST_CANCELED); handleError(err); - return; + return {}; } try { - prepareExecute(false); + auto logScope = prepareExecute(false); _state = HandlerState::EXECUTE; - return; + return logScope; } catch (Exception const& ex) { handleError(ex); } catch (std::exception const& ex) { @@ -509,47 +483,27 @@ void RestHandler::prepareEngine() { } _state = HandlerState::FAILED; + return {}; } -void RestHandler::prepareExecute(bool isContinue) { - _logContextEntry = LogContext::Current::pushValues(_logContextScopeValues); +auto RestHandler::prepareExecute(bool isContinue) + -> std::vector> { + return {_logContextScopeValues}; } -void RestHandler::shutdownExecute(bool isFinalized) noexcept { - LogContext::Current::popEntry(_logContextEntry); -} +void RestHandler::shutdownExecute(bool isFinalized) noexcept {} -/// Execute the rest handler state machine. Retry the wakeup, -/// returns true if _state == PAUSED, false otherwise -bool RestHandler::wakeupHandler() { - std::lock_guard lock{_executionMutex}; - if (_state == HandlerState::PAUSED) { - runHandlerStateMachine(); - } - return _state == HandlerState::PAUSED; -} +// Compatability function for old-style code that uses the +// WAITING/wakeupHandler scheme for async execution. +bool RestHandler::wakeupHandler() { return _suspensionCounter.notify(); } -void RestHandler::executeEngine(bool isContinue) { +auto RestHandler::executeEngine() -> async { DTRACE_PROBE1(arangod, RestHandlerExecuteEngine, this); ExecContextScope scope( basics::downCast(_request->requestContext())); try { - RestStatus result = RestStatus::DONE; - if (isContinue) { - // only need to run prepareExecute() again when we are continuing - // otherwise prepareExecute() was already run in the PREPARE phase - prepareExecute(true); - result = continueExecute(); - } else { - result = execute(); - } - - if (result == RestStatus::WAITING) { - _state = HandlerState::PAUSED; // wait for someone to continue the state - // machine - return; - } + co_await executeAsync(); if (_response == nullptr) { Exception err(TRI_ERROR_INTERNAL, "no response received from handler"); @@ -557,7 +511,7 @@ void RestHandler::executeEngine(bool isContinue) { } _state = HandlerState::FINALIZE; - return; + co_return; } catch (Exception const& ex) { #ifdef ARANGODB_ENABLE_MAINTAINER_MODE LOG_TOPIC("11928", WARN, arangodb::Logger::FIXME) @@ -738,50 +692,6 @@ void RestHandler::generateError(arangodb::Result const& r) { generateError(code, r.errorNumber(), r.errorMessage()); } -RestStatus RestHandler::waitForFuture(futures::Future&& f) { - if (f.isReady()) { // fast-path out - f.result().throwIfFailed(); // just throw the error upwards - return RestStatus::DONE; - } - TRI_ASSERT(_executionCounter == 0); - _executionCounter = 2; - std::move(f).thenFinal(withLogContext( - [self = shared_from_this()](futures::Try&& t) -> void { - if (t.hasException()) { - self->handleExceptionPtr(std::move(t).exception()); - } - if (--self->_executionCounter == 0) { - self->wakeupHandler(); - } - })); - return --_executionCounter == 0 ? RestStatus::DONE : RestStatus::WAITING; -} - -RestStatus RestHandler::waitForFuture(futures::Future&& f) { - if (f.isReady()) { // fast-path out - f.result().throwIfFailed(); // just throw the error upwards - return f.waitAndGet(); - } - TRI_ASSERT(_executionCounter == 0); - _executionCounter = 2; - std::move(f).thenFinal(withLogContext( - [self = shared_from_this()](futures::Try&& t) -> void { - if (t.hasException()) { - self->handleExceptionPtr(std::move(t).exception()); - self->_followupRestStatus = RestStatus::DONE; - } else { - self->_followupRestStatus = t.get(); - if (t.get() == RestStatus::WAITING) { - return; // rest handler will be woken up externally - } - } - if (--self->_executionCounter == 0) { - self->wakeupHandler(); - } - })); - return --_executionCounter == 0 ? _followupRestStatus : RestStatus::WAITING; -} - // ----------------------------------------------------------------------------- // --SECTION-- protected methods // ----------------------------------------------------------------------------- @@ -791,16 +701,55 @@ void RestHandler::resetResponse(rest::ResponseCode code) { _response->reset(code); } +// Fallback implementation for old RestHandlers that implement execute() and +// continueExecute() instead of executeAsync(). futures::Future RestHandler::executeAsync() { - THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED); + auto state = execute(); + + // After ensuring that no execute() implementation still returns WAITING, + // this can be removed. + if (state == RestStatus::WAITING) { + co_await waitingFunToCoro( + std::bind(&std::decay_t::continueExecute)); + } } -RestStatus RestHandler::execute() { return waitForFuture(executeAsync()); } +RestStatus RestHandler::execute() { + TRI_ASSERT(false); + THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED); +} void RestHandler::runHandler( std::function responseCallback) { - TRI_ASSERT(_state == HandlerState::PREPARE); _sendResponseCallback = std::move(responseCallback); - std::lock_guard guard(_executionMutex); - runHandlerStateMachine(); + + runHandlerStateMachine(). + // Swallow all exceptions. It would be desirable to guarantee no unhandled + // exceptions reach this point; so let's at least die in maintainer mode + // for now. + thenFinal([self = shared_from_this()](auto&& tryResult) noexcept { + try { + std::move(tryResult).throwIfFailed(); + } catch (basics::Exception const& exception) { + LOG_TOPIC("e0b25", ERR, Logger::FIXME) + << "Uncaught exception in RestHandler " << self->name() << ": " + << "[" << exception.code() << "] " << exception.message() + << " (at " << exception.location() << ")"; + TRI_ASSERT(false) + << "Uncaught exception in RestHandler " << self->name() << ": " + << "[" << exception.code() << "] " << exception.message() + << " (at " << exception.location() << ")"; + } catch (std::exception const& exception) { + LOG_TOPIC("989d1", ERR, Logger::FIXME) + << "Uncaught exception in RestHandler " << self->name() << ": " + << exception.what(); + TRI_ASSERT(false) << "Uncaught exception in RestHandler " + << self->name() << ": " << exception.what(); + } catch (...) { + LOG_TOPIC("99c0c", ERR, Logger::FIXME) + << "Uncaught exception in RestHandler " << self->name() << "."; + TRI_ASSERT(false) + << "Uncaught exception in RestHandler " << self->name() << "."; + } + }); } diff --git a/arangod/GeneralServer/RestHandler.h b/arangod/GeneralServer/RestHandler.h index 50be414212c4..2507250a0690 100644 --- a/arangod/GeneralServer/RestHandler.h +++ b/arangod/GeneralServer/RestHandler.h @@ -23,6 +23,8 @@ #pragma once +#include "Async/SuspensionCounter.h" +#include "Async/async.h" #include "Basics/ResultT.h" #include "Futures/Unit.h" #include "GeneralServer/RequestLane.h" @@ -32,10 +34,10 @@ #include "Statistics/RequestStatistics.h" #include +#include #include #include #include -#include namespace arangodb { namespace application_features { @@ -57,7 +59,7 @@ class GeneralRequest; class RequestStatistics; class Result; -enum class RestStatus { DONE, WAITING, FAIL }; +enum class RestStatus { DONE, WAITING }; namespace rest { class RestHandler : public std::enable_shared_from_this { @@ -107,9 +109,8 @@ class RestHandler : public std::enable_shared_from_this { /// Execute the rest handler state machine void runHandler(std::function responseCallback); - /// Execute the rest handler state machine. Retry the wakeup, - /// returns true if _state == PAUSED, false otherwise - bool wakeupHandler(); + /// Continue execution of a suspended (via WAITING) rest handler state machine + [[deprecated]] bool wakeupHandler(); /// @brief forwards the request to the appropriate server futures::Future forwardRequest(bool& forwarded); @@ -125,13 +126,17 @@ class RestHandler : public std::enable_shared_from_this { RequestLane determineRequestLane(); - virtual void prepareExecute(bool isContinue); + [[nodiscard]] virtual auto prepareExecute(bool isContinue) + -> std::vector>; virtual RestStatus execute(); virtual futures::Future executeAsync(); - virtual RestStatus continueExecute() { return RestStatus::DONE; } + // No longer used + [[deprecated]] static RestStatus continueExecute() { + return RestStatus::DONE; + } virtual void shutdownExecute(bool isFinalized) noexcept; - // you might need to implment this in your handler + // you might need to implement this in your handler // if it will be executed in an async job virtual void cancel() { _canceled.store(true); } @@ -162,9 +167,6 @@ class RestHandler : public std::enable_shared_from_this { // generates an error void generateError(arangodb::Result const&); - [[nodiscard]] RestStatus waitForFuture(futures::Future&& f); - [[nodiscard]] RestStatus waitForFuture(futures::Future&& f); - enum class HandlerState : uint8_t { PREPARE = 0, EXECUTE, @@ -178,15 +180,12 @@ class RestHandler : public std::enable_shared_from_this { /// handler state machine HandlerState state() const { return _state; } - private: - void runHandlerStateMachine(); + auto runHandlerStateMachine() -> futures::Future; - void prepareEngine(); + [[nodiscard]] auto prepareEngine() + -> std::vector>; /// @brief Executes the RestHandler - /// May set the state to PAUSED, FINALIZE or FAILED - /// If isContinue == true it will call continueExecute() - /// otherwise execute() will be called - void executeEngine(bool isContinue); + auto executeEngine() -> async; void compressResponse(); protected: @@ -205,8 +204,52 @@ class RestHandler : public std::enable_shared_from_this { private: mutable std::mutex _executionMutex; - mutable std::atomic_uint8_t _executionCounter{0}; - mutable RestStatus _followupRestStatus; + + protected: + // TODO Move this in a separate header, side-by-side with SuspensionCounter? + // Note: _suspensionCounter.notify() must be called for this to resume. + // RestHandler::wakeupHandler() does that, and can be called e.g. by the + // SharedQueryState's wakeup handler (for AQL-related code). + template + requires requires(F f) { + { f() } -> std::same_as; + } + [[nodiscard]] auto waitingFunToCoro(F&& funArg) -> async { + auto fun = std::forward(funArg); + auto state = fun(); + + while (state == RestStatus::WAITING) { + // Get the number of wakeups. We call fun() up to that many + // times before suspending again. + auto n = co_await _suspensionCounter.await(); + for (auto i = 0; i < n && state == RestStatus::WAITING; ++i) { + state = fun(); + } + } + co_return; + } + + template::value_type> + requires requires(F f) { + { f() } -> std::same_as>; + } + [[nodiscard]] auto waitingFunToCoro(F&& funArg) -> async { + auto fun = std::forward(funArg); + auto res = fun(); + + while (!res.has_value()) { + // Get the number of wakeups. We call fun() up to that many + // times before suspending again. + auto n = co_await _suspensionCounter.await(); + for (auto i = 0; i < n && !res.has_value(); ++i) { + res = fun(); + } + } + co_return std::move(res).value(); + } + + private: + SuspensionCounter _suspensionCounter; std::function _sendResponseCallback; @@ -224,8 +267,8 @@ class RestHandler : public std::enable_shared_from_this { RequestLane _lane; + // TODO we don't need the member any longer std::shared_ptr _logContextScopeValues; - LogContext::EntryPtr _logContextEntry; protected: metrics::GaugeCounterGuard _currentRequestsSizeTracker; diff --git a/arangod/RestHandler/RestCursorHandler.cpp b/arangod/RestHandler/RestCursorHandler.cpp index 7f18c621234c..a5459243c818 100644 --- a/arangod/RestHandler/RestCursorHandler.cpp +++ b/arangod/RestHandler/RestCursorHandler.cpp @@ -27,6 +27,7 @@ #include "Aql/Query.h" #include "Aql/QueryRegistry.h" #include "Aql/SharedQueryState.h" +#include "Async/async.h" #include "Basics/Exceptions.h" #include "Basics/ScopeGuard.h" #include "Basics/StaticStrings.h" @@ -73,69 +74,37 @@ RequestLane RestCursorHandler::lane() const { return RequestLane::CLIENT_AQL; } -RestStatus RestCursorHandler::execute() { +futures::Future RestCursorHandler::executeAsync() { // extract the sub-request type rest::RequestType const type = _request->requestType(); - if (type == rest::RequestType::POST) { if (_request->suffixes().size() == 0) { // POST /_api/cursor - return createQueryCursor(); + + co_await createQueryCursor(); + co_return; } else if (_request->suffixes().size() == 1) { // POST /_api/cursor/cursor-id - return modifyQueryCursor(); + + co_await modifyQueryCursor(); + co_return; } // POST /_api/cursor/cursor-id/batch-id - return showLatestBatch(); + co_await showLatestBatch(); + co_return; } else if (type == rest::RequestType::PUT) { - return modifyQueryCursor(); + co_await modifyQueryCursor(); + co_return; } else if (type == rest::RequestType::DELETE_REQ) { - return deleteQueryCursor(); + // TODO if this does not wait, it does not need to return RestStatus - + // and otherwise should be a coroutine. + auto status = deleteQueryCursor(); + TRI_ASSERT(status == RestStatus::DONE); + co_return; } generateError(rest::ResponseCode::METHOD_NOT_ALLOWED, TRI_ERROR_HTTP_METHOD_NOT_ALLOWED); - return RestStatus::DONE; -} - -RestStatus RestCursorHandler::continueExecute() { - if (wasCanceled()) { - generateError(rest::ResponseCode::GONE, TRI_ERROR_QUERY_KILLED); - return RestStatus::DONE; - } - - if (!_response->isResponseEmpty()) { - // an exception occurred in one of the suspension points - return RestStatus::DONE; - } - - // extract the sub-request type - rest::RequestType const type = _request->requestType(); - - if (_query != nullptr) { // non-stream query - if (type == rest::RequestType::POST || type == rest::RequestType::PUT) { - return processQuery(); - } - } else if (_cursor) { // stream cursor query - if (type == rest::RequestType::POST) { - if (_request->suffixes().size() == 0) { - // POST /_api/cursor - return generateCursorResult(rest::ResponseCode::CREATED); - } - // POST /_api/cursor/cursor-id - return generateCursorResult(ResponseCode::OK); - } else if (type == rest::RequestType::PUT) { - if (_request->requestPath() == SIMPLE_QUERY_ALL_PATH) { - // RestSimpleQueryHandler::allDocuments uses PUT for cursor creation - return generateCursorResult(ResponseCode::CREATED); - } - return generateCursorResult(ResponseCode::OK); - } - } - - // Other parts of the query cannot be paused - TRI_ASSERT(false) << requestToString(type) << " " << _request->fullUrl() - << " " << _request->parameters(); - return RestStatus::DONE; + co_return; } void RestCursorHandler::shutdownExecute(bool isFinalized) noexcept { @@ -172,18 +141,18 @@ void RestCursorHandler::cancel() { /// /// return If true, we need to continue processing, /// If false we are done (error or stream) -futures::Future RestCursorHandler::registerQueryOrCursor( +async RestCursorHandler::registerQueryOrCursor( velocypack::Slice slice, transaction::OperationOrigin operationOrigin) { TRI_ASSERT(_query == nullptr); if (!slice.isObject()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_QUERY_EMPTY); - co_return RestStatus::DONE; + co_return; } VPackSlice querySlice = slice.get("query"); if (!querySlice.isString() || querySlice.getStringLength() == 0) { generateError(rest::ResponseCode::BAD, TRI_ERROR_QUERY_EMPTY); - co_return RestStatus::DONE; + co_return; } VPackSlice bindVars = slice.get("bindVars"); @@ -191,7 +160,7 @@ futures::Future RestCursorHandler::registerQueryOrCursor( if (!bindVars.isObject() && !bindVars.isNull()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_TYPE_ERROR, "expecting object for "); - co_return RestStatus::DONE; + co_return; } } @@ -229,7 +198,7 @@ futures::Future RestCursorHandler::registerQueryOrCursor( if (count) { generateError(Result(TRI_ERROR_BAD_PARAMETER, "cannot use 'count' option for a streaming query")); - co_return RestStatus::DONE; + co_return; } CursorRepository* cursors = _vocbase.cursorRepository(); @@ -240,7 +209,8 @@ futures::Future RestCursorHandler::registerQueryOrCursor( _cursor->setWakeupHandler(withLogContext( [self = shared_from_this()]() { return self->wakeupHandler(); })); - co_return generateCursorResult(rest::ResponseCode::CREATED); + co_await generateCursorResult(rest::ResponseCode::CREATED); + co_return; } // non-stream case. Execute query, then build a cursor @@ -250,7 +220,7 @@ futures::Future RestCursorHandler::registerQueryOrCursor( TRI_ASSERT(ss != nullptr); if (ss == nullptr) { generateError(Result(TRI_ERROR_INTERNAL, "invalid query state")); - co_return RestStatus::DONE; + co_return; } ss->setWakeupHandler(withLogContext( @@ -258,13 +228,16 @@ futures::Future RestCursorHandler::registerQueryOrCursor( } registerQuery(std::move(query)); - co_return processQuery(); + + co_await processQuery(); + + co_return; } /// @brief Process the query registered in _query. /// The function is repeatable, so whenever we need to WAIT /// in AQL we can post a handler calling this function again. -RestStatus RestCursorHandler::processQuery() { +async RestCursorHandler::processQuery() { auto query = [this]() { std::unique_lock mutexLocker{_queryLock}; @@ -280,22 +253,24 @@ RestStatus RestCursorHandler::processQuery() { // always clean up auto guard = scopeGuard([this]() noexcept { unregisterQuery(); }); - // continue handler is registered earlier - auto state = query->execute(_queryResult); + co_await waitingFunToCoro([&] { + auto state = query->execute(_queryResult); - if (state == aql::ExecutionState::WAITING) { - guard.cancel(); - return RestStatus::WAITING; - } - TRI_ASSERT(state == aql::ExecutionState::DONE); + if (state == aql::ExecutionState::WAITING) { + return RestStatus::WAITING; + } + TRI_ASSERT(state == aql::ExecutionState::DONE); + return RestStatus::DONE; + }); } // We cannot get into HASMORE here, or we would lose results. - return handleQueryResult(); + co_await handleQueryResult(); + co_return; } // non stream case, result is complete -RestStatus RestCursorHandler::handleQueryResult() { +async RestCursorHandler::handleQueryResult() { TRI_ASSERT(_query == nullptr); if (_queryResult.result.fail()) { if (_queryResult.result.is(TRI_ERROR_REQUEST_CANCELED) || @@ -391,7 +366,7 @@ RestStatus RestCursorHandler::handleQueryResult() { // trx.commit()) without the server code for freeing the resources and the // client code racing for who's first - return RestStatus::DONE; + co_return; } else { // result is bigger than batchSize, and a cursor will be created CursorRepository* cursors = _vocbase.cursorRepository(); @@ -402,7 +377,8 @@ RestStatus RestCursorHandler::handleQueryResult() { ttl, count, retriable); // throws if a coordinator soft shutdown is ongoing - return generateCursorResult(rest::ResponseCode::CREATED); + co_await generateCursorResult(rest::ResponseCode::CREATED); + co_return; } } @@ -474,12 +450,6 @@ void RestCursorHandler::cancelQuery() { std::lock_guard mutexLocker{_queryLock}; if (_query != nullptr) { - // cursor is canceled. now remove the continue handler we may have - // registered in the query - if (_query->sharedState()) { - _query->sharedState()->resetWakeupHandler(); - } - _query->setKillFlag(); } _queryKilled = true; @@ -588,7 +558,7 @@ void RestCursorHandler::buildOptions(velocypack::Slice slice) { /// @brief append the contents of the cursor into the response body /// this function will also take care of the cursor and return it to the /// registry if required -RestStatus RestCursorHandler::generateCursorResult(rest::ResponseCode code) { +async RestCursorHandler::generateCursorResult(rest::ResponseCode code) { TRI_ASSERT(_cursor != nullptr); // dump might delete the cursor @@ -597,13 +567,20 @@ RestStatus RestCursorHandler::generateCursorResult(rest::ResponseCode code) { VPackBuilder builder; builder.openObject(/*unindexed*/ true); - auto const [state, r] = _cursor->dump(builder); + auto r = Result(); - if (state == aql::ExecutionState::WAITING) { - builder.clear(); - TRI_ASSERT(r.ok()); - return RestStatus::WAITING; - } + co_await waitingFunToCoro([&]() { + auto const [state, result] = _cursor->dump(builder); + + if (state == aql::ExecutionState::WAITING) { + TRI_ASSERT(r.ok()); + return RestStatus::WAITING; + } + + r = result; + + return RestStatus::DONE; + }); if (_cursor->allowDirtyReads()) { setOutgoingDirtyReadsHeader(true); @@ -618,7 +595,7 @@ RestStatus RestCursorHandler::generateCursorResult(rest::ResponseCode code) { if (_cursor->isRetriable()) { _cursor->setLastQueryBatchObject(builder.steal()); } - return RestStatus::FAIL; + co_return; } generateResult(code, builder.slice(), std::move(ctx)); @@ -640,16 +617,16 @@ RestStatus RestCursorHandler::generateCursorResult(rest::ResponseCode code) { generateError(r); } - return RestStatus::DONE; + co_return; } -RestStatus RestCursorHandler::createQueryCursor() { +async RestCursorHandler::createQueryCursor() { std::vector const& suffixes = _request->suffixes(); if (!suffixes.empty()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "expecting POST /_api/cursor"); - return RestStatus::DONE; + co_return; } bool parseSuccess = false; @@ -657,28 +634,28 @@ RestStatus RestCursorHandler::createQueryCursor() { if (!parseSuccess) { // error message generated in parseVPackBody - return RestStatus::DONE; + co_return; } if (body.isEmptyObject()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_CORRUPTED_JSON); - return RestStatus::DONE; + co_return; } TRI_ASSERT(_query == nullptr); - return waitForFuture(registerQueryOrCursor( - body, transaction::OperationOriginAQL{"running AQL query"})); + co_await registerQueryOrCursor( + body, transaction::OperationOriginAQL{"running AQL query"}); + co_return; } - /// @brief shows the batch given by if it's the last cached batch /// response on a retry, and does't advance cursor -RestStatus RestCursorHandler::showLatestBatch() { +async RestCursorHandler::showLatestBatch() { std::vector const& suffixes = _request->suffixes(); if (suffixes.size() != 2) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "expecting POST /_api/cursor//"); - return RestStatus::DONE; + co_return; } uint64_t batchId = basics::StringUtils::uint64(suffixes[1]); @@ -687,7 +664,7 @@ RestStatus RestCursorHandler::showLatestBatch() { if (_cursor == nullptr) { // error response already built here - return RestStatus::DONE; + co_return; } _cursor->setWakeupHandler(withLogContext( @@ -698,13 +675,14 @@ RestStatus RestCursorHandler::showLatestBatch() { // if x == y + 1, advance the cursor and return the new batch // otherwise return error if (_cursor->isNextBatchId(batchId)) { - return generateCursorResult(rest::ResponseCode::OK); + co_await generateCursorResult(rest::ResponseCode::OK); + co_return; } if (!_cursor->isCurrentBatchId(batchId)) { generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_CURSOR_NOT_FOUND, "batch id not found"); - return RestStatus::DONE; + co_return; } auto buffer = _cursor->getLastBatch(); @@ -714,16 +692,16 @@ RestStatus RestCursorHandler::showLatestBatch() { generateResult(rest::ResponseCode::OK, VPackSlice(buffer->data()), _cursor->context()); - return RestStatus::DONE; + co_return; } -RestStatus RestCursorHandler::modifyQueryCursor() { +async RestCursorHandler::modifyQueryCursor() { std::vector const& suffixes = _request->suffixes(); if (suffixes.size() != 1) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "expecting POST /_api/cursor/"); - return RestStatus::DONE; + co_return; } // the call to lookupCursor will populate _cursor if the cursor can be @@ -732,13 +710,14 @@ RestStatus RestCursorHandler::modifyQueryCursor() { lookupCursor(suffixes[0]); if (_cursor == nullptr) { - return RestStatus::DONE; + co_return; } _cursor->setWakeupHandler(withLogContext( [self = shared_from_this()]() { return self->wakeupHandler(); })); - return generateCursorResult(rest::ResponseCode::OK); + co_await generateCursorResult(rest::ResponseCode::OK); + co_return; } RestStatus RestCursorHandler::deleteQueryCursor() { diff --git a/arangod/RestHandler/RestCursorHandler.h b/arangod/RestHandler/RestCursorHandler.h index ac6e256add7d..a0890186717f 100644 --- a/arangod/RestHandler/RestCursorHandler.h +++ b/arangod/RestHandler/RestCursorHandler.h @@ -45,6 +45,9 @@ class QueryRegistry; struct QueryResult; } // namespace aql +template +struct async; + class Cursor; /// @brief cursor request handler @@ -58,8 +61,7 @@ class RestCursorHandler : public RestVocbaseBaseHandler { char const* name() const override { return "RestCursorHandler"; } RequestLane lane() const override final; - virtual RestStatus execute() override; - virtual RestStatus continueExecute() override; + auto executeAsync() -> futures::Future override; void shutdownExecute(bool isFinalized) noexcept override; void cancel() override final; @@ -68,13 +70,13 @@ class RestCursorHandler : public RestVocbaseBaseHandler { /// @brief register the query either as streaming cursor or in _query /// the query is not executed here. /// this method is also used by derived classes - [[nodiscard]] futures::Future registerQueryOrCursor( + async registerQueryOrCursor( velocypack::Slice body, transaction::OperationOrigin operationOrigin); /// @brief Process the query registered in _query. /// The function is repeatable, so whenever we need to WAIT /// in AQL we can post a handler calling this function again. - RestStatus processQuery(); + async processQuery(); /// @brief returns the short id of the server which should handle this request ResultT> forwardingTarget() override; @@ -86,7 +88,7 @@ class RestCursorHandler : public RestVocbaseBaseHandler { /// guaranteed /// to not be interrupted and is guaranteed to get a complete /// queryResult. - virtual RestStatus handleQueryResult(); + virtual async handleQueryResult(); private: /// @brief register the currently running query @@ -102,20 +104,20 @@ class RestCursorHandler : public RestVocbaseBaseHandler { /// @brief append the contents of the cursor into the response body /// this function will also take care of the cursor and return it to the /// registry if required - RestStatus generateCursorResult(rest::ResponseCode code); + async generateCursorResult(rest::ResponseCode code); /// @brief create a cursor and return the first results - RestStatus createQueryCursor(); + async createQueryCursor(); /// @brief return the next results from an existing cursor - RestStatus modifyQueryCursor(); + async modifyQueryCursor(); /// @brief dispose an existing cursor RestStatus deleteQueryCursor(); /// @brief show last batch on retry if `allowRetry` flag is true, doesn't /// advance cursor - RestStatus showLatestBatch(); + async showLatestBatch(); /// @brief look up cursor by id. side-effect: populates _cursor in case cursor /// was found. in case cursor was not found, writes an error into the response diff --git a/arangod/RestHandler/RestSimpleHandler.cpp b/arangod/RestHandler/RestSimpleHandler.cpp index 25d8ebcd0911..d591e7ed39e0 100644 --- a/arangod/RestHandler/RestSimpleHandler.cpp +++ b/arangod/RestHandler/RestSimpleHandler.cpp @@ -45,7 +45,7 @@ RestSimpleHandler::RestSimpleHandler( : RestCursorHandler(server, request, response, queryRegistry), _silent(true) {} -RestStatus RestSimpleHandler::execute() { +auto RestSimpleHandler::executeAsync() -> futures::Future { // extract the request type auto const type = _request->requestType(); @@ -53,36 +53,37 @@ RestStatus RestSimpleHandler::execute() { bool parsingSuccess = false; VPackSlice body = this->parseVPackBody(parsingSuccess); if (!parsingSuccess) { - return RestStatus::DONE; + co_return; } if (!body.isObject()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_TYPE_ERROR, "expecting JSON object body"); - return RestStatus::DONE; + co_return; } std::string const& prefix = _request->requestPath(); if (prefix == RestVocbaseBaseHandler::SIMPLE_REMOVE_PATH) { - return waitForFuture(removeByKeys(body)); + co_await removeByKeys(body); + co_return; } else if (prefix == RestVocbaseBaseHandler::SIMPLE_LOOKUP_PATH) { - return waitForFuture(lookupByKeys(body)); + co_await lookupByKeys(body); + co_return; } else { generateError(rest::ResponseCode::BAD, TRI_ERROR_TYPE_ERROR, "unsupported value for "); } - return RestStatus::DONE; + co_return; } generateError(rest::ResponseCode::METHOD_NOT_ALLOWED, TRI_ERROR_HTTP_METHOD_NOT_ALLOWED); - return RestStatus::DONE; + co_return; } -futures::Future RestSimpleHandler::removeByKeys( - VPackSlice const& slice) { +async RestSimpleHandler::removeByKeys(VPackSlice const& slice) { TRI_ASSERT(slice.isObject()); std::string collectionName; { @@ -91,7 +92,7 @@ futures::Future RestSimpleHandler::removeByKeys( if (!value.isString()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_TYPE_ERROR, "expecting string for "); - co_return RestStatus::DONE; + co_return; } collectionName = value.copyString(); @@ -110,7 +111,7 @@ futures::Future RestSimpleHandler::removeByKeys( if (!keys.isArray()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_TYPE_ERROR, "expecting array for "); - co_return RestStatus::DONE; + co_return; } bool waitForSync = false; @@ -162,7 +163,7 @@ futures::Future RestSimpleHandler::removeByKeys( transaction::OperationOriginREST{"removing documents by keys"}); } -RestStatus RestSimpleHandler::handleQueryResult() { +async RestSimpleHandler::handleQueryResult() { if (_queryResult.result.fail()) { if (_queryResult.result.is(TRI_ERROR_REQUEST_CANCELED) || (_queryResult.result.is(TRI_ERROR_QUERY_KILLED) && wasCanceled())) { @@ -171,7 +172,7 @@ RestStatus RestSimpleHandler::handleQueryResult() { } else { generateError(_queryResult.result); } - return RestStatus::DONE; + co_return; } // extract the request type @@ -181,10 +182,10 @@ RestStatus RestSimpleHandler::handleQueryResult() { if (type == rest::RequestType::PUT) { if (prefix == RestVocbaseBaseHandler::SIMPLE_REMOVE_PATH) { handleQueryResultRemoveByKeys(); - return RestStatus::DONE; + co_return; } else if (prefix == RestVocbaseBaseHandler::SIMPLE_LOOKUP_PATH) { handleQueryResultLookupByKeys(); - return RestStatus::DONE; + co_return; } } @@ -193,7 +194,7 @@ RestStatus RestSimpleHandler::handleQueryResult() { TRI_ASSERT(false); generateError(rest::ResponseCode::METHOD_NOT_ALLOWED, TRI_ERROR_HTTP_METHOD_NOT_ALLOWED); - return RestStatus::DONE; + co_return; } void RestSimpleHandler::handleQueryResultRemoveByKeys() { @@ -248,8 +249,7 @@ void RestSimpleHandler::handleQueryResultLookupByKeys() { _queryResult.context); } -futures::Future RestSimpleHandler::lookupByKeys( - VPackSlice const& slice) { +async RestSimpleHandler::lookupByKeys(VPackSlice const& slice) { if (response() == nullptr) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid response"); } @@ -261,7 +261,7 @@ futures::Future RestSimpleHandler::lookupByKeys( if (!value.isString()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_TYPE_ERROR, "expecting string for "); - co_return RestStatus::DONE; + co_return; } collectionName = value.copyString(); @@ -282,7 +282,7 @@ futures::Future RestSimpleHandler::lookupByKeys( if (!keys.isArray()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_TYPE_ERROR, "expecting array for "); - co_return RestStatus::DONE; + co_return; } std::string const aql( diff --git a/arangod/RestHandler/RestSimpleHandler.h b/arangod/RestHandler/RestSimpleHandler.h index e8715e2f5c05..df874ea22d1a 100644 --- a/arangod/RestHandler/RestSimpleHandler.h +++ b/arangod/RestHandler/RestSimpleHandler.h @@ -36,7 +36,7 @@ class RestSimpleHandler : public RestCursorHandler { aql::QueryRegistry*); public: - RestStatus execute() override final; + auto executeAsync() -> futures::Future override final; char const* name() const override final { return "RestSimpleHandler"; } private: @@ -47,7 +47,7 @@ class RestSimpleHandler : public RestCursorHandler { /// queryResult. ////////////////////////////////////////////////////////////////////////////// - RestStatus handleQueryResult() override final; + async handleQueryResult() override final; ////////////////////////////////////////////////////////////////////////////// /// @brief handle result of a remove-by-keys query @@ -65,13 +65,13 @@ class RestSimpleHandler : public RestCursorHandler { /// @brief execute a batch remove operation ////////////////////////////////////////////////////////////////////////////// - futures::Future removeByKeys(VPackSlice const&); + async removeByKeys(VPackSlice const&); ////////////////////////////////////////////////////////////////////////////// /// @brief execute a batch lookup operation ////////////////////////////////////////////////////////////////////////////// - futures::Future lookupByKeys(VPackSlice const&); + async lookupByKeys(VPackSlice const&); private: ////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/RestHandler/RestSimpleQueryHandler.cpp b/arangod/RestHandler/RestSimpleQueryHandler.cpp index 6885c6faeee2..a489db6e9dac 100644 --- a/arangod/RestHandler/RestSimpleQueryHandler.cpp +++ b/arangod/RestHandler/RestSimpleQueryHandler.cpp @@ -42,7 +42,7 @@ RestSimpleQueryHandler::RestSimpleQueryHandler( arangodb::aql::QueryRegistry* queryRegistry) : RestCursorHandler(server, request, response, queryRegistry) {} -RestStatus RestSimpleQueryHandler::execute() { +auto RestSimpleQueryHandler::executeAsync() -> futures::Future { // extract the sub-request type auto const type = _request->requestType(); @@ -50,27 +50,27 @@ RestStatus RestSimpleQueryHandler::execute() { if (type == rest::RequestType::PUT) { if (prefix == RestVocbaseBaseHandler::SIMPLE_QUERY_ALL_PATH) { // all query - return waitForFuture(allDocuments()); + co_return co_await allDocuments(); } else if (prefix == RestVocbaseBaseHandler::SIMPLE_QUERY_ALL_KEYS_PATH) { // all-keys query - return waitForFuture(allDocumentKeys()); + co_return co_await allDocumentKeys(); } else if (prefix == RestVocbaseBaseHandler::SIMPLE_QUERY_BY_EXAMPLE) { // by-example query - return waitForFuture(byExample()); + co_return co_await byExample(); } } generateError(rest::ResponseCode::METHOD_NOT_ALLOWED, TRI_ERROR_HTTP_METHOD_NOT_ALLOWED); - return RestStatus::DONE; + co_return; } -futures::Future RestSimpleQueryHandler::allDocuments() { +async RestSimpleQueryHandler::allDocuments() { bool parseSuccess = false; VPackSlice const body = this->parseVPackBody(parseSuccess); if (!parseSuccess) { // error message generated in parseVPackBody - co_return RestStatus::DONE; + co_return; } std::string collectionName; @@ -86,7 +86,7 @@ futures::Future RestSimpleQueryHandler::allDocuments() { if (collectionName.empty()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_TYPE_ERROR, "expecting string for "); - co_return RestStatus::DONE; + co_return; } auto col = _vocbase.lookupCollection(collectionName); @@ -156,12 +156,12 @@ futures::Future RestSimpleQueryHandler::allDocuments() { /// @brief return a cursor with all document keys from the collection ////////////////////////////////////////////////////////////////////////////// -futures::Future RestSimpleQueryHandler::allDocumentKeys() { +async RestSimpleQueryHandler::allDocumentKeys() { bool parseSuccess = false; VPackSlice const body = this->parseVPackBody(parseSuccess); if (!parseSuccess) { // error message generated in parseVPackBody - co_return RestStatus::DONE; + co_return; } std::string collectionName; @@ -177,7 +177,7 @@ futures::Future RestSimpleQueryHandler::allDocumentKeys() { if (collectionName.empty()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_TYPE_ERROR, "expecting string for "); - co_return RestStatus::DONE; + co_return; } auto col = _vocbase.lookupCollection(collectionName); @@ -260,17 +260,17 @@ static void buildExampleQuery(VPackBuilder& result, std::string const& cname, /// @brief return a cursor with all documents matching the example ////////////////////////////////////////////////////////////////////////////// -futures::Future RestSimpleQueryHandler::byExample() { +async RestSimpleQueryHandler::byExample() { bool parseSuccess = false; VPackSlice body = this->parseVPackBody(parseSuccess); if (!parseSuccess) { // error message generated in parseVPackBody - co_return RestStatus::DONE; + co_return; } if (!body.isObject() || !body.get("example").isObject()) { generateError(ResponseCode::BAD, TRI_ERROR_BAD_PARAMETER); - co_return RestStatus::DONE; + co_return; } // velocypack will throw an exception for negative numbers @@ -293,7 +293,7 @@ futures::Future RestSimpleQueryHandler::byExample() { if (cname.empty()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_TYPE_ERROR, "expecting string for "); - co_return RestStatus::DONE; + co_return; } auto col = _vocbase.lookupCollection(cname); diff --git a/arangod/RestHandler/RestSimpleQueryHandler.h b/arangod/RestHandler/RestSimpleQueryHandler.h index 61922e08f94a..2a79be0a384a 100644 --- a/arangod/RestHandler/RestSimpleQueryHandler.h +++ b/arangod/RestHandler/RestSimpleQueryHandler.h @@ -36,12 +36,12 @@ class RestSimpleQueryHandler : public RestCursorHandler { arangodb::aql::QueryRegistry*); public: - RestStatus execute() override final; + auto executeAsync() -> futures::Future override final; char const* name() const override final { return "RestSimpleQueryHandler"; } private: - futures::Future allDocuments(); - futures::Future allDocumentKeys(); - futures::Future byExample(); + async allDocuments(); + async allDocumentKeys(); + async byExample(); }; } // namespace arangodb diff --git a/arangod/RestHandler/RestVocbaseBaseHandler.cpp b/arangod/RestHandler/RestVocbaseBaseHandler.cpp index 78f0304241c1..3bc2b2d477a5 100644 --- a/arangod/RestHandler/RestVocbaseBaseHandler.cpp +++ b/arangod/RestHandler/RestVocbaseBaseHandler.cpp @@ -153,15 +153,11 @@ RestVocbaseBaseHandler::RestVocbaseBaseHandler(ArangodServer& server, TRI_ASSERT(request->requestContext()); } -void RestVocbaseBaseHandler::prepareExecute(bool isContinue) { - RestHandler::prepareExecute(isContinue); - _logContextVocbaseEntry = - LogContext::Current::pushValues(_scopeVocbaseValues); -} - -void RestVocbaseBaseHandler::shutdownExecute(bool isFinalized) noexcept { - LogContext::Current::popEntry(_logContextVocbaseEntry); - RestHandler::shutdownExecute(isFinalized); +auto RestVocbaseBaseHandler::prepareExecute(bool isContinue) + -> std::vector> { + auto values = RestHandler::prepareExecute(isContinue); + values.emplace_back(_scopeVocbaseValues); + return values; } RestVocbaseBaseHandler::~RestVocbaseBaseHandler() = default; diff --git a/arangod/RestHandler/RestVocbaseBaseHandler.h b/arangod/RestHandler/RestVocbaseBaseHandler.h index da5f2c0609a6..75efcc2c1164 100644 --- a/arangod/RestHandler/RestVocbaseBaseHandler.h +++ b/arangod/RestHandler/RestVocbaseBaseHandler.h @@ -132,9 +132,8 @@ class RestVocbaseBaseHandler : public RestBaseHandler { _context.cancel(); } - void prepareExecute(bool isContinue) override; - - void shutdownExecute(bool isFinalized) noexcept override; + [[nodiscard]] auto prepareExecute(bool isContinue) + -> std::vector> override; protected: // returns the short id of the server which should handle this request @@ -207,8 +206,8 @@ class RestVocbaseBaseHandler : public RestBaseHandler { TRI_vocbase_t& _vocbase; private: + // TODO we don't need the member any longer std::shared_ptr _scopeVocbaseValues; - LogContext::EntryPtr _logContextVocbaseEntry; }; } // namespace arangodb diff --git a/arangod/Transaction/Context.cpp b/arangod/Transaction/Context.cpp index 38baf2fc19fe..3736a593341c 100644 --- a/arangod/Transaction/Context.cpp +++ b/arangod/Transaction/Context.cpp @@ -68,9 +68,12 @@ struct CustomTypeHandler final : public VPackCustomTypeHandler { transaction::Context::Context(TRI_vocbase_t& vocbase, OperationOrigin operationOrigin) : _vocbase(vocbase), - _customTypeHandler(), + _resolver(std::make_unique(_vocbase)), + _customTypeHandler(createCustomTypeHandler(_vocbase, *_resolver)), _options(velocypack::Options::Defaults), - _operationOrigin(operationOrigin) {} + _operationOrigin(operationOrigin) { + _options.customTypeHandler = _customTypeHandler.get(); +} /// @brief destroy the context transaction::Context::~Context() { @@ -162,20 +165,19 @@ void transaction::Context::returnBuilder(VPackBuilder* builder) noexcept { } /// @brief get velocypack options with a custom type handler -VPackOptions* transaction::Context::getVPackOptions() { - if (_customTypeHandler == nullptr) { - // this modifies options! - orderCustomTypeHandler(); - } - +VPackOptions const* transaction::Context::getVPackOptions() const noexcept { return &_options; } +velocypack::CustomTypeHandler* transaction::Context::getCustomTypeHandler() + const noexcept { + TRI_ASSERT(_customTypeHandler != nullptr); + return _customTypeHandler.get(); +} + /// @brief create a resolver -CollectionNameResolver const& transaction::Context::resolver() { - if (_resolver == nullptr) { - _resolver = std::make_unique(_vocbase); - } +CollectionNameResolver const& transaction::Context::resolver() const noexcept { + TRI_ASSERT(_resolver != nullptr); return *_resolver; } diff --git a/arangod/Transaction/Context.h b/arangod/Transaction/Context.h index 0bc7b0a49e3d..10e3fcb89ffc 100644 --- a/arangod/Transaction/Context.h +++ b/arangod/Transaction/Context.h @@ -88,10 +88,10 @@ class Context { TEST_VIRTUAL void returnBuilder(arangodb::velocypack::Builder*) noexcept; /// @brief get velocypack options with a custom type handler - TEST_VIRTUAL velocypack::Options* getVPackOptions(); + TEST_VIRTUAL velocypack::Options const* getVPackOptions() const noexcept; /// @brief get a custom type handler - virtual arangodb::velocypack::CustomTypeHandler* orderCustomTypeHandler() = 0; + velocypack::CustomTypeHandler* getCustomTypeHandler() const noexcept; /// @brief get transaction state, determine commit responsiblity virtual std::shared_ptr acquireState( @@ -130,7 +130,7 @@ class Context { /// @brief whether or not the transaction is embeddable virtual bool isEmbeddable() const = 0; - CollectionNameResolver const& resolver(); + CollectionNameResolver const& resolver() const noexcept; /// @brief unregister the transaction virtual void unregisterTransaction() noexcept = 0; @@ -153,6 +153,11 @@ class Context { transaction::Options const& options); TRI_vocbase_t& _vocbase; + + private: + std::unique_ptr _resolver; + + protected: std::unique_ptr _customTypeHandler; containers::SmallVector _builders; @@ -163,8 +168,6 @@ class Context { OperationOrigin _operationOrigin; private: - std::unique_ptr _resolver; - std::shared_ptr _counterGuard; struct { diff --git a/arangod/Transaction/SmartContext.cpp b/arangod/Transaction/SmartContext.cpp index 43ff4479786a..e82aa8a8a4fc 100644 --- a/arangod/Transaction/SmartContext.cpp +++ b/arangod/Transaction/SmartContext.cpp @@ -44,19 +44,6 @@ SmartContext::SmartContext(TRI_vocbase_t& vocbase, TransactionId globalId, SmartContext::~SmartContext() = default; -/// @brief order a custom type handler for the collection -velocypack::CustomTypeHandler* -transaction::SmartContext::orderCustomTypeHandler() { - if (_customTypeHandler == nullptr) { - _customTypeHandler = - transaction::Context::createCustomTypeHandler(_vocbase, resolver()); - _options.customTypeHandler = _customTypeHandler.get(); - } - - TRI_ASSERT(_customTypeHandler != nullptr); - return _customTypeHandler.get(); -} - TransactionId transaction::SmartContext::generateId() const { return _globalId; } diff --git a/arangod/Transaction/SmartContext.h b/arangod/Transaction/SmartContext.h index c456c382b816..f7157606fbf3 100644 --- a/arangod/Transaction/SmartContext.h +++ b/arangod/Transaction/SmartContext.h @@ -51,9 +51,6 @@ class SmartContext : public Context { /// @brief destroy the context ~SmartContext(); - /// @brief order a custom type handler - velocypack::CustomTypeHandler* orderCustomTypeHandler() override final; - /// @brief whether or not the transaction is embeddable bool isEmbeddable() const override final { return true; } diff --git a/arangod/Transaction/V8Context.cpp b/arangod/Transaction/V8Context.cpp index 2f285e3bbe81..9a35cf0b6290 100644 --- a/arangod/Transaction/V8Context.cpp +++ b/arangod/Transaction/V8Context.cpp @@ -53,20 +53,6 @@ transaction::V8Context::~V8Context() noexcept { } } -/// @brief order a custom type handler for the collection -VPackCustomTypeHandler* transaction::V8Context::orderCustomTypeHandler() { - if (_customTypeHandler == nullptr) { - _customTypeHandler = - transaction::Context::createCustomTypeHandler(_vocbase, resolver()); - _options.customTypeHandler = _customTypeHandler.get(); - } - - TRI_ASSERT(_customTypeHandler != nullptr); - TRI_ASSERT(_options.customTypeHandler != nullptr); - - return _customTypeHandler.get(); -} - /// @brief get transaction state, determine commit responsibility /*virtual*/ std::shared_ptr transaction::V8Context::acquireState(transaction::Options const& options, diff --git a/arangod/Transaction/V8Context.h b/arangod/Transaction/V8Context.h index 5e826d21ba24..4fe09bc6ac77 100644 --- a/arangod/Transaction/V8Context.h +++ b/arangod/Transaction/V8Context.h @@ -49,9 +49,6 @@ class V8Context final : public Context { /// @brief destroy the context ~V8Context() noexcept; - /// @brief order a custom type handler - velocypack::CustomTypeHandler* orderCustomTypeHandler() override final; - /// @brief get transaction state, determine commit responsiblity std::shared_ptr acquireState( transaction::Options const& options, bool& responsibleForCommit) override; diff --git a/arangod/V8Server/v8-query.cpp b/arangod/V8Server/v8-query.cpp index b449fc9b9e5d..96adc521b268 100644 --- a/arangod/V8Server/v8-query.cpp +++ b/arangod/V8Server/v8-query.cpp @@ -233,8 +233,7 @@ static void JS_AllQuery(v8::FunctionCallbackInfo const& args) { // copy default options VPackOptions resultOptions = VPackOptions::Defaults; - resultOptions.customTypeHandler = - transactionContext->orderCustomTypeHandler(); + resultOptions.customTypeHandler = transactionContext->getCustomTypeHandler(); VPackBuilder resultBuilder; resultBuilder.openArray(); @@ -343,8 +342,7 @@ static void JS_AnyQuery(v8::FunctionCallbackInfo const& args) { // copy default options VPackOptions resultOptions = VPackOptions::Defaults; - resultOptions.customTypeHandler = - transactionContext->orderCustomTypeHandler(); + resultOptions.customTypeHandler = transactionContext->getCustomTypeHandler(); TRI_V8_RETURN(TRI_VPackToV8(isolate, doc.at(0), &resultOptions)); TRI_V8_TRY_CATCH_END } diff --git a/arangod/V8Server/v8-voccursor.cpp b/arangod/V8Server/v8-voccursor.cpp index e4a583572946..ac67f7e7f91b 100644 --- a/arangod/V8Server/v8-voccursor.cpp +++ b/arangod/V8Server/v8-voccursor.cpp @@ -155,7 +155,7 @@ static void JS_JsonCursor(v8::FunctionCallbackInfo const& args) { TRI_V8_THROW_EXCEPTION(TRI_ERROR_CURSOR_NOT_FOUND); } - VPackOptions* opts = cursor->context()->getVPackOptions(); + VPackOptions const* opts = cursor->context()->getVPackOptions(); VPackBuilder builder(opts); builder.openObject(true); // conversion uses sequential iterator, no indexing Result r = cursor->dumpSync(builder); diff --git a/js/client/modules/@arangodb/testutils/san-file-handler.js b/js/client/modules/@arangodb/testutils/san-file-handler.js index 6b20bbb60ec1..5b210a735ebb 100644 --- a/js/client/modules/@arangodb/testutils/san-file-handler.js +++ b/js/client/modules/@arangodb/testutils/san-file-handler.js @@ -156,9 +156,9 @@ exports.registerOptions = function(optionsDefaults, optionsDocumentation, option tu.CopyIntoList(optionsDocumentation, [ ' SUT instrumented binaries', ' - `sanitizer`: if set the programs are run with enabled sanitizer', - ' - `isSan`: doubles oneTestTimeot value if set to true (for ASAN-related builds)', + ' - `isSan`: doubles oneTestTimeout value if set to true (for ASAN-related builds)', ' and need longer timeouts', - ' - `isCov`: doubles oneTestTimeot value if set to true', + ' - `isCov`: doubles oneTestTimeout value if set to true', '' ]); optionHandlers.push(function(options) { diff --git a/lib/Async/CMakeLists.txt b/lib/Async/CMakeLists.txt index 47b325d4e202..2a09df228830 100644 --- a/lib/Async/CMakeLists.txt +++ b/lib/Async/CMakeLists.txt @@ -5,8 +5,15 @@ target_include_directories(arango_async_interface INTERFACE target_link_libraries(arango_async_interface INTERFACE arango_task_registry) -add_library(arango_async INTERFACE) -target_include_directories(arango_async INTERFACE +add_library(arango_async STATIC + include/Async/async.h + include/Async/coro-utils.h + include/Async/expected.h + include/Async/SuspensionCounter.h + src/SuspensionCounter.cpp +) + +target_include_directories(arango_async PUBLIC include) target_link_libraries(arango_async INTERFACE diff --git a/lib/Async/include/Async/SuspensionCounter.h b/lib/Async/include/Async/SuspensionCounter.h new file mode 100644 index 000000000000..f25f223b3fff --- /dev/null +++ b/lib/Async/include/Async/SuspensionCounter.h @@ -0,0 +1,89 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2024-2024 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Tobias Gödderz +//////////////////////////////////////////////////////////////////////////////// + +#pragma once + +#include +#include +#include +#include + +namespace arangodb { + +class Scheduler; + +// SuspensionCounter::await() returns an awaitable that suspends until +// notify() is called, and which in turn returns the number of notifies that +// happened while it was suspended. +// This is useful to connect a callee using WAITING for asynchronous execution +// with a caller that is a coroutine. +// The callee can be instructed to call notify() when it is done, and the caller +// can await the result of notify(). +struct SuspensionCounter { + // returns true if still suspended + bool notify() { + auto counter = _counter.load(); + do { + if (counter == -1) { + auto res = _counter.compare_exchange_weak(counter, 1); + if (res) { + // NOTE This doesn't need to be posted on the scheduler, as notify() + // will be called by SharedQueryState::queueHandler(), which already + // posted it before calling. + _c.resume(); + return false; + } + } else { + auto res = _counter.compare_exchange_weak(counter, counter + 1); + if (res) { + return true; + } + } + } while (true); + } + + auto await() { + struct Awaitable { + bool await_ready() const noexcept { + return _suspensionCounter->_counter.load(std::memory_order_relaxed) > 0; + } + [[nodiscard]] std::int64_t await_resume() const noexcept { + return _suspensionCounter->_counter.exchange(0); + } + bool await_suspend(std::coroutine_handle<> c) { + _suspensionCounter->_c = c; + auto counter = std::int64_t{}; + return _suspensionCounter->_counter.compare_exchange_strong(counter, + -1); + } + + SuspensionCounter* _suspensionCounter; + }; + + return Awaitable{this}; + } + + std::atomic _counter; + std::coroutine_handle<> _c; + Scheduler* _scheduler; +}; +} // namespace arangodb diff --git a/lib/Async/include/Async/context.h b/lib/Async/include/Async/context.h index cb5bb8f67545..9360f61befb3 100644 --- a/lib/Async/include/Async/context.h +++ b/lib/Async/include/Async/context.h @@ -23,6 +23,7 @@ #pragma once #include "Async/Registry/promise.h" +#include "Logger/LogContext.h" #include "TaskMonitoring/task.h" #include "Utils/ExecContext.h" @@ -39,16 +40,33 @@ struct Context { std::shared_ptr _execContext; async_registry::Requester _requester; task_monitoring::Task* _task; + // Note that this is optional just because LogContext::clear is private, + // and operator= needs the LHS to be cleared. A simple change to LogContext + // could make this optional unnecessary. + std::optional _logContext; Context() : _execContext{ExecContext::currentAsShared()}, _requester{*async_registry::get_current_coroutine()}, - _task{*task_monitoring::get_current_task()} {} + _task{*task_monitoring::get_current_task()}, + _logContext{LogContext::current()} {} - auto set() -> void { + auto operator=(Context&& other) noexcept -> Context& { + _execContext = std::move(other._execContext); + _requester = other._requester; + _task = other._task; + _logContext.reset(); + _logContext = std::move(other._logContext); + other._logContext.reset(); + + return *this; + } + + auto set() noexcept -> void { ExecContext::set(_execContext); *async_registry::get_current_coroutine() = _requester; *task_monitoring::get_current_task() = _task; + LogContext::setCurrent(*_logContext); } }; diff --git a/lib/Async/src/SuspensionCounter.cpp b/lib/Async/src/SuspensionCounter.cpp new file mode 100644 index 000000000000..0b79cfdad593 --- /dev/null +++ b/lib/Async/src/SuspensionCounter.cpp @@ -0,0 +1,23 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2024-2024 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Tobias Gödderz +//////////////////////////////////////////////////////////////////////////////// + +#include "Async/SuspensionCounter.h" diff --git a/lib/Logger/LogContext.cpp b/lib/Logger/LogContext.cpp index ae0c3d5d87ac..6ef802e395cc 100644 --- a/lib/Logger/LogContext.cpp +++ b/lib/Logger/LogContext.cpp @@ -76,9 +76,16 @@ void LogContext::doVisit(Visitor const& visitor, Entry const* entry) const { } void LogContext::setCurrent(LogContext ctx) noexcept { + _threadControlBlock._logContext.clear(_threadControlBlock._entryCache); _threadControlBlock._logContext = std::move(ctx); } +void LogContext::ValueBag::visit(Visitor const& visitor) const { + for (auto& v : _values) { + v->visit(visitor); + } +} + LogContext::ThreadControlBlock::~ThreadControlBlock() noexcept { // The LogContext destructor will possibly release remaining entries to the // thread-local _entryCache. _entryCache is destroyed before _logContext. diff --git a/lib/Logger/LogContext.h b/lib/Logger/LogContext.h index 61652579885f..ca0542e9dd24 100644 --- a/lib/Logger/LogContext.h +++ b/lib/Logger/LogContext.h @@ -35,6 +35,7 @@ #include #include #include +#include #include "Basics/debugging.h" @@ -95,6 +96,7 @@ class LogContext { struct EntryPtr; struct Values; + struct ValueBag; template struct KeyValue { @@ -360,6 +362,16 @@ struct LogContext::Values { virtual void visit(Visitor const&) const = 0; }; +struct LogContext::ValueBag : Values { + template + explicit ValueBag(V&&... v) : _values(std::forward(v)...) {} + virtual ~ValueBag() = default; + void visit(Visitor const&) const override; + + private: + std::vector> _values; +}; + template struct LogContext::ValuesImpl> final : Values { @@ -571,6 +583,10 @@ struct LogContext::Accessor::ScopedValue { explicit ScopedValue(std::shared_ptr v) { appendEntry>(std::move(v)); } + // TODO Maybe we should remove this constructor and have users create a + // ValueBag themselves. + explicit ScopedValue(std::vector>&& vs) + : ScopedValue(std::make_shared(std::move(vs))) {} template explicit ScopedValue(ValueBuilder&& v) { diff --git a/tests/Aql/Executor/EnumerateCollectionExecutorTest.cpp b/tests/Aql/Executor/EnumerateCollectionExecutorTest.cpp index 7a0fd81a1d2e..2320d725c850 100644 --- a/tests/Aql/Executor/EnumerateCollectionExecutorTest.cpp +++ b/tests/Aql/Executor/EnumerateCollectionExecutorTest.cpp @@ -332,8 +332,8 @@ class EnumerateCollectionExecutorTestProduce } // insert amount of documents into the vocbase - VPackOptions* insertDocuments(size_t amount, - std::vector& queryResults) { + VPackOptions const* insertDocuments(size_t amount, + std::vector& queryResults) { // TODO: Can be optimized to not use AQL INSERT (trx object directly // instead) std::string insertQuery = diff --git a/tests/Aql/QueryCursorTest.cpp b/tests/Aql/QueryCursorTest.cpp index 91af64cd7ca5..0f6133beb12f 100644 --- a/tests/Aql/QueryCursorTest.cpp +++ b/tests/Aql/QueryCursorTest.cpp @@ -76,7 +76,7 @@ TEST_F(QueryCursorTest, resultCursorResultArrayIndexSingleBatch) { server->server(), fakeRequest.release(), fakeResponse.release(), registry); - testee->execute(); + testee->executeAsync().wait(); fakeResponse.reset( dynamic_cast(testee->stealResponse().release())); @@ -110,7 +110,7 @@ TEST_F(QueryCursorTest, resultCursorResultArrayIndexTwoBatches) { server->server(), fakeRequest.release(), fakeResponse.release(), registry); - testee->execute(); + testee->executeAsync().wait(); fakeResponse.reset( dynamic_cast(testee->stealResponse().release())); @@ -145,7 +145,7 @@ TEST_F(QueryCursorTest, streamingCursorResultArrayIndexSingleBatch) { server->server(), fakeRequest.release(), fakeResponse.release(), registry); - testee->execute(); + testee->executeAsync().wait(); fakeResponse.reset( dynamic_cast(testee->stealResponse().release())); @@ -180,7 +180,7 @@ TEST_F(QueryCursorTest, streamingCursorResultArrayIndexTwoBatches) { server->server(), fakeRequest.release(), fakeResponse.release(), registry); - testee->execute(); + testee->executeAsync().wait(); fakeResponse.reset( dynamic_cast(testee->stealResponse().release())); @@ -201,7 +201,7 @@ TEST_F(QueryCursorTest, streamingCursorResultArrayIndexTwoBatches) { auto restHandler = std::make_shared( server->server(), fakeRequest.release(), fakeResponse.release(), registry); - restHandler->execute(); + restHandler->executeAsync().wait(); fakeResponse.reset( dynamic_cast(testee->stealResponse().release())); } diff --git a/tests/Mocks/MockGraph.cpp b/tests/Mocks/MockGraph.cpp index 77c0cf90e49f..5429986b5fe2 100644 --- a/tests/Mocks/MockGraph.cpp +++ b/tests/Mocks/MockGraph.cpp @@ -293,7 +293,7 @@ MockGraph::simulateApi( server.server(), fakeRequest.release(), fakeResponse.release(), &queryRegistry}; - aqlHandler.execute(); + aqlHandler.executeAsync().wait(); auto response = aqlHandler.stealResponse(); // Read: (EngineId eid) auto resBody = static_cast(response.get())->_payload.slice(); diff --git a/tests/Transaction/RestTransactionHandlerTest.cpp b/tests/Transaction/RestTransactionHandlerTest.cpp index a0243ea5f23c..19707c422834 100644 --- a/tests/Transaction/RestTransactionHandlerTest.cpp +++ b/tests/Transaction/RestTransactionHandlerTest.cpp @@ -85,8 +85,7 @@ TEST_F(RestTransactionHandlerTest, parsing_errors) { request.addSuffix("begin"); parser.parse("{ \"write\": [33] }"); - arangodb::RestStatus status = handler.execute(); - EXPECT_EQ(arangodb::RestStatus::DONE, status); + handler.executeAsync().wait(); EXPECT_EQ(arangodb::rest::ResponseCode::BAD, responce.responseCode()); VPackSlice slice = responce._payload.slice(); EXPECT_TRUE(slice.isObject()); @@ -111,8 +110,7 @@ TEST_F(RestTransactionHandlerTest, collection_not_found_ro) { request.addSuffix("begin"); parser.parse("{ \"collections\":{\"read\": [\"33\"]}}"); - arangodb::RestStatus status = handler.execute(); - EXPECT_EQ(arangodb::RestStatus::DONE, status); + handler.executeAsync().wait(); EXPECT_EQ(arangodb::rest::ResponseCode::NOT_FOUND, responce.responseCode()); VPackSlice slice = responce._payload.slice(); EXPECT_TRUE(slice.isObject()); @@ -137,8 +135,7 @@ TEST_F(RestTransactionHandlerTest, collection_not_found_write) { request.addSuffix("begin"); parser.parse("{ \"collections\":{\"write\": [\"33\"]}}"); - arangodb::RestStatus status = handler.execute(); - EXPECT_EQ(arangodb::RestStatus::DONE, status); + handler.executeAsync().wait(); EXPECT_EQ(arangodb::rest::ResponseCode::NOT_FOUND, responce.responseCode()); VPackSlice slice = responce._payload.slice(); EXPECT_TRUE(slice.isObject()); @@ -163,8 +160,7 @@ TEST_F(RestTransactionHandlerTest, collection_not_found_exclusive) { request.addSuffix("begin"); parser.parse("{ \"collections\":{\"exclusive\": [\"33\"]}}"); - arangodb::RestStatus status = handler.execute(); - EXPECT_EQ(arangodb::RestStatus::DONE, status); + handler.executeAsync().wait(); EXPECT_EQ(arangodb::rest::ResponseCode::NOT_FOUND, responce.responseCode()); VPackSlice slice = responce._payload.slice(); EXPECT_TRUE(slice.isObject()); @@ -197,8 +193,7 @@ TEST_F(RestTransactionHandlerTest, simple_transaction_abort) { request.addSuffix("begin"); parser.parse("{ \"collections\":{\"read\": [\"42\"]}}"); - arangodb::RestStatus status = handler.execute(); - EXPECT_EQ(arangodb::RestStatus::DONE, status); + handler.executeAsync().wait(); EXPECT_EQ(arangodb::rest::ResponseCode::CREATED, responce.responseCode()); VPackSlice slice = responce._payload.slice(); EXPECT_TRUE(slice.isObject()); @@ -222,8 +217,7 @@ TEST_F(RestTransactionHandlerTest, simple_transaction_abort) { request.clearSuffixes(); request.addSuffix(tid); - status = handler.execute(); - EXPECT_EQ(arangodb::RestStatus::DONE, status); + handler.executeAsync().wait(); EXPECT_EQ(arangodb::rest::ResponseCode::OK, responce.responseCode()); slice = responce._payload.slice(); EXPECT_TRUE(slice.isObject()); @@ -245,8 +239,7 @@ TEST_F(RestTransactionHandlerTest, simple_transaction_abort) { request.setRequestType(arangodb::rest::RequestType::DELETE_REQ); - status = handler.execute(); - EXPECT_EQ(arangodb::RestStatus::DONE, status); + handler.executeAsync().wait(); EXPECT_EQ(arangodb::rest::ResponseCode::OK, responce.responseCode()); slice = responce._payload.slice(); EXPECT_TRUE(slice.isObject()); @@ -278,8 +271,7 @@ TEST_F(RestTransactionHandlerTest, simple_transaction_and_commit) { request.addSuffix("begin"); parser.parse("{ \"collections\":{\"read\": [\"42\"]}}"); - arangodb::RestStatus status = handler.execute(); - EXPECT_EQ(arangodb::RestStatus::DONE, status); + handler.executeAsync().wait(); EXPECT_EQ(arangodb::rest::ResponseCode::CREATED, responce.responseCode()); VPackSlice slice = responce._payload.slice(); EXPECT_TRUE(slice.isObject()); @@ -303,8 +295,7 @@ TEST_F(RestTransactionHandlerTest, simple_transaction_and_commit) { request.clearSuffixes(); request.addSuffix(tid); - status = handler.execute(); - EXPECT_EQ(arangodb::RestStatus::DONE, status); + handler.executeAsync().wait(); EXPECT_EQ(arangodb::rest::ResponseCode::OK, responce.responseCode()); slice = responce._payload.slice(); EXPECT_TRUE(slice.isObject()); @@ -346,8 +337,7 @@ TEST_F(RestTransactionHandlerTest, permission_denied_read_only) { request.addSuffix("begin"); parser.parse("{ \"collections\":{\"write\": [\"42\"]}}"); - arangodb::RestStatus status = handler.execute(); - EXPECT_EQ(arangodb::RestStatus::DONE, status); + handler.executeAsync().wait(); EXPECT_EQ(arangodb::rest::ResponseCode::FORBIDDEN, responce.responseCode()); VPackSlice slice = responce._payload.slice(); EXPECT_TRUE(slice.isObject()); @@ -390,8 +380,7 @@ TEST_F(RestTransactionHandlerTest, permission_denied_forbidden) { request.addSuffix("begin"); parser.parse("{ \"collections\":{\"write\": [\"42\"]}}"); - arangodb::RestStatus status = handler.execute(); - EXPECT_EQ(arangodb::RestStatus::DONE, status); + handler.executeAsync().wait(); EXPECT_EQ(arangodb::rest::ResponseCode::FORBIDDEN, responce.responseCode()); VPackSlice slice = responce._payload.slice(); EXPECT_TRUE(slice.isObject()); diff --git a/tests/js/client/shell/multi/shell-aql-kill.js b/tests/js/client/shell/multi/shell-aql-kill.js index d57e197c8b07..296971ee4b14 100644 --- a/tests/js/client/shell/multi/shell-aql-kill.js +++ b/tests/js/client/shell/multi/shell-aql-kill.js @@ -56,7 +56,8 @@ function aqlKillSuite () { } } console.warn(`Giving up after ${stopAfter}s in ` + JSON.stringify(new Error().stack)); - return { code: 500, error: true, errorMessage: `Giving up after ${stopAfter}s in ${JSON.stringify(new Error().stack)}`}; + + return undefined; } function queryGone (queryId) { @@ -116,9 +117,6 @@ function aqlKillSuite () { assertEqual(killResult.code, 200, { httpres: JSON.stringify(killResult), sleepForMs }); const putResult = tryForUntil({until: jobGone(jobId)}); - if (isCov && putResult.code === 404 && putResult.errorNum === 1591) { - return; - } assertEqual(410, putResult.code, JSON.stringify(putResult)); }