-
-
Notifications
You must be signed in to change notification settings - Fork 768
/
Copy pathdb-lock.ts
54 lines (49 loc) · 1.61 KB
/
db-lock.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import { Client } from 'pg';
import type { IDBOption } from '../types';
import type { Logger } from '../logger';
export const defaultLockKey = 479341;
export const defaultTimeout = 30 * 60000;
interface IDbLockOptions {
timeout: number;
lockKey: number;
logger: Logger;
}
const defaultOptions: IDbLockOptions = {
timeout: defaultTimeout,
lockKey: defaultLockKey,
logger: { ...console, fatal: console.error },
};
export const withDbLock =
(dbConfig: IDBOption, config = defaultOptions) =>
<A extends any[], R>(fn: (...args: A) => Promise<R>) =>
async (...args: A): Promise<R> => {
const client = new Client({
...dbConfig,
});
let lockAcquired = false;
try {
await client.connect();
// wait to obtain a lock
await client.query('SELECT pg_advisory_lock($1)', [config.lockKey]);
lockAcquired = true;
const promise = fn(...args);
const timeoutPromise = new Promise((_, reject) =>
setTimeout(
() => reject(new Error('Query read timeout')),
config.timeout,
),
);
const result = (await Promise.race([promise, timeoutPromise])) as R;
return result;
} catch (e) {
config.logger.error(`Locking error: ${e.message}`);
throw e;
} finally {
if (lockAcquired) {
await client.query('SELECT pg_advisory_unlock($1)', [
config.lockKey,
]);
}
await client.end();
}
};