blob: 5536e923faae4fd8b58a9546a5e60a4b548ab7ad [file] [log] [blame]
Yijie Maf99b8b52022-12-23 07:01:531//
2// Copyright 2018 gRPC authors.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15//
16//
Vijay Pai2b226ad2018-01-12 00:11:3517
AJ Heller0d5dc5c2023-08-16 18:54:0418#include <grpc/event_engine/event_engine.h>
Craig Tiller49a30352022-12-19 16:18:3919#include <grpc/grpc.h>
Craig Tiller0581eb12024-04-11 03:14:2020#include <grpc/support/port_platform.h>
Craig Tiller0f9d0242022-05-19 14:34:4821#include <grpc/support/sync.h>
Cheng-Yu Chung0356ff32022-12-09 18:31:4422#include <grpc/support/time.h>
Craig Tillerea389c02021-09-08 19:14:4423#include <grpcpp/alarm.h>
Muxi Yan96143952018-02-21 00:11:4824#include <grpcpp/completion_queue.h>
Cheng-Yu Chungc91c2fb2022-11-17 18:52:1425#include <grpcpp/impl/completion_queue_tag.h>
Craig Tillerea389c02021-09-08 19:14:4426
Craig Tillerdbb51642024-10-04 16:41:5627#include <atomic>
28#include <functional>
29#include <memory>
30#include <utility>
31
32#include "absl/log/check.h"
33#include "absl/status/status.h"
AJ Heller0d5dc5c2023-08-16 18:54:0434#include "src/core/lib/event_engine/default_event_engine.h"
Craig Tiller0f9d0242022-05-19 14:34:4835#include "src/core/lib/iomgr/error.h"
Vijay Pai2b226ad2018-01-12 00:11:3536#include "src/core/lib/iomgr/exec_ctx.h"
Vijay Pai2b226ad2018-01-12 00:11:3537#include "src/core/lib/surface/completion_queue.h"
Mark D. Rothf6c57b62024-09-20 20:15:5838#include "src/core/util/time.h"
Vijay Pai2b226ad2018-01-12 00:11:3539
Karthik Ravi Shankare5498432020-10-06 17:13:0240namespace grpc {
Vijay Pai2b226ad2018-01-12 00:11:3541
42namespace internal {
AJ Heller0d5dc5c2023-08-16 18:54:0443
44namespace {
45using grpc_event_engine::experimental::EventEngine;
46} // namespace
47
AJ Heller85189b22022-02-19 00:18:5448class AlarmImpl : public grpc::internal::CompletionQueueTag {
Vijay Pai2b226ad2018-01-12 00:11:3549 public:
AJ Heller0d5dc5c2023-08-16 18:54:0450 AlarmImpl()
51 : event_engine_(grpc_event_engine::experimental::GetDefaultEventEngine()),
52 cq_(nullptr),
53 tag_(nullptr) {
Craig Tiller0fc03842022-06-29 22:35:3154 gpr_ref_init(&refs_, 1);
Craig Tiller0fc03842022-06-29 22:35:3155 }
Esun Kime7434d32020-10-16 20:57:2156 ~AlarmImpl() override {}
Vijay Paiab72f442019-10-21 16:16:3457 bool FinalizeResult(void** tag, bool* /*status*/) override {
Vijay Paie1e70422018-01-14 07:53:0158 *tag = tag_;
59 Unref();
60 return true;
61 }
AJ Heller85189b22022-02-19 00:18:5462 void Set(grpc::CompletionQueue* cq, gpr_timespec deadline, void* tag) {
Vijay Pai2b226ad2018-01-12 00:11:3563 grpc_core::ExecCtx exec_ctx;
64 GRPC_CQ_INTERNAL_REF(cq->cq(), "alarm");
65 cq_ = cq->cq();
Vijay Paie1e70422018-01-14 07:53:0166 tag_ = tag;
Tanvi Jagtapb72d3182024-05-03 06:33:4867 CHECK(grpc_cq_begin_op(cq_, this));
AJ Heller0d5dc5c2023-08-16 18:54:0468 Ref();
Tanvi Jagtapb72d3182024-05-03 06:33:4869 CHECK(cq_armed_.exchange(true) == false);
70 CHECK(!callback_armed_.load());
AJ Heller0d5dc5c2023-08-16 18:54:0471 cq_timer_handle_ = event_engine_->RunAfter(
72 grpc_core::Timestamp::FromTimespecRoundUp(deadline) -
73 grpc_core::ExecCtx::Get()->Now(),
74 [this] { OnCQAlarm(absl::OkStatus()); });
Vijay Paidb01bf72018-09-12 00:01:1975 }
76 void Set(gpr_timespec deadline, std::function<void(bool)> f) {
Vijay Paidb01bf72018-09-12 00:01:1977 grpc_core::ExecCtx exec_ctx;
78 // Don't use any CQ at all. Instead just use the timer to fire the function
79 callback_ = std::move(f);
80 Ref();
Tanvi Jagtapb72d3182024-05-03 06:33:4881 CHECK(callback_armed_.exchange(true) == false);
82 CHECK(!cq_armed_.load());
AJ Heller0d5dc5c2023-08-16 18:54:0483 callback_timer_handle_ = event_engine_->RunAfter(
84 grpc_core::Timestamp::FromTimespecRoundUp(deadline) -
85 grpc_core::ExecCtx::Get()->Now(),
86 [this] { OnCallbackAlarm(true); });
Vijay Pai2b226ad2018-01-12 00:11:3587 }
88 void Cancel() {
Vijay Pai2b226ad2018-01-12 00:11:3589 grpc_core::ExecCtx exec_ctx;
AJ Heller0d5dc5c2023-08-16 18:54:0490 if (callback_armed_.load() &&
91 event_engine_->Cancel(callback_timer_handle_)) {
92 event_engine_->Run([this] { OnCallbackAlarm(/*is_ok=*/false); });
93 }
94 if (cq_armed_.load() && event_engine_->Cancel(cq_timer_handle_)) {
Craig Tilleradeb4652024-12-03 18:05:5195 event_engine_->Run(
96 [this] { OnCQAlarm(absl::CancelledError("cancelled")); });
AJ Heller0d5dc5c2023-08-16 18:54:0497 }
Vijay Pai2b226ad2018-01-12 00:11:3598 }
99 void Destroy() {
100 Cancel();
101 Unref();
102 }
Vijay Paife75ef52018-01-14 07:54:59103
Vijay Pai2b226ad2018-01-12 00:11:35104 private:
AJ Heller0d5dc5c2023-08-16 18:54:04105 void OnCQAlarm(grpc_error_handle error) {
106 cq_armed_.store(false);
AJ Heller0d5dc5c2023-08-16 18:54:04107 grpc_core::ExecCtx exec_ctx;
108 // Preserve the cq and reset the cq_ so that the alarm
109 // can be reset when the alarm tag is delivered.
110 grpc_completion_queue* cq = cq_;
111 cq_ = nullptr;
112 grpc_cq_end_op(
113 cq, this, error,
Craig Tilleradeb4652024-12-03 18:05:51114 [](void* /*arg*/, grpc_cq_completion* /*completion*/) {}, nullptr,
115 &completion_);
AJ Heller0d5dc5c2023-08-16 18:54:04116 GRPC_CQ_INTERNAL_UNREF(cq, "alarm");
117 }
118
119 void OnCallbackAlarm(bool is_ok) {
120 callback_armed_.store(false);
AJ Heller0d5dc5c2023-08-16 18:54:04121 grpc_core::ExecCtx exec_ctx;
122 callback_(is_ok);
123 Unref();
124 }
125
Vijay Pai2b226ad2018-01-12 00:11:35126 void Ref() { gpr_ref(&refs_); }
127 void Unref() {
128 if (gpr_unref(&refs_)) {
129 delete this;
130 }
131 }
132
AJ Heller0d5dc5c2023-08-16 18:54:04133 std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
134 std::atomic<bool> cq_armed_{false};
135 EventEngine::TaskHandle cq_timer_handle_ = EventEngine::TaskHandle::kInvalid;
136 std::atomic<bool> callback_armed_{false};
137 EventEngine::TaskHandle callback_timer_handle_ =
138 EventEngine::TaskHandle::kInvalid;
Vijay Pai2b226ad2018-01-12 00:11:35139 gpr_refcount refs_;
Craig Tilleradeb4652024-12-03 18:05:51140 grpc_cq_completion completion_;
Vijay Pai2b226ad2018-01-12 00:11:35141 // completion queue where events about this alarm will be posted
142 grpc_completion_queue* cq_;
Vijay Paie1e70422018-01-14 07:53:01143 void* tag_;
Vijay Paidb01bf72018-09-12 00:01:19144 std::function<void(bool)> callback_;
Vijay Pai2b226ad2018-01-12 00:11:35145};
146} // namespace internal
147
Cheng-Yu Chung1b055762022-12-16 17:08:35148Alarm::Alarm() : alarm_(new internal::AlarmImpl()) {}
Vijay Pai2b226ad2018-01-12 00:11:35149
AJ Heller85189b22022-02-19 00:18:54150void Alarm::SetInternal(grpc::CompletionQueue* cq, gpr_timespec deadline,
Nicolas "Pixel" Nobled550af32018-12-12 03:09:42151 void* tag) {
Vijay Paie1e70422018-01-14 07:53:01152 // Note that we know that alarm_ is actually an internal::AlarmImpl
153 // but we declared it as the base pointer to avoid a forward declaration
154 // or exposing core data structures in the C++ public headers.
155 // Thus it is safe to use a static_cast to the subclass here, and the
156 // C++ style guide allows us to do so in this case
157 static_cast<internal::AlarmImpl*>(alarm_)->Set(cq, deadline, tag);
Vijay Pai2b226ad2018-01-12 00:11:35158}
159
Vijay Paidb01bf72018-09-12 00:01:19160void Alarm::SetInternal(gpr_timespec deadline, std::function<void(bool)> f) {
161 // Note that we know that alarm_ is actually an internal::AlarmImpl
162 // but we declared it as the base pointer to avoid a forward declaration
163 // or exposing core data structures in the C++ public headers.
164 // Thus it is safe to use a static_cast to the subclass here, and the
165 // C++ style guide allows us to do so in this case
166 static_cast<internal::AlarmImpl*>(alarm_)->Set(deadline, std::move(f));
167}
168
Vijay Pai2b226ad2018-01-12 00:11:35169Alarm::~Alarm() {
170 if (alarm_ != nullptr) {
Vijay Paie1e70422018-01-14 07:53:01171 static_cast<internal::AlarmImpl*>(alarm_)->Destroy();
Vijay Pai2b226ad2018-01-12 00:11:35172 }
173}
174
Vijay Paie1e70422018-01-14 07:53:01175void Alarm::Cancel() { static_cast<internal::AlarmImpl*>(alarm_)->Cancel(); }
Karthik Ravi Shankare5498432020-10-06 17:13:02176} // namespace grpc