-
-
Notifications
You must be signed in to change notification settings - Fork 768
/
Copy pathmetrics-gauge.ts
116 lines (106 loc) · 3.72 KB
/
metrics-gauge.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import type { Logger } from './logger';
import type { IUnleashConfig } from './types';
import { createGauge, type Gauge } from './util/metrics';
type Query<R> = () => Promise<R | undefined | null>;
type MetricValue<L extends string> = {
value: number;
labels?: Record<L, string | number>;
};
type MapResult<R, L extends string> = (
result: R,
) => MetricValue<L> | MetricValue<L>[];
type GaugeDefinition<T, L extends string> = {
name: string;
help: string;
labelNames?: L[];
query: Query<T>;
map: MapResult<T, L>;
};
type Task = () => Promise<void>;
interface GaugeUpdater {
target: Gauge<string>;
task: Task;
}
export class DbMetricsMonitor {
private updaters: Map<string, GaugeUpdater> = new Map();
private log: Logger;
constructor({ getLogger }: Pick<IUnleashConfig, 'getLogger'>) {
this.log = getLogger('gauge-metrics');
}
private asArray<T>(value: T | T[]): T[] {
return Array.isArray(value) ? value : [value];
}
private async fetch<T, L extends string>(
definition: GaugeDefinition<T, L>,
): Promise<MetricValue<L>[]> {
const result = await definition.query();
if (
result !== undefined &&
result !== null &&
(!Array.isArray(result) || result.length > 0)
) {
const resultArray = this.asArray(definition.map(result));
resultArray
.filter((r) => typeof r.value !== 'number')
.forEach((r) => {
this.log.debug(
`Invalid value for ${definition.name}: ${r.value}. Value must be an number.`,
);
});
return resultArray.filter((r) => typeof r.value === 'number');
}
return [];
}
registerGaugeDbMetric<T, L extends string>(
definition: GaugeDefinition<T, L>,
): Task {
const gauge = createGauge(definition);
const task = async () => {
try {
const results = await this.fetch(definition);
if (results.length > 0) {
gauge.reset();
for (const r of results) {
// when r.value is zero, we are writing a zero value to the gauge which might not be what we want in some cases
if (r.labels) {
gauge.labels(r.labels).set(r.value);
} else {
gauge.set(r.value);
}
}
}
} catch (e) {
this.log.warn(`Failed to refresh ${definition.name}`, e);
}
};
this.updaters.set(definition.name, { target: gauge, task });
return task;
}
refreshMetrics = async () => {
const tasks = Array.from(this.updaters.entries()).map(
([name, updater]) => ({ name, task: updater.task }),
);
for (const { name, task } of tasks) {
this.log.debug(`Refreshing metric ${name}`);
await task();
}
};
async findValue(
name: string,
labels?: Record<string, string | number>,
): Promise<number | undefined> {
const gauge = await this.updaters.get(name)?.target.gauge?.get();
if (gauge && gauge.values.length > 0) {
const values = labels
? gauge.values.filter(({ labels: l }) => {
return Object.entries(labels).every(
([key, value]) => l[key] === value,
);
})
: gauge.values;
// return first value
return values.map(({ value }) => value).shift();
}
return undefined;
}
}