Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/admin-metrics-handler-instrumentation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"nostream": minor
---

feat: instrument event and websocket handlers with OpenTelemetry metrics
3 changes: 3 additions & 0 deletions src/adapters/web-socket-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { WebSocketAdapterEvent, WebSocketServerAdapterEvent } from '../constants
import { attemptValidation } from '../utils/validation'
import { ContextMetadataKey } from '../constants/base'
import { createLogger } from '../factories/logger-factory'
import { recordWebsocketConnectionClosed, recordWebsocketConnectionOpened } from '../telemetry/event-metrics'
import { Event } from '../@types/event'
import { getRemoteAddress } from '../utils/http'
import { IRateLimiter } from '../@types/utils'
Expand Down Expand Up @@ -82,6 +83,7 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
.on(WebSocketAdapterEvent.Message, this.sendMessage.bind(this))

logger('client %s connected from %s', this.clientId, this.clientAddress.address)
recordWebsocketConnectionOpened()

// NIP-42
this.challenge = randomBytes(32).toString('base64url')
Expand Down Expand Up @@ -260,6 +262,7 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
}

private onClientClose() {
recordWebsocketConnectionClosed()
this.alive = false
this.subscriptions.clear()
this.authenticatedPubkeys.clear()
Expand Down
26 changes: 14 additions & 12 deletions src/handlers/event-message-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import {
import { IEventRepository, INip05VerificationRepository, IUserRepository } from '../@types/repositories'
import { IEventStrategy, IMessageHandler } from '../@types/message-handlers'
import { CacheAdmissionState } from '../constants/caching'
import { createCommandResult } from '../utils/messages'
import { createEventCommandResult } from '../telemetry/event-metrics'
import { createLogger } from '../factories/logger-factory'
import { Factory } from '../@types/base'
import { ICacheAdapter } from '../@types/adapters'
Expand Down Expand Up @@ -63,13 +63,13 @@ export class EventMessageHandler implements IMessageHandler {
let reason = await this.isEventValid(event)
if (reason) {
logger('event %s rejected: %s', event.id, reason)
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, reason))
this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, false, reason))
return
}

if (isExpiredEvent(event)) {
logger('event %s rejected: expired')
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, 'event is expired'))
this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, false, 'event is expired'))
return
}

Expand All @@ -79,43 +79,43 @@ export class EventMessageHandler implements IMessageHandler {
logger('event %s rejected: rate-limited')
this.webSocket.emit(
WebSocketAdapterEvent.Message,
createCommandResult(event.id, false, 'rate-limited: slow down'),
createEventCommandResult(event.id, false, 'rate-limited: slow down'),
)
return
}

reason = this.canAcceptEvent(event)
if (reason) {
logger('event %s rejected: %s', event.id, reason)
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, reason))
this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, false, reason))
return
}

reason = await this.isProtectedEventBlocked(event)
if (reason) {
logger('event %s rejected: %s', event.id, reason)
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, reason))
this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, false, reason))
return
}

reason = await this.isBlockedByRequestToVanish(event)
if (reason) {
logger('event %s rejected: %s', event.id, reason)
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, reason))
this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, false, reason))
return
}

reason = await this.isUserAdmitted(event)
if (reason) {
logger('event %s rejected: %s', event.id, reason)
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, reason))
this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, false, reason))
return
}

reason = await this.checkNip05Verification(event)
if (reason) {
logger('event %s rejected: %s', event.id, reason)
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, reason))
this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, false, reason))
return
}

Expand All @@ -124,7 +124,7 @@ export class EventMessageHandler implements IMessageHandler {
if (typeof strategy?.execute !== 'function') {
this.webSocket.emit(
WebSocketAdapterEvent.Message,
createCommandResult(event.id, false, 'error: event not supported'),
createEventCommandResult(event.id, false, 'error: event not supported'),
)
return
}
Expand All @@ -134,7 +134,7 @@ export class EventMessageHandler implements IMessageHandler {
this.processNip05Metadata(event)
} catch (error) {
logger.error('error handling message', message, error)
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, 'error: unable to process event'))
this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, false, 'error: unable to process event'))
}
}

Expand Down Expand Up @@ -242,7 +242,9 @@ export class EventMessageHandler implements IMessageHandler {
}

const checkEmbedded = async (evt: Event, depth = 0): Promise<boolean> => {
if (depth > 10) return false // Prevent infinite loops or excessive recursion
if (depth > 10) {
return false // Prevent infinite loops or excessive recursion
}
if ((evt.kind === EventKinds.REPOST || evt.kind === EventKinds.GENERIC_REPOST) && evt.content.length > 0) {
try {
const embedded = attemptValidation(eventSchema)(JSON.parse(evt.content)) as Event
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/event-strategies/default-event-strategy.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { createCommandResult } from '../../utils/messages'
import { createEventCommandResult } from '../../telemetry/event-metrics'
import { createLogger } from '../../factories/logger-factory'
import { Event } from '../../@types/event'
import { IEventRepository } from '../../@types/repositories'
Expand All @@ -17,7 +17,7 @@ export class DefaultEventStrategy implements IEventStrategy<Event, Promise<void>
public async execute(event: Event): Promise<void> {
logger('received event: %o', event)
const count = await this.eventRepository.create(event)
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, true, count ? '' : 'duplicate:'))
this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, true, count ? '' : 'duplicate:'))

if (count) {
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/event-strategies/delete-event-strategy.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { createCommandResult } from '../../utils/messages'
import { createEventCommandResult } from '../../telemetry/event-metrics'
import { createLogger } from '../../factories/logger-factory'
import { Event } from '../../@types/event'
import { EventTags } from '../../constants/base'
Expand Down Expand Up @@ -31,7 +31,7 @@ export class DeleteEventStrategy implements IEventStrategy<Event, Promise<void>>
}

const count = await this.eventRepository.create(event)
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, true, count ? '' : 'duplicate:'))
this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, true, count ? '' : 'duplicate:'))

if (count) {
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/event-strategies/ephemeral-event-strategy.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { createCommandResult } from '../../utils/messages'
import { createEventCommandResult } from '../../telemetry/event-metrics'
import { createLogger } from '../../factories/logger-factory'
import { Event } from '../../@types/event'
import { IEventStrategy } from '../../@types/message-handlers'
Expand All @@ -12,7 +12,7 @@ export class EphemeralEventStrategy implements IEventStrategy<Event, Promise<voi

public async execute(event: Event): Promise<void> {
logger('received ephemeral event: %o', event)
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, true, ''))
this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, true, ''))
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
}
}
6 changes: 3 additions & 3 deletions src/handlers/event-strategies/gift-wrap-event-strategy.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { createCommandResult } from '../../utils/messages'
import { createEventCommandResult } from '../../telemetry/event-metrics'
import { createLogger } from '../../factories/logger-factory'
import { Event } from '../../@types/event'
import { EventTags } from '../../constants/base'
Expand All @@ -21,12 +21,12 @@ export class GiftWrapEventStrategy implements IEventStrategy<Event, Promise<void

const reason = this.validateGiftWrap(event)
if (reason) {
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, `invalid: ${reason}`))
this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, false, `invalid: ${reason}`))
return
}

const count = await this.eventRepository.create(event)
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, true, count ? '' : 'duplicate:'))
this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, true, count ? '' : 'duplicate:'))

if (count) {
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
Expand Down
6 changes: 3 additions & 3 deletions src/handlers/event-strategies/group-event-strategy.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { createCommandResult } from '../../utils/messages'
import { createEventCommandResult } from '../../telemetry/event-metrics'
import { createLogger } from '../../factories/logger-factory'
import { Event } from '../../@types/event'
import { EventTags } from '../../constants/base'
Expand All @@ -20,12 +20,12 @@ export class GroupEventStrategy implements IEventStrategy<Event, Promise<void>>

const reason = this.validateGroupEvent(event)
if (reason) {
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, `invalid: ${reason}`))
this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, false, `invalid: ${reason}`))
return
}

const count = await this.eventRepository.create(event)
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, true, count ? '' : 'duplicate:'))
this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, true, count ? '' : 'duplicate:'))

if (count) {
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Event, ParameterizedReplaceableEvent } from '../../@types/event'
import { EventDeduplicationMetadataKey, EventTags } from '../../constants/base'
import { createCommandResult } from '../../utils/messages'
import { createEventCommandResult } from '../../telemetry/event-metrics'
import { createLogger } from '../../factories/logger-factory'
import { IEventRepository } from '../../@types/repositories'
import { IEventStrategy } from '../../@types/message-handlers'
Expand Down Expand Up @@ -29,7 +29,7 @@ export class ParameterizedReplaceableEventStrategy implements IEventStrategy<Eve
}

const count = await this.eventRepository.upsert(parameterizedReplaceableEvent)
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, true, count ? '' : 'duplicate:'))
this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, true, count ? '' : 'duplicate:'))

if (count) {
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
Expand Down
8 changes: 4 additions & 4 deletions src/handlers/event-strategies/replaceable-event-strategy.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { createCommandResult } from '../../utils/messages'
import { createEventCommandResult } from '../../telemetry/event-metrics'
import { createLogger } from '../../factories/logger-factory'
import { Event } from '../../@types/event'
import { IEventRepository } from '../../@types/repositories'
Expand All @@ -18,7 +18,7 @@ export class ReplaceableEventStrategy implements IEventStrategy<Event, Promise<v
logger('received replaceable event: %o', event)
try {
const count = await this.eventRepository.upsert(event)
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, true, count ? '' : 'duplicate:'))
this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, true, count ? '' : 'duplicate:'))
if (count) {
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
}
Expand All @@ -27,12 +27,12 @@ export class ReplaceableEventStrategy implements IEventStrategy<Event, Promise<v
if (error.message.endsWith('duplicate key value violates unique constraint "events_event_id_unique"')) {
this.webSocket.emit(
WebSocketAdapterEvent.Message,
createCommandResult(event.id, false, 'rejected: event already exists'),
createEventCommandResult(event.id, false, 'rejected: event already exists'),
)
return
}

this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, 'error: '))
this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, false, 'error: '))
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/handlers/event-strategies/timestamp-event-strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { IEventRepository } from '../../@types/repositories'
import { WebSocketAdapterEvent } from '../../constants/adapter'
import { EventTags } from '../../constants/base'
import { createLogger } from '../../factories/logger-factory'
import { createCommandResult } from '../../utils/messages'
import { createEventCommandResult } from '../../telemetry/event-metrics'
import { validateOtsProof } from '../../utils/nip03'

const debug = createLogger('timestamp-event-strategy')
Expand Down Expand Up @@ -36,12 +36,12 @@ export class TimestampEventStrategy implements IEventStrategy<Event, Promise<voi

const reason = this.validate(event)
if (reason) {
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, `invalid: ${reason}`))
this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, false, `invalid: ${reason}`))
return
}

const count = await this.eventRepository.create(event)
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, true, count ? '' : 'duplicate:'))
this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, true, count ? '' : 'duplicate:'))

if (count) {
this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event)
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/event-strategies/vanish-event-strategy.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { IEventRepository, IUserRepository } from '../../@types/repositories'
import { createCommandResult } from '../../utils/messages'
import { createEventCommandResult } from '../../telemetry/event-metrics'
import { createLogger } from '../../factories/logger-factory'
import { Event } from '../../@types/event'
import { EventKinds } from '../../constants/base'
Expand All @@ -25,6 +25,6 @@ export class VanishEventStrategy implements IEventStrategy<Event, Promise<void>>

await this.userRepository.setVanished(event.pubkey, true)

this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, true, count ? '' : 'duplicate:'))
this.webSocket.emit(WebSocketAdapterEvent.Message, createEventCommandResult(event.id, true, count ? '' : 'duplicate:'))
}
}
23 changes: 23 additions & 0 deletions src/telemetry/event-metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import type { EventId } from '../@types/base'
import { createCommandResult } from '../utils/messages'
import { getRelayMetricInstruments } from './metrics'

export const createEventCommandResult = (eventId: EventId, successful: boolean, message: string) => {
const instruments = getRelayMetricInstruments()

if (successful) {
instruments.eventsAcceptedTotal.add(1)
} else {
instruments.eventsRejectedTotal.add(1)
}

return createCommandResult(eventId, successful, message)
}

export const recordWebsocketConnectionOpened = (): void => {
getRelayMetricInstruments().websocketConnections.add(1)
}

export const recordWebsocketConnectionClosed = (): void => {
getRelayMetricInstruments().websocketConnections.add(-1)
}
Loading