From a831210f29fe6d202ae3a031c884db329b031103 Mon Sep 17 00:00:00 2001 From: benthecarman Date: Tue, 30 Jun 2026 16:39:47 -0500 Subject: [PATCH] Add forwarded payment tracking and statistics Routing nodes and LSPs need to track forwarded payments so they can account for earned fees and profitability over time. Store forwarded payment data and aggregate it into channel and channel-pair statistics. Detailed tracking retains individual forwarding events for a configured window before aggregating them by channel pair. Stats mode keeps only per-channel forwarding totals for lightweight nodes. AI-assisted-by: OpenAI Codex --- bindings/ldk_node.udl | 36 +++ src/builder.rs | 114 ++++++-- src/config.rs | 27 +- src/data_store.rs | 63 ++++- src/event.rs | 163 ++++++++++- src/ffi/types.rs | 46 +++- src/io/mod.rs | 14 + src/lib.rs | 244 ++++++++++++++++- src/payment/mod.rs | 5 +- src/payment/store.rs | 624 +++++++++++++++++++++++++++++++++++++++++- src/types.rs | 7 + 11 files changed, 1307 insertions(+), 36 deletions(-) diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 7c0edc535..e79912e06 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -11,6 +11,8 @@ typedef dictionary ElectrumSyncConfig; typedef dictionary TorConfig; +typedef enum ForwardedPaymentTrackingMode; + typedef interface NodeEntropy; typedef enum WordCount; @@ -137,6 +139,19 @@ interface Node { void remove_payment([ByRef]PaymentId payment_id); BalanceDetails list_balances(); sequence list_payments(); + ForwardedPaymentDetails? forwarded_payment([ByRef]ForwardedPaymentId forwarded_payment_id); + [Throws=NodeError] + ForwardedPaymentDetailsPage list_forwarded_payments(PageToken? page_token); + ForwardedPaymentTrackingMode forwarded_payment_tracking_mode(); + ChannelForwardingStats? channel_forwarding_stats([ByRef]ChannelId channel_id); + [Throws=NodeError] + ChannelForwardingStatsPage list_channel_forwarding_stats(PageToken? page_token); + [Throws=NodeError] + ChannelPairForwardingStatsPage list_channel_pair_forwarding_stats(PageToken? page_token); + [Throws=NodeError] + ChannelPairForwardingStatsPage list_channel_pair_forwarding_stats_in_range(u64 start_timestamp, u64 end_timestamp, PageToken? page_token); + [Throws=NodeError] + ChannelPairForwardingStatsPage list_channel_pair_forwarding_stats_for_pair(ChannelId prev_channel_id, ChannelId next_channel_id, PageToken? page_token); sequence list_peers(); sequence list_channels(); NetworkGraph network_graph(); @@ -379,6 +394,9 @@ typedef string OfferId; [Custom] typedef string PaymentId; +[Custom] +typedef string ForwardedPaymentId; + [Custom] typedef string PaymentHash; @@ -391,6 +409,12 @@ typedef string PaymentSecret; [Custom] typedef string ChannelId; +[Custom] +typedef string ChannelPairStatsId; + +[Custom] +typedef string PageToken; + [Custom] typedef string UserChannelId; @@ -417,3 +441,15 @@ typedef enum Event; typedef interface HRNResolverConfig; typedef dictionary HumanReadableNamesConfig; + +typedef dictionary ForwardedPaymentDetails; + +typedef dictionary ChannelForwardingStats; + +typedef dictionary ChannelPairForwardingStats; + +typedef dictionary ForwardedPaymentDetailsPage; + +typedef dictionary ChannelForwardingStatsPage; + +typedef dictionary ChannelPairForwardingStatsPage; diff --git a/src/builder.rs b/src/builder.rs index 8b575cc3f..27bc7b35c 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -64,7 +64,13 @@ use crate::io::utils::{ }; use crate::io::vss_store::VssStoreBuilder; use crate::io::{ - self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + self, CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE, + FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, }; @@ -77,7 +83,8 @@ use crate::peer_store::PeerStore; use crate::runtime::{Runtime, RuntimeSpawner}; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ - AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreRef, DynStoreWrapper, + AsyncPersister, ChainMonitor, ChannelForwardingStatsStore, ChannelManager, + ChannelPairForwardingStatsStore, DynStore, DynStoreRef, DynStoreWrapper, ForwardedPaymentStore, GossipSync, Graph, HRNResolver, KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, PendingPaymentStore, }; @@ -1397,24 +1404,48 @@ fn build_with_store_internal( let kv_store_ref = Arc::clone(&kv_store); let logger_ref = Arc::clone(&logger); - let (payment_store_res, node_metris_res, pending_payment_store_res) = - runtime.block_on(async move { - tokio::join!( - read_all_objects( - &*kv_store_ref, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - Arc::clone(&logger_ref), - ), - read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)), - read_all_objects( - &*kv_store_ref, - PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - Arc::clone(&logger_ref), - ) + let ( + payment_store_res, + forwarded_payment_store_res, + channel_forwarding_stats_res, + channel_pair_forwarding_stats_res, + node_metris_res, + pending_payment_store_res, + ) = runtime.block_on(async move { + tokio::join!( + read_all_objects( + &*kv_store_ref, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + Arc::clone(&logger_ref), + ), + read_all_objects( + &*kv_store_ref, + FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + Arc::clone(&logger_ref), + ), + read_all_objects( + &*kv_store_ref, + CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE, + Arc::clone(&logger_ref), + ), + read_all_objects( + &*kv_store_ref, + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE, + Arc::clone(&logger_ref), + ), + read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)), + read_all_objects( + &*kv_store_ref, + PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + Arc::clone(&logger_ref), ) - }); + ) + }); // Initialize the status fields. let node_metrics = match node_metris_res { @@ -1443,6 +1474,48 @@ fn build_with_store_internal( }, }; + let forwarded_payment_store = match forwarded_payment_store_res { + Ok(forwarded_payments) => Arc::new(ForwardedPaymentStore::new( + forwarded_payments, + FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read forwarded payment data from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; + + let channel_forwarding_stats_store = match channel_forwarding_stats_res { + Ok(stats) => Arc::new(ChannelForwardingStatsStore::new( + stats, + CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read channel forwarding stats from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; + + let channel_pair_forwarding_stats_store = match channel_pair_forwarding_stats_res { + Ok(stats) => Arc::new(ChannelPairForwardingStatsStore::new( + stats, + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read channel pair forwarding stats from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; + let (chain_source, chain_tip_opt) = match chain_data_source_config { Some(ChainDataSourceConfig::Esplora { server_url, headers, sync_config }) => { let sync_config = sync_config.unwrap_or(EsploraSyncConfig::default()); @@ -2245,6 +2318,9 @@ fn build_with_store_internal( scorer, peer_store, payment_store, + forwarded_payment_store, + channel_forwarding_stats_store, + channel_pair_forwarding_stats_store, lnurl_auth, is_running, node_metrics, diff --git a/src/config.rs b/src/config.rs index ad1b91181..a043c3bd2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -111,6 +111,25 @@ pub(crate) const HRN_RESOLUTION_TIMEOUT_SECS: u64 = 5; // The timeout after which we abort an LNURL-auth operation. pub(crate) const LNURL_AUTH_TIMEOUT_SECS: u64 = 15; +/// The mode used for tracking forwarded payments. +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))] +pub enum ForwardedPaymentTrackingMode { + /// Store individual forwarded payments until they are aggregated into channel-pair buckets. + Detailed { + /// Number of minutes to retain individual forwarded payments before aggregation. + retention_minutes: u64, + }, + /// Track only per-channel aggregate statistics. + Stats, +} + +impl Default for ForwardedPaymentTrackingMode { + fn default() -> Self { + Self::Stats + } +} + #[derive(Debug, Clone)] #[cfg_attr(feature = "uniffi", derive(uniffi::Record))] /// Represents the configuration of an [`Node`] instance. @@ -130,9 +149,10 @@ pub(crate) const LNURL_AUTH_TIMEOUT_SECS: u64 = 15; /// | `route_parameters` | None | /// | `tor_config` | None | /// | `hrn_config` | HumanReadableNamesConfig::default() | +/// | `forwarded_payment_tracking_mode` | Stats | /// -/// See [`AnchorChannelsConfig`] and [`RouteParametersConfig`] for more information regarding their -/// respective default values. +/// See [`AnchorChannelsConfig`], [`RouteParametersConfig`], and +/// [`ForwardedPaymentTrackingMode`] for more information regarding their respective default values. /// /// [`Node`]: crate::Node pub struct Config { @@ -205,6 +225,8 @@ pub struct Config { /// /// [BIP 353]: https://github.com/bitcoin/bips/blob/master/bip-0353.mediawiki pub hrn_config: HumanReadableNamesConfig, + /// The mode used for tracking forwarded payments. + pub forwarded_payment_tracking_mode: ForwardedPaymentTrackingMode, } impl Default for Config { @@ -221,6 +243,7 @@ impl Default for Config { route_parameters: None, node_alias: None, hrn_config: HumanReadableNamesConfig::default(), + forwarded_payment_tracking_mode: ForwardedPaymentTrackingMode::default(), } } } diff --git a/src/data_store.rs b/src/data_store.rs index 13afeca7e..deb92d324 100644 --- a/src/data_store.rs +++ b/src/data_store.rs @@ -9,11 +9,11 @@ use std::collections::HashMap; use std::ops::Deref; use std::sync::{Arc, Mutex}; -use lightning::util::persist::KVStore; +use lightning::util::persist::{KVStore, PageToken, PaginatedKVStore}; use lightning::util::ser::{Readable, Writeable}; use crate::logger::{log_error, LdkLogger}; -use crate::types::DynStore; +use crate::types::{DynStore, DynStoreRef}; use crate::Error; pub(crate) trait StorableObject: Clone + Readable + Writeable { @@ -179,6 +179,65 @@ where self.objects.lock().expect("lock").values().filter(f).cloned().collect::>() } + pub(crate) async fn list_page( + &self, page_token: Option, + ) -> Result<(Vec, Option), Error> { + let response = PaginatedKVStore::list_paginated( + &DynStoreRef(Arc::clone(&self.kv_store)), + &self.primary_namespace, + &self.secondary_namespace, + page_token, + ) + .await + .map_err(|e| { + log_error!( + self.logger, + "Listing object data under {}/{} failed due to: {}", + &self.primary_namespace, + &self.secondary_namespace, + e + ); + Error::PersistenceFailed + })?; + + let mut objects = Vec::with_capacity(response.keys.len()); + for key in response.keys { + let data = KVStore::read( + &DynStoreRef(Arc::clone(&self.kv_store)), + &self.primary_namespace, + &self.secondary_namespace, + &key, + ) + .await + .map_err(|e| { + log_error!( + self.logger, + "Reading object data for key {}/{}/{} failed due to: {}", + &self.primary_namespace, + &self.secondary_namespace, + key, + e + ); + Error::PersistenceFailed + })?; + + let object = SO::read(&mut &data[..]).map_err(|e| { + log_error!( + self.logger, + "Failed to deserialize object data for key {}/{}/{}: {}", + &self.primary_namespace, + &self.secondary_namespace, + key, + e + ); + Error::PersistenceFailed + })?; + objects.push(object); + } + + Ok((objects, response.next_page_token)) + } + async fn persist(&self, object: &SO) -> Result<(), Error> { let (store_key, data) = Self::encode_object(object); self.persist_encoded(store_key, data).await diff --git a/src/event.rs b/src/event.rs index 93d274ff7..1a208c21c 100644 --- a/src/event.rs +++ b/src/event.rs @@ -7,9 +7,10 @@ use core::future::Future; use core::task::{Poll, Waker}; -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::ops::Deref; use std::sync::{Arc, Mutex}; +use std::time::{SystemTime, UNIX_EPOCH}; use bitcoin::blockdata::locktime::absolute::LockTime; use bitcoin::secp256k1::PublicKey; @@ -33,7 +34,7 @@ use lightning::{impl_writeable_tlv_based, impl_writeable_tlv_based_enum}; use lightning_liquidity::lsps2::utils::compute_opening_fee; use lightning_types::payment::{PaymentHash, PaymentPreimage}; -use crate::config::{may_announce_channel, Config}; +use crate::config::{may_announce_channel, Config, ForwardedPaymentTrackingMode}; use crate::connection::ConnectionManager; use crate::data_store::DataStoreUpdateResult; use crate::fee_estimator::ConfirmationTarget; @@ -48,12 +49,14 @@ use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox; use crate::payment::asynchronous::static_invoice_store::StaticInvoiceStore; use crate::payment::store::{ - PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus, + ChannelForwardingStats, ForwardedPaymentDetails, ForwardedPaymentId, PaymentDetails, + PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus, }; use crate::payment::PaymentMetadata; use crate::runtime::Runtime; use crate::types::{ - CustomTlvRecord, DynStore, KeysManager, OnionMessenger, PaymentStore, Sweeper, Wallet, + ChannelForwardingStatsStore, CustomTlvRecord, DynStore, ForwardedPaymentStore, KeysManager, + OnionMessenger, PaymentStore, Sweeper, Wallet, }; use crate::{ hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore, @@ -535,6 +538,8 @@ where network_graph: Arc, liquidity_source: Arc>>, payment_store: Arc, + forwarded_payment_store: Arc, + channel_forwarding_stats_store: Arc, peer_store: Arc>, keys_manager: Arc, runtime: Arc, @@ -555,6 +560,8 @@ where channel_manager: Arc, connection_manager: Arc>, output_sweeper: Arc, network_graph: Arc, liquidity_source: Arc>>, payment_store: Arc, + forwarded_payment_store: Arc, + channel_forwarding_stats_store: Arc, peer_store: Arc>, keys_manager: Arc, static_invoice_store: Option, onion_messenger: Arc, om_mailbox: Option>, runtime: Arc, logger: L, @@ -570,6 +577,8 @@ where network_graph, liquidity_source, payment_store, + forwarded_payment_store, + channel_forwarding_stats_store, peer_store, keys_manager, logger, @@ -1480,6 +1489,152 @@ where .await; } + if !prev_htlcs.is_empty() && !next_htlcs.is_empty() { + let forwarded_at_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("current time should not be earlier than the Unix epoch") + .as_secs(); + let inbound_amount_msat = outbound_amount_forwarded_msat + .unwrap_or(0) + .saturating_add(total_fee_earned_msat.unwrap_or(0)); + + let prev_htlc_count = prev_htlcs.len() as u64; + let inbound_amount_per_htlc_msat = inbound_amount_msat / prev_htlc_count; + let inbound_amount_remainder_msat = inbound_amount_msat % prev_htlc_count; + let fee_per_htlc_msat = total_fee_earned_msat.unwrap_or(0) / prev_htlc_count; + let fee_remainder_msat = total_fee_earned_msat.unwrap_or(0) % prev_htlc_count; + let skimmed_fee_per_htlc_msat = skimmed_fee_msat.unwrap_or(0) / prev_htlc_count; + let skimmed_fee_remainder_msat = + skimmed_fee_msat.unwrap_or(0) % prev_htlc_count; + + let mut inbound_stats_by_channel = HashMap::new(); + for (idx, prev_htlc) in prev_htlcs.iter().enumerate() { + let inbound_stats = inbound_stats_by_channel + .entry(prev_htlc.channel_id) + .or_insert(ChannelForwardingStats { + channel_id: prev_htlc.channel_id, + counterparty_node_id: prev_htlc.node_id, + inbound_payments_forwarded: 1, + outbound_payments_forwarded: 0, + total_inbound_amount_msat: 0, + total_outbound_amount_msat: 0, + total_fee_earned_msat: 0, + total_skimmed_fee_msat: 0, + onchain_claims_count: 0, + first_forwarded_at_timestamp: forwarded_at_timestamp, + last_forwarded_at_timestamp: forwarded_at_timestamp, + }); + if inbound_stats.counterparty_node_id.is_none() { + inbound_stats.counterparty_node_id = prev_htlc.node_id; + } + inbound_stats.total_inbound_amount_msat += inbound_amount_per_htlc_msat + + if idx == 0 { inbound_amount_remainder_msat } else { 0 }; + inbound_stats.total_fee_earned_msat += + fee_per_htlc_msat + if idx == 0 { fee_remainder_msat } else { 0 }; + inbound_stats.total_skimmed_fee_msat += skimmed_fee_per_htlc_msat + + if idx == 0 { skimmed_fee_remainder_msat } else { 0 }; + inbound_stats.onchain_claims_count += + if claim_from_onchain_tx && idx == 0 { 1 } else { 0 }; + } + + for inbound_stats in inbound_stats_by_channel.into_values() { + self.channel_forwarding_stats_store + .insert_or_update(inbound_stats) + .await + .map_err(|e| { + log_error!( + self.logger, + "Failed to update inbound channel forwarding stats: {e}" + ); + ReplayEvent() + })?; + } + + let next_htlc_count = next_htlcs.len() as u64; + let outbound_amount_msat = outbound_amount_forwarded_msat.unwrap_or(0); + let outbound_amount_per_htlc_msat = outbound_amount_msat / next_htlc_count; + let outbound_amount_remainder_msat = outbound_amount_msat % next_htlc_count; + + let mut outbound_stats_by_channel = HashMap::new(); + for (idx, next_htlc) in next_htlcs.iter().enumerate() { + let outbound_stats = outbound_stats_by_channel + .entry(next_htlc.channel_id) + .or_insert(ChannelForwardingStats { + channel_id: next_htlc.channel_id, + counterparty_node_id: next_htlc.node_id, + inbound_payments_forwarded: 0, + outbound_payments_forwarded: 1, + total_inbound_amount_msat: 0, + total_outbound_amount_msat: 0, + total_fee_earned_msat: 0, + total_skimmed_fee_msat: 0, + onchain_claims_count: 0, + first_forwarded_at_timestamp: forwarded_at_timestamp, + last_forwarded_at_timestamp: forwarded_at_timestamp, + }); + if outbound_stats.counterparty_node_id.is_none() { + outbound_stats.counterparty_node_id = next_htlc.node_id; + } + outbound_stats.total_outbound_amount_msat += outbound_amount_per_htlc_msat + + if idx == 0 { outbound_amount_remainder_msat } else { 0 }; + } + + for outbound_stats in outbound_stats_by_channel.into_values() { + self.channel_forwarding_stats_store + .insert_or_update(outbound_stats) + .await + .map_err(|e| { + log_error!( + self.logger, + "Failed to update outbound channel forwarding stats: {e}" + ); + ReplayEvent() + })?; + } + + if matches!( + self.config.forwarded_payment_tracking_mode, + ForwardedPaymentTrackingMode::Detailed { .. } + ) { + // TODO: Once LDK exposes exact incoming/outgoing HTLC pair data for + // trampoline forwards, store detailed records for multi-HTLC forwards too. + if prev_htlcs.len() == 1 && next_htlcs.len() == 1 { + let prev_htlc = prev_htlcs.first().expect("prev_htlcs has one element"); + let next_htlc = next_htlcs.first().expect("next_htlcs has one element"); + let forwarded_payment = ForwardedPaymentDetails { + id: ForwardedPaymentId(self.keys_manager.get_secure_random_bytes()), + prev_channel_id: prev_htlc.channel_id, + next_channel_id: next_htlc.channel_id, + prev_user_channel_id: prev_htlc.user_channel_id.map(UserChannelId), + next_user_channel_id: next_htlc.user_channel_id.map(UserChannelId), + prev_node_id: prev_htlc.node_id, + next_node_id: next_htlc.node_id, + total_fee_earned_msat, + skimmed_fee_msat, + claim_from_onchain_tx, + outbound_amount_forwarded_msat, + forwarded_at_timestamp, + }; + self.forwarded_payment_store.insert(forwarded_payment).await.map_err( + |e| { + log_error!( + self.logger, + "Failed to store forwarded payment: {e}" + ); + ReplayEvent() + }, + )?; + } else { + log_debug!( + self.logger, + "Skipping detailed channel-pair forwarding stats for multi-HTLC forward with {} inbound and {} outbound HTLCs", + prev_htlcs.len(), + next_htlcs.len() + ); + } + } + } + let event = Event::PaymentForwarded { prev_htlcs: prev_htlcs.into_iter().map(HTLCLocator::from).collect(), next_htlcs: next_htlcs.into_iter().map(HTLCLocator::from).collect(), diff --git a/src/ffi/types.rs b/src/ffi/types.rs index 9bb03bb07..8854e1604 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -149,7 +149,8 @@ pub use crate::entropy::{generate_entropy_mnemonic, NodeEntropy, WordCount}; use crate::error::Error; pub use crate::liquidity::LSPS1OrderStatus; pub use crate::logger::{LogLevel, LogRecord, LogWriter}; -use crate::{hex_utils, SocketAddress, UserChannelId}; +use crate::payment::store::{ChannelPairStatsId, ForwardedPaymentId}; +use crate::{hex_utils, PageToken, SocketAddress, UserChannelId}; uniffi::custom_type!(PublicKey, String, { remote, @@ -909,6 +910,22 @@ uniffi::custom_type!(PaymentId, String, { }, }); +uniffi::custom_type!(ForwardedPaymentId, String, { + remote, + try_lift: |val| { + if let Some(bytes_vec) = hex_utils::to_vec(&val) { + let bytes_res = bytes_vec.try_into(); + if let Ok(bytes) = bytes_res { + return Ok(ForwardedPaymentId(bytes)); + } + } + Err(Error::InvalidPaymentId.into()) + }, + lower: |obj| { + hex_utils::to_string(&obj.0) + }, +}); + uniffi::custom_type!(PaymentHash, String, { remote, try_lift: |val| { @@ -974,6 +991,33 @@ uniffi::custom_type!(ChannelId, String, { } }); +uniffi::custom_type!(ChannelPairStatsId, String, { + remote, + try_lift: |val| { + if let Some(hex_vec) = hex_utils::to_vec(&val) { + if hex_vec.len() == 72 { + let mut id = [0u8; 72]; + id.copy_from_slice(&hex_vec[..]); + return Ok(ChannelPairStatsId(id)); + } + } + Err(Error::InvalidChannelId.into()) + }, + lower: |obj| { + hex_utils::to_string(&obj.0) + } +}); + +uniffi::custom_type!(PageToken, String, { + remote, + try_lift: |val| { + Ok(PageToken::new(val)) + }, + lower: |obj| { + obj.to_string() + } +}); + uniffi::custom_type!(UserChannelId, String, { remote, try_lift: |val| { diff --git a/src/io/mod.rs b/src/io/mod.rs index a01aa59a8..5ae2bc608 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -33,6 +33,20 @@ pub(crate) const PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; pub(crate) const PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "pending_payments"; pub(crate) const PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; +/// The forwarded payment information will be persisted under this prefix. +pub(crate) const FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "forwarded_payments"; +pub(crate) const FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + +/// The channel forwarding stats will be persisted under this prefix. +pub(crate) const CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE: &str = + "channel_forwarding_stats"; +pub(crate) const CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + +/// The channel pair forwarding stats will be persisted under this prefix. +pub(crate) const CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE: &str = + "channel_pair_forwarding_stats"; +pub(crate) const CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + /// The node metrics will be persisted under this key. pub(crate) const NODE_METRICS_PRIMARY_NAMESPACE: &str = ""; pub(crate) const NODE_METRICS_SECONDARY_NAMESPACE: &str = ""; diff --git a/src/lib.rs b/src/lib.rs index c97e16fe6..a6368e653 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -128,8 +128,8 @@ pub use builder::NodeBuilder as Builder; use chain::ChainSource; use config::{ default_user_config, may_announce_channel, AsyncPaymentsRole, ChannelConfig, Config, - LNURL_AUTH_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, - RGS_SYNC_INTERVAL, + ForwardedPaymentTrackingMode, LNURL_AUTH_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL, + PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL, }; use connection::ConnectionManager; pub use error::Error as NodeError; @@ -153,9 +153,11 @@ pub use lightning::ln::channel_state::ChannelShutdownState; use lightning::ln::channelmanager::PaymentId; use lightning::ln::msgs::{BaseMessageHandler, SocketAddress}; use lightning::ln::peer_handler::CustomMessageHandler; +use lightning::ln::types::ChannelId; use lightning::routing::gossip::NodeAlias; use lightning::sign::EntropySource; use lightning::util::persist::KVStore; +pub use lightning::util::persist::PageToken; use lightning::util::wallet_utils::{Input, Wallet as LdkWallet}; use lightning_background_processor::process_events_async; pub use lightning_invoice; @@ -167,15 +169,21 @@ use lnurl_auth::LnurlAuth; use logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use payment::asynchronous::om_mailbox::OnionMessageMailbox; use payment::asynchronous::static_invoice_store::StaticInvoiceStore; +use payment::store::{ + aggregate_channel_pair_stats as aggregate_channel_pair_stats_impl, + aggregate_expired_forwarded_payments, +}; use payment::{ - Bolt11Payment, Bolt12Payment, OnchainPayment, PaymentDetails, SpontaneousPayment, - UnifiedPayment, + Bolt11Payment, Bolt12Payment, ChannelForwardingStats, ChannelPairForwardingStats, + ForwardedPaymentDetails, ForwardedPaymentId, OnchainPayment, PaymentDetails, + SpontaneousPayment, UnifiedPayment, }; use peer_store::{PeerInfo, PeerStore}; use runtime::Runtime; pub use tokio; use types::{ - Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph, + Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelForwardingStatsStore, + ChannelManager, ChannelPairForwardingStatsStore, DynStore, ForwardedPaymentStore, Graph, HRNResolver, KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, Wallet, }; @@ -244,6 +252,9 @@ pub struct Node { scorer: Arc>, peer_store: Arc>>, payment_store: Arc, + forwarded_payment_store: Arc, + channel_forwarding_stats_store: Arc, + channel_pair_forwarding_stats_store: Arc, lnurl_auth: Arc, is_running: Arc>, node_metrics: Arc, @@ -254,6 +265,36 @@ pub struct Node { _leak_checker: LeakChecker, } +/// A page of forwarded payments returned from a paginated listing. +#[derive(Clone, Debug, PartialEq, Eq)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Record))] +pub struct ForwardedPaymentDetailsPage { + /// Forwarded payments in this page. + pub payments: Vec, + /// Token to pass to the next call to continue listing, if another page exists. + pub next_page_token: Option, +} + +/// A page of channel forwarding statistics returned from a paginated listing. +#[derive(Clone, Debug, PartialEq, Eq)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Record))] +pub struct ChannelForwardingStatsPage { + /// Channel forwarding statistics in this page. + pub stats: Vec, + /// Token to pass to the next call to continue listing, if another page exists. + pub next_page_token: Option, +} + +/// A page of channel-pair forwarding statistics returned from a paginated listing. +#[derive(Clone, Debug, PartialEq, Eq)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Record))] +pub struct ChannelPairForwardingStatsPage { + /// Channel-pair forwarding statistics in this page. + pub stats: Vec, + /// Token to pass to the next call to continue listing, if another page exists. + pub next_page_token: Option, +} + impl Node { /// Starts the necessary background tasks, such as handling events coming from user input, /// LDK/BDK, and the peer-to-peer network. @@ -289,6 +330,38 @@ impl Node { let chain_source = Arc::clone(&self.chain_source); self.runtime.block_on(async move { chain_source.update_fee_rate_estimates().await })?; + if let ForwardedPaymentTrackingMode::Detailed { retention_minutes } = + self.config.forwarded_payment_tracking_mode + { + if retention_minutes > 0 { + let forwarded_payment_store = Arc::clone(&self.forwarded_payment_store); + let channel_pair_stats_store = + Arc::clone(&self.channel_pair_forwarding_stats_store); + let logger = Arc::clone(&self.logger); + self.runtime.block_on(async move { + match aggregate_expired_forwarded_payments( + &forwarded_payment_store, + &channel_pair_stats_store, + retention_minutes, + &logger, + ) + .await + { + Ok((pair_count, payment_count)) if pair_count > 0 => { + log_info!( + logger, + "Aggregated {payment_count} forwarded payments into {pair_count} channel pair buckets" + ); + }, + Err(e) => { + log_error!(logger, "Startup forwarded payment aggregation failed: {e}") + }, + _ => {}, + } + }); + } + } + // Spawn background task continuously syncing onchain, lightning, and fee rate cache. let stop_sync_receiver = self.stop_sender.subscribe(); let chain_source = Arc::clone(&self.chain_source); @@ -581,6 +654,50 @@ impl Node { chain_source.continuously_process_broadcast_queue(stop_tx_bcast).await }); + if let ForwardedPaymentTrackingMode::Detailed { retention_minutes } = + self.config.forwarded_payment_tracking_mode + { + if retention_minutes > 0 { + let stop_aggregation = self.stop_sender.subscribe(); + let forwarded_payment_store = Arc::clone(&self.forwarded_payment_store); + let channel_pair_stats_store = + Arc::clone(&self.channel_pair_forwarding_stats_store); + let logger = Arc::clone(&self.logger); + self.runtime.spawn_cancellable_background_task(async move { + let mut interval = + tokio::time::interval(Duration::from_secs(retention_minutes * 60)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mut stop_aggregation = stop_aggregation; + loop { + tokio::select! { + _ = stop_aggregation.changed() => break, + _ = interval.tick() => { + match aggregate_expired_forwarded_payments( + &forwarded_payment_store, + &channel_pair_stats_store, + retention_minutes, + &logger, + ) + .await + { + Ok((pair_count, payment_count)) if pair_count > 0 => { + log_debug!( + logger, + "Aggregated {} forwarded payments into {} channel pair buckets", + payment_count, + pair_count + ); + }, + Err(e) => log_error!(logger, "Periodic forwarded payment aggregation failed: {}", e), + _ => {}, + } + } + } + } + }); + } + } + let bump_tx_event_handler = Arc::new(BumpTransactionEventHandler::new( Arc::clone(&self.tx_broadcaster), Arc::new(LdkWallet::new(Arc::clone(&self.wallet), Arc::clone(&self.logger))), @@ -605,6 +722,8 @@ impl Node { Arc::clone(&self.network_graph), Arc::clone(&self.liquidity_source), Arc::clone(&self.payment_store), + Arc::clone(&self.forwarded_payment_store), + Arc::clone(&self.channel_forwarding_stats_store), Arc::clone(&self.peer_store), Arc::clone(&self.keys_manager), static_invoice_store, @@ -2135,6 +2254,114 @@ impl Node { self.payment_store.list_filter(|_| true) } + /// Retrieve the details of a specific forwarded payment with the given id. + pub fn forwarded_payment( + &self, forwarded_payment_id: &ForwardedPaymentId, + ) -> Option { + self.forwarded_payment_store.get(forwarded_payment_id) + } + + /// Retrieves all forwarded payments that match the given predicate. + pub fn list_forwarded_payments_with_filter bool>( + &self, f: F, + ) -> Vec { + self.forwarded_payment_store.list_filter(f) + } + + /// Retrieves a page of forwarded payments from the underlying paginated store. + pub fn list_forwarded_payments( + &self, page_token: Option, + ) -> Result { + let (payments, next_page_token) = + self.runtime.block_on(self.forwarded_payment_store.list_page(page_token))?; + Ok(ForwardedPaymentDetailsPage { payments, next_page_token }) + } + + /// Returns the configured forwarded payment tracking mode. + pub fn forwarded_payment_tracking_mode(&self) -> ForwardedPaymentTrackingMode { + self.config.forwarded_payment_tracking_mode + } + + /// Retrieve the forwarding statistics for a specific channel. + pub fn channel_forwarding_stats( + &self, channel_id: &ChannelId, + ) -> Option { + self.channel_forwarding_stats_store.get(channel_id) + } + + /// Retrieves a page of channel forwarding statistics from the underlying paginated store. + pub fn list_channel_forwarding_stats( + &self, page_token: Option, + ) -> Result { + let (stats, next_page_token) = + self.runtime.block_on(self.channel_forwarding_stats_store.list_page(page_token))?; + Ok(ChannelForwardingStatsPage { stats, next_page_token }) + } + + /// Retrieves all channel forwarding statistics that match the given predicate. + pub fn list_channel_forwarding_stats_with_filter bool>( + &self, f: F, + ) -> Vec { + self.channel_forwarding_stats_store.list_filter(f) + } + + /// Retrieves a page of channel pair forwarding statistics from the underlying paginated store. + pub fn list_channel_pair_forwarding_stats( + &self, page_token: Option, + ) -> Result { + let (stats, next_page_token) = self + .runtime + .block_on(self.channel_pair_forwarding_stats_store.list_page(page_token))?; + Ok(ChannelPairForwardingStatsPage { stats, next_page_token }) + } + + /// Retrieves all channel pair forwarding statistics that match the given predicate. + pub fn list_channel_pair_forwarding_stats_with_filter< + F: FnMut(&&ChannelPairForwardingStats) -> bool, + >( + &self, f: F, + ) -> Vec { + self.channel_pair_forwarding_stats_store.list_filter(f) + } + + /// Retrieves channel pair forwarding statistics within a specific time range. + /// + /// The page token advances over the underlying channel-pair stats namespace. Filtering is + /// applied to the returned page. + pub fn list_channel_pair_forwarding_stats_in_range( + &self, start_timestamp: u64, end_timestamp: u64, page_token: Option, + ) -> Result { + let (stats, next_page_token) = self + .runtime + .block_on(self.channel_pair_forwarding_stats_store.list_page(page_token))?; + let stats = stats + .into_iter() + .filter(|stats| { + stats.bucket_start_timestamp >= start_timestamp + && stats.bucket_start_timestamp < end_timestamp + }) + .collect(); + Ok(ChannelPairForwardingStatsPage { stats, next_page_token }) + } + + /// Retrieves all forwarding statistics buckets for a specific channel pair. + /// + /// The page token advances over the underlying channel-pair stats namespace. Filtering is + /// applied to the returned page. + pub fn list_channel_pair_forwarding_stats_for_pair( + &self, prev_channel_id: ChannelId, next_channel_id: ChannelId, + page_token: Option, + ) -> Result { + let (mut stats, next_page_token) = self + .runtime + .block_on(self.channel_pair_forwarding_stats_store.list_page(page_token))?; + stats.retain(|stats| { + stats.prev_channel_id == prev_channel_id && stats.next_channel_id == next_channel_id + }); + stats.sort_by_key(|stats| stats.bucket_start_timestamp); + Ok(ChannelPairForwardingStatsPage { stats, next_page_token }) + } + /// Retrieves a list of known peers. pub fn list_peers(&self) -> Vec { let mut peers = Vec::new(); @@ -2413,6 +2640,13 @@ pub(crate) fn new_channel_anchor_reserve_sats( }) } +/// Aggregates multiple channel pair statistics buckets into cumulative totals. +pub fn aggregate_channel_pair_stats( + buckets: &[ChannelPairForwardingStats], +) -> ChannelPairForwardingStats { + aggregate_channel_pair_stats_impl(buckets) +} + #[cfg(test)] mod tests { use lightning::util::ser::{Readable, Writeable}; diff --git a/src/payment/mod.rs b/src/payment/mod.rs index 2d3acf90e..e6860ad0f 100644 --- a/src/payment/mod.rs +++ b/src/payment/mod.rs @@ -24,7 +24,8 @@ pub(crate) use pending_payment_store::FundingTxCandidate; pub(crate) use pending_payment_store::PendingPaymentDetails; pub use spontaneous::SpontaneousPayment; pub use store::{ - Channel, ConfirmationStatus, LSPS2Parameters, PaymentDetails, PaymentDirection, PaymentKind, - PaymentStatus, TransactionType, + Channel, ChannelForwardingStats, ChannelPairForwardingStats, ConfirmationStatus, + ForwardedPaymentDetails, ForwardedPaymentId, LSPS2Parameters, PaymentDetails, PaymentDirection, + PaymentKind, PaymentStatus, TransactionType, }; pub use unified::{UnifiedPayment, UnifiedPaymentResult}; diff --git a/src/payment/store.rs b/src/payment/store.rs index 160890895..bde5b219c 100644 --- a/src/payment/store.rs +++ b/src/payment/store.rs @@ -5,6 +5,9 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. +use std::collections::HashMap; +use std::fmt::{Debug, Display}; +use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use bitcoin::secp256k1::PublicKey; @@ -14,6 +17,7 @@ use lightning::ln::channelmanager::PaymentId; use lightning::ln::msgs::DecodeError; use lightning::ln::types::ChannelId; use lightning::offers::offer::OfferId; +use lightning::util::logger::Logger as _; use lightning::util::ser::{Readable, Writeable}; use lightning::{ _init_and_read_len_prefixed_tlv_fields, impl_writeable_tlv_based, @@ -22,8 +26,11 @@ use lightning::{ use lightning_types::payment::{PaymentHash, PaymentPreimage, PaymentSecret}; use lightning_types::string::UntrustedString; -use crate::data_store::{StorableObject, StorableObjectId, StorableObjectUpdate}; +use crate::data_store::{DataStore, StorableObject, StorableObjectId, StorableObjectUpdate}; use crate::hex_utils; +use crate::logger::{log_debug, log_error, Logger}; +use crate::Error; +use crate::UserChannelId; /// Represents a payment. #[derive(Clone, Debug, PartialEq, Eq)] @@ -985,3 +992,618 @@ mod tests { assert_eq!(decoded, PaymentKind::read(&mut &*reencoded).unwrap()); } } + +/// A unique identifier for a forwarded payment. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct ForwardedPaymentId(pub [u8; 32]); + +impl StorableObjectId for ForwardedPaymentId { + fn encode_to_hex_str(&self) -> String { + hex_utils::to_string(&self.0) + } +} + +impl Writeable for ForwardedPaymentId { + fn write( + &self, writer: &mut W, + ) -> Result<(), lightning::io::Error> { + self.0.write(writer) + } +} + +impl Readable for ForwardedPaymentId { + fn read(reader: &mut R) -> Result { + Ok(Self(Readable::read(reader)?)) + } +} + +/// Details of a payment that has been forwarded through this node. +/// +/// Detailed channel-pair records are only stored for forwards with exactly one incoming HTLC and +/// one outgoing HTLC. Multi-HTLC forwards are omitted from detailed channel-pair tracking until LDK +/// exposes enough data to identify the exact incoming/outgoing HTLC pairs. +/// +/// TODO: Store multi-HTLC channel-pair records once LDK exposes exact pair data for trampoline +/// forwards. +#[derive(Clone, Debug, PartialEq, Eq)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Record))] +pub struct ForwardedPaymentDetails { + /// A randomly generated identifier for this forwarded payment. + pub id: ForwardedPaymentId, + /// The incoming channel id. + pub prev_channel_id: ChannelId, + /// The outgoing channel id. + pub next_channel_id: ChannelId, + /// The incoming user channel id, if available. + pub prev_user_channel_id: Option, + /// The outgoing user channel id, if available. + pub next_user_channel_id: Option, + /// The previous node id, if available. + pub prev_node_id: Option, + /// The next node id, if available. + pub next_node_id: Option, + /// The total fee earned by forwarding this payment, in millisatoshis. + pub total_fee_earned_msat: Option, + /// The skimmed fee, in millisatoshis. + pub skimmed_fee_msat: Option, + /// Whether the forwarded HTLC was claimed from an on-chain transaction. + pub claim_from_onchain_tx: bool, + /// The outbound amount forwarded, in millisatoshis. + pub outbound_amount_forwarded_msat: Option, + /// The timestamp when this payment was forwarded. + pub forwarded_at_timestamp: u64, +} + +impl_writeable_tlv_based!(ForwardedPaymentDetails, { + (0, id, required), + (2, prev_channel_id, required), + (4, next_channel_id, required), + (6, prev_user_channel_id, option), + (8, next_user_channel_id, option), + (10, prev_node_id, option), + (12, next_node_id, option), + (14, total_fee_earned_msat, option), + (16, skimmed_fee_msat, option), + (18, claim_from_onchain_tx, required), + (20, outbound_amount_forwarded_msat, option), + (22, forwarded_at_timestamp, required), +}); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub(crate) struct ForwardedPaymentDetailsUpdate { + id: ForwardedPaymentId, +} + +impl StorableObjectUpdate for ForwardedPaymentDetailsUpdate { + fn id(&self) -> ForwardedPaymentId { + self.id + } +} + +impl StorableObject for ForwardedPaymentDetails { + type Id = ForwardedPaymentId; + type Update = ForwardedPaymentDetailsUpdate; + + fn id(&self) -> Self::Id { + self.id + } + + fn update(&mut self, _update: Self::Update) -> bool { + false + } + + fn to_update(&self) -> Self::Update { + ForwardedPaymentDetailsUpdate { id: self.id } + } +} + +/// Aggregate statistics for forwarded payments through a single channel. +#[derive(Clone, Debug, PartialEq, Eq)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Record))] +pub struct ChannelForwardingStats { + /// The channel id these stats apply to. + pub channel_id: ChannelId, + /// The channel counterparty node id, if known. + pub counterparty_node_id: Option, + /// Number of forwarded payments where this was the incoming channel. + pub inbound_payments_forwarded: u64, + /// Number of forwarded payments where this was the outgoing channel. + pub outbound_payments_forwarded: u64, + /// Total inbound amount forwarded through this channel, in millisatoshis. + pub total_inbound_amount_msat: u64, + /// Total outbound amount forwarded through this channel, in millisatoshis. + pub total_outbound_amount_msat: u64, + /// Total forwarding fees earned through this channel, in millisatoshis. + pub total_fee_earned_msat: u64, + /// Total skimmed fees for this channel, in millisatoshis. + pub total_skimmed_fee_msat: u64, + /// Number of forwarded HTLCs claimed from on-chain transactions. + pub onchain_claims_count: u64, + /// Timestamp of the first forward recorded for this channel. + pub first_forwarded_at_timestamp: u64, + /// Timestamp of the latest forward recorded for this channel. + pub last_forwarded_at_timestamp: u64, +} + +impl_writeable_tlv_based!(ChannelForwardingStats, { + (0, channel_id, required), + (2, counterparty_node_id, option), + (4, inbound_payments_forwarded, required), + (6, outbound_payments_forwarded, required), + (8, total_inbound_amount_msat, required), + (10, total_outbound_amount_msat, required), + (12, total_fee_earned_msat, required), + (14, total_skimmed_fee_msat, required), + (16, onchain_claims_count, required), + (18, first_forwarded_at_timestamp, required), + (20, last_forwarded_at_timestamp, required), +}); + +/// Channel pair identifier, formed from previous channel, next channel, and time bucket. +#[derive(Clone, Copy, PartialEq, Eq, Hash)] +pub struct ChannelPairStatsId(pub [u8; 72]); + +impl ChannelPairStatsId { + pub fn from_channel_pair_and_bucket( + prev: &ChannelId, next: &ChannelId, bucket_start_timestamp: u64, + ) -> Self { + let mut result = [0u8; 72]; + result[0..32].copy_from_slice(&prev.0); + result[32..64].copy_from_slice(&next.0); + result[64..72].copy_from_slice(&bucket_start_timestamp.to_be_bytes()); + Self(result) + } +} + +impl Writeable for ChannelPairStatsId { + fn write( + &self, writer: &mut W, + ) -> Result<(), lightning::io::Error> { + writer.write_all(&self.0) + } +} + +impl Readable for ChannelPairStatsId { + fn read(reader: &mut R) -> Result { + let mut bytes = [0u8; 72]; + reader.read_exact(&mut bytes)?; + Ok(Self(bytes)) + } +} + +impl Display for ChannelPairStatsId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", hex_utils::to_string(&self.0)) + } +} + +impl Debug for ChannelPairStatsId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "ChannelPairStatsId({})", hex_utils::to_string(&self.0)) + } +} + +impl StorableObjectId for ChannelPairStatsId { + fn encode_to_hex_str(&self) -> String { + hex_utils::to_string(&self.0) + } +} + +/// Aggregated statistics for a specific channel pair. +#[derive(Clone, Debug, PartialEq, Eq)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Record))] +pub struct ChannelPairForwardingStats { + /// The unique channel-pair bucket id. + pub id: ChannelPairStatsId, + /// The incoming channel id. + pub prev_channel_id: ChannelId, + /// The outgoing channel id. + pub next_channel_id: ChannelId, + /// Start timestamp of this aggregation bucket. + pub bucket_start_timestamp: u64, + /// The previous node id, if available. + pub prev_node_id: Option, + /// The next node id, if available. + pub next_node_id: Option, + /// Number of payments aggregated in this bucket. + pub payment_count: u64, + /// Total inbound amount in this bucket, in millisatoshis. + pub total_inbound_amount_msat: u64, + /// Total outbound amount in this bucket, in millisatoshis. + pub total_outbound_amount_msat: u64, + /// Total forwarding fees earned in this bucket, in millisatoshis. + pub total_fee_earned_msat: u64, + /// Total skimmed fees in this bucket, in millisatoshis. + pub total_skimmed_fee_msat: u64, + /// Number of forwarded HTLCs claimed from on-chain transactions. + pub onchain_claims_count: u64, + /// Average forwarding fee per payment, in millisatoshis. + pub avg_fee_msat: u64, + /// Average inbound amount per payment, in millisatoshis. + pub avg_inbound_amount_msat: u64, + /// Timestamp of the first forward in this bucket. + pub first_forwarded_at_timestamp: u64, + /// Timestamp of the latest forward in this bucket. + pub last_forwarded_at_timestamp: u64, + /// Timestamp when this bucket was aggregated. + pub aggregated_at_timestamp: u64, +} + +impl_writeable_tlv_based!(ChannelPairForwardingStats, { + (0, id, required), + (2, prev_channel_id, required), + (4, next_channel_id, required), + (6, prev_node_id, option), + (8, next_node_id, option), + (10, payment_count, required), + (12, total_inbound_amount_msat, required), + (14, total_outbound_amount_msat, required), + (16, total_fee_earned_msat, required), + (18, total_skimmed_fee_msat, required), + (20, onchain_claims_count, required), + (22, avg_fee_msat, required), + (24, avg_inbound_amount_msat, required), + (26, first_forwarded_at_timestamp, required), + (28, last_forwarded_at_timestamp, required), + (30, aggregated_at_timestamp, required), + (32, bucket_start_timestamp, required), +}); + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct ChannelForwardingStatsUpdate { + pub channel_id: ChannelId, + pub counterparty_node_id: Option, + pub inbound_payments_increment: u64, + pub outbound_payments_increment: u64, + pub inbound_amount_increment_msat: u64, + pub outbound_amount_increment_msat: u64, + pub fee_earned_increment_msat: u64, + pub skimmed_fee_increment_msat: u64, + pub onchain_claims_increment: u64, + pub timestamp: u64, +} + +impl StorableObjectUpdate for ChannelForwardingStatsUpdate { + fn id(&self) -> ChannelId { + self.channel_id + } +} + +impl StorableObjectId for ChannelId { + fn encode_to_hex_str(&self) -> String { + hex_utils::to_string(&self.0) + } +} + +impl StorableObject for ChannelForwardingStats { + type Id = ChannelId; + type Update = ChannelForwardingStatsUpdate; + + fn id(&self) -> Self::Id { + self.channel_id + } + + fn update(&mut self, update: Self::Update) -> bool { + debug_assert_eq!(self.channel_id, update.channel_id); + let mut updated = false; + if self.counterparty_node_id.is_none() && update.counterparty_node_id.is_some() { + self.counterparty_node_id = update.counterparty_node_id; + updated = true; + } + if update.inbound_payments_increment > 0 { + self.inbound_payments_forwarded += update.inbound_payments_increment; + updated = true; + } + if update.outbound_payments_increment > 0 { + self.outbound_payments_forwarded += update.outbound_payments_increment; + updated = true; + } + if update.inbound_amount_increment_msat > 0 { + self.total_inbound_amount_msat += update.inbound_amount_increment_msat; + updated = true; + } + if update.outbound_amount_increment_msat > 0 { + self.total_outbound_amount_msat += update.outbound_amount_increment_msat; + updated = true; + } + if update.fee_earned_increment_msat > 0 { + self.total_fee_earned_msat += update.fee_earned_increment_msat; + updated = true; + } + if update.skimmed_fee_increment_msat > 0 { + self.total_skimmed_fee_msat += update.skimmed_fee_increment_msat; + updated = true; + } + if update.onchain_claims_increment > 0 { + self.onchain_claims_count += update.onchain_claims_increment; + updated = true; + } + if updated { + self.first_forwarded_at_timestamp = + self.first_forwarded_at_timestamp.min(update.timestamp); + self.last_forwarded_at_timestamp = + self.last_forwarded_at_timestamp.max(update.timestamp); + } + updated + } + + fn to_update(&self) -> Self::Update { + ChannelForwardingStatsUpdate { + channel_id: self.channel_id, + counterparty_node_id: self.counterparty_node_id, + inbound_payments_increment: self.inbound_payments_forwarded, + outbound_payments_increment: self.outbound_payments_forwarded, + inbound_amount_increment_msat: self.total_inbound_amount_msat, + outbound_amount_increment_msat: self.total_outbound_amount_msat, + fee_earned_increment_msat: self.total_fee_earned_msat, + skimmed_fee_increment_msat: self.total_skimmed_fee_msat, + onchain_claims_increment: self.onchain_claims_count, + timestamp: self.last_forwarded_at_timestamp, + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct ChannelPairForwardingStatsUpdate { + pub id: ChannelPairStatsId, + pub prev_node_id: Option, + pub next_node_id: Option, + pub payment_count_increment: u64, + pub inbound_amount_increment_msat: u64, + pub outbound_amount_increment_msat: u64, + pub fee_earned_increment_msat: u64, + pub skimmed_fee_increment_msat: u64, + pub onchain_claims_increment: u64, + pub first_timestamp: u64, + pub last_timestamp: u64, +} + +impl StorableObjectUpdate for ChannelPairForwardingStatsUpdate { + fn id(&self) -> ChannelPairStatsId { + self.id + } +} + +impl StorableObject for ChannelPairForwardingStats { + type Id = ChannelPairStatsId; + type Update = ChannelPairForwardingStatsUpdate; + + fn id(&self) -> Self::Id { + self.id + } + + fn update(&mut self, update: Self::Update) -> bool { + debug_assert_eq!(self.id, update.id); + let mut updated = false; + if self.prev_node_id.is_none() && update.prev_node_id.is_some() { + self.prev_node_id = update.prev_node_id; + updated = true; + } + if self.next_node_id.is_none() && update.next_node_id.is_some() { + self.next_node_id = update.next_node_id; + updated = true; + } + if update.payment_count_increment > 0 { + self.payment_count += update.payment_count_increment; + updated = true; + } + if update.inbound_amount_increment_msat > 0 { + self.total_inbound_amount_msat += update.inbound_amount_increment_msat; + updated = true; + } + if update.outbound_amount_increment_msat > 0 { + self.total_outbound_amount_msat += update.outbound_amount_increment_msat; + updated = true; + } + if update.fee_earned_increment_msat > 0 { + self.total_fee_earned_msat += update.fee_earned_increment_msat; + updated = true; + } + if update.skimmed_fee_increment_msat > 0 { + self.total_skimmed_fee_msat += update.skimmed_fee_increment_msat; + updated = true; + } + if update.onchain_claims_increment > 0 { + self.onchain_claims_count += update.onchain_claims_increment; + updated = true; + } + if updated { + if self.first_forwarded_at_timestamp == 0 { + self.first_forwarded_at_timestamp = update.first_timestamp; + } else { + self.first_forwarded_at_timestamp = + self.first_forwarded_at_timestamp.min(update.first_timestamp); + } + self.last_forwarded_at_timestamp = + self.last_forwarded_at_timestamp.max(update.last_timestamp); + if self.payment_count > 0 { + self.avg_fee_msat = self.total_fee_earned_msat / self.payment_count; + self.avg_inbound_amount_msat = self.total_inbound_amount_msat / self.payment_count; + } + } + updated + } + + fn to_update(&self) -> Self::Update { + ChannelPairForwardingStatsUpdate { + id: self.id, + prev_node_id: self.prev_node_id, + next_node_id: self.next_node_id, + payment_count_increment: self.payment_count, + inbound_amount_increment_msat: self.total_inbound_amount_msat, + outbound_amount_increment_msat: self.total_outbound_amount_msat, + fee_earned_increment_msat: self.total_fee_earned_msat, + skimmed_fee_increment_msat: self.total_skimmed_fee_msat, + onchain_claims_increment: self.onchain_claims_count, + first_timestamp: self.first_forwarded_at_timestamp, + last_timestamp: self.last_forwarded_at_timestamp, + } + } +} + +/// Aggregate expired forwarded payments into time-bucketed channel pair statistics. +pub(crate) async fn aggregate_expired_forwarded_payments( + forwarded_payment_store: &DataStore>, + channel_pair_stats_store: &DataStore>, + retention_minutes: u64, logger: &Arc, +) -> Result<(u64, u64), Error> { + let now = + SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or(Duration::from_secs(0)).as_secs(); + let cutoff = now.saturating_sub(retention_minutes.saturating_mul(60)); + let bucket_size_secs = retention_minutes.saturating_mul(60); + if bucket_size_secs == 0 { + return Ok((0, 0)); + } + + let expired = forwarded_payment_store.list_filter(|p| p.forwarded_at_timestamp < cutoff); + if expired.is_empty() { + log_debug!(logger, "No expired forwarded payments found"); + return Ok((0, 0)); + } + + let mut bucket_groups: HashMap<(ChannelId, ChannelId, u64), Vec<&ForwardedPaymentDetails>> = + HashMap::new(); + for payment in &expired { + let bucket_start = (payment.forwarded_at_timestamp / bucket_size_secs) * bucket_size_secs; + bucket_groups + .entry((payment.prev_channel_id, payment.next_channel_id, bucket_start)) + .or_default() + .push(payment); + } + + let mut aggregated_bucket_count = 0u64; + let mut removed_payment_count = 0u64; + for ((prev_channel_id, next_channel_id, bucket_start), payments) in bucket_groups { + let pair_id = ChannelPairStatsId::from_channel_pair_and_bucket( + &prev_channel_id, + &next_channel_id, + bucket_start, + ); + + let first_payment = payments[0]; + let mut total_inbound_amount_msat = 0u64; + let mut total_outbound_amount_msat = 0u64; + let mut total_fee_earned_msat = 0u64; + let mut total_skimmed_fee_msat = 0u64; + let mut onchain_claims_count = 0u64; + let mut first_timestamp = u64::MAX; + let mut last_timestamp = 0u64; + + for payment in &payments { + let outbound = payment.outbound_amount_forwarded_msat.unwrap_or(0); + let fee = payment.total_fee_earned_msat.unwrap_or(0); + let skimmed = payment.skimmed_fee_msat.unwrap_or(0); + total_inbound_amount_msat = + total_inbound_amount_msat.saturating_add(outbound.saturating_add(fee)); + total_outbound_amount_msat = total_outbound_amount_msat.saturating_add(outbound); + total_fee_earned_msat = total_fee_earned_msat.saturating_add(fee); + total_skimmed_fee_msat = total_skimmed_fee_msat.saturating_add(skimmed); + if payment.claim_from_onchain_tx { + onchain_claims_count += 1; + } + first_timestamp = first_timestamp.min(payment.forwarded_at_timestamp); + last_timestamp = last_timestamp.max(payment.forwarded_at_timestamp); + } + + let payment_count = payments.len() as u64; + let stats = ChannelPairForwardingStats { + id: pair_id, + prev_channel_id, + next_channel_id, + bucket_start_timestamp: bucket_start, + prev_node_id: first_payment.prev_node_id, + next_node_id: first_payment.next_node_id, + payment_count, + total_inbound_amount_msat, + total_outbound_amount_msat, + total_fee_earned_msat, + total_skimmed_fee_msat, + onchain_claims_count, + avg_fee_msat: total_fee_earned_msat / payment_count, + avg_inbound_amount_msat: total_inbound_amount_msat / payment_count, + first_forwarded_at_timestamp: first_timestamp, + last_forwarded_at_timestamp: last_timestamp, + aggregated_at_timestamp: now, + }; + + channel_pair_stats_store.insert_or_update(stats).await.map_err(|e| { + log_error!(logger, "Failed to upsert channel pair stats bucket for {pair_id:?}: {e}"); + e + })?; + aggregated_bucket_count += 1; + + for payment in payments { + forwarded_payment_store.remove(&payment.id).await.map_err(|e| { + log_error!(logger, "Failed to remove forwarded payment {:?}: {}", payment.id, e); + e + })?; + removed_payment_count += 1; + } + } + + Ok((aggregated_bucket_count, removed_payment_count)) +} + +/// Aggregates multiple channel pair statistics buckets into cumulative totals. +pub fn aggregate_channel_pair_stats( + buckets: &[ChannelPairForwardingStats], +) -> ChannelPairForwardingStats { + assert!(!buckets.is_empty(), "Cannot aggregate empty bucket list"); + let first = &buckets[0]; + for bucket in &buckets[1..] { + assert_eq!(bucket.prev_channel_id, first.prev_channel_id); + assert_eq!(bucket.next_channel_id, first.next_channel_id); + } + + let mut payment_count = 0u64; + let mut total_inbound_amount_msat = 0u64; + let mut total_outbound_amount_msat = 0u64; + let mut total_fee_earned_msat = 0u64; + let mut total_skimmed_fee_msat = 0u64; + let mut onchain_claims_count = 0u64; + let mut first_forwarded_at_timestamp = u64::MAX; + let mut last_forwarded_at_timestamp = 0u64; + let mut earliest_bucket_start = u64::MAX; + for bucket in buckets { + payment_count += bucket.payment_count; + total_inbound_amount_msat += bucket.total_inbound_amount_msat; + total_outbound_amount_msat += bucket.total_outbound_amount_msat; + total_fee_earned_msat += bucket.total_fee_earned_msat; + total_skimmed_fee_msat += bucket.total_skimmed_fee_msat; + onchain_claims_count += bucket.onchain_claims_count; + first_forwarded_at_timestamp = + first_forwarded_at_timestamp.min(bucket.first_forwarded_at_timestamp); + last_forwarded_at_timestamp = + last_forwarded_at_timestamp.max(bucket.last_forwarded_at_timestamp); + earliest_bucket_start = earliest_bucket_start.min(bucket.bucket_start_timestamp); + } + let now = + SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or(Duration::from_secs(0)).as_secs(); + ChannelPairForwardingStats { + id: ChannelPairStatsId::from_channel_pair_and_bucket( + &first.prev_channel_id, + &first.next_channel_id, + earliest_bucket_start, + ), + prev_channel_id: first.prev_channel_id, + next_channel_id: first.next_channel_id, + bucket_start_timestamp: earliest_bucket_start, + prev_node_id: first.prev_node_id, + next_node_id: first.next_node_id, + payment_count, + total_inbound_amount_msat, + total_outbound_amount_msat, + total_fee_earned_msat, + total_skimmed_fee_msat, + onchain_claims_count, + avg_fee_msat: if payment_count > 0 { total_fee_earned_msat / payment_count } else { 0 }, + avg_inbound_amount_msat: if payment_count > 0 { + total_inbound_amount_msat / payment_count + } else { + 0 + }, + first_forwarded_at_timestamp, + last_forwarded_at_timestamp, + aggregated_at_timestamp: now, + } +} diff --git a/src/types.rs b/src/types.rs index e24db4d25..347392fcf 100644 --- a/src/types.rs +++ b/src/types.rs @@ -48,6 +48,9 @@ use crate::fee_estimator::OnchainFeeEstimator; use crate::ffi::maybe_wrap; use crate::logger::Logger; use crate::message_handler::NodeCustomMessageHandler; +use crate::payment::store::{ + ChannelForwardingStats, ChannelPairForwardingStats, ForwardedPaymentDetails, +}; use crate::payment::{PaymentDetails, PendingPaymentDetails}; use crate::runtime::RuntimeSpawner; @@ -372,6 +375,10 @@ pub(crate) type BumpTransactionEventHandler = >; pub(crate) type PaymentStore = DataStore>; +pub(crate) type ForwardedPaymentStore = DataStore>; +pub(crate) type ChannelForwardingStatsStore = DataStore>; +pub(crate) type ChannelPairForwardingStatsStore = + DataStore>; /// A local, potentially user-provided, identifier of a channel. ///