From ff92051f599c198c6fc3e38e9b94ba7d01867c2c Mon Sep 17 00:00:00 2001 From: ABHAY PANDEY Date: Tue, 30 Jun 2026 09:33:03 +0530 Subject: [PATCH] feat(observability): instrument event and websocket handlers with OTEL metrics Signed-off-by: ABHAY PANDEY --- .../admin-metrics-handler-instrumentation.md | 5 ++++ src/adapters/web-socket-adapter.ts | 3 +++ src/handlers/event-message-handler.ts | 26 ++++++++++--------- .../default-event-strategy.ts | 4 +-- .../event-strategies/delete-event-strategy.ts | 4 +-- .../ephemeral-event-strategy.ts | 4 +-- .../gift-wrap-event-strategy.ts | 6 ++--- .../event-strategies/group-event-strategy.ts | 6 ++--- ...arameterized-replaceable-event-strategy.ts | 4 +-- .../replaceable-event-strategy.ts | 8 +++--- .../timestamp-event-strategy.ts | 6 ++--- .../event-strategies/vanish-event-strategy.ts | 4 +-- src/telemetry/event-metrics.ts | 23 ++++++++++++++++ 13 files changed, 68 insertions(+), 35 deletions(-) create mode 100644 .changeset/admin-metrics-handler-instrumentation.md create mode 100644 src/telemetry/event-metrics.ts diff --git a/.changeset/admin-metrics-handler-instrumentation.md b/.changeset/admin-metrics-handler-instrumentation.md new file mode 100644 index 00000000..45424dfa --- /dev/null +++ b/.changeset/admin-metrics-handler-instrumentation.md @@ -0,0 +1,5 @@ +--- +"nostream": minor +--- + +feat: instrument event and websocket handlers with OpenTelemetry metrics diff --git a/src/adapters/web-socket-adapter.ts b/src/adapters/web-socket-adapter.ts index 570a50b2..38655dfb 100644 --- a/src/adapters/web-socket-adapter.ts +++ b/src/adapters/web-socket-adapter.ts @@ -15,6 +15,7 @@ import { WebSocketAdapterEvent, WebSocketServerAdapterEvent } from '../constants import { attemptValidation } from '../utils/validation' import { ContextMetadataKey } from '../constants/base' import { createLogger } from '../factories/logger-factory' +import { recordWebsocketConnectionClosed, recordWebsocketConnectionOpened } from '../telemetry/event-metrics' import { Event } from '../@types/event' import { getRemoteAddress } from '../utils/http' import { IRateLimiter } from '../@types/utils' @@ -82,6 +83,7 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter .on(WebSocketAdapterEvent.Message, this.sendMessage.bind(this)) logger('client %s connected from %s', this.clientId, this.clientAddress.address) + recordWebsocketConnectionOpened() // NIP-42 this.challenge = randomBytes(32).toString('base64url') @@ -260,6 +262,7 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter } private onClientClose() { + recordWebsocketConnectionClosed() this.alive = false this.subscriptions.clear() this.authenticatedPubkeys.clear() diff --git a/src/handlers/event-message-handler.ts b/src/handlers/event-message-handler.ts index 67b6b146..5ac88b71 100644 --- a/src/handlers/event-message-handler.ts +++ b/src/handlers/event-message-handler.ts @@ -31,7 +31,7 @@ import { import { IEventRepository, INip05VerificationRepository, IUserRepository } from '../@types/repositories' import { IEventStrategy, IMessageHandler } from '../@types/message-handlers' import { CacheAdmissionState } from '../constants/caching' -import { createCommandResult } from '../utils/messages' +import { createEventCommandResult } from '../telemetry/event-metrics' import { createLogger } from '../factories/logger-factory' import { Factory } from '../@types/base' import { ICacheAdapter } from '../@types/adapters' @@ -63,13 +63,13 @@ export class EventMessageHandler implements IMessageHandler { let reason = await this.isEventValid(event) if (reason) { logger('event %s rejected: %s', event.id, reason) - this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, reason)) + this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, false, reason)) return } if (isExpiredEvent(event)) { logger('event %s rejected: expired') - this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, 'event is expired')) + this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, false, 'event is expired')) return } @@ -79,7 +79,7 @@ export class EventMessageHandler implements IMessageHandler { logger('event %s rejected: rate-limited') this.webSocket.emit( WebSocketAdapterEvent.Message, - createCommandResult(event.id, false, 'rate-limited: slow down'), + createEventCommandResult(event.id, false, 'rate-limited: slow down'), ) return } @@ -87,35 +87,35 @@ export class EventMessageHandler implements IMessageHandler { reason = this.canAcceptEvent(event) if (reason) { logger('event %s rejected: %s', event.id, reason) - this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, reason)) + this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, false, reason)) return } reason = await this.isProtectedEventBlocked(event) if (reason) { logger('event %s rejected: %s', event.id, reason) - this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, reason)) + this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, false, reason)) return } reason = await this.isBlockedByRequestToVanish(event) if (reason) { logger('event %s rejected: %s', event.id, reason) - this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, reason)) + this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, false, reason)) return } reason = await this.isUserAdmitted(event) if (reason) { logger('event %s rejected: %s', event.id, reason) - this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, reason)) + this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, false, reason)) return } reason = await this.checkNip05Verification(event) if (reason) { logger('event %s rejected: %s', event.id, reason) - this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, reason)) + this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, false, reason)) return } @@ -124,7 +124,7 @@ export class EventMessageHandler implements IMessageHandler { if (typeof strategy?.execute !== 'function') { this.webSocket.emit( WebSocketAdapterEvent.Message, - createCommandResult(event.id, false, 'error: event not supported'), + createEventCommandResult(event.id, false, 'error: event not supported'), ) return } @@ -134,7 +134,7 @@ export class EventMessageHandler implements IMessageHandler { this.processNip05Metadata(event) } catch (error) { logger.error('error handling message', message, error) - this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, 'error: unable to process event')) + this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, false, 'error: unable to process event')) } } @@ -242,7 +242,9 @@ export class EventMessageHandler implements IMessageHandler { } const checkEmbedded = async (evt: Event, depth = 0): Promise => { - if (depth > 10) return false // Prevent infinite loops or excessive recursion + if (depth > 10) { + return false // Prevent infinite loops or excessive recursion + } if ((evt.kind === EventKinds.REPOST || evt.kind === EventKinds.GENERIC_REPOST) && evt.content.length > 0) { try { const embedded = attemptValidation(eventSchema)(JSON.parse(evt.content)) as Event diff --git a/src/handlers/event-strategies/default-event-strategy.ts b/src/handlers/event-strategies/default-event-strategy.ts index e38cbe3e..b70ce43f 100644 --- a/src/handlers/event-strategies/default-event-strategy.ts +++ b/src/handlers/event-strategies/default-event-strategy.ts @@ -1,4 +1,4 @@ -import { createCommandResult } from '../../utils/messages' +import { createEventCommandResult } from '../../telemetry/event-metrics' import { createLogger } from '../../factories/logger-factory' import { Event } from '../../@types/event' import { IEventRepository } from '../../@types/repositories' @@ -17,7 +17,7 @@ export class DefaultEventStrategy implements IEventStrategy public async execute(event: Event): Promise { logger('received event: %o', event) const count = await this.eventRepository.create(event) - this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, true, count ? '' : 'duplicate:')) + this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, true, count ? '' : 'duplicate:')) if (count) { this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event) diff --git a/src/handlers/event-strategies/delete-event-strategy.ts b/src/handlers/event-strategies/delete-event-strategy.ts index 0be6aa13..562c4a26 100644 --- a/src/handlers/event-strategies/delete-event-strategy.ts +++ b/src/handlers/event-strategies/delete-event-strategy.ts @@ -1,4 +1,4 @@ -import { createCommandResult } from '../../utils/messages' +import { createEventCommandResult } from '../../telemetry/event-metrics' import { createLogger } from '../../factories/logger-factory' import { Event } from '../../@types/event' import { EventTags } from '../../constants/base' @@ -31,7 +31,7 @@ export class DeleteEventStrategy implements IEventStrategy> } const count = await this.eventRepository.create(event) - this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, true, count ? '' : 'duplicate:')) + this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, true, count ? '' : 'duplicate:')) if (count) { this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event) diff --git a/src/handlers/event-strategies/ephemeral-event-strategy.ts b/src/handlers/event-strategies/ephemeral-event-strategy.ts index 252457ce..3641938d 100644 --- a/src/handlers/event-strategies/ephemeral-event-strategy.ts +++ b/src/handlers/event-strategies/ephemeral-event-strategy.ts @@ -1,4 +1,4 @@ -import { createCommandResult } from '../../utils/messages' +import { createEventCommandResult } from '../../telemetry/event-metrics' import { createLogger } from '../../factories/logger-factory' import { Event } from '../../@types/event' import { IEventStrategy } from '../../@types/message-handlers' @@ -12,7 +12,7 @@ export class EphemeralEventStrategy implements IEventStrategy { logger('received ephemeral event: %o', event) - this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, true, '')) + this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, true, '')) this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event) } } diff --git a/src/handlers/event-strategies/gift-wrap-event-strategy.ts b/src/handlers/event-strategies/gift-wrap-event-strategy.ts index 0414e461..5fa1b651 100644 --- a/src/handlers/event-strategies/gift-wrap-event-strategy.ts +++ b/src/handlers/event-strategies/gift-wrap-event-strategy.ts @@ -1,4 +1,4 @@ -import { createCommandResult } from '../../utils/messages' +import { createEventCommandResult } from '../../telemetry/event-metrics' import { createLogger } from '../../factories/logger-factory' import { Event } from '../../@types/event' import { EventTags } from '../../constants/base' @@ -21,12 +21,12 @@ export class GiftWrapEventStrategy implements IEventStrategy> const reason = this.validateGroupEvent(event) if (reason) { - this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, `invalid: ${reason}`)) + this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, false, `invalid: ${reason}`)) return } const count = await this.eventRepository.create(event) - this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, true, count ? '' : 'duplicate:')) + this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, true, count ? '' : 'duplicate:')) if (count) { this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event) diff --git a/src/handlers/event-strategies/parameterized-replaceable-event-strategy.ts b/src/handlers/event-strategies/parameterized-replaceable-event-strategy.ts index 14e9aa5e..56ca182a 100644 --- a/src/handlers/event-strategies/parameterized-replaceable-event-strategy.ts +++ b/src/handlers/event-strategies/parameterized-replaceable-event-strategy.ts @@ -1,6 +1,6 @@ import { Event, ParameterizedReplaceableEvent } from '../../@types/event' import { EventDeduplicationMetadataKey, EventTags } from '../../constants/base' -import { createCommandResult } from '../../utils/messages' +import { createEventCommandResult } from '../../telemetry/event-metrics' import { createLogger } from '../../factories/logger-factory' import { IEventRepository } from '../../@types/repositories' import { IEventStrategy } from '../../@types/message-handlers' @@ -29,7 +29,7 @@ export class ParameterizedReplaceableEventStrategy implements IEventStrategy> await this.userRepository.setVanished(event.pubkey, true) - this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, true, count ? '' : 'duplicate:')) + this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, true, count ? '' : 'duplicate:')) } } diff --git a/src/telemetry/event-metrics.ts b/src/telemetry/event-metrics.ts new file mode 100644 index 00000000..6ba9c3c9 --- /dev/null +++ b/src/telemetry/event-metrics.ts @@ -0,0 +1,23 @@ +import type { EventId } from '../@types/base' +import { createCommandResult } from '../utils/messages' +import { getRelayMetricInstruments } from './metrics' + +export const createEventCommandResult = (eventId: EventId, successful: boolean, message: string) => { + const instruments = getRelayMetricInstruments() + + if (successful) { + instruments.eventsAcceptedTotal.add(1) + } else { + instruments.eventsRejectedTotal.add(1) + } + + return createCommandResult(eventId, successful, message) +} + +export const recordWebsocketConnectionOpened = (): void => { + getRelayMetricInstruments().websocketConnections.add(1) +} + +export const recordWebsocketConnectionClosed = (): void => { + getRelayMetricInstruments().websocketConnections.add(-1) +}