Yijie Ma | f99b8b5 | 2022-12-23 07:01:53 | [diff] [blame] | 1 | // |
| 2 | // Copyright 2018 gRPC authors. |
| 3 | // |
| 4 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | // you may not use this file except in compliance with the License. |
| 6 | // You may obtain a copy of the License at |
| 7 | // |
| 8 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | // |
| 10 | // Unless required by applicable law or agreed to in writing, software |
| 11 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | // See the License for the specific language governing permissions and |
| 14 | // limitations under the License. |
| 15 | // |
| 16 | // |
Vijay Pai | 2b226ad | 2018-01-12 00:11:35 | [diff] [blame] | 17 | |
AJ Heller | 0d5dc5c | 2023-08-16 18:54:04 | [diff] [blame] | 18 | #include <grpc/event_engine/event_engine.h> |
Craig Tiller | 49a3035 | 2022-12-19 16:18:39 | [diff] [blame] | 19 | #include <grpc/grpc.h> |
Craig Tiller | 0581eb1 | 2024-04-11 03:14:20 | [diff] [blame] | 20 | #include <grpc/support/port_platform.h> |
Craig Tiller | 0f9d024 | 2022-05-19 14:34:48 | [diff] [blame] | 21 | #include <grpc/support/sync.h> |
Cheng-Yu Chung | 0356ff3 | 2022-12-09 18:31:44 | [diff] [blame] | 22 | #include <grpc/support/time.h> |
Craig Tiller | ea389c0 | 2021-09-08 19:14:44 | [diff] [blame] | 23 | #include <grpcpp/alarm.h> |
Muxi Yan | 9614395 | 2018-02-21 00:11:48 | [diff] [blame] | 24 | #include <grpcpp/completion_queue.h> |
Cheng-Yu Chung | c91c2fb | 2022-11-17 18:52:14 | [diff] [blame] | 25 | #include <grpcpp/impl/completion_queue_tag.h> |
Craig Tiller | ea389c0 | 2021-09-08 19:14:44 | [diff] [blame] | 26 | |
Craig Tiller | dbb5164 | 2024-10-04 16:41:56 | [diff] [blame] | 27 | #include <atomic> |
| 28 | #include <functional> |
| 29 | #include <memory> |
| 30 | #include <utility> |
| 31 | |
| 32 | #include "absl/log/check.h" |
| 33 | #include "absl/status/status.h" |
AJ Heller | 0d5dc5c | 2023-08-16 18:54:04 | [diff] [blame] | 34 | #include "src/core/lib/event_engine/default_event_engine.h" |
Craig Tiller | 0f9d024 | 2022-05-19 14:34:48 | [diff] [blame] | 35 | #include "src/core/lib/iomgr/error.h" |
Vijay Pai | 2b226ad | 2018-01-12 00:11:35 | [diff] [blame] | 36 | #include "src/core/lib/iomgr/exec_ctx.h" |
Vijay Pai | 2b226ad | 2018-01-12 00:11:35 | [diff] [blame] | 37 | #include "src/core/lib/surface/completion_queue.h" |
Mark D. Roth | f6c57b6 | 2024-09-20 20:15:58 | [diff] [blame] | 38 | #include "src/core/util/time.h" |
Vijay Pai | 2b226ad | 2018-01-12 00:11:35 | [diff] [blame] | 39 | |
Karthik Ravi Shankar | e549843 | 2020-10-06 17:13:02 | [diff] [blame] | 40 | namespace grpc { |
Vijay Pai | 2b226ad | 2018-01-12 00:11:35 | [diff] [blame] | 41 | |
| 42 | namespace internal { |
AJ Heller | 0d5dc5c | 2023-08-16 18:54:04 | [diff] [blame] | 43 | |
| 44 | namespace { |
| 45 | using grpc_event_engine::experimental::EventEngine; |
| 46 | } // namespace |
| 47 | |
AJ Heller | 85189b2 | 2022-02-19 00:18:54 | [diff] [blame] | 48 | class AlarmImpl : public grpc::internal::CompletionQueueTag { |
Vijay Pai | 2b226ad | 2018-01-12 00:11:35 | [diff] [blame] | 49 | public: |
AJ Heller | 0d5dc5c | 2023-08-16 18:54:04 | [diff] [blame] | 50 | AlarmImpl() |
| 51 | : event_engine_(grpc_event_engine::experimental::GetDefaultEventEngine()), |
| 52 | cq_(nullptr), |
| 53 | tag_(nullptr) { |
Craig Tiller | 0fc0384 | 2022-06-29 22:35:31 | [diff] [blame] | 54 | gpr_ref_init(&refs_, 1); |
Craig Tiller | 0fc0384 | 2022-06-29 22:35:31 | [diff] [blame] | 55 | } |
Esun Kim | e7434d3 | 2020-10-16 20:57:21 | [diff] [blame] | 56 | ~AlarmImpl() override {} |
Vijay Pai | ab72f44 | 2019-10-21 16:16:34 | [diff] [blame] | 57 | bool FinalizeResult(void** tag, bool* /*status*/) override { |
Vijay Pai | e1e7042 | 2018-01-14 07:53:01 | [diff] [blame] | 58 | *tag = tag_; |
| 59 | Unref(); |
| 60 | return true; |
| 61 | } |
AJ Heller | 85189b2 | 2022-02-19 00:18:54 | [diff] [blame] | 62 | void Set(grpc::CompletionQueue* cq, gpr_timespec deadline, void* tag) { |
Vijay Pai | 2b226ad | 2018-01-12 00:11:35 | [diff] [blame] | 63 | grpc_core::ExecCtx exec_ctx; |
| 64 | GRPC_CQ_INTERNAL_REF(cq->cq(), "alarm"); |
| 65 | cq_ = cq->cq(); |
Vijay Pai | e1e7042 | 2018-01-14 07:53:01 | [diff] [blame] | 66 | tag_ = tag; |
Tanvi Jagtap | b72d318 | 2024-05-03 06:33:48 | [diff] [blame] | 67 | CHECK(grpc_cq_begin_op(cq_, this)); |
AJ Heller | 0d5dc5c | 2023-08-16 18:54:04 | [diff] [blame] | 68 | Ref(); |
Tanvi Jagtap | b72d318 | 2024-05-03 06:33:48 | [diff] [blame] | 69 | CHECK(cq_armed_.exchange(true) == false); |
| 70 | CHECK(!callback_armed_.load()); |
AJ Heller | 0d5dc5c | 2023-08-16 18:54:04 | [diff] [blame] | 71 | cq_timer_handle_ = event_engine_->RunAfter( |
| 72 | grpc_core::Timestamp::FromTimespecRoundUp(deadline) - |
| 73 | grpc_core::ExecCtx::Get()->Now(), |
| 74 | [this] { OnCQAlarm(absl::OkStatus()); }); |
Vijay Pai | db01bf7 | 2018-09-12 00:01:19 | [diff] [blame] | 75 | } |
| 76 | void Set(gpr_timespec deadline, std::function<void(bool)> f) { |
Vijay Pai | db01bf7 | 2018-09-12 00:01:19 | [diff] [blame] | 77 | grpc_core::ExecCtx exec_ctx; |
| 78 | // Don't use any CQ at all. Instead just use the timer to fire the function |
| 79 | callback_ = std::move(f); |
| 80 | Ref(); |
Tanvi Jagtap | b72d318 | 2024-05-03 06:33:48 | [diff] [blame] | 81 | CHECK(callback_armed_.exchange(true) == false); |
| 82 | CHECK(!cq_armed_.load()); |
AJ Heller | 0d5dc5c | 2023-08-16 18:54:04 | [diff] [blame] | 83 | callback_timer_handle_ = event_engine_->RunAfter( |
| 84 | grpc_core::Timestamp::FromTimespecRoundUp(deadline) - |
| 85 | grpc_core::ExecCtx::Get()->Now(), |
| 86 | [this] { OnCallbackAlarm(true); }); |
Vijay Pai | 2b226ad | 2018-01-12 00:11:35 | [diff] [blame] | 87 | } |
| 88 | void Cancel() { |
Vijay Pai | 2b226ad | 2018-01-12 00:11:35 | [diff] [blame] | 89 | grpc_core::ExecCtx exec_ctx; |
AJ Heller | 0d5dc5c | 2023-08-16 18:54:04 | [diff] [blame] | 90 | if (callback_armed_.load() && |
| 91 | event_engine_->Cancel(callback_timer_handle_)) { |
| 92 | event_engine_->Run([this] { OnCallbackAlarm(/*is_ok=*/false); }); |
| 93 | } |
| 94 | if (cq_armed_.load() && event_engine_->Cancel(cq_timer_handle_)) { |
Craig Tiller | adeb465 | 2024-12-03 18:05:51 | [diff] [blame] | 95 | event_engine_->Run( |
| 96 | [this] { OnCQAlarm(absl::CancelledError("cancelled")); }); |
AJ Heller | 0d5dc5c | 2023-08-16 18:54:04 | [diff] [blame] | 97 | } |
Vijay Pai | 2b226ad | 2018-01-12 00:11:35 | [diff] [blame] | 98 | } |
| 99 | void Destroy() { |
| 100 | Cancel(); |
| 101 | Unref(); |
| 102 | } |
Vijay Pai | fe75ef5 | 2018-01-14 07:54:59 | [diff] [blame] | 103 | |
Vijay Pai | 2b226ad | 2018-01-12 00:11:35 | [diff] [blame] | 104 | private: |
AJ Heller | 0d5dc5c | 2023-08-16 18:54:04 | [diff] [blame] | 105 | void OnCQAlarm(grpc_error_handle error) { |
| 106 | cq_armed_.store(false); |
AJ Heller | 0d5dc5c | 2023-08-16 18:54:04 | [diff] [blame] | 107 | grpc_core::ExecCtx exec_ctx; |
| 108 | // Preserve the cq and reset the cq_ so that the alarm |
| 109 | // can be reset when the alarm tag is delivered. |
| 110 | grpc_completion_queue* cq = cq_; |
| 111 | cq_ = nullptr; |
| 112 | grpc_cq_end_op( |
| 113 | cq, this, error, |
Craig Tiller | adeb465 | 2024-12-03 18:05:51 | [diff] [blame] | 114 | [](void* /*arg*/, grpc_cq_completion* /*completion*/) {}, nullptr, |
| 115 | &completion_); |
AJ Heller | 0d5dc5c | 2023-08-16 18:54:04 | [diff] [blame] | 116 | GRPC_CQ_INTERNAL_UNREF(cq, "alarm"); |
| 117 | } |
| 118 | |
| 119 | void OnCallbackAlarm(bool is_ok) { |
| 120 | callback_armed_.store(false); |
AJ Heller | 0d5dc5c | 2023-08-16 18:54:04 | [diff] [blame] | 121 | grpc_core::ExecCtx exec_ctx; |
| 122 | callback_(is_ok); |
| 123 | Unref(); |
| 124 | } |
| 125 | |
Vijay Pai | 2b226ad | 2018-01-12 00:11:35 | [diff] [blame] | 126 | void Ref() { gpr_ref(&refs_); } |
| 127 | void Unref() { |
| 128 | if (gpr_unref(&refs_)) { |
| 129 | delete this; |
| 130 | } |
| 131 | } |
| 132 | |
AJ Heller | 0d5dc5c | 2023-08-16 18:54:04 | [diff] [blame] | 133 | std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_; |
| 134 | std::atomic<bool> cq_armed_{false}; |
| 135 | EventEngine::TaskHandle cq_timer_handle_ = EventEngine::TaskHandle::kInvalid; |
| 136 | std::atomic<bool> callback_armed_{false}; |
| 137 | EventEngine::TaskHandle callback_timer_handle_ = |
| 138 | EventEngine::TaskHandle::kInvalid; |
Vijay Pai | 2b226ad | 2018-01-12 00:11:35 | [diff] [blame] | 139 | gpr_refcount refs_; |
Craig Tiller | adeb465 | 2024-12-03 18:05:51 | [diff] [blame] | 140 | grpc_cq_completion completion_; |
Vijay Pai | 2b226ad | 2018-01-12 00:11:35 | [diff] [blame] | 141 | // completion queue where events about this alarm will be posted |
| 142 | grpc_completion_queue* cq_; |
Vijay Pai | e1e7042 | 2018-01-14 07:53:01 | [diff] [blame] | 143 | void* tag_; |
Vijay Pai | db01bf7 | 2018-09-12 00:01:19 | [diff] [blame] | 144 | std::function<void(bool)> callback_; |
Vijay Pai | 2b226ad | 2018-01-12 00:11:35 | [diff] [blame] | 145 | }; |
| 146 | } // namespace internal |
| 147 | |
Cheng-Yu Chung | 1b05576 | 2022-12-16 17:08:35 | [diff] [blame] | 148 | Alarm::Alarm() : alarm_(new internal::AlarmImpl()) {} |
Vijay Pai | 2b226ad | 2018-01-12 00:11:35 | [diff] [blame] | 149 | |
AJ Heller | 85189b2 | 2022-02-19 00:18:54 | [diff] [blame] | 150 | void Alarm::SetInternal(grpc::CompletionQueue* cq, gpr_timespec deadline, |
Nicolas "Pixel" Noble | d550af3 | 2018-12-12 03:09:42 | [diff] [blame] | 151 | void* tag) { |
Vijay Pai | e1e7042 | 2018-01-14 07:53:01 | [diff] [blame] | 152 | // Note that we know that alarm_ is actually an internal::AlarmImpl |
| 153 | // but we declared it as the base pointer to avoid a forward declaration |
| 154 | // or exposing core data structures in the C++ public headers. |
| 155 | // Thus it is safe to use a static_cast to the subclass here, and the |
| 156 | // C++ style guide allows us to do so in this case |
| 157 | static_cast<internal::AlarmImpl*>(alarm_)->Set(cq, deadline, tag); |
Vijay Pai | 2b226ad | 2018-01-12 00:11:35 | [diff] [blame] | 158 | } |
| 159 | |
Vijay Pai | db01bf7 | 2018-09-12 00:01:19 | [diff] [blame] | 160 | void Alarm::SetInternal(gpr_timespec deadline, std::function<void(bool)> f) { |
| 161 | // Note that we know that alarm_ is actually an internal::AlarmImpl |
| 162 | // but we declared it as the base pointer to avoid a forward declaration |
| 163 | // or exposing core data structures in the C++ public headers. |
| 164 | // Thus it is safe to use a static_cast to the subclass here, and the |
| 165 | // C++ style guide allows us to do so in this case |
| 166 | static_cast<internal::AlarmImpl*>(alarm_)->Set(deadline, std::move(f)); |
| 167 | } |
| 168 | |
Vijay Pai | 2b226ad | 2018-01-12 00:11:35 | [diff] [blame] | 169 | Alarm::~Alarm() { |
| 170 | if (alarm_ != nullptr) { |
Vijay Pai | e1e7042 | 2018-01-14 07:53:01 | [diff] [blame] | 171 | static_cast<internal::AlarmImpl*>(alarm_)->Destroy(); |
Vijay Pai | 2b226ad | 2018-01-12 00:11:35 | [diff] [blame] | 172 | } |
| 173 | } |
| 174 | |
Vijay Pai | e1e7042 | 2018-01-14 07:53:01 | [diff] [blame] | 175 | void Alarm::Cancel() { static_cast<internal::AlarmImpl*>(alarm_)->Cancel(); } |
Karthik Ravi Shankar | e549843 | 2020-10-06 17:13:02 | [diff] [blame] | 176 | } // namespace grpc |