Skip to content

Commit f1b1db6

Browse files
authored
Support cancel channel and curl operation (swoole#4249)
* Support cancel channel/curl * fix tests * fix tests * optimize code, fix curl_error * add core tests
1 parent f7ef45b commit f1b1db6

23 files changed

+545
-83
lines changed

core-tests/src/coroutine/base.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "test_coroutine.h"
22

33
using namespace swoole;
4+
using swoole::coroutine::System;
45

56
TEST(coroutine_base, create) {
67
long _cid;
@@ -178,3 +179,23 @@ TEST(coroutine_base, run) {
178179
});
179180
ASSERT_GE(cid, 1);
180181
}
182+
183+
TEST(coroutine_base, cancel) {
184+
coroutine::run([](void *arg) {
185+
auto co = Coroutine::get_current_safe();
186+
Coroutine::create([co](void *){
187+
System::sleep(0.002);
188+
co->cancel();
189+
});
190+
ASSERT_EQ(co->yield_ex(-1), false);
191+
ASSERT_EQ(co->is_canceled(), true);
192+
});
193+
}
194+
195+
TEST(coroutine_base, timeout) {
196+
coroutine::run([](void *arg) {
197+
auto co = Coroutine::get_current_safe();
198+
ASSERT_EQ(co->yield_ex(0.005), false);
199+
ASSERT_EQ(co->is_timedout(), true);
200+
});
201+
}

ext-src/php_swoole.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -660,6 +660,7 @@ PHP_MINIT_FUNCTION(swoole) {
660660
SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_CO_CANNOT_CANCEL", SW_ERROR_CO_CANNOT_CANCEL);
661661
SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_CO_NOT_EXISTS", SW_ERROR_CO_NOT_EXISTS);
662662
SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_CO_CANCELED", SW_ERROR_CO_CANCELED);
663+
SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_CO_TIMEDOUT", SW_ERROR_CO_TIMEDOUT);
663664

664665
/**
665666
* trace log

ext-src/php_swoole_curl.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,17 +46,18 @@ struct Handle {
4646

4747
struct Selector {
4848
bool defer_callback = false;
49+
bool timer_callback = false;
4950
std::set<Handle *> active_handles;
5051
};
5152

5253
class Multi {
5354
CURLM *multi_handle_;
5455
TimerNode *timer = nullptr;
55-
bool timedout = false;
5656
long timeout_ms_ = 0;
5757
Coroutine *co = nullptr;
5858
int running_handles_ = 0;
5959
int last_sockfd;
60+
int event_count_ = 0;
6061
std::unique_ptr<Selector> selector;
6162

6263
CURLcode read_info();
@@ -144,7 +145,7 @@ class Multi {
144145
}
145146

146147
CURLcode exec(php_curl *ch);
147-
long select(php_curlm *mh);
148+
long select(php_curlm *mh, double timeout = -1);
148149
void callback(Handle *handle, int event_bitmask);
149150

150151
static int cb_readable(Reactor *reactor, Event *event);

ext-src/swoole_channel_coro.cc

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,6 @@ static const zend_function_entry swoole_channel_coro_methods[] =
7373
};
7474
// clang-format on
7575

76-
enum swChannelErrorCode {
77-
SW_CHANNEL_OK = 0,
78-
SW_CHANNEL_TIMEOUT = -1,
79-
SW_CHANNEL_CLOSED = -2,
80-
};
81-
8276
static sw_inline ChannelObject *php_swoole_channel_coro_fetch_object(zend_object *obj) {
8377
return (ChannelObject *) ((char *) obj - swoole_channel_coro_handlers.offset);
8478
}
@@ -142,9 +136,10 @@ void php_swoole_channel_coro_minit(int module_number) {
142136
zend_declare_property_long(swoole_channel_coro_ce, ZEND_STRL("capacity"), 0, ZEND_ACC_PUBLIC);
143137
zend_declare_property_long(swoole_channel_coro_ce, ZEND_STRL("errCode"), 0, ZEND_ACC_PUBLIC);
144138

145-
SW_REGISTER_LONG_CONSTANT("SWOOLE_CHANNEL_OK", SW_CHANNEL_OK);
146-
SW_REGISTER_LONG_CONSTANT("SWOOLE_CHANNEL_TIMEOUT", SW_CHANNEL_TIMEOUT);
147-
SW_REGISTER_LONG_CONSTANT("SWOOLE_CHANNEL_CLOSED", SW_CHANNEL_CLOSED);
139+
SW_REGISTER_LONG_CONSTANT("SWOOLE_CHANNEL_OK", Channel::ERROR_OK);
140+
SW_REGISTER_LONG_CONSTANT("SWOOLE_CHANNEL_TIMEOUT", Channel::ERROR_TIMEOUT);
141+
SW_REGISTER_LONG_CONSTANT("SWOOLE_CHANNEL_CLOSED", Channel::ERROR_CLOSED);
142+
SW_REGISTER_LONG_CONSTANT("SWOOLE_CHANNEL_CANCELED", Channel::ERROR_CANCELED);
148143
}
149144

150145
static PHP_METHOD(swoole_channel_coro, __construct) {
@@ -178,13 +173,12 @@ static PHP_METHOD(swoole_channel_coro, push) {
178173
Z_TRY_ADDREF_P(zdata);
179174
zdata = sw_zval_dup(zdata);
180175
if (chan->push(zdata, timeout)) {
181-
zend_update_property_long(swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("errCode"), SW_CHANNEL_OK);
176+
zend_update_property_long(
177+
swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("errCode"), Channel::ERROR_OK);
182178
RETURN_TRUE;
183179
} else {
184-
zend_update_property_long(swoole_channel_coro_ce,
185-
SW_Z8_OBJ_P(ZEND_THIS),
186-
ZEND_STRL("errCode"),
187-
chan->is_closed() ? SW_CHANNEL_CLOSED : SW_CHANNEL_TIMEOUT);
180+
zend_update_property_long(
181+
swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("errCode"), chan->get_error());
188182
Z_TRY_DELREF_P(zdata);
189183
efree(zdata);
190184
RETURN_FALSE;
@@ -204,12 +198,11 @@ static PHP_METHOD(swoole_channel_coro, pop) {
204198
if (zdata) {
205199
RETVAL_ZVAL(zdata, 0, 0);
206200
efree(zdata);
207-
zend_update_property_long(swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("errCode"), SW_CHANNEL_OK);
201+
zend_update_property_long(
202+
swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("errCode"), Channel::ERROR_OK);
208203
} else {
209-
zend_update_property_long(swoole_channel_coro_ce,
210-
SW_Z8_OBJ_P(ZEND_THIS),
211-
ZEND_STRL("errCode"),
212-
chan->is_closed() ? SW_CHANNEL_CLOSED : SW_CHANNEL_TIMEOUT);
204+
zend_update_property_long(
205+
swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("errCode"), chan->get_error());
213206
RETURN_FALSE;
214207
}
215208
}
@@ -241,4 +234,3 @@ static PHP_METHOD(swoole_channel_coro, stats) {
241234
add_assoc_long_ex(return_value, ZEND_STRL("producer_num"), chan->producer_num());
242235
add_assoc_long_ex(return_value, ZEND_STRL("queue_num"), chan->length());
243236
}
244-

ext-src/swoole_curl.cc

Lines changed: 52 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ Socket *Multi::create_socket(CURL *cp, curl_socket_t sockfd) {
8484
void Multi::del_event(CURL *cp, void *socket_ptr, curl_socket_t sockfd) {
8585
Socket *socket = (Socket *) socket_ptr;
8686
socket->silent_remove = 1;
87-
if (socket->events && swoole_event_is_available()) {
88-
swoole_event_del(socket);
87+
if (socket->events && swoole_event_is_available() && swoole_event_del(socket) == SW_OK) {
88+
event_count_--;
8989
}
9090
socket->fd = -1;
9191
socket->free();
@@ -111,12 +111,15 @@ void Multi::set_event(CURL *cp, void *socket_ptr, curl_socket_t sockfd, int acti
111111
if (socket->events) {
112112
swoole_event_set(socket, events);
113113
} else {
114-
swoole_event_add(socket, events);
114+
if (swoole_event_add(socket, events) == SW_OK) {
115+
event_count_++;
116+
}
115117
}
116118
Handle *handle = get_handle(cp);
117119
handle->action = action;
118120

119-
swTraceLog(SW_TRACE_CO_CURL, SW_ECHO_GREEN " handle=%p, curl=%p, fd=%d, events=%d", "[ADD]", handle, cp, sockfd, events);
121+
swTraceLog(
122+
SW_TRACE_CO_CURL, SW_ECHO_GREEN " handle=%p, curl=%p, fd=%d, events=%d", "[ADD]", handle, cp, sockfd, events);
120123
}
121124

122125
CURLMcode Multi::add_handle(CURL *cp) {
@@ -156,14 +159,23 @@ CURLcode Multi::exec(php_curl *ch) {
156159

157160
Handle *handle = get_handle(ch->cp);
158161

162+
Coroutine::CancelFunc cancel_fn = [](Coroutine *co) {
163+
co->resume();
164+
return true;
165+
};
166+
159167
SW_LOOP {
160-
co->yield();
168+
co->yield(&cancel_fn);
169+
if (co->is_canceled()) {
170+
swoole_set_last_error(SW_ERROR_CO_CANCELED);
171+
break;
172+
}
161173
int sockfd = last_sockfd;
162174
int bitmask = 0;
163175
if (sockfd >= 0) {
164176
bitmask = handle->event_bitmask;
165-
if (handle->socket && !handle->socket->removed) {
166-
swoole_event_del(handle->socket);
177+
if (handle->socket && !handle->socket->removed && swoole_event_del(handle->socket) == SW_OK) {
178+
event_count_--;
167179
}
168180
}
169181
del_timer();
@@ -173,7 +185,9 @@ CURLcode Multi::exec(php_curl *ch) {
173185
}
174186
set_timer();
175187
if (sockfd >= 0 && handle->socket && handle->socket->removed) {
176-
swoole_event_add(handle->socket, get_event(handle->action));
188+
if (swoole_event_add(handle->socket, get_event(handle->action)) == SW_OK) {
189+
event_count_++;
190+
}
177191
}
178192
if (!timer && handle->socket->removed) {
179193
break;
@@ -182,7 +196,7 @@ CURLcode Multi::exec(php_curl *ch) {
182196

183197
CURLcode retval = read_info();
184198
remove_handle(ch->cp);
185-
return retval;
199+
return co->is_canceled() ? CURLE_ABORTED_BY_CALLBACK : retval;
186200
}
187201

188202
CURLcode Multi::read_info() {
@@ -222,7 +236,7 @@ int Multi::handle_timeout(CURLM *mh, long timeout_ms, void *userp) {
222236
return 0;
223237
}
224238

225-
long Multi::select(php_curlm *mh) {
239+
long Multi::select(php_curlm *mh, double timeout) {
226240
co = check_bound_co();
227241
if (zend_llist_count(&mh->easyh) == 0) {
228242
return 0;
@@ -231,7 +245,6 @@ long Multi::select(php_curlm *mh) {
231245
co = nullptr;
232246
};
233247

234-
int event_count = 0;
235248
for (zend_llist_element *element = mh->easyh.head; element; element = element->next) {
236249
zval *z_ch = (zval *) element->data;
237250
php_curl *ch;
@@ -241,18 +254,20 @@ long Multi::select(php_curlm *mh) {
241254
Handle *handle = get_handle(ch->cp);
242255
if (handle && handle->socket && handle->socket->removed) {
243256
if (swoole_event_add(handle->socket, get_event(handle->action)) == SW_OK) {
244-
event_count++;
257+
event_count_++;
245258
}
246259
swTraceLog(SW_TRACE_CO_CURL, "resume, handle=%p, curl=%p, fd=%d", handle, ch->cp, handle->socket->get_fd());
247260
}
248261
}
249262
set_timer();
250263

251264
// no events and timers, should not be suspended
252-
if (!timer && event_count == 0) {
265+
if (!timer && event_count_ == 0) {
253266
return 0;
254267
}
255-
co->yield();
268+
269+
co->yield_ex(timeout);
270+
256271
auto count = selector->active_handles.size();
257272

258273
for (zend_llist_element *element = mh->easyh.head; element; element = element->next) {
@@ -262,46 +277,56 @@ long Multi::select(php_curlm *mh) {
262277
continue;
263278
}
264279
Handle *handle = get_handle(ch->cp);
265-
if (handle && handle->socket && !handle->socket->removed) {
266-
swTraceLog(SW_TRACE_CO_CURL, "suspend, handle=%p, curl=%p, fd=%d", handle, ch->cp, handle->socket->get_fd());
267-
swoole_event_del(handle->socket);
280+
if (handle && handle->socket && !handle->socket->removed && swoole_event_del(handle->socket) == SW_OK) {
281+
swTraceLog(
282+
SW_TRACE_CO_CURL, "suspend, handle=%p, curl=%p, fd=%d", handle, ch->cp, handle->socket->get_fd());
283+
event_count_--;
268284
}
269285
}
270286
del_timer();
271287

288+
if (selector->timer_callback) {
289+
selector->timer_callback = false;
290+
curl_multi_socket_action(multi_handle_, -1, 0, &running_handles_);
291+
swTraceLog(SW_TRACE_CO_CURL, "socket_action[timer], running_handles=%d", running_handles_);
292+
}
293+
272294
for (auto iter = selector->active_handles.begin(); iter != selector->active_handles.end(); iter++) {
273295
Handle *handle = *iter;
274-
int bitmask = 0;
275-
int sockfd = -1;
276-
if (handle) {
277-
bitmask = handle->event_bitmask;
278-
sockfd = handle->event_fd;
279-
}
280-
curl_multi_socket_action(multi_handle_, sockfd, bitmask, &running_handles_);
281-
swTraceLog(SW_TRACE_CO_CURL, "socket_action, running_handles=%d", running_handles_);
296+
curl_multi_socket_action(multi_handle_, handle->event_fd, handle->event_bitmask, &running_handles_);
297+
swTraceLog(SW_TRACE_CO_CURL, "socket_action[socket], running_handles=%d", running_handles_);
282298
}
299+
283300
selector->active_handles.clear();
284301

285302
return count;
286303
}
287304

288305
void Multi::callback(Handle *handle, int event_bitmask) {
306+
swTraceLog(SW_TRACE_CO_CURL, "callback, handle=%p, event_bitmask=%d", handle, event_bitmask);
289307
if (handle) {
290308
last_sockfd = handle->event_fd;
291309
} else {
292310
last_sockfd = -1;
293311
}
294312
// for curl_multi_select
295313
if (selector.get()) {
314+
if (!handle) {
315+
selector->timer_callback = true;
316+
}
296317
if (!co) {
297318
if (handle) {
298-
swoole_event_del(handle->socket);
319+
if (swoole_event_del(handle->socket) == SW_OK) {
320+
event_count_--;
321+
}
299322
} else {
300323
del_timer();
301324
}
302325
return;
303326
}
304-
selector->active_handles.insert(handle);
327+
if (handle) {
328+
selector->active_handles.insert(handle);
329+
}
305330
if (selector->defer_callback) {
306331
return;
307332
}

include/swoole_coroutine.h

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,6 @@
3535
namespace swoole {
3636
class Coroutine {
3737
public:
38-
void resume();
39-
void yield();
40-
bool cancel();
41-
42-
void resume_naked();
43-
void yield_naked();
44-
4538
constexpr static int STACK_ALIGNED_SIZE = (4 * 1024);
4639
constexpr static int MIN_STACK_SIZE = (64 * 1024);
4740
constexpr static int MAX_STACK_SIZE = (16 * 1024 * 1024);
@@ -60,16 +53,24 @@ class Coroutine {
6053
ERR_INVALID = -2,
6154
};
6255

56+
enum ResumeCode {
57+
RC_OK = 0,
58+
RC_TIMEDOUT = -1,
59+
RC_CANCELED = -2,
60+
};
61+
6362
typedef void (*SwapCallback)(void *);
6463
typedef void (*BailoutCallback)();
6564
typedef std::function<bool(swoole::Coroutine*)> CancelFunc;
6665

67-
void yield(CancelFunc *cancel_fn) {
68-
set_cancel_fn(cancel_fn);
69-
canceled_ = false;
70-
yield();
71-
set_cancel_fn(nullptr);
72-
}
66+
void resume();
67+
void yield();
68+
void yield(CancelFunc *cancel_fn);
69+
bool cancel();
70+
71+
void resume_naked();
72+
void yield_naked();
73+
bool yield_ex(double timeout = -1);
7374

7475
inline enum State get_state() {
7576
return state;
@@ -100,7 +101,11 @@ class Coroutine {
100101
}
101102

102103
bool is_canceled() {
103-
return canceled_;
104+
return resume_code_ == RC_CANCELED;
105+
}
106+
107+
bool is_timedout() {
108+
return resume_code_ == RC_TIMEDOUT;
104109
}
105110

106111
inline void set_task(void *_task) {
@@ -203,13 +208,13 @@ class Coroutine {
203208
static bool activated;
204209

205210
enum State state = STATE_INIT;
211+
enum ResumeCode resume_code_ = RC_OK;
206212
long cid;
207213
long init_msec = Timer::get_absolute_msec();
208214
void *task = nullptr;
209215
coroutine::Context ctx;
210216
Coroutine *origin = nullptr;
211217
CancelFunc *cancel_fn_ = nullptr;
212-
bool canceled_ = false;
213218

214219
Coroutine(const CoroutineFunc &fn, void *private_data) : ctx(stack_size, fn, private_data) {
215220
cid = ++last_cid;

0 commit comments

Comments
 (0)