Skip to content

Commit 01e1526

Browse files
removing async dep in server
1 parent 9d58ba2 commit 01e1526

File tree

3 files changed

+83
-85
lines changed

3 files changed

+83
-85
lines changed

src/server.ts

+31-33
Original file line numberDiff line numberDiff line change
@@ -8,41 +8,39 @@ import config from './config';
88
import cluster = require('cluster');
99
import app from './routes';
1010
import parseArgv from './utils/parseArgv';
11-
const async = require('async');
1211
let args = parseArgv([], ['DEBUG']);
1312

14-
async.series(
15-
[
16-
Storage.start.bind(Storage),
17-
Worker.start.bind(Worker),
18-
async (cb: CallbackType) => {
19-
let p2pServices = [] as Array<P2pService>;
20-
for (let chain of Object.keys(config.chains)) {
21-
for (let network of Object.keys(config.chains[chain])) {
22-
const chainConfig = config.chains[chain][network];
23-
const hasChainSource = chainConfig.chainSource !== undefined;
24-
if (!hasChainSource || chainConfig.chainSource === 'p2p') {
25-
let p2pServiceConfig = Object.assign(
26-
config.chains[chain][network],
27-
{ chain, network }
28-
);
29-
p2pServices.push(new P2pService(p2pServiceConfig));
30-
}
31-
}
13+
const startServices = async () => {
14+
await Storage.start({});
15+
await Worker.start();
16+
17+
// TODO this needs to move to a static p2pService method
18+
let p2pServices = [] as Array<P2pService>;
19+
for (let chain of Object.keys(config.chains)) {
20+
for (let network of Object.keys(config.chains[chain])) {
21+
const chainConfig = config.chains[chain][network];
22+
const hasChainSource = chainConfig.chainSource !== undefined;
23+
if (!hasChainSource || chainConfig.chainSource === 'p2p') {
24+
let p2pServiceConfig = Object.assign(
25+
config.chains[chain][network],
26+
{ chain, network }
27+
);
28+
p2pServices.push(new P2pService(p2pServiceConfig));
3229
}
33-
await Promise.all(p2pServices.map(p2pService => p2pService.start())).then(
34-
cb
35-
);
36-
}
37-
],
38-
function () {
39-
const shouldRunDebug = cluster.isMaster && args.DEBUG;
40-
const shouldRunCluster = cluster.isWorker && !args.DEBUG;
41-
if (shouldRunDebug || shouldRunCluster) {
42-
const server = app.listen(config.port, function () {
43-
logger.info(`API server started on port ${config.port}`);
44-
});
45-
server.timeout = 600000;
4630
}
4731
}
48-
);
32+
await Promise.all(p2pServices.map(p2pService => p2pService.start()));
33+
34+
// TODO this needs to move to an api service
35+
const shouldRunDebug = cluster.isMaster && args.DEBUG;
36+
const shouldRunCluster = cluster.isWorker && !args.DEBUG;
37+
if (shouldRunDebug || shouldRunCluster) {
38+
const server = app.listen(config.port, function () {
39+
logger.info(`API server started on port ${config.port}`);
40+
});
41+
// TODO this should be config driven
42+
server.timeout = 600000;
43+
}
44+
};
45+
46+
startServices();

src/services/storage.ts

+28-26
Original file line numberDiff line numberDiff line change
@@ -10,34 +10,36 @@ import "../models"
1010

1111
@LoggifyClass
1212
export class StorageService {
13-
start(ready: CallbackType, args: any) {
14-
let options = Object.assign({}, config, args);
15-
let { dbName, dbHost } = options;
16-
const connectUrl = `mongodb://${dbHost}/${dbName}?socketTimeoutMS=3600000&noDelay=true`;
17-
let attemptConnect = async () => {
18-
return mongoose.connect(connectUrl, {
19-
keepAlive: 1,
20-
poolSize: config.maxPoolSize,
21-
/*
22-
*nativeParser: true
23-
*/
24-
});
25-
};
26-
let attempted = 0;
27-
let attemptConnectId = setInterval(async () => {
28-
try {
29-
let data = await attemptConnect();
30-
clearInterval(attemptConnectId);
31-
ready(null, data);
32-
} catch (err) {
33-
logger.error(err);
34-
attempted++;
35-
if (attempted > 5) {
13+
start(args: any): Promise<any> {
14+
return new Promise((resolve, reject) => {
15+
let options = Object.assign({}, config, args);
16+
let { dbName, dbHost } = options;
17+
const connectUrl = `mongodb://${dbHost}/${dbName}?socketTimeoutMS=3600000&noDelay=true`;
18+
let attemptConnect = async () => {
19+
return mongoose.connect(connectUrl, {
20+
keepAlive: 1,
21+
poolSize: config.maxPoolSize,
22+
/*
23+
*nativeParser: true
24+
*/
25+
});
26+
};
27+
let attempted = 0;
28+
let attemptConnectId = setInterval(async () => {
29+
try {
30+
let data = await attemptConnect();
3631
clearInterval(attemptConnectId);
37-
ready(err);
32+
resolve(data);
33+
} catch (err) {
34+
logger.error(err);
35+
attempted++;
36+
if (attempted > 5) {
37+
clearInterval(attemptConnectId);
38+
reject(err);
39+
}
3840
}
39-
}
40-
}, 5000);
41+
}, 5000);
42+
});
4143
}
4244

4345
stop() {}

src/services/worker.ts

+24-26
Original file line numberDiff line numberDiff line change
@@ -5,38 +5,36 @@ import config from '../config';
55
import { LoggifyClass } from "../decorators/Loggify";
66
const cluster = require('cluster');
77
const { EventEmitter } = require('events');
8-
const async = require('async');
98

109
@LoggifyClass
1110
export class WorkerService extends EventEmitter {
12-
workers = new Array<{ worker: WorkerType; active: boolean }>();
11+
workers = new Array<{ worker: WorkerType; active: boolean, started: Promise<any> }>();
1312

14-
start(ready: CallbackType) {
15-
var self = this;
16-
if (cluster.isMaster) {
17-
logger.verbose(`Master ${process.pid} is running`);
18-
cluster.on('exit', function(worker: WorkerType) {
19-
logger.error(`worker ${worker.process.pid} died`);
20-
});
21-
async.times(
22-
config.numWorkers,
23-
function(n: any, cb: CallbackType) {
24-
var newWorker = cluster.fork();
25-
newWorker.on('message', function(msg: any) {
26-
self.emit(msg.id, msg);
13+
async start() {
14+
return new Promise(async resolve => {
15+
if (cluster.isMaster) {
16+
logger.verbose(`Master ${process.pid} is running`);
17+
cluster.on('exit', (worker: WorkerType) => {
18+
logger.error(`worker ${worker.process.pid} died`);
19+
});
20+
for (let worker = 0; worker < config.numWorkers; worker++) {
21+
let newWorker = cluster.fork();
22+
newWorker.on('message', (msg: any) => {
23+
this.emit(msg.id, msg);
24+
});
25+
let started = new Promise(resolve => {
26+
newWorker.on('listening', resolve);
2727
});
28-
self.workers.push({ worker: newWorker, active: false });
29-
setTimeout(cb, 3000);
30-
},
31-
function() {
32-
ready();
28+
this.workers.push({ worker: newWorker, active: false, started });
3329
}
34-
);
35-
}
36-
if (cluster.isWorker) {
37-
logger.verbose(`Worker ${process.pid} started`);
38-
setImmediate(ready);
39-
}
30+
await Promise.all(this.workers.map(worker => worker.started));
31+
resolve();
32+
}
33+
if (cluster.isWorker) {
34+
logger.verbose(`Worker ${process.pid} started`);
35+
resolve();
36+
}
37+
});
4038
}
4139

4240
stop() {}

0 commit comments

Comments
 (0)