Skip to content

Commit 3543c55

Browse files
committed
src: simplify v8 thread pool implementation
This commit drops the semaphore in exchange for a second condition variable and makes the task ring an array member instead of allocating it on the heap. That in turn makes size calculations a little easier because of the array's fixed size. PR-URL: node-forward/node#34 Reviewed-By: Fedor Indutny <fedor@indutny.com>
1 parent 78e38f5 commit 3543c55

File tree

2 files changed

+48
-42
lines changed

2 files changed

+48
-42
lines changed

src/node_v8_platform.cc

Lines changed: 42 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -95,64 +95,73 @@ void Platform::WorkerBody(void* arg) {
9595
}
9696

9797

98-
TaskQueue::TaskQueue() {
99-
int err;
100-
101-
for (size_t i = 0; i < ARRAY_SIZE(ring_); i += 1)
102-
ring_[i] = nullptr;
103-
104-
read_off_ = 0;
105-
write_off_ = 0;
106-
107-
err = uv_sem_init(&sem_, 0);
108-
CHECK_EQ(err, 0);
109-
110-
err = uv_cond_init(&cond_);
111-
CHECK_EQ(err, 0);
112-
113-
err = uv_mutex_init(&mutex_);
114-
CHECK_EQ(err, 0);
98+
TaskQueue::TaskQueue() : read_off_(0), write_off_(0) {
99+
CHECK_EQ(0, uv_cond_init(&read_cond_));
100+
CHECK_EQ(0, uv_cond_init(&write_cond_));
101+
CHECK_EQ(0, uv_mutex_init(&mutex_));
115102
}
116103

117104

118105
TaskQueue::~TaskQueue() {
106+
uv_mutex_lock(&mutex_);
119107
CHECK_EQ(read_off_, write_off_);
120-
uv_sem_destroy(&sem_);
121-
uv_cond_destroy(&cond_);
108+
uv_mutex_unlock(&mutex_);
109+
uv_cond_destroy(&read_cond_);
110+
uv_cond_destroy(&write_cond_);
122111
uv_mutex_destroy(&mutex_);
123112
}
124113

125114

126115
void TaskQueue::Push(Task* task) {
127116
uv_mutex_lock(&mutex_);
128117

129-
// Wait for empty cell
130-
while (ring_[write_off_] != nullptr)
131-
uv_cond_wait(&cond_, &mutex_);
118+
while (can_write() == false)
119+
uv_cond_wait(&write_cond_, &mutex_); // Wait until there is a free slot.
132120

133121
ring_[write_off_] = task;
134-
write_off_++;
135-
write_off_ &= kRingMask;
122+
write_off_ = next(write_off_);
123+
uv_cond_signal(&read_cond_);
136124
uv_mutex_unlock(&mutex_);
137-
138-
uv_sem_post(&sem_);
139125
}
140126

141127

142128
Task* TaskQueue::Shift() {
143-
uv_sem_wait(&sem_);
144-
145129
uv_mutex_lock(&mutex_);
146-
Task* task = ring_[read_off_];
147-
ring_[read_off_] = nullptr;
148-
uv_cond_signal(&cond_);
149130

150-
read_off_++;
151-
read_off_ &= kRingMask;
131+
while (can_read() == false)
132+
uv_cond_wait(&read_cond_, &mutex_);
133+
134+
Task* task = ring_[read_off_];
135+
if (can_write() == false)
136+
uv_cond_signal(&write_cond_); // Signal waiters that we freed up a slot.
137+
read_off_ = next(read_off_);
152138
uv_mutex_unlock(&mutex_);
153139

154140
return task;
155141
}
156142

157143

144+
unsigned int TaskQueue::next(unsigned int n) {
145+
return (n + 1) % ARRAY_SIZE(TaskQueue::ring_);
146+
}
147+
148+
149+
bool TaskQueue::can_read() const {
150+
return read_off_ != write_off_;
151+
}
152+
153+
154+
// The read pointer chases the write pointer in the circular queue.
155+
// This method checks that the write pointer hasn't advanced so much
156+
// that it has gone full circle and caught up with the read pointer.
157+
//
158+
// can_write() returns false when there is an empty slot but the read pointer
159+
// points to the first element and the write pointer to the last element.
160+
// That should be rare enough that it is not worth the extra bookkeeping
161+
// to work around that. It's not harmful either, just mildly inefficient.
162+
bool TaskQueue::can_write() const {
163+
return next(write_off_) != read_off_;
164+
}
165+
166+
158167
} // namespace node

src/node_v8_platform.h

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,15 @@ class TaskQueue {
3636
v8::Task* Shift();
3737

3838
private:
39-
static const unsigned int kRingSize = 1024;
40-
static const unsigned int kRingMask = kRingSize - 1;
41-
42-
static_assert(kRingSize == (kRingSize & ~kRingMask),
43-
"kRingSize is not a power of two");
44-
45-
uv_sem_t sem_;
46-
uv_cond_t cond_;
39+
static unsigned int next(unsigned int n);
40+
bool can_read() const;
41+
bool can_write() const;
42+
uv_cond_t read_cond_;
43+
uv_cond_t write_cond_;
4744
uv_mutex_t mutex_;
4845
unsigned int read_off_;
4946
unsigned int write_off_;
50-
v8::Task* ring_[kRingSize];
47+
v8::Task* ring_[1024];
5148
};
5249

5350
class Platform : public v8::Platform {

0 commit comments

Comments
 (0)