mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-02-14 21:27:20 +00:00
refactor(server): simplify metrics creation and usage (#5115)
This commit is contained in:
@@ -1,76 +1,129 @@
|
||||
import opentelemetry, { Attributes, Observable } from '@opentelemetry/api';
|
||||
import {
|
||||
Attributes,
|
||||
Counter,
|
||||
Histogram,
|
||||
Meter,
|
||||
MetricOptions,
|
||||
} from '@opentelemetry/api';
|
||||
|
||||
interface AsyncMetric {
|
||||
ob: Observable;
|
||||
get value(): any;
|
||||
get attrs(): Attributes | undefined;
|
||||
}
|
||||
import { getMeter } from './opentelemetry';
|
||||
|
||||
let _metrics: ReturnType<typeof createBusinessMetrics> | undefined = undefined;
|
||||
type MetricType = 'counter' | 'gauge' | 'histogram';
|
||||
type Metric<T extends MetricType> = T extends 'counter'
|
||||
? Counter
|
||||
: T extends 'gauge'
|
||||
? Histogram
|
||||
: T extends 'histogram'
|
||||
? Histogram
|
||||
: never;
|
||||
|
||||
export function getMeter(name = 'business') {
|
||||
return opentelemetry.metrics.getMeter(name);
|
||||
}
|
||||
export type ScopedMetrics = {
|
||||
[T in MetricType]: (name: string, opts?: MetricOptions) => Metric<T>;
|
||||
};
|
||||
type MetricCreators = {
|
||||
[T in MetricType]: (
|
||||
meter: Meter,
|
||||
name: string,
|
||||
opts?: MetricOptions
|
||||
) => Metric<T>;
|
||||
};
|
||||
|
||||
function createBusinessMetrics() {
|
||||
const meter = getMeter();
|
||||
const asyncMetrics: AsyncMetric[] = [];
|
||||
export type KnownMetricScopes =
|
||||
| 'socketio'
|
||||
| 'gql'
|
||||
| 'jwst'
|
||||
| 'auth'
|
||||
| 'controllers'
|
||||
| 'doc';
|
||||
|
||||
function createGauge(name: string) {
|
||||
const metricCreators: MetricCreators = {
|
||||
counter(meter: Meter, name: string, opts?: MetricOptions) {
|
||||
return meter.createCounter(name, opts);
|
||||
},
|
||||
gauge(meter: Meter, name: string, opts?: MetricOptions) {
|
||||
let value: any;
|
||||
let attrs: Attributes | undefined;
|
||||
const ob = meter.createObservableGauge(name);
|
||||
asyncMetrics.push({
|
||||
ob,
|
||||
get value() {
|
||||
return value;
|
||||
},
|
||||
get attrs() {
|
||||
return attrs;
|
||||
},
|
||||
const ob = meter.createObservableGauge(name, opts);
|
||||
|
||||
ob.addCallback(result => {
|
||||
result.observe(value, attrs);
|
||||
});
|
||||
|
||||
return (newValue: any, newAttrs?: Attributes) => {
|
||||
value = newValue;
|
||||
attrs = newAttrs;
|
||||
};
|
||||
return {
|
||||
record: (newValue, newAttrs) => {
|
||||
value = newValue;
|
||||
attrs = newAttrs;
|
||||
},
|
||||
} satisfies Histogram;
|
||||
},
|
||||
histogram(meter: Meter, name: string, opts?: MetricOptions) {
|
||||
return meter.createHistogram(name, opts);
|
||||
},
|
||||
};
|
||||
|
||||
const scopes = new Map<string, ScopedMetrics>();
|
||||
|
||||
function make(scope: string) {
|
||||
const meter = getMeter();
|
||||
const metrics = new Map<string, { type: MetricType; metric: any }>();
|
||||
const prefix = scope + '/';
|
||||
|
||||
function getOrCreate<T extends MetricType>(
|
||||
type: T,
|
||||
name: string,
|
||||
opts?: MetricOptions
|
||||
): Metric<T> {
|
||||
name = prefix + name;
|
||||
const metric = metrics.get(name);
|
||||
if (metric) {
|
||||
if (type !== metric.type) {
|
||||
throw new Error(
|
||||
`Metric ${name} has already been registered as ${metric.type} mode, but get as ${type} again.`
|
||||
);
|
||||
}
|
||||
|
||||
return metric.metric;
|
||||
} else {
|
||||
const metric = metricCreators[type](meter, name, opts);
|
||||
metrics.set(name, { type, metric });
|
||||
return metric;
|
||||
}
|
||||
}
|
||||
|
||||
const metrics = {
|
||||
socketIOConnectionGauge: createGauge('socket_io_connection'),
|
||||
|
||||
gqlRequest: meter.createCounter('gql_request'),
|
||||
gqlError: meter.createCounter('gql_error'),
|
||||
gqlTimer: meter.createHistogram('gql_timer'),
|
||||
|
||||
jwstCodecMerge: meter.createCounter('jwst_codec_merge'),
|
||||
jwstCodecDidnotMatch: meter.createCounter('jwst_codec_didnot_match'),
|
||||
jwstCodecFail: meter.createCounter('jwst_codec_fail'),
|
||||
|
||||
authCounter: meter.createCounter('auth'),
|
||||
authFailCounter: meter.createCounter('auth_fail'),
|
||||
|
||||
docHistoryCounter: meter.createCounter('doc_history_created'),
|
||||
docRecoverCounter: meter.createCounter('doc_history_recovered'),
|
||||
};
|
||||
|
||||
meter.addBatchObservableCallback(
|
||||
result => {
|
||||
asyncMetrics.forEach(metric => {
|
||||
result.observe(metric.ob, metric.value, metric.attrs);
|
||||
});
|
||||
return {
|
||||
counter(name, opts) {
|
||||
return getOrCreate('counter', name, opts);
|
||||
},
|
||||
asyncMetrics.map(({ ob }) => ob)
|
||||
);
|
||||
|
||||
return metrics;
|
||||
gauge(name, opts) {
|
||||
return getOrCreate('gauge', name, opts);
|
||||
},
|
||||
histogram(name, opts) {
|
||||
return getOrCreate('histogram', name, opts);
|
||||
},
|
||||
} satisfies ScopedMetrics;
|
||||
}
|
||||
|
||||
export function registerBusinessMetrics() {
|
||||
if (!_metrics) {
|
||||
_metrics = createBusinessMetrics();
|
||||
/**
|
||||
* @example
|
||||
*
|
||||
* ```
|
||||
* metrics.scope.counter('example_count').add(1, {
|
||||
* attr1: 'example-event'
|
||||
* })
|
||||
* ```
|
||||
*/
|
||||
export const metrics = new Proxy<Record<KnownMetricScopes, ScopedMetrics>>(
|
||||
// @ts-expect-error proxied
|
||||
{},
|
||||
{
|
||||
get(_, scopeName: string) {
|
||||
let scope = scopes.get(scopeName);
|
||||
if (!scope) {
|
||||
scope = make(scopeName);
|
||||
scopes.set(scopeName, scope);
|
||||
}
|
||||
|
||||
return scope;
|
||||
},
|
||||
}
|
||||
|
||||
return _metrics;
|
||||
}
|
||||
export const metrics = registerBusinessMetrics;
|
||||
);
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { MetricExporter } from '@google-cloud/opentelemetry-cloud-monitoring-exporter';
|
||||
import { TraceExporter } from '@google-cloud/opentelemetry-cloud-trace-exporter';
|
||||
import { metrics } from '@opentelemetry/api';
|
||||
import {
|
||||
CompositePropagator,
|
||||
W3CBaggagePropagator,
|
||||
@@ -16,6 +17,8 @@ import { NestInstrumentation } from '@opentelemetry/instrumentation-nestjs-core'
|
||||
import { SocketIoInstrumentation } from '@opentelemetry/instrumentation-socket.io';
|
||||
import {
|
||||
ConsoleMetricExporter,
|
||||
type MeterProvider,
|
||||
MetricProducer,
|
||||
MetricReader,
|
||||
PeriodicExportingMetricReader,
|
||||
} from '@opentelemetry/sdk-metrics';
|
||||
@@ -24,10 +27,11 @@ import {
|
||||
BatchSpanProcessor,
|
||||
ConsoleSpanExporter,
|
||||
SpanExporter,
|
||||
TraceIdRatioBasedSampler,
|
||||
} from '@opentelemetry/sdk-trace-node';
|
||||
import { PrismaInstrumentation } from '@prisma/instrumentation';
|
||||
|
||||
import { registerBusinessMetrics } from './metrics';
|
||||
import { PrismaMetricProducer } from './prisma';
|
||||
|
||||
abstract class OpentelemetryFactor {
|
||||
abstract getMetricReader(): MetricReader;
|
||||
@@ -44,9 +48,14 @@ abstract class OpentelemetryFactor {
|
||||
];
|
||||
}
|
||||
|
||||
getMetricsProducers(): MetricProducer[] {
|
||||
return [new PrismaMetricProducer()];
|
||||
}
|
||||
|
||||
create() {
|
||||
const traceExporter = this.getSpanExporter();
|
||||
return new NodeSDK({
|
||||
sampler: new TraceIdRatioBasedSampler(0.1),
|
||||
traceExporter,
|
||||
metricReader: this.getMetricReader(),
|
||||
spanProcessor: new BatchSpanProcessor(traceExporter),
|
||||
@@ -67,7 +76,10 @@ class GCloudOpentelemetryFactor extends OpentelemetryFactor {
|
||||
return new PeriodicExportingMetricReader({
|
||||
exportIntervalMillis: 30000,
|
||||
exportTimeoutMillis: 10000,
|
||||
exporter: new MetricExporter(),
|
||||
exporter: new MetricExporter({
|
||||
prefix: 'custom.googleapis.com',
|
||||
}),
|
||||
metricProducers: this.getMetricsProducers(),
|
||||
});
|
||||
}
|
||||
|
||||
@@ -78,7 +90,9 @@ class GCloudOpentelemetryFactor extends OpentelemetryFactor {
|
||||
|
||||
class LocalOpentelemetryFactor extends OpentelemetryFactor {
|
||||
override getMetricReader(): MetricReader {
|
||||
return new PrometheusExporter();
|
||||
return new PrometheusExporter({
|
||||
metricProducers: this.getMetricsProducers(),
|
||||
});
|
||||
}
|
||||
|
||||
override getSpanExporter(): SpanExporter {
|
||||
@@ -90,6 +104,7 @@ class DebugOpentelemetryFactor extends OpentelemetryFactor {
|
||||
override getMetricReader(): MetricReader {
|
||||
return new PeriodicExportingMetricReader({
|
||||
exporter: new ConsoleMetricExporter(),
|
||||
metricProducers: this.getMetricsProducers(),
|
||||
});
|
||||
}
|
||||
|
||||
@@ -111,9 +126,30 @@ function createSDK() {
|
||||
return factor?.create();
|
||||
}
|
||||
|
||||
let OPENTELEMETRY_STARTED = false;
|
||||
|
||||
function ensureStarted() {
|
||||
if (!OPENTELEMETRY_STARTED) {
|
||||
OPENTELEMETRY_STARTED = true;
|
||||
start();
|
||||
}
|
||||
}
|
||||
|
||||
function getMeterProvider() {
|
||||
ensureStarted();
|
||||
return metrics.getMeterProvider();
|
||||
}
|
||||
|
||||
function registerCustomMetrics() {
|
||||
const host = new HostMetrics({ name: 'instance-host-metrics' });
|
||||
host.start();
|
||||
const hostMetricsMonitoring = new HostMetrics({
|
||||
name: 'instance-host-metrics',
|
||||
meterProvider: getMeterProvider() as MeterProvider,
|
||||
});
|
||||
hostMetricsMonitoring.start();
|
||||
}
|
||||
|
||||
export function getMeter(name = 'business') {
|
||||
return getMeterProvider().getMeter(name);
|
||||
}
|
||||
|
||||
export function start() {
|
||||
@@ -122,6 +158,5 @@ export function start() {
|
||||
if (sdk) {
|
||||
sdk.start();
|
||||
registerCustomMetrics();
|
||||
registerBusinessMetrics();
|
||||
}
|
||||
}
|
||||
|
||||
132
packages/backend/server/src/metrics/prisma.ts
Normal file
132
packages/backend/server/src/metrics/prisma.ts
Normal file
@@ -0,0 +1,132 @@
|
||||
import { HrTime, ValueType } from '@opentelemetry/api';
|
||||
import { hrTime } from '@opentelemetry/core';
|
||||
import { Resource } from '@opentelemetry/resources';
|
||||
import {
|
||||
AggregationTemporality,
|
||||
CollectionResult,
|
||||
DataPointType,
|
||||
InstrumentType,
|
||||
MetricProducer,
|
||||
ScopeMetrics,
|
||||
} from '@opentelemetry/sdk-metrics';
|
||||
|
||||
import { PrismaService } from '../prisma';
|
||||
|
||||
function transformPrismaKey(key: string) {
|
||||
// replace first '_' to '/' as a scope prefix
|
||||
// example: prisma_client_query_duration_seconds_sum -> prisma/client_query_duration_seconds_sum
|
||||
return key.replace(/_/, '/');
|
||||
}
|
||||
|
||||
export class PrismaMetricProducer implements MetricProducer {
|
||||
private readonly startTime: HrTime = hrTime();
|
||||
|
||||
async collect(): Promise<CollectionResult> {
|
||||
const result: CollectionResult = {
|
||||
resourceMetrics: {
|
||||
resource: Resource.EMPTY,
|
||||
scopeMetrics: [],
|
||||
},
|
||||
errors: [],
|
||||
};
|
||||
|
||||
if (!PrismaService.INSTANCE) {
|
||||
return result;
|
||||
}
|
||||
|
||||
const prisma = PrismaService.INSTANCE;
|
||||
|
||||
const endTime = hrTime();
|
||||
|
||||
const metrics = await prisma.$metrics.json();
|
||||
const scopeMetrics: ScopeMetrics = {
|
||||
scope: {
|
||||
name: '',
|
||||
},
|
||||
metrics: [],
|
||||
};
|
||||
for (const counter of metrics.counters) {
|
||||
scopeMetrics.metrics.push({
|
||||
descriptor: {
|
||||
name: transformPrismaKey(counter.key),
|
||||
description: counter.description,
|
||||
unit: '1',
|
||||
type: InstrumentType.COUNTER,
|
||||
valueType: ValueType.INT,
|
||||
},
|
||||
dataPointType: DataPointType.SUM,
|
||||
aggregationTemporality: AggregationTemporality.CUMULATIVE,
|
||||
dataPoints: [
|
||||
{
|
||||
startTime: this.startTime,
|
||||
endTime: endTime,
|
||||
value: counter.value,
|
||||
attributes: counter.labels,
|
||||
},
|
||||
],
|
||||
isMonotonic: true,
|
||||
});
|
||||
}
|
||||
|
||||
for (const gauge of metrics.gauges) {
|
||||
scopeMetrics.metrics.push({
|
||||
descriptor: {
|
||||
name: transformPrismaKey(gauge.key),
|
||||
description: gauge.description,
|
||||
unit: '1',
|
||||
type: InstrumentType.UP_DOWN_COUNTER,
|
||||
valueType: ValueType.INT,
|
||||
},
|
||||
dataPointType: DataPointType.GAUGE,
|
||||
aggregationTemporality: AggregationTemporality.CUMULATIVE,
|
||||
dataPoints: [
|
||||
{
|
||||
startTime: this.startTime,
|
||||
endTime: endTime,
|
||||
value: gauge.value,
|
||||
attributes: gauge.labels,
|
||||
},
|
||||
],
|
||||
});
|
||||
}
|
||||
|
||||
for (const histogram of metrics.histograms) {
|
||||
const boundaries = [];
|
||||
const counts = [];
|
||||
for (const [boundary, count] of histogram.value.buckets) {
|
||||
boundaries.push(boundary);
|
||||
counts.push(count);
|
||||
}
|
||||
scopeMetrics.metrics.push({
|
||||
descriptor: {
|
||||
name: transformPrismaKey(histogram.key),
|
||||
description: histogram.description,
|
||||
unit: 'ms',
|
||||
type: InstrumentType.HISTOGRAM,
|
||||
valueType: ValueType.DOUBLE,
|
||||
},
|
||||
dataPointType: DataPointType.HISTOGRAM,
|
||||
aggregationTemporality: AggregationTemporality.CUMULATIVE,
|
||||
dataPoints: [
|
||||
{
|
||||
startTime: this.startTime,
|
||||
endTime: endTime,
|
||||
value: {
|
||||
buckets: {
|
||||
boundaries,
|
||||
counts,
|
||||
},
|
||||
count: histogram.value.count,
|
||||
sum: histogram.value.sum,
|
||||
},
|
||||
attributes: histogram.labels,
|
||||
},
|
||||
],
|
||||
});
|
||||
}
|
||||
|
||||
result.resourceMetrics.scopeMetrics.push(scopeMetrics);
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@@ -1,8 +1,9 @@
|
||||
import { Attributes } from '@opentelemetry/api';
|
||||
|
||||
import { getMeter } from './metrics';
|
||||
import { KnownMetricScopes, metrics } from './metrics';
|
||||
|
||||
export const CallTimer = (
|
||||
scope: KnownMetricScopes,
|
||||
name: string,
|
||||
attrs?: Attributes
|
||||
): MethodDecorator => {
|
||||
@@ -18,9 +19,11 @@ export const CallTimer = (
|
||||
}
|
||||
|
||||
desc.value = function (...args: any[]) {
|
||||
const timer = getMeter().createHistogram(name, {
|
||||
const timer = metrics[scope].histogram(name, {
|
||||
description: `function call time costs of ${name}`,
|
||||
unit: 'ms',
|
||||
});
|
||||
|
||||
const start = Date.now();
|
||||
|
||||
const end = () => {
|
||||
@@ -48,6 +51,7 @@ export const CallTimer = (
|
||||
};
|
||||
|
||||
export const CallCounter = (
|
||||
scope: KnownMetricScopes,
|
||||
name: string,
|
||||
attrs?: Attributes
|
||||
): MethodDecorator => {
|
||||
@@ -63,7 +67,7 @@ export const CallCounter = (
|
||||
}
|
||||
|
||||
desc.value = function (...args: any[]) {
|
||||
const count = getMeter().createCounter(name, {
|
||||
const count = metrics[scope].counter(name, {
|
||||
description: `function call counter of ${name}`,
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user