@@ -95,64 +95,73 @@ void Platform::WorkerBody(void* arg) {
95
95
}
96
96
97
97
98
- TaskQueue::TaskQueue () {
99
- int err;
100
-
101
- for (size_t i = 0 ; i < ARRAY_SIZE (ring_); i += 1 )
102
- ring_[i] = nullptr ;
103
-
104
- read_off_ = 0 ;
105
- write_off_ = 0 ;
106
-
107
- err = uv_sem_init (&sem_, 0 );
108
- CHECK_EQ (err, 0 );
109
-
110
- err = uv_cond_init (&cond_);
111
- CHECK_EQ (err, 0 );
112
-
113
- err = uv_mutex_init (&mutex_);
114
- CHECK_EQ (err, 0 );
98
+ TaskQueue::TaskQueue () : read_off_(0 ), write_off_(0 ) {
99
+ CHECK_EQ (0 , uv_cond_init (&read_cond_));
100
+ CHECK_EQ (0 , uv_cond_init (&write_cond_));
101
+ CHECK_EQ (0 , uv_mutex_init (&mutex_));
115
102
}
116
103
117
104
118
105
TaskQueue::~TaskQueue () {
106
+ uv_mutex_lock (&mutex_);
119
107
CHECK_EQ (read_off_, write_off_);
120
- uv_sem_destroy (&sem_);
121
- uv_cond_destroy (&cond_);
108
+ uv_mutex_unlock (&mutex_);
109
+ uv_cond_destroy (&read_cond_);
110
+ uv_cond_destroy (&write_cond_);
122
111
uv_mutex_destroy (&mutex_);
123
112
}
124
113
125
114
126
115
void TaskQueue::Push (Task* task) {
127
116
uv_mutex_lock (&mutex_);
128
117
129
- // Wait for empty cell
130
- while (ring_[write_off_] != nullptr )
131
- uv_cond_wait (&cond_, &mutex_);
118
+ while (can_write () == false )
119
+ uv_cond_wait (&write_cond_, &mutex_); // Wait until there is a free slot.
132
120
133
121
ring_[write_off_] = task;
134
- write_off_++ ;
135
- write_off_ &= kRingMask ;
122
+ write_off_ = next (write_off_) ;
123
+ uv_cond_signal (&read_cond_) ;
136
124
uv_mutex_unlock (&mutex_);
137
-
138
- uv_sem_post (&sem_);
139
125
}
140
126
141
127
142
128
Task* TaskQueue::Shift () {
143
- uv_sem_wait (&sem_);
144
-
145
129
uv_mutex_lock (&mutex_);
146
- Task* task = ring_[read_off_];
147
- ring_[read_off_] = nullptr ;
148
- uv_cond_signal (&cond_);
149
130
150
- read_off_++;
151
- read_off_ &= kRingMask ;
131
+ while (can_read () == false )
132
+ uv_cond_wait (&read_cond_, &mutex_);
133
+
134
+ Task* task = ring_[read_off_];
135
+ if (can_write () == false )
136
+ uv_cond_signal (&write_cond_); // Signal waiters that we freed up a slot.
137
+ read_off_ = next (read_off_);
152
138
uv_mutex_unlock (&mutex_);
153
139
154
140
return task;
155
141
}
156
142
157
143
144
+ unsigned int TaskQueue::next (unsigned int n) {
145
+ return (n + 1 ) % ARRAY_SIZE (TaskQueue::ring_);
146
+ }
147
+
148
+
149
+ bool TaskQueue::can_read () const {
150
+ return read_off_ != write_off_;
151
+ }
152
+
153
+
154
+ // The read pointer chases the write pointer in the circular queue.
155
+ // This method checks that the write pointer hasn't advanced so much
156
+ // that it has gone full circle and caught up with the read pointer.
157
+ //
158
+ // can_write() returns false when there is an empty slot but the read pointer
159
+ // points to the first element and the write pointer to the last element.
160
+ // That should be rare enough that it is not worth the extra bookkeeping
161
+ // to work around that. It's not harmful either, just mildly inefficient.
162
+ bool TaskQueue::can_write () const {
163
+ return next (write_off_) != read_off_;
164
+ }
165
+
166
+
158
167
} // namespace node
0 commit comments