@@ -25,7 +25,14 @@ static void channel_operation_timeout(swTimer *timer, swTimer_node *tnode)
25
25
timeout_msg_t *msg = (timeout_msg_t *) tnode->data ;
26
26
msg->error = true ;
27
27
msg->timer = nullptr ;
28
- msg->chan ->remove (msg->co );
28
+ if (msg->type == CONSUMER)
29
+ {
30
+ msg->chan ->consumer_remove (msg->co );
31
+ }
32
+ else
33
+ {
34
+ msg->chan ->producer_remove (msg->co );
35
+ }
29
36
msg->co ->resume ();
30
37
}
31
38
@@ -61,7 +68,7 @@ void* Channel::pop(double timeout)
61
68
{
62
69
return nullptr ;
63
70
}
64
- if (is_empty () || consumer_queue.size () > 0 )
71
+ if (is_empty () || ! consumer_queue.empty () )
65
72
{
66
73
timeout_msg_t msg;
67
74
msg.error = false ;
@@ -70,6 +77,7 @@ void* Channel::pop(double timeout)
70
77
{
71
78
int msec = (int ) (timeout * 1000 );
72
79
msg.chan = this ;
80
+ msg.type = CONSUMER;
73
81
msg.co = coroutine_get_current ();
74
82
msg.timer = swTimer_add (&SwooleG.timer , msec, 0 , &msg, channel_operation_timeout);
75
83
}
@@ -93,7 +101,7 @@ void* Channel::pop(double timeout)
93
101
/* *
94
102
* notify producer
95
103
*/
96
- if (producer_queue.size () > 0 )
104
+ if (! producer_queue.empty () )
97
105
{
98
106
Coroutine *co = pop_coroutine (PRODUCER);
99
107
co->resume ();
@@ -107,7 +115,7 @@ bool Channel::push(void *data, double timeout)
107
115
{
108
116
return false ;
109
117
}
110
- if (is_full () || producer_queue.size () > 0 )
118
+ if (is_full () || ! producer_queue.empty () )
111
119
{
112
120
timeout_msg_t msg;
113
121
msg.error = false ;
@@ -116,6 +124,7 @@ bool Channel::push(void *data, double timeout)
116
124
{
117
125
int msec = (int ) (timeout * 1000 );
118
126
msg.chan = this ;
127
+ msg.type = PRODUCER;
119
128
msg.co = coroutine_get_current ();
120
129
msg.timer = swTimer_add (&SwooleG.timer , msec, 0 , &msg, channel_operation_timeout);
121
130
}
@@ -139,7 +148,7 @@ bool Channel::push(void *data, double timeout)
139
148
/* *
140
149
* notify consumer
141
150
*/
142
- if (consumer_queue.size () > 0 )
151
+ if (! consumer_queue.empty () )
143
152
{
144
153
Coroutine *co = pop_coroutine (CONSUMER);
145
154
co->resume ();
@@ -155,12 +164,12 @@ bool Channel::close()
155
164
}
156
165
swTraceLog (SW_TRACE_CHANNEL, " channel closed" );
157
166
closed = true ;
158
- while (producer_queue.size () > 0 )
167
+ while (! producer_queue.empty () )
159
168
{
160
169
Coroutine *co = pop_coroutine (PRODUCER);
161
170
co->resume ();
162
171
}
163
- while (consumer_queue.size () > 0 )
172
+ while (! consumer_queue.empty () )
164
173
{
165
174
Coroutine *co = pop_coroutine (CONSUMER);
166
175
co->resume ();
0 commit comments