fix(server): use updowncounter by default (#10482)

This commit is contained in:
forehalo
2025-02-27 10:15:13 +00:00
parent b995b4f965
commit d50860eee2
6 changed files with 109 additions and 36 deletions

View File

@@ -0,0 +1,88 @@
# Snapshot report for `src/base/job/queue/__tests__/queue.spec.ts`
The actual snapshot is saved in `queue.spec.ts.snap`.
Generated by [AVA](https://avajs.dev).
## should be able to record job metrics
> [+1 active jobs, job handler, -1 active jobs]
[
[
1,
{
queue: 'nightly',
},
],
[
1,
{
error: false,
handler: 'JobHandlers.handleJob',
job: 'nightly.__test__job',
name: 'job_handler',
namespace: 'nightly',
},
],
[
-1,
{
queue: 'nightly',
},
],
]
> [+1 active jobs, job handler, -1 active jobs]
[
[
1,
{
queue: 'nightly',
},
],
[
1,
{
error: false,
handler: 'JobHandlers.handleJob',
job: 'nightly.__test__job2',
name: 'job_handler',
namespace: 'nightly',
},
],
[
-1,
{
queue: 'nightly',
},
],
]
> [+1 active jobs, job handler errored, -1 active jobs]
[
[
1,
{
queue: 'nightly',
},
],
[
1,
{
error: true,
handler: 'JobHandlers.throwJob',
job: 'nightly.__test__throw',
name: 'job_handler',
namespace: 'nightly',
},
],
[
-1,
{
queue: 'nightly',
},
],
]

View File

@@ -148,22 +148,18 @@ test('should dispatch job handler', async t => {
}); });
test('should be able to record job metrics', async t => { test('should be able to record job metrics', async t => {
const counterStub = Sinon.stub(metrics.job.counter('function_calls'), 'add'); const counterStub = Sinon.stub(
metrics.queue.counter('function_calls'),
'add'
);
const timerStub = Sinon.stub( const timerStub = Sinon.stub(
metrics.job.histogram('function_timer'), metrics.queue.histogram('function_timer'),
'record' 'record'
); );
await executor.run('nightly.__test__job', { name: 'test executor' }); await executor.run('nightly.__test__job', { name: 'test executor' });
t.deepEqual(counterStub.firstCall.args[1], { t.snapshot(counterStub.args, '[+1 active jobs, job handler, -1 active jobs]');
name: 'job_handler',
job: 'nightly.__test__job',
namespace: 'nightly',
handler: 'JobHandlers.handleJob',
error: false,
});
t.deepEqual(timerStub.firstCall.args[1], { t.deepEqual(timerStub.firstCall.args[1], {
name: 'job_handler', name: 'job_handler',
job: 'nightly.__test__job', job: 'nightly.__test__job',
@@ -177,14 +173,7 @@ test('should be able to record job metrics', async t => {
await executor.run('nightly.__test__job2', { name: 'test executor' }); await executor.run('nightly.__test__job2', { name: 'test executor' });
t.deepEqual(counterStub.firstCall.args[1], { t.snapshot(counterStub.args, '[+1 active jobs, job handler, -1 active jobs]');
name: 'job_handler',
job: 'nightly.__test__job2',
namespace: 'nightly',
handler: 'JobHandlers.handleJob',
error: false,
});
t.deepEqual(timerStub.firstCall.args[1], { t.deepEqual(timerStub.firstCall.args[1], {
name: 'job_handler', name: 'job_handler',
job: 'nightly.__test__job2', job: 'nightly.__test__job2',
@@ -203,14 +192,10 @@ test('should be able to record job metrics', async t => {
} }
); );
t.deepEqual(counterStub.firstCall.args[1], { t.snapshot(
name: 'job_handler', counterStub.args,
job: 'nightly.__test__throw', '[+1 active jobs, job handler errored, -1 active jobs]'
namespace: 'nightly', );
handler: 'JobHandlers.throwJob',
error: true,
});
t.deepEqual(timerStub.firstCall.args[1], { t.deepEqual(timerStub.firstCall.args[1], {
name: 'job_handler', name: 'job_handler',
job: 'nightly.__test__throw', job: 'nightly.__test__throw',

View File

@@ -77,7 +77,7 @@ export class JobExecutor
} }
}); });
}, },
'job', 'queue',
'job_handler', 'job_handler',
{ {
job: name, job: name,
@@ -85,12 +85,12 @@ export class JobExecutor
handler: handler.name, handler: handler.name,
} }
); );
const activeJobs = metrics.job.gauge('queue_active_jobs'); const activeJobs = metrics.queue.counter('active_jobs');
activeJobs.record(1, { queue: ns }); activeJobs.add(1, { queue: ns });
try { try {
return await fn(); return await fn();
} finally { } finally {
activeJobs.record(-1, { queue: ns }); activeJobs.add(-1, { queue: ns });
} }
} }

View File

@@ -1,16 +1,16 @@
import { import {
Counter,
Gauge, Gauge,
Histogram, Histogram,
Meter, Meter,
MetricOptions, MetricOptions,
UpDownCounter,
} from '@opentelemetry/api'; } from '@opentelemetry/api';
import { getMeter } from './opentelemetry'; import { getMeter } from './opentelemetry';
type MetricType = 'counter' | 'gauge' | 'histogram'; type MetricType = 'counter' | 'gauge' | 'histogram';
type Metric<T extends MetricType> = T extends 'counter' type Metric<T extends MetricType> = T extends 'counter'
? Counter ? UpDownCounter
: T extends 'gauge' : T extends 'gauge'
? Gauge ? Gauge
: T extends 'histogram' : T extends 'histogram'
@@ -18,7 +18,7 @@ type Metric<T extends MetricType> = T extends 'counter'
: never; : never;
export type ScopedMetrics = { export type ScopedMetrics = {
counter: (name: string, opts?: MetricOptions) => Counter; counter: (name: string, opts?: MetricOptions) => UpDownCounter;
gauge: (name: string, opts?: MetricOptions) => Gauge; gauge: (name: string, opts?: MetricOptions) => Gauge;
histogram: (name: string, opts?: MetricOptions) => Histogram; histogram: (name: string, opts?: MetricOptions) => Histogram;
}; };
@@ -42,7 +42,7 @@ export type KnownMetricScopes =
| 'mail' | 'mail'
| 'ai' | 'ai'
| 'event' | 'event'
| 'job'; | 'queue';
const metricCreators: MetricCreators = { const metricCreators: MetricCreators = {
counter(meter: Meter, name: string, opts?: MetricOptions) { counter(meter: Meter, name: string, opts?: MetricOptions) {

View File

@@ -153,13 +153,13 @@ export class SpaceSyncGateway
handleConnection() { handleConnection() {
this.connectionCount++; this.connectionCount++;
this.logger.log(`New connection, total: ${this.connectionCount}`); this.logger.log(`New connection, total: ${this.connectionCount}`);
metrics.socketio.gauge('connections').record(1); metrics.socketio.gauge('connections').record(this.connectionCount);
} }
handleDisconnect() { handleDisconnect() {
this.connectionCount--; this.connectionCount--;
this.logger.log(`Connection disconnected, total: ${this.connectionCount}`); this.logger.log(`Connection disconnected, total: ${this.connectionCount}`);
metrics.socketio.gauge('connections').record(-1); metrics.socketio.gauge('connections').record(this.connectionCount);
} }
selectAdapter(client: Socket, spaceType: SpaceType): SyncSocketAdapter { selectAdapter(client: Socket, spaceType: SpaceType): SyncSocketAdapter {