Skip to content

Commit 995e18a

Browse files
committed
parallel
1 parent b75de9e commit 995e18a

File tree

7 files changed

+193
-12
lines changed

7 files changed

+193
-12
lines changed

lib/functions/parallel.d.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
import { Tasks } from "../types";
2+
export declare function parallel(tasks: Tasks, maxThreads?: number): Promise<any>;
3+
export declare function queue(tasks: Tasks): Promise<any>;

lib/functions/parallel.js

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,46 @@
1-
/*import {
2-
Tasks
3-
} from "../types";
4-
5-
export function parallel(tasks: Tasks, maxThreads: number = tasks.length): Promise<any> {
1+
"use strict";
2+
function promiseHandler(index, data) {
3+
this.results[index] = data;
4+
this.processes--;
5+
if (this.processes === 0 && index === this.tasks.length - 1) {
6+
this.resolve(this.results);
7+
}
8+
else {
9+
execute(this);
10+
}
11+
}
12+
function execute(scope) {
13+
if (scope.processes < scope.maxThreads && scope.pointer < scope.tasks.length) {
14+
var handler = promiseHandler.bind(scope, scope.pointer);
15+
scope.tasks[scope.pointer]().then(handler, handler);
16+
scope.pointer++;
17+
scope.processes++;
18+
}
19+
}
20+
function parallel(tasks, maxThreads) {
21+
if (maxThreads === void 0) { maxThreads = tasks.length; }
622
if (tasks.length === 0) {
723
return Promise.resolve();
824
}
9-
25+
var scope = {
26+
processes: 0,
27+
pointer: 0,
28+
tasks: tasks,
29+
maxThreads: maxThreads,
30+
results: [],
31+
resolve: null,
32+
reject: null
33+
};
1034
return new Promise(function (resolve, reject) {
11-
35+
for (var i = 0; i < maxThreads && i < tasks.length; i++) {
36+
scope.resolve = resolve;
37+
scope.reject = reject;
38+
execute(scope);
39+
}
1240
});
13-
}*/
41+
}
42+
exports.parallel = parallel;
43+
function queue(tasks) {
44+
return parallel(tasks, 1);
45+
}
46+
exports.queue = queue;

lib/prow.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ export { timeout } from "./functions/timeout";
44
export { waterfall } from "./functions/waterfall";
55
export { retry } from "./functions/retry";
66
export { times } from "./functions/times";
7+
export { parallel, queue } from "./functions/parallel";

lib/prow.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,6 @@ var retry_1 = require("./functions/retry");
1111
exports.retry = retry_1.retry;
1212
var times_1 = require("./functions/times");
1313
exports.times = times_1.times;
14+
var parallel_1 = require("./functions/parallel");
15+
exports.parallel = parallel_1.parallel;
16+
exports.queue = parallel_1.queue;

src/functions/parallel.ts

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,60 @@
1-
/*import {
1+
import {
22
Tasks
33
} from "../types";
44

5+
interface IScope {
6+
processes: number;
7+
pointer: number;
8+
tasks: Tasks;
9+
maxThreads: number;
10+
results: any[];
11+
resolve: any;
12+
reject: any;
13+
}
14+
15+
function promiseHandler (index: number, data: any) {
16+
this.results[index] = data;
17+
this.processes--;
18+
if (this.processes === 0 && index === this.tasks.length - 1) {
19+
this.resolve(this.results);
20+
} else {
21+
execute(this);
22+
}
23+
}
24+
25+
function execute(scope: IScope) {
26+
if (scope.processes < scope.maxThreads && scope.pointer < scope.tasks.length) {
27+
const handler = promiseHandler.bind(scope, scope.pointer);
28+
scope.tasks[scope.pointer]().then(handler, handler);
29+
scope.pointer++;
30+
scope.processes++;
31+
}
32+
}
33+
534
export function parallel(tasks: Tasks, maxThreads: number = tasks.length): Promise<any> {
635
if (tasks.length === 0) {
736
return Promise.resolve();
837
}
938

10-
return new Promise(function (resolve, reject) {
39+
const scope: IScope = {
40+
processes: 0,
41+
pointer: 0,
42+
tasks: tasks,
43+
maxThreads: maxThreads,
44+
results: [],
45+
resolve: null,
46+
reject: null
47+
};
1148

49+
return new Promise(function (resolve, reject) {
50+
for (let i = 0; i < maxThreads && i < tasks.length; i++) {
51+
scope.resolve = resolve;
52+
scope.reject = reject;
53+
execute(scope);
54+
}
1255
});
13-
}*/
56+
}
57+
58+
export function queue(tasks: Tasks): Promise<any> {
59+
return parallel(tasks, 1);
60+
}

src/prow.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ export {delay} from "./functions/delay";
66
export {timeout} from "./functions/timeout";
77
export {waterfall} from "./functions/waterfall";
88
export {retry} from "./functions/retry";
9-
export {times} from "./functions/times";
9+
export {times} from "./functions/times";
10+
export {parallel, queue} from "./functions/parallel";

test/parallel.js

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
const _ = require("lodash");
2+
const chai = require("chai");
3+
const chaiAsPromised = require("chai-as-promised");
4+
chai.use(chaiAsPromised);
5+
const {assert, expect} = chai;
6+
const prow = require("../lib/prow");
7+
8+
describe("Parallel", function () {
9+
it("queue", function () {
10+
const tasks = [];
11+
let counter = 0;
12+
for (let i = 0; i < 10; i++) {
13+
tasks.push(() => prow.delay(1).then(() => counter++));
14+
}
15+
16+
return assert.becomes(prow.queue(tasks), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
17+
});
18+
19+
it("parallel 1 thread", function () {
20+
const tasks = [];
21+
let counter = 0;
22+
for (let i = 0; i < 10; i++) {
23+
tasks.push(() => prow.delay(1).then(() => counter++));
24+
}
25+
26+
return assert.becomes(prow.parallel(tasks, 1), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
27+
});
28+
29+
it("parallel 2 threads", function () {
30+
const tasks = [];
31+
let counter = 0;
32+
for (let i = 0; i < 10; i++) {
33+
tasks.push(() => {
34+
counter++;
35+
return Promise.resolve().then(() => counter--)
36+
});
37+
}
38+
39+
return assert.becomes(prow.parallel(tasks, 2), [2, 1, 2, 1, 2, 1, 2, 1, 2, 1]);
40+
});
41+
42+
it("parallel 5 threads", function () {
43+
const tasks = [];
44+
let counter = 0;
45+
for (let i = 0; i < 10; i++) {
46+
tasks.push(() => {
47+
counter++;
48+
return Promise.resolve().then(() => counter--)
49+
});
50+
}
51+
52+
return assert.becomes(prow.parallel(tasks, 5), [5, 4, 3, 2, 1, 5, 4, 3, 2, 1]);
53+
});
54+
55+
it("parallel 7 threads", function () {
56+
const tasks = [];
57+
let counter = 0;
58+
for (let i = 0; i < 10; i++) {
59+
tasks.push(() => {
60+
counter++;
61+
return Promise.resolve().then(() => counter--)
62+
});
63+
}
64+
65+
return assert.becomes(prow.parallel(tasks, 7), [7, 6, 5, 4, 3, 2, 1, 3, 2, 1]);
66+
});
67+
68+
it("parallel 10 threads", function () {
69+
const tasks = [];
70+
let counter = 0;
71+
for (let i = 0; i < 10; i++) {
72+
tasks.push(() => {
73+
counter++;
74+
return Promise.resolve().then(() => counter--)
75+
});
76+
}
77+
78+
return assert.becomes(prow.parallel(tasks), [10, 9, 8, 7, 6, 5, 4, 3, 2, 1]);
79+
});
80+
81+
it("parallel 2 threads wit rejects", function () {
82+
const tasks = [];
83+
let counter = 0;
84+
for (let i = 0; i < 10; i++) {
85+
tasks.push(() => {
86+
counter++;
87+
return Promise.reject().catch(() => counter--)
88+
});
89+
}
90+
91+
return assert.becomes(prow.parallel(tasks, 2), [2, 1, 2, 1, 2, 1, 2, 1, 2, 1]);
92+
});
93+
});

0 commit comments

Comments
 (0)