Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/solid-boats-try.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"nostream": minor
---

added wot graph service and unit test
3 changes: 2 additions & 1 deletion .knip.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
"lzma-native"
],
"ignore": [
".nostr/**"
".nostr/**",
"src/services/wot-service.ts"
],
"commitlint": false,
"eslint": false,
Expand Down
4 changes: 4 additions & 0 deletions resources/default-settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ wot:
minimumFollowers: 1
# Hours between full trust graph rebuilds.
refreshIntervalHours: 24
# Relay URLs to fetch follow lists from when building the trust graph.
seedRelays:
- "wss://relay.damus.io"
- "wss://relay.nostr.band"
network:
maxPayloadSize: 524288
# Uncomment only when using a trusted reverse proxy and configuring trustedProxies.
Expand Down
8 changes: 8 additions & 0 deletions src/@types/services.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Invoice } from './invoice'
import { Pubkey } from './base'
import { WoTSettings } from './settings'

export interface IMaintenanceService {
clearOldEvents(): Promise<void>
Expand All @@ -14,3 +15,10 @@ export interface IPaymentsService {
sendInvoiceUpdateNotification(invoice: Invoice): Promise<void>
getPendingInvoices(): Promise<Invoice[]>
}

export interface IWotService {
buildGraph(settings: WoTSettings): Promise<void>
isTrusted(pubkey: string): boolean
isReady(): boolean
reset(): void
}
5 changes: 5 additions & 0 deletions src/@types/settings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,11 @@ export interface WoTSettings {
* Defaults to 24.
*/
refreshIntervalHours: number
/**
* Relay URLs to fetch follow lists from when building the trust graph.
* Falls back to the relay's own URL if empty.
*/
seedRelays: string[]
Comment on lines +286 to +290
}

export interface Settings {
Expand Down
257 changes: 257 additions & 0 deletions src/services/wot-service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
import { RawData, WebSocket } from 'ws'
import { randomUUID } from 'crypto'

import { createLogger } from '../factories/logger-factory'
import { IWotService } from '../@types/services'
import { WoTSettings } from '../@types/settings'

const logger = createLogger('wot-service')

const PHASE1_TIMEOUT_MS = 5_000
const PHASE2_BATCH_SIZE = 500
const PHASE2_CONCURRENCY = 5
const PHASE2_BATCH_TIMEOUT_MS = 30_000

// Kind 3 — contact / follow list
const KIND_FOLLOW_LIST = 3

/**
* Open a WebSocket to `relayUrl`, send a REQ for `filter`, collect all EVENT
* payloads, close on EOSE or timeout, resolve with collected events.
*/
function fetchEvents(
relayUrl: string,
filter: Record<string, unknown>,
timeoutMs: number,
): Promise<any[]> {
return new Promise((resolve) => {
const subId = `wot-${randomUUID().slice(0, 8)}`
const events: any[] = []
let settled = false

const finish = () => {
if (settled) {
return
}
settled = true
clearTimeout(timer)
try { ws.close() } catch { /* ignore */ }
Comment thread
cameri marked this conversation as resolved.
resolve(events)
}

const timer = setTimeout(finish, timeoutMs)

let ws: WebSocket
try {
ws = new WebSocket(relayUrl, { timeout: timeoutMs })
} catch (err) {
logger.warn('wot-service: could not create WebSocket to %s: %o', relayUrl, err)
clearTimeout(timer)
resolve(events)
return
}

ws.on('open', () => {
ws.send(JSON.stringify(['REQ', subId, filter]))
})

ws.on('message', (raw: RawData) => {
try {
const msg = JSON.parse(raw.toString('utf8'))
if (!Array.isArray(msg)) {
return
}

if (msg[0] === 'EVENT' && msg[1] === subId && msg[2]) {
events.push(msg[2])
} else if (msg[0] === 'EOSE' && msg[1] === subId) {
finish()
}
} catch { /* malformed message — ignore */ }
})

ws.on('error', (err) => {
logger.warn('wot-service: WebSocket error for %s: %o', relayUrl, err)
finish()
Comment thread
cameri marked this conversation as resolved.
})

ws.on('close', finish)
})
}

/**
* Fetch from multiple relays in parallel, deduplicate events by id.
* Exported so it can be injected / replaced in tests.
*/
export async function fetchFromRelays(
relayUrls: string[],
filter: Record<string, unknown>,
timeoutMs: number,
): Promise<any[]> {
const results = await Promise.all(
relayUrls.map((url) => fetchEvents(url, filter, timeoutMs))
)

const seen = new Set<string>()

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this dedup in memory is unbounded, so for a popular user following many other usrs this can cause Nostream to get OOM-killed

In these scenarios, we are better off using a bloom filter, and use streams to process the information so the amount of memory used is manageable

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking on a more robust stream based approach to keep memory bounded as well.

const deduped: any[] = []

for (const batch of results) {
for (const event of batch) {
if (typeof event.id === 'string' && !seen.has(event.id)) {
seen.add(event.id)
deduped.push(event)
}
}
}

return deduped
}

/**
* Run up to `concurrency` async tasks from `items` at a time.
*/
async function runConcurrent<T>(
items: T[],
concurrency: number,
task: (item: T) => Promise<void>,
): Promise<void> {
const queue = [...items]
const workers = Array.from({ length: Math.min(concurrency, queue.length) }, async () => {
while (queue.length > 0) {
const item = queue.shift()
if (item !== undefined) {
await task(item)
}
}
})
await Promise.all(workers)
}

// static service

export type RelayFetcher = (
relayUrls: string[],
filter: Record<string, unknown>,
timeoutMs: number,
) => Promise<any[]>

export class WotService implements IWotService {
private trustMap: Map<string, boolean> = new Map()
private booted = false
private building = false

/**
* @param fetcher - relay fetch function. Defaults to the real WebSocket
* implementation. Pass a stub in tests.
*/
public constructor(private readonly fetcher: RelayFetcher = fetchFromRelays) {}

public async buildGraph(settings: WoTSettings): Promise<void> {

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the amount of memory used by this function will also be quite significant, would it be a better choice to try and build the graph using PostgreSQL and use queries instead?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One option to bound the in-memory footprint without switching to PostgreSQL: cap the followerCount Map at a configurable max size (e.g. 100k entries). Once a pubkey's count reaches minimumFollowers we stop incrementing, and once the map hits the size cap we stop adding new keys entirely. At 100k entries that's ~15 MB worst case, predictable and manageable. The tradeoff is that pubkeys encountered after the cap is hit get silently excluded, but for most relay operators that's acceptable since the most-followed pubkeys in the network will naturally appear first (only considering followerCount Map because Bloom filter + streaming is being implemented to make dedup memory bounded).

That said, the PostgreSQL approach is architecturally cleaner, event_tags already has kind-3 data normalized and indexed, so a GROUP BY + COUNT query runs entirely in the DB and returns only the final trusted pubkey list to Node. Zero accumulation on the Node side and exact dedup via DISTINCT.

The one tradeoff with PostgreSQL: it can only trust pubkeys whose kind-3 events have actually been relayed through this nostream instance, whereas the external relay fetch pulls from damus.io, nostr.band etc. regardless of what's been published here.

Happy to go either direction!

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, the PostgreSQL approach is architecturally cleaner, event_tags already has kind-3 data normalized and indexed, so a GROUP BY + COUNT query runs entirely in the DB and returns only the final trusted pubkey list to Node. Zero accumulation on the Node side and exact dedup via DISTINCT.

We should go with PostgreSQL (or Redis for caching) so that we don't have potential OOM issues, introduce artificial caps (100k limit), or surprises (silently excluding a pubkey). Imagine a new pubkey who is just followed and falls outside the cap: this user will remain outside, and the operator will think the relay isn't working.

The one tradeoff with PostgreSQL: it can only trust pubkeys whose kind-3 events have actually been relayed through this nostream instance, whereas the external relay fetch pulls from damus.io, nostr.band etc. regardless of what's been published here.

If the kind-3 events from seed relays are inserted into our database this trade-off goes away. By the way, we have a static mirroring working implementation. Can it be re-used for WoT? It already connects to other relays and fetches events.

if (this.building) {
logger('build already in progress — skipping')
return
}

this.building = true
logger.info('starting WoT graph build, seed=%s relays=%o', settings.seedPubkey, settings.seedRelays)

try {
// local accumulators — never touch live state during the build
const followerCount = new Map<string, number>()
const oneHopSet = new Set<string>()

// ── Phase 1: fetch owner's follow list (1-hop) ──────────────────────────
const phase1Events = await this.fetcher(
settings.seedRelays,
{ authors: [settings.seedPubkey], kinds: [KIND_FOLLOW_LIST] },
PHASE1_TIMEOUT_MS,
)

for (const event of phase1Events) {
if (!Array.isArray(event.tags)) {
continue
}
for (const tag of event.tags) {
if (Array.isArray(tag) && tag[0] === 'p' && typeof tag[1] === 'string' && tag[1].length === 64) {
const pubkey = tag[1]
oneHopSet.add(pubkey)
followerCount.set(pubkey, (followerCount.get(pubkey) ?? 0) + 1)
}
}
}

logger.info('phase 1 complete: %d 1-hop pubkeys', oneHopSet.size)

// ── Phase 2: fetch 2-hop follow lists (batched + concurrent) ───────────
const oneHopList = Array.from(oneHopSet)
const batches: string[][] = []

for (let i = 0; i < oneHopList.length; i += PHASE2_BATCH_SIZE) {
batches.push(oneHopList.slice(i, i + PHASE2_BATCH_SIZE))
}

await runConcurrent(batches, PHASE2_CONCURRENCY, async (batch) => {
try {
const events = await this.fetcher(
settings.seedRelays,
{ authors: batch, kinds: [KIND_FOLLOW_LIST] },
PHASE2_BATCH_TIMEOUT_MS,
)

for (const event of events) {
if (!Array.isArray(event.tags)) {
continue
}
for (const tag of event.tags) {
if (Array.isArray(tag) && tag[0] === 'p' && typeof tag[1] === 'string' && tag[1].length === 64) {
const pubkey = tag[1]
followerCount.set(pubkey, (followerCount.get(pubkey) ?? 0) + 1)
}
}
}
} catch (err) {
logger.warn('wot-service: phase 2 batch failed: %o', err)
}
})

logger.info('phase 2 complete: %d unique pubkeys in follower map', followerCount.size)

// ── Phase 3: build new trust map and swap atomically ───────────────────
const newTrustMap = new Map<string, boolean>()

// owner is always trusted
newTrustMap.set(settings.seedPubkey, true)

for (const [pubkey, count] of followerCount) {
if (count >= settings.minimumFollowers) {
newTrustMap.set(pubkey, true)
}
}

// atomic swap
this.trustMap = newTrustMap
this.booted = true

logger.info('WoT graph build complete: %d trusted pubkeys', newTrustMap.size)
} catch (err) {
logger.error('wot-service: graph build failed: %o', err)
throw err
} finally {
this.building = false
}
}

public isTrusted(pubkey: string): boolean {
return this.trustMap.get(pubkey) === true
}

public isReady(): boolean {
return this.booted
}

public reset(): void {
this.trustMap = new Map()
this.booted = false
this.building = false
}
}
Loading
Loading