Skip to content

Commit 1372364

Browse files
authored
Merge pull request swoole#2226 from windrunner414/master
fix channel->push timeout bug
2 parents 2ed933c + 58c6f98 commit 1372364

File tree

2 files changed

+23
-8
lines changed

2 files changed

+23
-8
lines changed

include/channel.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ struct notify_msg_t
2828
struct timeout_msg_t
2929
{
3030
Channel *chan;
31+
enum channel_op type;
3132
Coroutine *co;
3233
bool error;
3334
swTimer_node *timer;
@@ -68,11 +69,16 @@ class Channel
6869
return producer_queue.size();
6970
}
7071

71-
inline void remove(Coroutine *co)
72+
inline void consumer_remove(Coroutine *co)
7273
{
7374
consumer_queue.remove(co);
7475
}
7576

77+
inline void producer_remove(Coroutine *co)
78+
{
79+
producer_queue.remove(co);
80+
}
81+
7682
/**
7783
* No coroutine scheduling
7884
*/

src/coroutine/channel.cc

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,14 @@ static void channel_operation_timeout(swTimer *timer, swTimer_node *tnode)
2525
timeout_msg_t *msg = (timeout_msg_t *) tnode->data;
2626
msg->error = true;
2727
msg->timer = nullptr;
28-
msg->chan->remove(msg->co);
28+
if (msg->type == CONSUMER)
29+
{
30+
msg->chan->consumer_remove(msg->co);
31+
}
32+
else
33+
{
34+
msg->chan->producer_remove(msg->co);
35+
}
2936
msg->co->resume();
3037
}
3138

@@ -61,7 +68,7 @@ void* Channel::pop(double timeout)
6168
{
6269
return nullptr;
6370
}
64-
if (is_empty() || consumer_queue.size() > 0)
71+
if (is_empty() || !consumer_queue.empty())
6572
{
6673
timeout_msg_t msg;
6774
msg.error = false;
@@ -70,6 +77,7 @@ void* Channel::pop(double timeout)
7077
{
7178
int msec = (int) (timeout * 1000);
7279
msg.chan = this;
80+
msg.type = CONSUMER;
7381
msg.co = coroutine_get_current();
7482
msg.timer = swTimer_add(&SwooleG.timer, msec, 0, &msg, channel_operation_timeout);
7583
}
@@ -93,7 +101,7 @@ void* Channel::pop(double timeout)
93101
/**
94102
* notify producer
95103
*/
96-
if (producer_queue.size() > 0)
104+
if (!producer_queue.empty())
97105
{
98106
Coroutine *co = pop_coroutine(PRODUCER);
99107
co->resume();
@@ -107,7 +115,7 @@ bool Channel::push(void *data, double timeout)
107115
{
108116
return false;
109117
}
110-
if (is_full() || producer_queue.size() > 0)
118+
if (is_full() || !producer_queue.empty())
111119
{
112120
timeout_msg_t msg;
113121
msg.error = false;
@@ -116,6 +124,7 @@ bool Channel::push(void *data, double timeout)
116124
{
117125
int msec = (int) (timeout * 1000);
118126
msg.chan = this;
127+
msg.type = PRODUCER;
119128
msg.co = coroutine_get_current();
120129
msg.timer = swTimer_add(&SwooleG.timer, msec, 0, &msg, channel_operation_timeout);
121130
}
@@ -139,7 +148,7 @@ bool Channel::push(void *data, double timeout)
139148
/**
140149
* notify consumer
141150
*/
142-
if (consumer_queue.size() > 0)
151+
if (!consumer_queue.empty())
143152
{
144153
Coroutine *co = pop_coroutine(CONSUMER);
145154
co->resume();
@@ -155,12 +164,12 @@ bool Channel::close()
155164
}
156165
swTraceLog(SW_TRACE_CHANNEL, "channel closed");
157166
closed = true;
158-
while (producer_queue.size() > 0)
167+
while (!producer_queue.empty())
159168
{
160169
Coroutine *co = pop_coroutine(PRODUCER);
161170
co->resume();
162171
}
163-
while (consumer_queue.size() > 0)
172+
while (!consumer_queue.empty())
164173
{
165174
Coroutine *co = pop_coroutine(CONSUMER);
166175
co->resume();

0 commit comments

Comments
 (0)