From df985200d37c6f7e33f550a6ce291188ff312a40 Mon Sep 17 00:00:00 2001 From: Johann Listunov Date: Mon, 4 Aug 2025 11:31:27 +0200 Subject: [PATCH] Introduces an empty/no-op acceptance queue as a preparation for further steps. Moves relevant metrics from the generic Scheduler to the new queue. --- arangod/GeneralServer/RestHandler.cpp | 20 +++--- .../AcceptanceQueue/AcceptanceQueue.cpp | 57 +++++++++++++++++ .../AcceptanceQueue/AcceptanceQueue.h | 64 +++++++++++++++++++ arangod/Scheduler/CMakeLists.txt | 1 + arangod/Scheduler/Scheduler.h | 2 - arangod/Scheduler/SchedulerFeature.cpp | 19 +++++- arangod/Scheduler/SchedulerFeature.h | 3 + arangod/Scheduler/SupervisedScheduler.cpp | 8 --- arangod/Scheduler/SupervisedScheduler.h | 2 - arangod/Scheduler/ThreadPoolScheduler.cpp | 6 -- arangod/Scheduler/ThreadPoolScheduler.h | 2 - tests/Mocks/FakeScheduler.cpp | 6 -- tests/Mocks/FakeScheduler.h | 2 - 13 files changed, 154 insertions(+), 38 deletions(-) create mode 100644 arangod/Scheduler/AcceptanceQueue/AcceptanceQueue.cpp create mode 100644 arangod/Scheduler/AcceptanceQueue/AcceptanceQueue.h diff --git a/arangod/GeneralServer/RestHandler.cpp b/arangod/GeneralServer/RestHandler.cpp index 991c77e4f160..10606081d980 100644 --- a/arangod/GeneralServer/RestHandler.cpp +++ b/arangod/GeneralServer/RestHandler.cpp @@ -41,6 +41,7 @@ #include "Rest/GeneralRequest.h" #include "Rest/HttpResponse.h" #include "Scheduler/SchedulerFeature.h" +#include "Scheduler/AcceptanceQueue/AcceptanceQueue.h" #include "Statistics/RequestStatistics.h" #include "Utils/ExecContext.h" #include "VocBase/Identifiers/TransactionId.h" @@ -80,8 +81,8 @@ RestHandler::~RestHandler() { // someone forgot to call trackTaskEnd 🤔 TRI_ASSERT(PriorityRequestLane(determineRequestLane()) == RequestPriority::LOW); - TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr); - SchedulerFeature::SCHEDULER->trackEndOngoingLowPriorityTask(); + TRI_ASSERT(SchedulerFeature::ACCEPTANCE_QUEUE != nullptr); + SchedulerFeature::ACCEPTANCE_QUEUE->trackEndOngoingLowPriorityTask(); } } @@ -162,9 +163,9 @@ RequestLane RestHandler::determineRequestLane() { } void RestHandler::trackQueueStart() noexcept { - TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr); + TRI_ASSERT(SchedulerFeature::ACCEPTANCE_QUEUE != nullptr); _statistics.SET_QUEUE_START( - SchedulerFeature::SCHEDULER->queueStatistics()._queued); + SchedulerFeature::ACCEPTANCE_QUEUE->queueStatistics()._queued); } void RestHandler::trackQueueEnd() noexcept { _statistics.SET_QUEUE_END(); } @@ -173,8 +174,8 @@ void RestHandler::trackTaskStart() noexcept { TRI_ASSERT(!_trackedAsOngoingLowPrio); if (PriorityRequestLane(determineRequestLane()) == RequestPriority::LOW) { - TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr); - SchedulerFeature::SCHEDULER->trackBeginOngoingLowPriorityTask(); + TRI_ASSERT(SchedulerFeature::ACCEPTANCE_QUEUE != nullptr); + SchedulerFeature::ACCEPTANCE_QUEUE->trackBeginOngoingLowPriorityTask(); _trackedAsOngoingLowPrio = true; } } @@ -186,15 +187,16 @@ void RestHandler::trackTaskEnd() noexcept { if (_trackedAsOngoingLowPrio) { TRI_ASSERT(PriorityRequestLane(determineRequestLane()) == RequestPriority::LOW); - TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr); - SchedulerFeature::SCHEDULER->trackEndOngoingLowPriorityTask(); + TRI_ASSERT(SchedulerFeature::ACCEPTANCE_QUEUE != nullptr); + SchedulerFeature::ACCEPTANCE_QUEUE->trackEndOngoingLowPriorityTask(); _trackedAsOngoingLowPrio = false; // update the time the last low priority item spent waiting in the queue. // the queueing time in ms uint64_t queueTimeMs = static_cast(queueTime * 1000.0); - SchedulerFeature::SCHEDULER->setLastLowPriorityDequeueTime(queueTimeMs); + SchedulerFeature::ACCEPTANCE_QUEUE->setLastLowPriorityDequeueTime( + queueTimeMs); } if (queueTime >= 30.0) { diff --git a/arangod/Scheduler/AcceptanceQueue/AcceptanceQueue.cpp b/arangod/Scheduler/AcceptanceQueue/AcceptanceQueue.cpp new file mode 100644 index 000000000000..c5e6bd77e3a8 --- /dev/null +++ b/arangod/Scheduler/AcceptanceQueue/AcceptanceQueue.cpp @@ -0,0 +1,57 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2025 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Business Source License 1.1 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// https://github.com/arangodb/arangodb/blob/devel/LICENSE +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Johann Listunov +//////////////////////////////////////////////////////////////////////////////// + +#include "AcceptanceQueue.h" + +#include "Metrics/Gauge.h" + +namespace arangodb { +AcceptanceQueue::AcceptanceQueue( + Scheduler* scheduler, const std::shared_ptr& metrics) + : _scheduler(scheduler), _metrics(metrics) {} + +bool AcceptanceQueue::start() { + // the scheduler starts first + return true; +} + +void AcceptanceQueue::shutdown() { + // the scheduler will shutdown after this +} + +void AcceptanceQueue::setLastLowPriorityDequeueTime(uint64_t time) { + _scheduler->setLastLowPriorityDequeueTime(time); +} + +Scheduler::QueueStatistics AcceptanceQueue::queueStatistics() const { + return _scheduler->queueStatistics(); +} + +void AcceptanceQueue::trackBeginOngoingLowPriorityTask() noexcept { + _metrics->_ongoingLowPriorityGauge += 1; +} +void AcceptanceQueue::trackEndOngoingLowPriorityTask() noexcept { + _metrics->_ongoingLowPriorityGauge -= 1; +} + +} // namespace arangodb \ No newline at end of file diff --git a/arangod/Scheduler/AcceptanceQueue/AcceptanceQueue.h b/arangod/Scheduler/AcceptanceQueue/AcceptanceQueue.h new file mode 100644 index 000000000000..f0555e1977de --- /dev/null +++ b/arangod/Scheduler/AcceptanceQueue/AcceptanceQueue.h @@ -0,0 +1,64 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2025 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Business Source License 1.1 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// https://github.com/arangodb/arangodb/blob/devel/LICENSE +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Johann Listunov +//////////////////////////////////////////////////////////////////////////////// + +#pragma once + +#include +#include + +#include "GeneralServer/RequestLane.h" +#include "Scheduler/Scheduler.h" + +#include "../SchedulerMetrics.h" + +namespace arangodb { + +class AcceptanceQueue { + public: + explicit AcceptanceQueue(Scheduler* scheduler, + const std::shared_ptr& metrics); + + template>, int> = 0> + [[nodiscard]] bool tryBoundedQueue(RequestLane lane, F&& fn) noexcept { + if (_scheduler == nullptr) { + return false; + } + return _scheduler->tryBoundedQueue(lane, fn); + } + + void setLastLowPriorityDequeueTime(uint64_t time); + [[nodiscard]] Scheduler::QueueStatistics queueStatistics() const; + + bool start(); + void shutdown(); + + void trackBeginOngoingLowPriorityTask() noexcept; + void trackEndOngoingLowPriorityTask() noexcept; + + private: + Scheduler* _scheduler; + std::shared_ptr _metrics; +}; + +} // namespace arangodb diff --git a/arangod/Scheduler/CMakeLists.txt b/arangod/Scheduler/CMakeLists.txt index 6629c96c5500..f1e645b53c7e 100644 --- a/arangod/Scheduler/CMakeLists.txt +++ b/arangod/Scheduler/CMakeLists.txt @@ -7,6 +7,7 @@ add_library(arango_scheduler STATIC SupervisedScheduler.cpp ThreadPoolScheduler.cpp WorkStealingThreadPool.cpp + AcceptanceQueue/AcceptanceQueue.cpp ) target_link_libraries(arango_scheduler diff --git a/arangod/Scheduler/Scheduler.h b/arangod/Scheduler/Scheduler.h index 91359a64ab57..8721c3a43dc7 100644 --- a/arangod/Scheduler/Scheduler.h +++ b/arangod/Scheduler/Scheduler.h @@ -316,8 +316,6 @@ class Scheduler { virtual QueueStatistics queueStatistics() const = 0; virtual void trackCreateHandlerTask() noexcept = 0; - virtual void trackBeginOngoingLowPriorityTask() noexcept = 0; - virtual void trackEndOngoingLowPriorityTask() noexcept = 0; virtual void trackQueueTimeViolation() = 0; virtual void trackQueueItemSize(std::int64_t) noexcept = 0; diff --git a/arangod/Scheduler/SchedulerFeature.cpp b/arangod/Scheduler/SchedulerFeature.cpp index 10eedafadd9e..f7dd93052522 100644 --- a/arangod/Scheduler/SchedulerFeature.cpp +++ b/arangod/Scheduler/SchedulerFeature.cpp @@ -46,6 +46,8 @@ #include "Scheduler/Scheduler.h" #include "Scheduler/SupervisedScheduler.h" #include "Scheduler/ThreadPoolScheduler.h" +#include "Scheduler/AcceptanceQueue/AcceptanceQueue.h" + #ifdef USE_V8 #include "VocBase/Methods/Tasks.h" #endif @@ -83,6 +85,7 @@ std::atomic processIdRequestingLogRotate{processIdUnspecified}; namespace arangodb { Scheduler* SchedulerFeature::SCHEDULER = nullptr; +AcceptanceQueue* SchedulerFeature::ACCEPTANCE_QUEUE = nullptr; struct SchedulerFeature::AsioHandler { std::shared_ptr _exitSignals; @@ -93,6 +96,7 @@ SchedulerFeature::SchedulerFeature(Server& server, metrics::MetricsFeature& metrics) : ArangodFeature{server, *this}, _scheduler(nullptr), + _acceptanceQueue(nullptr), _metricsFeature(metrics), _asioHandler(std::make_unique()) { setOptional(false); @@ -347,11 +351,15 @@ void SchedulerFeature::prepare() { } else { TRI_ASSERT(_schedulerType == "threadpools"); return std::make_unique(server(), _nrMaximalThreads, - std::move(metrics)); + metrics); } }); + _acceptanceQueue = + std::make_unique(_scheduler.get(), metrics); + SCHEDULER = _scheduler.get(); + ACCEPTANCE_QUEUE = _acceptanceQueue.get(); } void SchedulerFeature::start() { @@ -364,6 +372,14 @@ void SchedulerFeature::start() { FATAL_ERROR_EXIT(); } LOG_TOPIC("14e6f", DEBUG, Logger::STARTUP) << "scheduler has started"; + + ok = _acceptanceQueue->start(); + if (!ok) { + LOG_TOPIC("7f498", FATAL, arangodb::Logger::FIXME) + << "the AcceptanceQueue cannot be started"; + FATAL_ERROR_EXIT(); + } + LOG_TOPIC("14e70", DEBUG, Logger::STARTUP) << "AcceptanceQueue has started"; } void SchedulerFeature::stop() { @@ -372,6 +388,7 @@ void SchedulerFeature::stop() { arangodb::Task::shutdownTasks(); #endif signalStuffDeinit(); + _acceptanceQueue->shutdown(); _scheduler->shutdown(); } diff --git a/arangod/Scheduler/SchedulerFeature.h b/arangod/Scheduler/SchedulerFeature.h index e9556d15b75c..ad55df9fb10e 100644 --- a/arangod/Scheduler/SchedulerFeature.h +++ b/arangod/Scheduler/SchedulerFeature.h @@ -31,12 +31,14 @@ namespace arangodb { class Scheduler; +class AcceptanceQueue; class SchedulerFeature final : public ArangodFeature { public: static constexpr std::string_view name() noexcept { return "Scheduler"; } static Scheduler* SCHEDULER; + static AcceptanceQueue* ACCEPTANCE_QUEUE; SchedulerFeature(Server& server, metrics::MetricsFeature& metrics); ~SchedulerFeature(); @@ -71,6 +73,7 @@ class SchedulerFeature final : public ArangodFeature { std::string _schedulerType = "supervised"; std::unique_ptr _scheduler; + std::unique_ptr _acceptanceQueue; metrics::MetricsFeature& _metricsFeature; struct AsioHandler; diff --git a/arangod/Scheduler/SupervisedScheduler.cpp b/arangod/Scheduler/SupervisedScheduler.cpp index debdb2cea3d6..73a6340595d3 100644 --- a/arangod/Scheduler/SupervisedScheduler.cpp +++ b/arangod/Scheduler/SupervisedScheduler.cpp @@ -989,14 +989,6 @@ void SupervisedScheduler::trackCreateHandlerTask() noexcept { ++_metrics->_metricsHandlerTasksCreated; } -void SupervisedScheduler::trackBeginOngoingLowPriorityTask() noexcept { - ++_metrics->_ongoingLowPriorityGauge; -} - -void SupervisedScheduler::trackEndOngoingLowPriorityTask() noexcept { - --_metrics->_ongoingLowPriorityGauge; -} - void SupervisedScheduler::trackQueueTimeViolation() noexcept { ++_metrics->_metricsQueueTimeViolations; } diff --git a/arangod/Scheduler/SupervisedScheduler.h b/arangod/Scheduler/SupervisedScheduler.h index 4efd11f0074e..7707d0f786ed 100644 --- a/arangod/Scheduler/SupervisedScheduler.h +++ b/arangod/Scheduler/SupervisedScheduler.h @@ -59,8 +59,6 @@ class SupervisedScheduler final : public Scheduler { Scheduler::QueueStatistics queueStatistics() const override; void trackCreateHandlerTask() noexcept override; - void trackBeginOngoingLowPriorityTask() noexcept override; - void trackEndOngoingLowPriorityTask() noexcept override; void trackQueueTimeViolation() noexcept override; void trackQueueItemSize(std::int64_t) noexcept override; diff --git a/arangod/Scheduler/ThreadPoolScheduler.cpp b/arangod/Scheduler/ThreadPoolScheduler.cpp index ebffec622b9c..4f34a7610840 100644 --- a/arangod/Scheduler/ThreadPoolScheduler.cpp +++ b/arangod/Scheduler/ThreadPoolScheduler.cpp @@ -42,12 +42,6 @@ Scheduler::QueueStatistics ThreadPoolScheduler::queueStatistics() const { void ThreadPoolScheduler::trackCreateHandlerTask() noexcept { ++_metrics->_metricsHandlerTasksCreated; } -void ThreadPoolScheduler::trackBeginOngoingLowPriorityTask() noexcept { - _metrics->_ongoingLowPriorityGauge += 1; -} -void ThreadPoolScheduler::trackEndOngoingLowPriorityTask() noexcept { - _metrics->_ongoingLowPriorityGauge -= 1; -} void ThreadPoolScheduler::trackQueueTimeViolation() { ++_metrics->_metricsQueueTimeViolations; } diff --git a/arangod/Scheduler/ThreadPoolScheduler.h b/arangod/Scheduler/ThreadPoolScheduler.h index a4cf53a91316..19f89ea786b9 100644 --- a/arangod/Scheduler/ThreadPoolScheduler.h +++ b/arangod/Scheduler/ThreadPoolScheduler.h @@ -33,8 +33,6 @@ struct ThreadPoolScheduler final : Scheduler { void toVelocyPack(velocypack::Builder& builder) const override; QueueStatistics queueStatistics() const override; void trackCreateHandlerTask() noexcept override; - void trackBeginOngoingLowPriorityTask() noexcept override; - void trackEndOngoingLowPriorityTask() noexcept override; void trackQueueTimeViolation() override; void trackQueueItemSize(std::int64_t x) noexcept override; uint64_t getLastLowPriorityDequeueTime() const noexcept override; diff --git a/tests/Mocks/FakeScheduler.cpp b/tests/Mocks/FakeScheduler.cpp index b6fb789025c6..9fb2b89b380f 100644 --- a/tests/Mocks/FakeScheduler.cpp +++ b/tests/Mocks/FakeScheduler.cpp @@ -50,12 +50,6 @@ Scheduler::QueueStatistics FakeScheduler::queueStatistics() const { void FakeScheduler::trackCreateHandlerTask() noexcept { ADB_PROD_ASSERT(false) << "not implemented"; } -void FakeScheduler::trackBeginOngoingLowPriorityTask() noexcept { - ADB_PROD_ASSERT(false) << "not implemented"; -} -void FakeScheduler::trackEndOngoingLowPriorityTask() noexcept { - ADB_PROD_ASSERT(false) << "not implemented"; -} void FakeScheduler::trackQueueTimeViolation() { ADB_PROD_ASSERT(false) << "not implemented"; diff --git a/tests/Mocks/FakeScheduler.h b/tests/Mocks/FakeScheduler.h index 4ae73a1e8bc0..479be508940c 100644 --- a/tests/Mocks/FakeScheduler.h +++ b/tests/Mocks/FakeScheduler.h @@ -35,8 +35,6 @@ struct FakeScheduler : Scheduler { QueueStatistics queueStatistics() const override; void trackCreateHandlerTask() noexcept override; - void trackBeginOngoingLowPriorityTask() noexcept override; - void trackEndOngoingLowPriorityTask() noexcept override; void trackQueueTimeViolation() override; void trackQueueItemSize(std::int64_t) noexcept override; uint64_t getLastLowPriorityDequeueTime() const noexcept override;