diff --git a/.changeset/ready-crews-fetch.md b/.changeset/ready-crews-fetch.md new file mode 100644 index 00000000..0bad8b71 --- /dev/null +++ b/.changeset/ready-crews-fetch.md @@ -0,0 +1,5 @@ +--- +"nostream": minor +--- + +Rewrite WoT service to use PostgreSQL for trust graph — eliminates unbounded in-memory accumulation by streaming kind-3 events into the DB and computing trust via SQL GROUP BY diff --git a/.changeset/solid-boats-try.md b/.changeset/solid-boats-try.md new file mode 100644 index 00000000..74a5d662 --- /dev/null +++ b/.changeset/solid-boats-try.md @@ -0,0 +1,5 @@ +--- +"nostream": minor +--- + +added wot graph service and unit test diff --git a/.knip.json b/.knip.json index 001eba2e..c12cb788 100644 --- a/.knip.json +++ b/.knip.json @@ -15,7 +15,8 @@ ], "ignore": [ ".nostr/**", - "src/repositories/invite-code-repository.ts" + "src/repositories/invite-code-repository.ts", + "src/services/wot-service.ts" ], "commitlint": false, "eslint": false, diff --git a/resources/default-settings.yaml b/resources/default-settings.yaml index 1f8948a5..f67b315b 100755 --- a/resources/default-settings.yaml +++ b/resources/default-settings.yaml @@ -73,6 +73,10 @@ wot: minimumFollowers: 1 # Hours between full trust graph rebuilds. refreshIntervalHours: 24 + # Relay URLs to fetch follow lists from when building the trust graph. + seedRelays: + - "wss://relay.damus.io" + - "wss://relay.nostr.band" network: maxPayloadSize: 524288 # Uncomment only when using a trusted reverse proxy and configuring trustedProxies. diff --git a/src/@types/services.ts b/src/@types/services.ts index 5d8ea229..5f14c4e1 100644 --- a/src/@types/services.ts +++ b/src/@types/services.ts @@ -1,5 +1,6 @@ import { Invoice } from './invoice' import { Pubkey } from './base' +import { WoTSettings } from './settings' export interface IMaintenanceService { clearOldEvents(): Promise @@ -14,3 +15,8 @@ export interface IPaymentsService { sendInvoiceUpdateNotification(invoice: Invoice): Promise getPendingInvoices(): Promise } + +export interface IWotService { + buildGraph(settings: WoTSettings): Promise + isReady(): boolean +} diff --git a/src/@types/settings.ts b/src/@types/settings.ts index a7d50ea8..75a8a0b0 100644 --- a/src/@types/settings.ts +++ b/src/@types/settings.ts @@ -283,6 +283,11 @@ export interface WoTSettings { * Defaults to 24. */ refreshIntervalHours: number + /** + * Relay URLs to fetch follow lists from when building the trust graph. + * Falls back to the relay's own URL if empty. + */ + seedRelays: string[] } export interface Nip43Settings { diff --git a/src/services/wot-service.ts b/src/services/wot-service.ts new file mode 100644 index 00000000..39dad0c0 --- /dev/null +++ b/src/services/wot-service.ts @@ -0,0 +1,241 @@ +import { RawData, WebSocket } from 'ws' +import { randomUUID } from 'crypto' + +import { isEventIdValid, isEventSignatureValid } from '../utils/event' +import { createLogger } from '../factories/logger-factory' +import { IEventRepository } from '../@types/repositories' +import { IWotService } from '../@types/services' +import { WoTSettings } from '../@types/settings' + +const logger = createLogger('wot-service') + +export const PHASE1_TIMEOUT_MS = 5_000 +export const PHASE2_BATCH_SIZE = 500 +export const PHASE2_CONCURRENCY = 5 +export const PHASE2_BATCH_TIMEOUT_MS = 30_000 + +const KIND_FOLLOW_LIST = 3 + +/** + * Open a WebSocket to `relayUrl`, send a REQ for `filter`, and yield each + * EVENT payload as it arrives. Closes on EOSE or timeout. + */ +async function* fetchEvents( + relayUrl: string, + filter: Record, + timeoutMs: number, +): AsyncGenerator { + const subId = `wot-${randomUUID().slice(0, 8)}` + + const queue: any[] = [] + let done = false + let notify: (() => void) | null = null + + const wake = () => { + const fn = notify + notify = null + fn?.() + } + + const finish = () => { + if (done) { + return + } + done = true + clearTimeout(timer) + try { ws.close() } catch { /* ignore */ } + wake() + } + + const timer = setTimeout(finish, timeoutMs) + + let ws: WebSocket + try { + ws = new WebSocket(relayUrl, { timeout: timeoutMs }) + } catch (err) { + logger.warn('wot-service: could not create WebSocket to %s: %o', relayUrl, err) + clearTimeout(timer) + return + } + + ws.on('open', () => { + ws.send(JSON.stringify(['REQ', subId, filter])) + }) + + ws.on('message', (raw: RawData) => { + try { + const msg = JSON.parse(raw.toString('utf8')) + if (!Array.isArray(msg)) { + return + } + + if (msg[0] === 'EVENT' && msg[1] === subId && msg[2]) { + queue.push(msg[2]) + wake() + } else if (msg[0] === 'EOSE' && msg[1] === subId) { + finish() + } + } catch { /* malformed message — ignore */ } + }) + + ws.on('error', (err) => { + logger.warn('wot-service: WebSocket error for %s: %o', relayUrl, err) + finish() + }) + + ws.on('close', finish) + + while (true) { + while (queue.length > 0) { + yield queue.shift() + } + if (done) { + break + } + await new Promise((resolve) => { notify = resolve }) + } +} + +/** + * Run up to `concurrency` async tasks from `items` at a time. + */ +async function runConcurrent( + items: T[], + concurrency: number, + task: (item: T) => Promise, +): Promise { + const queue = [...items] + const workers = Array.from({ length: Math.min(concurrency, queue.length) }, async () => { + while (queue.length > 0) { + const item = queue.shift() + if (item !== undefined) { + await task(item) + } + } + }) + await Promise.all(workers) +} + +export type RelayFetcher = ( + relayUrl: string, + filter: Record, + timeoutMs: number, +) => AsyncGenerator + +export class WotService implements IWotService { + private booted = false + private building = false + + public constructor( + private readonly eventRepository: IEventRepository, + private readonly fetcher: RelayFetcher = fetchEvents, + ) {} + + public async buildGraph(settings: WoTSettings): Promise { + if (this.building) { + logger('build already in progress — skipping') + return [] + } + + this.building = true + logger.info('starting WoT graph build, seed=%s relays=%o', settings.seedPubkey, settings.seedRelays) + + try { + // ── Phase 1: fetch and store seed's kind-3 follow list ────────────────── + for (const relayUrl of settings.seedRelays) { + for await (const event of this.fetcher( + relayUrl, + { authors: [settings.seedPubkey], kinds: [KIND_FOLLOW_LIST] }, + PHASE1_TIMEOUT_MS, + )) { + if (!(await isEventIdValid(event)) || !(await isEventSignatureValid(event))) { + continue + } + await this.eventRepository.upsert(event) + } + } + + logger.info('phase 1 complete: seed kind-3 events stored') + + // ── Phase 1B: read 1-hop pubkeys from DB ──────────────────────────────── + const oneHopRows = await (this.eventRepository as any).masterDbClient('event_tags') + .select('tag_value') + .whereIn( + 'event_id', + (this.eventRepository as any).masterDbClient('events') + .select('event_id') + .where('event_pubkey', Buffer.from(settings.seedPubkey, 'hex')) + .where('event_kind', KIND_FOLLOW_LIST), + ) + .where('tag_name', 'p') + + const oneHopPubkeys: string[] = oneHopRows.map((r: any) => r.tag_value as string) + + logger.info('phase 1B complete: %d 1-hop pubkeys', oneHopPubkeys.length) + + if (oneHopPubkeys.length === 0) { + this.booted = true + return [settings.seedPubkey] + } + + // ── Phase 2: fetch and store 1-hop kind-3 events (batched) ────────────── + const batches: string[][] = [] + for (let i = 0; i < oneHopPubkeys.length; i += PHASE2_BATCH_SIZE) { + batches.push(oneHopPubkeys.slice(i, i + PHASE2_BATCH_SIZE)) + } + + await runConcurrent(batches, PHASE2_CONCURRENCY, async (batch) => { + for (const relayUrl of settings.seedRelays) { + try { + for await (const event of this.fetcher( + relayUrl, + { authors: batch, kinds: [KIND_FOLLOW_LIST] }, + PHASE2_BATCH_TIMEOUT_MS, + )) { + if (!(await isEventIdValid(event)) || !(await isEventSignatureValid(event))) { + continue + } + await this.eventRepository.upsert(event) + } + } catch (err) { + logger.warn('wot-service: phase 2 batch failed for %s: %o', relayUrl, err) + } + } + }) + + logger.info('phase 2 complete: 1-hop kind-3 events stored') + + // ── Phase 3: SQL trust query ───────────────────────────────────────────── + const trustedRows = await (this.eventRepository as any).masterDbClient('event_tags') + .select('tag_value') + .whereIn( + 'event_id', + (this.eventRepository as any).masterDbClient('events') + .select('event_id') + .whereIn('event_pubkey', oneHopPubkeys.map((pk) => Buffer.from(pk, 'hex'))) + .where('event_kind', KIND_FOLLOW_LIST), + ) + .where('tag_name', 'p') + .groupBy('tag_value') + .havingRaw('COUNT(*) >= ?', [settings.minimumFollowers]) + + const trustedSet = new Set(trustedRows.map((r: any) => r.tag_value as string)) + trustedSet.add(settings.seedPubkey) + const trustedPubkeys = Array.from(trustedSet) + + this.booted = true + logger.info('WoT graph build complete: %d trusted pubkeys', trustedPubkeys.length) + + return trustedPubkeys + } catch (err) { + logger.error('wot-service: graph build failed: %o', err) + throw err + } finally { + this.building = false + } + } + + public isReady(): boolean { + return this.booted + } +} diff --git a/test/unit/services/wot-service.spec.ts b/test/unit/services/wot-service.spec.ts new file mode 100644 index 00000000..15c0044e --- /dev/null +++ b/test/unit/services/wot-service.spec.ts @@ -0,0 +1,478 @@ +import chai from 'chai' +import chaiAsPromised from 'chai-as-promised' +import Sinon from 'sinon' +import sinonChai from 'sinon-chai' + +import * as eventUtils from '../../../src/utils/event' +import { WoTSettings } from '../../../src/@types/settings' +import { + WotService, + RelayFetcher, + PHASE1_TIMEOUT_MS, + PHASE2_BATCH_SIZE, + PHASE2_CONCURRENCY, + PHASE2_BATCH_TIMEOUT_MS, +} from '../../../src/services/wot-service' + +chai.use(sinonChai) +chai.use(chaiAsPromised) + +const { expect } = chai + +// ── fixtures ────────────────────────────────────────────────────────────────── + +const SEED_PUBKEY = 'a'.repeat(64) +const HOP1_PUBKEY = 'b'.repeat(64) +const HOP2_PUBKEY = 'c'.repeat(64) +const OTHER_PUBKEY = 'd'.repeat(64) + +const RELAY_A = 'wss://relay-a.example.com' +const RELAY_B = 'wss://relay-b.example.com' + +const baseSettings: WoTSettings = { + enabled: true, + seedPubkey: SEED_PUBKEY, + minimumFollowers: 1, + refreshIntervalHours: 24, + seedRelays: [RELAY_A], +} + +function makeEvent(id: string, pubkey: string, follows: string[]): object { + return { + id, + pubkey, + kind: 3, + tags: follows.map((pk) => ['p', pk]), + content: '', + created_at: 1_700_000_000, + sig: 'f'.repeat(128), + } +} + +async function* toStream(events: any[]): AsyncGenerator { + for (const e of events) { yield e } +} + +function hangingStream(): { gen: AsyncGenerator; release: () => void } { + let release!: () => void + const gate = new Promise((resolve) => { release = resolve }) + async function* gen(): AsyncGenerator { await gate } + return { gen: gen(), release } +} + +// ── repository stub ─────────────────────────────────────────────────────────── +// +// WotService accesses masterDbClient directly for Phase 1B and Phase 3 queries. +// Each phase makes two calls to clientFn (outer query + inner subselect), so: +// calls 1-2 → oneHopChain (Phase 1B) +// calls 3-4 → trustedChain (Phase 3) +// +function makeRepoStub( + sandbox: Sinon.SinonSandbox, + opts: { oneHopRows?: any[]; trustedRows?: any[] } = {}, +) { + const { oneHopRows = [], trustedRows = [] } = opts + const upsert = sandbox.stub().resolves(1) + + const makeChain = (rows: any[]) => { + const chain: any = {} + ;['select', 'whereIn', 'where', 'groupBy', 'havingRaw'].forEach((m) => { + chain[m] = () => chain + }) + chain.then = (ok: any, fail: any) => Promise.resolve(rows).then(ok, fail) + return chain + } + + const oneHopChain = makeChain(oneHopRows) + const trustedChain = makeChain(trustedRows) + + const clientStub = sandbox.stub() + clientStub.onCall(0).returns(oneHopChain) // Phase 1B outer + clientStub.onCall(1).returns(oneHopChain) // Phase 1B subselect + clientStub.onCall(2).returns(trustedChain) // Phase 3 outer + clientStub.onCall(3).returns(trustedChain) // Phase 3 subselect + + // knex exposes `.raw()` — add it as a plain property on the stub function + const clientFn = Object.assign(clientStub, { raw: () => ({}) }) + + return { upsert, masterDbClient: clientFn } +} + +// ── suite ───────────────────────────────────────────────────────────────────── + +describe('WotService', () => { + let sandbox: Sinon.SinonSandbox + let fetcher: Sinon.SinonStub + let repo: ReturnType + let service: WotService + + beforeEach(() => { + sandbox = Sinon.createSandbox() + + // Default: all events pass validation + sandbox.stub(eventUtils, 'isEventIdValid').resolves(true) + sandbox.stub(eventUtils, 'isEventSignatureValid').resolves(true) + + // Default fetcher: empty stream + fetcher = sandbox.stub, ReturnType>() + .callsFake(() => toStream([])) + + repo = makeRepoStub(sandbox) + service = new WotService(repo as any, fetcher) + }) + + afterEach(() => { + sandbox.restore() + }) + + // ── initial state ────────────────────────────────────────────────────────── + + describe('before any build', () => { + it('isReady() returns false', () => { + expect(service.isReady()).to.equal(false) + }) + }) + + // ── fetch parameters ─────────────────────────────────────────────────────── + + describe('buildGraph() — fetch parameters', () => { + it('phase 1: calls fetcher with the seed relay URL', async () => { + await service.buildGraph(baseSettings) + + expect(fetcher.firstCall.args[0]).to.equal(RELAY_A) + }) + + it('phase 1: calls fetcher with correct kind-3 filter for the seed', async () => { + await service.buildGraph(baseSettings) + + const filter = fetcher.firstCall.args[1] + expect(filter).to.deep.include({ kinds: [3], authors: [SEED_PUBKEY] }) + }) + + it('phase 1: calls fetcher with PHASE1_TIMEOUT_MS', async () => { + await service.buildGraph(baseSettings) + + expect(fetcher.firstCall.args[2]).to.equal(PHASE1_TIMEOUT_MS) + }) + + it('phase 1: fans out to every configured seed relay', async () => { + const settings = { ...baseSettings, seedRelays: [RELAY_A, RELAY_B] } + await service.buildGraph(settings) + + const calledUrls = Array.from({ length: fetcher.callCount }, (_, i) => fetcher.getCall(i).args[0]) + expect(calledUrls).to.include(RELAY_A) + expect(calledUrls).to.include(RELAY_B) + }) + + it('phase 2: calls fetcher with PHASE2_BATCH_TIMEOUT_MS for batch fetches', async () => { + repo = makeRepoStub(sandbox, { oneHopRows: [{ tag_value: HOP1_PUBKEY }] }) + service = new WotService(repo as any, fetcher) + + await service.buildGraph(baseSettings) + + // phase 1 call uses PHASE1_TIMEOUT_MS; phase 2 calls use PHASE2_BATCH_TIMEOUT_MS + const phase2Call = fetcher.getCalls().find((c) => c.args[2] === PHASE2_BATCH_TIMEOUT_MS) + expect(phase2Call).to.not.be.undefined + }) + }) + + // ── validation and storage ───────────────────────────────────────────────── + + describe('buildGraph() — validation and storage', () => { + it('upserts events that pass both validation checks', async () => { + fetcher.onFirstCall().callsFake(() => toStream([ + makeEvent('evt1', SEED_PUBKEY, [HOP1_PUBKEY]), + ])) + + await service.buildGraph(baseSettings) + + expect(repo.upsert.callCount).to.equal(1) + }) + + it('skips events where isEventIdValid returns false', async () => { + ;(eventUtils.isEventIdValid as Sinon.SinonStub).resolves(false) + fetcher.onFirstCall().callsFake(() => toStream([ + makeEvent('evt1', SEED_PUBKEY, [HOP1_PUBKEY]), + ])) + + await service.buildGraph(baseSettings) + + expect(repo.upsert.callCount).to.equal(0) + }) + + it('skips events where isEventSignatureValid returns false', async () => { + ;(eventUtils.isEventSignatureValid as Sinon.SinonStub).resolves(false) + fetcher.onFirstCall().callsFake(() => toStream([ + makeEvent('evt1', SEED_PUBKEY, [HOP1_PUBKEY]), + ])) + + await service.buildGraph(baseSettings) + + expect(repo.upsert.callCount).to.equal(0) + }) + + it('upserts valid events and skips invalid ones in the same stream', async () => { + const isIdValid = eventUtils.isEventIdValid as Sinon.SinonStub + // first event: valid, second event: invalid id + isIdValid.onFirstCall().resolves(true) + isIdValid.onSecondCall().resolves(false) + + fetcher.onFirstCall().callsFake(() => toStream([ + makeEvent('evt1', SEED_PUBKEY, [HOP1_PUBKEY]), + makeEvent('evt2', SEED_PUBKEY, [HOP2_PUBKEY]), + ])) + + await service.buildGraph(baseSettings) + + expect(repo.upsert.callCount).to.equal(1) + }) + }) + + // ── batching ─────────────────────────────────────────────────────────────── + + describe('buildGraph() — batching', () => { + it('splits 1-hop pubkeys into batches of PHASE2_BATCH_SIZE', async () => { + // produce PHASE2_BATCH_SIZE + 1 one-hop pubkeys so we expect 2 batches + const manyPubkeys = Array.from({ length: PHASE2_BATCH_SIZE + 1 }, (_, i) => + i.toString(16).padStart(64, '0'), + ) + repo = makeRepoStub(sandbox, { oneHopRows: manyPubkeys.map((pk) => ({ tag_value: pk })) }) + service = new WotService(repo as any, fetcher) + + await service.buildGraph(baseSettings) + + // phase 1: 1 call, phase 2: 2 batches × 1 relay = 2 calls → total 3 + expect(fetcher.callCount).to.equal(3) + }) + + it('each phase 2 batch carries at most PHASE2_BATCH_SIZE authors', async () => { + const manyPubkeys = Array.from({ length: PHASE2_BATCH_SIZE + 1 }, (_, i) => + i.toString(16).padStart(64, '0'), + ) + repo = makeRepoStub(sandbox, { oneHopRows: manyPubkeys.map((pk) => ({ tag_value: pk })) }) + service = new WotService(repo as any, fetcher) + + await service.buildGraph(baseSettings) + + const phase2Calls = fetcher.getCalls().filter((c) => c.args[2] === PHASE2_BATCH_TIMEOUT_MS) + for (const call of phase2Calls) { + expect(call.args[1].authors.length).to.be.at.most(PHASE2_BATCH_SIZE) + } + }) + + it('does not exceed PHASE2_CONCURRENCY simultaneous fetches', async () => { + // Enough batches to exceed the concurrency limit + const batchCount = PHASE2_CONCURRENCY + 2 + const pubkeys = Array.from({ length: PHASE2_BATCH_SIZE * batchCount }, (_, i) => + i.toString(16).padStart(64, '0'), + ) + repo = makeRepoStub(sandbox, { oneHopRows: pubkeys.map((pk) => ({ tag_value: pk })) }) + service = new WotService(repo as any, fetcher) + + // Promise barrier — all workers park here until we release them. + // When waitingCount peaks, that IS the max concurrency — no timing needed. + let waitingCount = 0 + let peakWaiting = 0 + let releaseAll!: () => void + const barrier = new Promise((resolve) => { releaseAll = resolve }) + + fetcher.callsFake((url: string, filter: any, timeout: number) => { + if (timeout !== PHASE2_BATCH_TIMEOUT_MS) { + return toStream([]) + } + return (async function* () { + waitingCount++ + peakWaiting = Math.max(peakWaiting, waitingCount) + await barrier // park until releaseAll() is called + waitingCount-- + })() + }) + + // Start buildGraph but don't await — workers are now queued up at the barrier + const buildPromise = service.buildGraph(baseSettings) + + // Yield to the event loop until PHASE2_CONCURRENCY workers have parked + // (runConcurrent starts exactly min(concurrency, batches) workers upfront) + await new Promise((resolve) => { + const poll = () => { + if (waitingCount >= Math.min(PHASE2_CONCURRENCY, batchCount)) { + resolve() + } else { + setImmediate(poll) + } + } + poll() + }) + + // At this point all initially-scheduled workers are parked — record peak + const observedPeak = peakWaiting + + // Release the barrier so buildGraph can complete + releaseAll() + await buildPromise + + expect(observedPeak).to.be.at.most(PHASE2_CONCURRENCY) + expect(observedPeak).to.equal(Math.min(PHASE2_CONCURRENCY, batchCount)) + }) + }) + + // ── phase 2 error handling ───────────────────────────────────────────────── + + describe('buildGraph() — phase 2 error handling', () => { + it('continues build if one relay throws during a batch', async () => { + const settings = { ...baseSettings, seedRelays: [RELAY_A, RELAY_B] } + repo = makeRepoStub(sandbox, { oneHopRows: [{ tag_value: HOP1_PUBKEY }] }) + service = new WotService(repo as any, fetcher) + + // RELAY_B phase 2 batch throws mid-iteration + fetcher.callsFake((url: string, filter: any, timeout: number) => { + if (url === RELAY_B && timeout === PHASE2_BATCH_TIMEOUT_MS) { + return (async function* () { + yield makeEvent('evt-err', HOP1_PUBKEY, []) // yield first so we enter the for-await + throw new Error('relay down') + })() + } + return toStream([]) + }) + + await expect(service.buildGraph(settings)).to.eventually.be.fulfilled + expect(service.isReady()).to.equal(true) + }) + + it('completes build even if all relays fail for a batch', async () => { + repo = makeRepoStub(sandbox, { oneHopRows: [{ tag_value: HOP1_PUBKEY }] }) + service = new WotService(repo as any, fetcher) + + fetcher.callsFake((url: string, filter: any, timeout: number) => { + if (timeout === PHASE2_BATCH_TIMEOUT_MS) { + return (async function* () { + yield makeEvent('evt-err', HOP1_PUBKEY, []) + throw new Error('all relays down') + })() + } + return toStream([]) + }) + + await expect(service.buildGraph(baseSettings)).to.eventually.be.fulfilled + expect(service.isReady()).to.equal(true) + }) + }) + + // ── early return path ────────────────────────────────────────────────────── + + describe('buildGraph() — early return path', () => { + it('returns [seedPubkey] when seed has no 1-hop follows', async () => { + // oneHopRows defaults to [] in makeRepoStub + const result = await service.buildGraph(baseSettings) + expect(result).to.deep.equal([SEED_PUBKEY]) + }) + + it('sets isReady() true even on early return', async () => { + await service.buildGraph(baseSettings) + expect(service.isReady()).to.equal(true) + }) + + it('does not call fetcher for phase 2 when seed has no follows', async () => { + await service.buildGraph(baseSettings) + + const phase2Calls = fetcher.getCalls().filter((c) => c.args[2] === PHASE2_BATCH_TIMEOUT_MS) + expect(phase2Calls).to.have.length(0) + }) + }) + + // ── SQL trust contract ───────────────────────────────────────────────────── + + describe('buildGraph() — SQL trust contract', () => { + it('always includes seedPubkey in result', async () => { + repo = makeRepoStub(sandbox, { + oneHopRows: [{ tag_value: HOP1_PUBKEY }], + trustedRows: [], + }) + service = new WotService(repo as any, fetcher) + + const result = await service.buildGraph(baseSettings) + expect(result).to.include(SEED_PUBKEY) + }) + + it('includes pubkeys returned by the SQL trust query', async () => { + repo = makeRepoStub(sandbox, { + oneHopRows: [{ tag_value: HOP1_PUBKEY }], + trustedRows: [{ tag_value: HOP2_PUBKEY }], + }) + service = new WotService(repo as any, fetcher) + + const result = await service.buildGraph(baseSettings) + expect(result).to.include(HOP2_PUBKEY) + }) + + it('does not include pubkeys absent from the SQL trust query result', async () => { + repo = makeRepoStub(sandbox, { + oneHopRows: [{ tag_value: HOP1_PUBKEY }], + trustedRows: [{ tag_value: HOP2_PUBKEY }], + }) + service = new WotService(repo as any, fetcher) + + const result = await service.buildGraph(baseSettings) + expect(result).to.not.include(OTHER_PUBKEY) + }) + + it('deduplicates seedPubkey if it also appears in SQL results', async () => { + repo = makeRepoStub(sandbox, { + oneHopRows: [{ tag_value: HOP1_PUBKEY }], + // seed appears in trusted rows (e.g. someone follows them back) + trustedRows: [{ tag_value: SEED_PUBKEY }, { tag_value: HOP2_PUBKEY }], + }) + service = new WotService(repo as any, fetcher) + + const result = await service.buildGraph(baseSettings) + const seedCount = result.filter((pk) => pk === SEED_PUBKEY).length + expect(seedCount).to.equal(1) + }) + }) + + // ── state transitions ────────────────────────────────────────────────────── + + describe('buildGraph() — state transitions', () => { + it('sets isReady() to true after a successful build', async () => { + await service.buildGraph(baseSettings) + expect(service.isReady()).to.equal(true) + }) + + it('returns [] immediately when a build is already in flight', async () => { + const { gen, release } = hangingStream() + fetcher.onFirstCall().returns(gen) + + const first = service.buildGraph(baseSettings) + const second = await service.buildGraph(baseSettings) + + expect(second).to.deep.equal([]) + expect(fetcher.callCount).to.equal(1) + + release() + await first + }) + + it('isReady() remains false after a thrown error', async () => { + fetcher.callsFake(async function* () { + throw new Error('boom') + }) + + await expect(service.buildGraph(baseSettings)).to.eventually.be.rejected + expect(service.isReady()).to.equal(false) + }) + + it('building flag resets after error — subsequent build can run', async () => { + fetcher.onFirstCall().callsFake(async function* () { + throw new Error('first attempt failed') + }) + + await expect(service.buildGraph(baseSettings)).to.eventually.be.rejected + + // reset fetcher to empty stream for second attempt + fetcher.callsFake(() => toStream([])) + + await expect(service.buildGraph(baseSettings)).to.eventually.be.fulfilled + expect(service.isReady()).to.equal(true) + }) + }) +}) diff --git a/test/unit/utils/settings.spec.ts b/test/unit/utils/settings.spec.ts index dff99e6a..b2536589 100644 --- a/test/unit/utils/settings.spec.ts +++ b/test/unit/utils/settings.spec.ts @@ -270,6 +270,8 @@ describe('SettingsStatic', () => { expect(defaults).to.have.nested.property('wot.seedPubkey', '') expect(defaults).to.have.nested.property('wot.minimumFollowers', 1) expect(defaults).to.have.nested.property('wot.refreshIntervalHours', 24) + expect(defaults).to.have.nested.property('wot.seedRelays') + expect((defaults as any).wot.seedRelays).to.be.an('array').with.length.greaterThan(0) }) it('user config wot block overrides defaults', () => { @@ -283,6 +285,16 @@ describe('SettingsStatic', () => { expect(merged.wot?.minimumFollowers).to.equal(3) // non-overridden fields stay as defaults expect(merged.wot?.refreshIntervalHours).to.equal(24) + expect(merged.wot?.seedRelays).to.be.an('array').with.length.greaterThan(0) + }) + + it('user config seedRelays overrides default relay list', () => { + const defaults = SettingsStatic.loadAndParseYamlFile( + SettingsStatic.getDefaultSettingsFilePath() + ) + const userConfig = { wot: { seedRelays: ['wss://my-relay.com'] } } + const merged = mergeDeepRight(defaults, userConfig) as Settings + expect(merged.wot?.seedRelays).to.deep.equal(['wss://my-relay.com']) }) }) })