diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 7c0edc535..e79912e06 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -11,6 +11,8 @@ typedef dictionary ElectrumSyncConfig; typedef dictionary TorConfig; +typedef enum ForwardedPaymentTrackingMode; + typedef interface NodeEntropy; typedef enum WordCount; @@ -137,6 +139,19 @@ interface Node { void remove_payment([ByRef]PaymentId payment_id); BalanceDetails list_balances(); sequence list_payments(); + ForwardedPaymentDetails? forwarded_payment([ByRef]ForwardedPaymentId forwarded_payment_id); + [Throws=NodeError] + ForwardedPaymentDetailsPage list_forwarded_payments(PageToken? page_token); + ForwardedPaymentTrackingMode forwarded_payment_tracking_mode(); + ChannelForwardingStats? channel_forwarding_stats([ByRef]ChannelId channel_id); + [Throws=NodeError] + ChannelForwardingStatsPage list_channel_forwarding_stats(PageToken? page_token); + [Throws=NodeError] + ChannelPairForwardingStatsPage list_channel_pair_forwarding_stats(PageToken? page_token); + [Throws=NodeError] + ChannelPairForwardingStatsPage list_channel_pair_forwarding_stats_in_range(u64 start_timestamp, u64 end_timestamp, PageToken? page_token); + [Throws=NodeError] + ChannelPairForwardingStatsPage list_channel_pair_forwarding_stats_for_pair(ChannelId prev_channel_id, ChannelId next_channel_id, PageToken? page_token); sequence list_peers(); sequence list_channels(); NetworkGraph network_graph(); @@ -379,6 +394,9 @@ typedef string OfferId; [Custom] typedef string PaymentId; +[Custom] +typedef string ForwardedPaymentId; + [Custom] typedef string PaymentHash; @@ -391,6 +409,12 @@ typedef string PaymentSecret; [Custom] typedef string ChannelId; +[Custom] +typedef string ChannelPairStatsId; + +[Custom] +typedef string PageToken; + [Custom] typedef string UserChannelId; @@ -417,3 +441,15 @@ typedef enum Event; typedef interface HRNResolverConfig; typedef dictionary HumanReadableNamesConfig; + +typedef dictionary ForwardedPaymentDetails; + +typedef dictionary ChannelForwardingStats; + +typedef dictionary ChannelPairForwardingStats; + +typedef dictionary ForwardedPaymentDetailsPage; + +typedef dictionary ChannelForwardingStatsPage; + +typedef dictionary ChannelPairForwardingStatsPage; diff --git a/src/builder.rs b/src/builder.rs index 8b575cc3f..27bc7b35c 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -64,7 +64,13 @@ use crate::io::utils::{ }; use crate::io::vss_store::VssStoreBuilder; use crate::io::{ - self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + self, CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE, + FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, }; @@ -77,7 +83,8 @@ use crate::peer_store::PeerStore; use crate::runtime::{Runtime, RuntimeSpawner}; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ - AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreRef, DynStoreWrapper, + AsyncPersister, ChainMonitor, ChannelForwardingStatsStore, ChannelManager, + ChannelPairForwardingStatsStore, DynStore, DynStoreRef, DynStoreWrapper, ForwardedPaymentStore, GossipSync, Graph, HRNResolver, KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, PendingPaymentStore, }; @@ -1397,24 +1404,48 @@ fn build_with_store_internal( let kv_store_ref = Arc::clone(&kv_store); let logger_ref = Arc::clone(&logger); - let (payment_store_res, node_metris_res, pending_payment_store_res) = - runtime.block_on(async move { - tokio::join!( - read_all_objects( - &*kv_store_ref, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - Arc::clone(&logger_ref), - ), - read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)), - read_all_objects( - &*kv_store_ref, - PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - Arc::clone(&logger_ref), - ) + let ( + payment_store_res, + forwarded_payment_store_res, + channel_forwarding_stats_res, + channel_pair_forwarding_stats_res, + node_metris_res, + pending_payment_store_res, + ) = runtime.block_on(async move { + tokio::join!( + read_all_objects( + &*kv_store_ref, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + Arc::clone(&logger_ref), + ), + read_all_objects( + &*kv_store_ref, + FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + Arc::clone(&logger_ref), + ), + read_all_objects( + &*kv_store_ref, + CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE, + Arc::clone(&logger_ref), + ), + read_all_objects( + &*kv_store_ref, + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE, + Arc::clone(&logger_ref), + ), + read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)), + read_all_objects( + &*kv_store_ref, + PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + Arc::clone(&logger_ref), ) - }); + ) + }); // Initialize the status fields. let node_metrics = match node_metris_res { @@ -1443,6 +1474,48 @@ fn build_with_store_internal( }, }; + let forwarded_payment_store = match forwarded_payment_store_res { + Ok(forwarded_payments) => Arc::new(ForwardedPaymentStore::new( + forwarded_payments, + FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read forwarded payment data from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; + + let channel_forwarding_stats_store = match channel_forwarding_stats_res { + Ok(stats) => Arc::new(ChannelForwardingStatsStore::new( + stats, + CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read channel forwarding stats from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; + + let channel_pair_forwarding_stats_store = match channel_pair_forwarding_stats_res { + Ok(stats) => Arc::new(ChannelPairForwardingStatsStore::new( + stats, + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read channel pair forwarding stats from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; + let (chain_source, chain_tip_opt) = match chain_data_source_config { Some(ChainDataSourceConfig::Esplora { server_url, headers, sync_config }) => { let sync_config = sync_config.unwrap_or(EsploraSyncConfig::default()); @@ -2245,6 +2318,9 @@ fn build_with_store_internal( scorer, peer_store, payment_store, + forwarded_payment_store, + channel_forwarding_stats_store, + channel_pair_forwarding_stats_store, lnurl_auth, is_running, node_metrics, diff --git a/src/config.rs b/src/config.rs index ad1b91181..a043c3bd2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -111,6 +111,25 @@ pub(crate) const HRN_RESOLUTION_TIMEOUT_SECS: u64 = 5; // The timeout after which we abort an LNURL-auth operation. pub(crate) const LNURL_AUTH_TIMEOUT_SECS: u64 = 15; +/// The mode used for tracking forwarded payments. +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))] +pub enum ForwardedPaymentTrackingMode { + /// Store individual forwarded payments until they are aggregated into channel-pair buckets. + Detailed { + /// Number of minutes to retain individual forwarded payments before aggregation. + retention_minutes: u64, + }, + /// Track only per-channel aggregate statistics. + Stats, +} + +impl Default for ForwardedPaymentTrackingMode { + fn default() -> Self { + Self::Stats + } +} + #[derive(Debug, Clone)] #[cfg_attr(feature = "uniffi", derive(uniffi::Record))] /// Represents the configuration of an [`Node`] instance. @@ -130,9 +149,10 @@ pub(crate) const LNURL_AUTH_TIMEOUT_SECS: u64 = 15; /// | `route_parameters` | None | /// | `tor_config` | None | /// | `hrn_config` | HumanReadableNamesConfig::default() | +/// | `forwarded_payment_tracking_mode` | Stats | /// -/// See [`AnchorChannelsConfig`] and [`RouteParametersConfig`] for more information regarding their -/// respective default values. +/// See [`AnchorChannelsConfig`], [`RouteParametersConfig`], and +/// [`ForwardedPaymentTrackingMode`] for more information regarding their respective default values. /// /// [`Node`]: crate::Node pub struct Config { @@ -205,6 +225,8 @@ pub struct Config { /// /// [BIP 353]: https://github.com/bitcoin/bips/blob/master/bip-0353.mediawiki pub hrn_config: HumanReadableNamesConfig, + /// The mode used for tracking forwarded payments. + pub forwarded_payment_tracking_mode: ForwardedPaymentTrackingMode, } impl Default for Config { @@ -221,6 +243,7 @@ impl Default for Config { route_parameters: None, node_alias: None, hrn_config: HumanReadableNamesConfig::default(), + forwarded_payment_tracking_mode: ForwardedPaymentTrackingMode::default(), } } } diff --git a/src/data_store.rs b/src/data_store.rs index 13afeca7e..deb92d324 100644 --- a/src/data_store.rs +++ b/src/data_store.rs @@ -9,11 +9,11 @@ use std::collections::HashMap; use std::ops::Deref; use std::sync::{Arc, Mutex}; -use lightning::util::persist::KVStore; +use lightning::util::persist::{KVStore, PageToken, PaginatedKVStore}; use lightning::util::ser::{Readable, Writeable}; use crate::logger::{log_error, LdkLogger}; -use crate::types::DynStore; +use crate::types::{DynStore, DynStoreRef}; use crate::Error; pub(crate) trait StorableObject: Clone + Readable + Writeable { @@ -179,6 +179,65 @@ where self.objects.lock().expect("lock").values().filter(f).cloned().collect::>() } + pub(crate) async fn list_page( + &self, page_token: Option, + ) -> Result<(Vec, Option), Error> { + let response = PaginatedKVStore::list_paginated( + &DynStoreRef(Arc::clone(&self.kv_store)), + &self.primary_namespace, + &self.secondary_namespace, + page_token, + ) + .await + .map_err(|e| { + log_error!( + self.logger, + "Listing object data under {}/{} failed due to: {}", + &self.primary_namespace, + &self.secondary_namespace, + e + ); + Error::PersistenceFailed + })?; + + let mut objects = Vec::with_capacity(response.keys.len()); + for key in response.keys { + let data = KVStore::read( + &DynStoreRef(Arc::clone(&self.kv_store)), + &self.primary_namespace, + &self.secondary_namespace, + &key, + ) + .await + .map_err(|e| { + log_error!( + self.logger, + "Reading object data for key {}/{}/{} failed due to: {}", + &self.primary_namespace, + &self.secondary_namespace, + key, + e + ); + Error::PersistenceFailed + })?; + + let object = SO::read(&mut &data[..]).map_err(|e| { + log_error!( + self.logger, + "Failed to deserialize object data for key {}/{}/{}: {}", + &self.primary_namespace, + &self.secondary_namespace, + key, + e + ); + Error::PersistenceFailed + })?; + objects.push(object); + } + + Ok((objects, response.next_page_token)) + } + async fn persist(&self, object: &SO) -> Result<(), Error> { let (store_key, data) = Self::encode_object(object); self.persist_encoded(store_key, data).await diff --git a/src/event.rs b/src/event.rs index 93d274ff7..1a208c21c 100644 --- a/src/event.rs +++ b/src/event.rs @@ -7,9 +7,10 @@ use core::future::Future; use core::task::{Poll, Waker}; -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::ops::Deref; use std::sync::{Arc, Mutex}; +use std::time::{SystemTime, UNIX_EPOCH}; use bitcoin::blockdata::locktime::absolute::LockTime; use bitcoin::secp256k1::PublicKey; @@ -33,7 +34,7 @@ use lightning::{impl_writeable_tlv_based, impl_writeable_tlv_based_enum}; use lightning_liquidity::lsps2::utils::compute_opening_fee; use lightning_types::payment::{PaymentHash, PaymentPreimage}; -use crate::config::{may_announce_channel, Config}; +use crate::config::{may_announce_channel, Config, ForwardedPaymentTrackingMode}; use crate::connection::ConnectionManager; use crate::data_store::DataStoreUpdateResult; use crate::fee_estimator::ConfirmationTarget; @@ -48,12 +49,14 @@ use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox; use crate::payment::asynchronous::static_invoice_store::StaticInvoiceStore; use crate::payment::store::{ - PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus, + ChannelForwardingStats, ForwardedPaymentDetails, ForwardedPaymentId, PaymentDetails, + PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus, }; use crate::payment::PaymentMetadata; use crate::runtime::Runtime; use crate::types::{ - CustomTlvRecord, DynStore, KeysManager, OnionMessenger, PaymentStore, Sweeper, Wallet, + ChannelForwardingStatsStore, CustomTlvRecord, DynStore, ForwardedPaymentStore, KeysManager, + OnionMessenger, PaymentStore, Sweeper, Wallet, }; use crate::{ hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore, @@ -535,6 +538,8 @@ where network_graph: Arc, liquidity_source: Arc>>, payment_store: Arc, + forwarded_payment_store: Arc, + channel_forwarding_stats_store: Arc, peer_store: Arc>, keys_manager: Arc, runtime: Arc, @@ -555,6 +560,8 @@ where channel_manager: Arc, connection_manager: Arc>, output_sweeper: Arc, network_graph: Arc, liquidity_source: Arc>>, payment_store: Arc, + forwarded_payment_store: Arc, + channel_forwarding_stats_store: Arc, peer_store: Arc>, keys_manager: Arc, static_invoice_store: Option, onion_messenger: Arc, om_mailbox: Option>, runtime: Arc, logger: L, @@ -570,6 +577,8 @@ where network_graph, liquidity_source, payment_store, + forwarded_payment_store, + channel_forwarding_stats_store, peer_store, keys_manager, logger, @@ -1480,6 +1489,152 @@ where .await; } + if !prev_htlcs.is_empty() && !next_htlcs.is_empty() { + let forwarded_at_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("current time should not be earlier than the Unix epoch") + .as_secs(); + let inbound_amount_msat = outbound_amount_forwarded_msat + .unwrap_or(0) + .saturating_add(total_fee_earned_msat.unwrap_or(0)); + + let prev_htlc_count = prev_htlcs.len() as u64; + let inbound_amount_per_htlc_msat = inbound_amount_msat / prev_htlc_count; + let inbound_amount_remainder_msat = inbound_amount_msat % prev_htlc_count; + let fee_per_htlc_msat = total_fee_earned_msat.unwrap_or(0) / prev_htlc_count; + let fee_remainder_msat = total_fee_earned_msat.unwrap_or(0) % prev_htlc_count; + let skimmed_fee_per_htlc_msat = skimmed_fee_msat.unwrap_or(0) / prev_htlc_count; + let skimmed_fee_remainder_msat = + skimmed_fee_msat.unwrap_or(0) % prev_htlc_count; + + let mut inbound_stats_by_channel = HashMap::new(); + for (idx, prev_htlc) in prev_htlcs.iter().enumerate() { + let inbound_stats = inbound_stats_by_channel + .entry(prev_htlc.channel_id) + .or_insert(ChannelForwardingStats { + channel_id: prev_htlc.channel_id, + counterparty_node_id: prev_htlc.node_id, + inbound_payments_forwarded: 1, + outbound_payments_forwarded: 0, + total_inbound_amount_msat: 0, + total_outbound_amount_msat: 0, + total_fee_earned_msat: 0, + total_skimmed_fee_msat: 0, + onchain_claims_count: 0, + first_forwarded_at_timestamp: forwarded_at_timestamp, + last_forwarded_at_timestamp: forwarded_at_timestamp, + }); + if inbound_stats.counterparty_node_id.is_none() { + inbound_stats.counterparty_node_id = prev_htlc.node_id; + } + inbound_stats.total_inbound_amount_msat += inbound_amount_per_htlc_msat + + if idx == 0 { inbound_amount_remainder_msat } else { 0 }; + inbound_stats.total_fee_earned_msat += + fee_per_htlc_msat + if idx == 0 { fee_remainder_msat } else { 0 }; + inbound_stats.total_skimmed_fee_msat += skimmed_fee_per_htlc_msat + + if idx == 0 { skimmed_fee_remainder_msat } else { 0 }; + inbound_stats.onchain_claims_count += + if claim_from_onchain_tx && idx == 0 { 1 } else { 0 }; + } + + for inbound_stats in inbound_stats_by_channel.into_values() { + self.channel_forwarding_stats_store + .insert_or_update(inbound_stats) + .await + .map_err(|e| { + log_error!( + self.logger, + "Failed to update inbound channel forwarding stats: {e}" + ); + ReplayEvent() + })?; + } + + let next_htlc_count = next_htlcs.len() as u64; + let outbound_amount_msat = outbound_amount_forwarded_msat.unwrap_or(0); + let outbound_amount_per_htlc_msat = outbound_amount_msat / next_htlc_count; + let outbound_amount_remainder_msat = outbound_amount_msat % next_htlc_count; + + let mut outbound_stats_by_channel = HashMap::new(); + for (idx, next_htlc) in next_htlcs.iter().enumerate() { + let outbound_stats = outbound_stats_by_channel + .entry(next_htlc.channel_id) + .or_insert(ChannelForwardingStats { + channel_id: next_htlc.channel_id, + counterparty_node_id: next_htlc.node_id, + inbound_payments_forwarded: 0, + outbound_payments_forwarded: 1, + total_inbound_amount_msat: 0, + total_outbound_amount_msat: 0, + total_fee_earned_msat: 0, + total_skimmed_fee_msat: 0, + onchain_claims_count: 0, + first_forwarded_at_timestamp: forwarded_at_timestamp, + last_forwarded_at_timestamp: forwarded_at_timestamp, + }); + if outbound_stats.counterparty_node_id.is_none() { + outbound_stats.counterparty_node_id = next_htlc.node_id; + } + outbound_stats.total_outbound_amount_msat += outbound_amount_per_htlc_msat + + if idx == 0 { outbound_amount_remainder_msat } else { 0 }; + } + + for outbound_stats in outbound_stats_by_channel.into_values() { + self.channel_forwarding_stats_store + .insert_or_update(outbound_stats) + .await + .map_err(|e| { + log_error!( + self.logger, + "Failed to update outbound channel forwarding stats: {e}" + ); + ReplayEvent() + })?; + } + + if matches!( + self.config.forwarded_payment_tracking_mode, + ForwardedPaymentTrackingMode::Detailed { .. } + ) { + // TODO: Once LDK exposes exact incoming/outgoing HTLC pair data for + // trampoline forwards, store detailed records for multi-HTLC forwards too. + if prev_htlcs.len() == 1 && next_htlcs.len() == 1 { + let prev_htlc = prev_htlcs.first().expect("prev_htlcs has one element"); + let next_htlc = next_htlcs.first().expect("next_htlcs has one element"); + let forwarded_payment = ForwardedPaymentDetails { + id: ForwardedPaymentId(self.keys_manager.get_secure_random_bytes()), + prev_channel_id: prev_htlc.channel_id, + next_channel_id: next_htlc.channel_id, + prev_user_channel_id: prev_htlc.user_channel_id.map(UserChannelId), + next_user_channel_id: next_htlc.user_channel_id.map(UserChannelId), + prev_node_id: prev_htlc.node_id, + next_node_id: next_htlc.node_id, + total_fee_earned_msat, + skimmed_fee_msat, + claim_from_onchain_tx, + outbound_amount_forwarded_msat, + forwarded_at_timestamp, + }; + self.forwarded_payment_store.insert(forwarded_payment).await.map_err( + |e| { + log_error!( + self.logger, + "Failed to store forwarded payment: {e}" + ); + ReplayEvent() + }, + )?; + } else { + log_debug!( + self.logger, + "Skipping detailed channel-pair forwarding stats for multi-HTLC forward with {} inbound and {} outbound HTLCs", + prev_htlcs.len(), + next_htlcs.len() + ); + } + } + } + let event = Event::PaymentForwarded { prev_htlcs: prev_htlcs.into_iter().map(HTLCLocator::from).collect(), next_htlcs: next_htlcs.into_iter().map(HTLCLocator::from).collect(), diff --git a/src/ffi/types.rs b/src/ffi/types.rs index 9bb03bb07..8854e1604 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -149,7 +149,8 @@ pub use crate::entropy::{generate_entropy_mnemonic, NodeEntropy, WordCount}; use crate::error::Error; pub use crate::liquidity::LSPS1OrderStatus; pub use crate::logger::{LogLevel, LogRecord, LogWriter}; -use crate::{hex_utils, SocketAddress, UserChannelId}; +use crate::payment::store::{ChannelPairStatsId, ForwardedPaymentId}; +use crate::{hex_utils, PageToken, SocketAddress, UserChannelId}; uniffi::custom_type!(PublicKey, String, { remote, @@ -909,6 +910,22 @@ uniffi::custom_type!(PaymentId, String, { }, }); +uniffi::custom_type!(ForwardedPaymentId, String, { + remote, + try_lift: |val| { + if let Some(bytes_vec) = hex_utils::to_vec(&val) { + let bytes_res = bytes_vec.try_into(); + if let Ok(bytes) = bytes_res { + return Ok(ForwardedPaymentId(bytes)); + } + } + Err(Error::InvalidPaymentId.into()) + }, + lower: |obj| { + hex_utils::to_string(&obj.0) + }, +}); + uniffi::custom_type!(PaymentHash, String, { remote, try_lift: |val| { @@ -974,6 +991,33 @@ uniffi::custom_type!(ChannelId, String, { } }); +uniffi::custom_type!(ChannelPairStatsId, String, { + remote, + try_lift: |val| { + if let Some(hex_vec) = hex_utils::to_vec(&val) { + if hex_vec.len() == 72 { + let mut id = [0u8; 72]; + id.copy_from_slice(&hex_vec[..]); + return Ok(ChannelPairStatsId(id)); + } + } + Err(Error::InvalidChannelId.into()) + }, + lower: |obj| { + hex_utils::to_string(&obj.0) + } +}); + +uniffi::custom_type!(PageToken, String, { + remote, + try_lift: |val| { + Ok(PageToken::new(val)) + }, + lower: |obj| { + obj.to_string() + } +}); + uniffi::custom_type!(UserChannelId, String, { remote, try_lift: |val| { diff --git a/src/io/mod.rs b/src/io/mod.rs index a01aa59a8..5ae2bc608 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -33,6 +33,20 @@ pub(crate) const PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; pub(crate) const PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "pending_payments"; pub(crate) const PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; +/// The forwarded payment information will be persisted under this prefix. +pub(crate) const FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "forwarded_payments"; +pub(crate) const FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + +/// The channel forwarding stats will be persisted under this prefix. +pub(crate) const CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE: &str = + "channel_forwarding_stats"; +pub(crate) const CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + +/// The channel pair forwarding stats will be persisted under this prefix. +pub(crate) const CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE: &str = + "channel_pair_forwarding_stats"; +pub(crate) const CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + /// The node metrics will be persisted under this key. pub(crate) const NODE_METRICS_PRIMARY_NAMESPACE: &str = ""; pub(crate) const NODE_METRICS_SECONDARY_NAMESPACE: &str = ""; diff --git a/src/lib.rs b/src/lib.rs index c97e16fe6..a6368e653 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -128,8 +128,8 @@ pub use builder::NodeBuilder as Builder; use chain::ChainSource; use config::{ default_user_config, may_announce_channel, AsyncPaymentsRole, ChannelConfig, Config, - LNURL_AUTH_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, - RGS_SYNC_INTERVAL, + ForwardedPaymentTrackingMode, LNURL_AUTH_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL, + PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL, }; use connection::ConnectionManager; pub use error::Error as NodeError; @@ -153,9 +153,11 @@ pub use lightning::ln::channel_state::ChannelShutdownState; use lightning::ln::channelmanager::PaymentId; use lightning::ln::msgs::{BaseMessageHandler, SocketAddress}; use lightning::ln::peer_handler::CustomMessageHandler; +use lightning::ln::types::ChannelId; use lightning::routing::gossip::NodeAlias; use lightning::sign::EntropySource; use lightning::util::persist::KVStore; +pub use lightning::util::persist::PageToken; use lightning::util::wallet_utils::{Input, Wallet as LdkWallet}; use lightning_background_processor::process_events_async; pub use lightning_invoice; @@ -167,15 +169,21 @@ use lnurl_auth::LnurlAuth; use logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use payment::asynchronous::om_mailbox::OnionMessageMailbox; use payment::asynchronous::static_invoice_store::StaticInvoiceStore; +use payment::store::{ + aggregate_channel_pair_stats as aggregate_channel_pair_stats_impl, + aggregate_expired_forwarded_payments, +}; use payment::{ - Bolt11Payment, Bolt12Payment, OnchainPayment, PaymentDetails, SpontaneousPayment, - UnifiedPayment, + Bolt11Payment, Bolt12Payment, ChannelForwardingStats, ChannelPairForwardingStats, + ForwardedPaymentDetails, ForwardedPaymentId, OnchainPayment, PaymentDetails, + SpontaneousPayment, UnifiedPayment, }; use peer_store::{PeerInfo, PeerStore}; use runtime::Runtime; pub use tokio; use types::{ - Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph, + Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelForwardingStatsStore, + ChannelManager, ChannelPairForwardingStatsStore, DynStore, ForwardedPaymentStore, Graph, HRNResolver, KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, Wallet, }; @@ -244,6 +252,9 @@ pub struct Node { scorer: Arc>, peer_store: Arc>>, payment_store: Arc, + forwarded_payment_store: Arc, + channel_forwarding_stats_store: Arc, + channel_pair_forwarding_stats_store: Arc, lnurl_auth: Arc, is_running: Arc>, node_metrics: Arc, @@ -254,6 +265,36 @@ pub struct Node { _leak_checker: LeakChecker, } +/// A page of forwarded payments returned from a paginated listing. +#[derive(Clone, Debug, PartialEq, Eq)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Record))] +pub struct ForwardedPaymentDetailsPage { + /// Forwarded payments in this page. + pub payments: Vec, + /// Token to pass to the next call to continue listing, if another page exists. + pub next_page_token: Option, +} + +/// A page of channel forwarding statistics returned from a paginated listing. +#[derive(Clone, Debug, PartialEq, Eq)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Record))] +pub struct ChannelForwardingStatsPage { + /// Channel forwarding statistics in this page. + pub stats: Vec, + /// Token to pass to the next call to continue listing, if another page exists. + pub next_page_token: Option, +} + +/// A page of channel-pair forwarding statistics returned from a paginated listing. +#[derive(Clone, Debug, PartialEq, Eq)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Record))] +pub struct ChannelPairForwardingStatsPage { + /// Channel-pair forwarding statistics in this page. + pub stats: Vec, + /// Token to pass to the next call to continue listing, if another page exists. + pub next_page_token: Option, +} + impl Node { /// Starts the necessary background tasks, such as handling events coming from user input, /// LDK/BDK, and the peer-to-peer network. @@ -289,6 +330,38 @@ impl Node { let chain_source = Arc::clone(&self.chain_source); self.runtime.block_on(async move { chain_source.update_fee_rate_estimates().await })?; + if let ForwardedPaymentTrackingMode::Detailed { retention_minutes } = + self.config.forwarded_payment_tracking_mode + { + if retention_minutes > 0 { + let forwarded_payment_store = Arc::clone(&self.forwarded_payment_store); + let channel_pair_stats_store = + Arc::clone(&self.channel_pair_forwarding_stats_store); + let logger = Arc::clone(&self.logger); + self.runtime.block_on(async move { + match aggregate_expired_forwarded_payments( + &forwarded_payment_store, + &channel_pair_stats_store, + retention_minutes, + &logger, + ) + .await + { + Ok((pair_count, payment_count)) if pair_count > 0 => { + log_info!( + logger, + "Aggregated {payment_count} forwarded payments into {pair_count} channel pair buckets" + ); + }, + Err(e) => { + log_error!(logger, "Startup forwarded payment aggregation failed: {e}") + }, + _ => {}, + } + }); + } + } + // Spawn background task continuously syncing onchain, lightning, and fee rate cache. let stop_sync_receiver = self.stop_sender.subscribe(); let chain_source = Arc::clone(&self.chain_source); @@ -581,6 +654,50 @@ impl Node { chain_source.continuously_process_broadcast_queue(stop_tx_bcast).await }); + if let ForwardedPaymentTrackingMode::Detailed { retention_minutes } = + self.config.forwarded_payment_tracking_mode + { + if retention_minutes > 0 { + let stop_aggregation = self.stop_sender.subscribe(); + let forwarded_payment_store = Arc::clone(&self.forwarded_payment_store); + let channel_pair_stats_store = + Arc::clone(&self.channel_pair_forwarding_stats_store); + let logger = Arc::clone(&self.logger); + self.runtime.spawn_cancellable_background_task(async move { + let mut interval = + tokio::time::interval(Duration::from_secs(retention_minutes * 60)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mut stop_aggregation = stop_aggregation; + loop { + tokio::select! { + _ = stop_aggregation.changed() => break, + _ = interval.tick() => { + match aggregate_expired_forwarded_payments( + &forwarded_payment_store, + &channel_pair_stats_store, + retention_minutes, + &logger, + ) + .await + { + Ok((pair_count, payment_count)) if pair_count > 0 => { + log_debug!( + logger, + "Aggregated {} forwarded payments into {} channel pair buckets", + payment_count, + pair_count + ); + }, + Err(e) => log_error!(logger, "Periodic forwarded payment aggregation failed: {}", e), + _ => {}, + } + } + } + } + }); + } + } + let bump_tx_event_handler = Arc::new(BumpTransactionEventHandler::new( Arc::clone(&self.tx_broadcaster), Arc::new(LdkWallet::new(Arc::clone(&self.wallet), Arc::clone(&self.logger))), @@ -605,6 +722,8 @@ impl Node { Arc::clone(&self.network_graph), Arc::clone(&self.liquidity_source), Arc::clone(&self.payment_store), + Arc::clone(&self.forwarded_payment_store), + Arc::clone(&self.channel_forwarding_stats_store), Arc::clone(&self.peer_store), Arc::clone(&self.keys_manager), static_invoice_store, @@ -2135,6 +2254,114 @@ impl Node { self.payment_store.list_filter(|_| true) } + /// Retrieve the details of a specific forwarded payment with the given id. + pub fn forwarded_payment( + &self, forwarded_payment_id: &ForwardedPaymentId, + ) -> Option { + self.forwarded_payment_store.get(forwarded_payment_id) + } + + /// Retrieves all forwarded payments that match the given predicate. + pub fn list_forwarded_payments_with_filter bool>( + &self, f: F, + ) -> Vec { + self.forwarded_payment_store.list_filter(f) + } + + /// Retrieves a page of forwarded payments from the underlying paginated store. + pub fn list_forwarded_payments( + &self, page_token: Option, + ) -> Result { + let (payments, next_page_token) = + self.runtime.block_on(self.forwarded_payment_store.list_page(page_token))?; + Ok(ForwardedPaymentDetailsPage { payments, next_page_token }) + } + + /// Returns the configured forwarded payment tracking mode. + pub fn forwarded_payment_tracking_mode(&self) -> ForwardedPaymentTrackingMode { + self.config.forwarded_payment_tracking_mode + } + + /// Retrieve the forwarding statistics for a specific channel. + pub fn channel_forwarding_stats( + &self, channel_id: &ChannelId, + ) -> Option { + self.channel_forwarding_stats_store.get(channel_id) + } + + /// Retrieves a page of channel forwarding statistics from the underlying paginated store. + pub fn list_channel_forwarding_stats( + &self, page_token: Option, + ) -> Result { + let (stats, next_page_token) = + self.runtime.block_on(self.channel_forwarding_stats_store.list_page(page_token))?; + Ok(ChannelForwardingStatsPage { stats, next_page_token }) + } + + /// Retrieves all channel forwarding statistics that match the given predicate. + pub fn list_channel_forwarding_stats_with_filter bool>( + &self, f: F, + ) -> Vec { + self.channel_forwarding_stats_store.list_filter(f) + } + + /// Retrieves a page of channel pair forwarding statistics from the underlying paginated store. + pub fn list_channel_pair_forwarding_stats( + &self, page_token: Option, + ) -> Result { + let (stats, next_page_token) = self + .runtime + .block_on(self.channel_pair_forwarding_stats_store.list_page(page_token))?; + Ok(ChannelPairForwardingStatsPage { stats, next_page_token }) + } + + /// Retrieves all channel pair forwarding statistics that match the given predicate. + pub fn list_channel_pair_forwarding_stats_with_filter< + F: FnMut(&&ChannelPairForwardingStats) -> bool, + >( + &self, f: F, + ) -> Vec { + self.channel_pair_forwarding_stats_store.list_filter(f) + } + + /// Retrieves channel pair forwarding statistics within a specific time range. + /// + /// The page token advances over the underlying channel-pair stats namespace. Filtering is + /// applied to the returned page. + pub fn list_channel_pair_forwarding_stats_in_range( + &self, start_timestamp: u64, end_timestamp: u64, page_token: Option, + ) -> Result { + let (stats, next_page_token) = self + .runtime + .block_on(self.channel_pair_forwarding_stats_store.list_page(page_token))?; + let stats = stats + .into_iter() + .filter(|stats| { + stats.bucket_start_timestamp >= start_timestamp + && stats.bucket_start_timestamp < end_timestamp + }) + .collect(); + Ok(ChannelPairForwardingStatsPage { stats, next_page_token }) + } + + /// Retrieves all forwarding statistics buckets for a specific channel pair. + /// + /// The page token advances over the underlying channel-pair stats namespace. Filtering is + /// applied to the returned page. + pub fn list_channel_pair_forwarding_stats_for_pair( + &self, prev_channel_id: ChannelId, next_channel_id: ChannelId, + page_token: Option, + ) -> Result { + let (mut stats, next_page_token) = self + .runtime + .block_on(self.channel_pair_forwarding_stats_store.list_page(page_token))?; + stats.retain(|stats| { + stats.prev_channel_id == prev_channel_id && stats.next_channel_id == next_channel_id + }); + stats.sort_by_key(|stats| stats.bucket_start_timestamp); + Ok(ChannelPairForwardingStatsPage { stats, next_page_token }) + } + /// Retrieves a list of known peers. pub fn list_peers(&self) -> Vec { let mut peers = Vec::new(); @@ -2413,6 +2640,13 @@ pub(crate) fn new_channel_anchor_reserve_sats( }) } +/// Aggregates multiple channel pair statistics buckets into cumulative totals. +pub fn aggregate_channel_pair_stats( + buckets: &[ChannelPairForwardingStats], +) -> ChannelPairForwardingStats { + aggregate_channel_pair_stats_impl(buckets) +} + #[cfg(test)] mod tests { use lightning::util::ser::{Readable, Writeable}; diff --git a/src/payment/mod.rs b/src/payment/mod.rs index 2d3acf90e..e6860ad0f 100644 --- a/src/payment/mod.rs +++ b/src/payment/mod.rs @@ -24,7 +24,8 @@ pub(crate) use pending_payment_store::FundingTxCandidate; pub(crate) use pending_payment_store::PendingPaymentDetails; pub use spontaneous::SpontaneousPayment; pub use store::{ - Channel, ConfirmationStatus, LSPS2Parameters, PaymentDetails, PaymentDirection, PaymentKind, - PaymentStatus, TransactionType, + Channel, ChannelForwardingStats, ChannelPairForwardingStats, ConfirmationStatus, + ForwardedPaymentDetails, ForwardedPaymentId, LSPS2Parameters, PaymentDetails, PaymentDirection, + PaymentKind, PaymentStatus, TransactionType, }; pub use unified::{UnifiedPayment, UnifiedPaymentResult}; diff --git a/src/payment/store.rs b/src/payment/store.rs index 160890895..bde5b219c 100644 --- a/src/payment/store.rs +++ b/src/payment/store.rs @@ -5,6 +5,9 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. +use std::collections::HashMap; +use std::fmt::{Debug, Display}; +use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use bitcoin::secp256k1::PublicKey; @@ -14,6 +17,7 @@ use lightning::ln::channelmanager::PaymentId; use lightning::ln::msgs::DecodeError; use lightning::ln::types::ChannelId; use lightning::offers::offer::OfferId; +use lightning::util::logger::Logger as _; use lightning::util::ser::{Readable, Writeable}; use lightning::{ _init_and_read_len_prefixed_tlv_fields, impl_writeable_tlv_based, @@ -22,8 +26,11 @@ use lightning::{ use lightning_types::payment::{PaymentHash, PaymentPreimage, PaymentSecret}; use lightning_types::string::UntrustedString; -use crate::data_store::{StorableObject, StorableObjectId, StorableObjectUpdate}; +use crate::data_store::{DataStore, StorableObject, StorableObjectId, StorableObjectUpdate}; use crate::hex_utils; +use crate::logger::{log_debug, log_error, Logger}; +use crate::Error; +use crate::UserChannelId; /// Represents a payment. #[derive(Clone, Debug, PartialEq, Eq)] @@ -985,3 +992,618 @@ mod tests { assert_eq!(decoded, PaymentKind::read(&mut &*reencoded).unwrap()); } } + +/// A unique identifier for a forwarded payment. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct ForwardedPaymentId(pub [u8; 32]); + +impl StorableObjectId for ForwardedPaymentId { + fn encode_to_hex_str(&self) -> String { + hex_utils::to_string(&self.0) + } +} + +impl Writeable for ForwardedPaymentId { + fn write( + &self, writer: &mut W, + ) -> Result<(), lightning::io::Error> { + self.0.write(writer) + } +} + +impl Readable for ForwardedPaymentId { + fn read(reader: &mut R) -> Result { + Ok(Self(Readable::read(reader)?)) + } +} + +/// Details of a payment that has been forwarded through this node. +/// +/// Detailed channel-pair records are only stored for forwards with exactly one incoming HTLC and +/// one outgoing HTLC. Multi-HTLC forwards are omitted from detailed channel-pair tracking until LDK +/// exposes enough data to identify the exact incoming/outgoing HTLC pairs. +/// +/// TODO: Store multi-HTLC channel-pair records once LDK exposes exact pair data for trampoline +/// forwards. +#[derive(Clone, Debug, PartialEq, Eq)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Record))] +pub struct ForwardedPaymentDetails { + /// A randomly generated identifier for this forwarded payment. + pub id: ForwardedPaymentId, + /// The incoming channel id. + pub prev_channel_id: ChannelId, + /// The outgoing channel id. + pub next_channel_id: ChannelId, + /// The incoming user channel id, if available. + pub prev_user_channel_id: Option, + /// The outgoing user channel id, if available. + pub next_user_channel_id: Option, + /// The previous node id, if available. + pub prev_node_id: Option, + /// The next node id, if available. + pub next_node_id: Option, + /// The total fee earned by forwarding this payment, in millisatoshis. + pub total_fee_earned_msat: Option, + /// The skimmed fee, in millisatoshis. + pub skimmed_fee_msat: Option, + /// Whether the forwarded HTLC was claimed from an on-chain transaction. + pub claim_from_onchain_tx: bool, + /// The outbound amount forwarded, in millisatoshis. + pub outbound_amount_forwarded_msat: Option, + /// The timestamp when this payment was forwarded. + pub forwarded_at_timestamp: u64, +} + +impl_writeable_tlv_based!(ForwardedPaymentDetails, { + (0, id, required), + (2, prev_channel_id, required), + (4, next_channel_id, required), + (6, prev_user_channel_id, option), + (8, next_user_channel_id, option), + (10, prev_node_id, option), + (12, next_node_id, option), + (14, total_fee_earned_msat, option), + (16, skimmed_fee_msat, option), + (18, claim_from_onchain_tx, required), + (20, outbound_amount_forwarded_msat, option), + (22, forwarded_at_timestamp, required), +}); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub(crate) struct ForwardedPaymentDetailsUpdate { + id: ForwardedPaymentId, +} + +impl StorableObjectUpdate for ForwardedPaymentDetailsUpdate { + fn id(&self) -> ForwardedPaymentId { + self.id + } +} + +impl StorableObject for ForwardedPaymentDetails { + type Id = ForwardedPaymentId; + type Update = ForwardedPaymentDetailsUpdate; + + fn id(&self) -> Self::Id { + self.id + } + + fn update(&mut self, _update: Self::Update) -> bool { + false + } + + fn to_update(&self) -> Self::Update { + ForwardedPaymentDetailsUpdate { id: self.id } + } +} + +/// Aggregate statistics for forwarded payments through a single channel. +#[derive(Clone, Debug, PartialEq, Eq)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Record))] +pub struct ChannelForwardingStats { + /// The channel id these stats apply to. + pub channel_id: ChannelId, + /// The channel counterparty node id, if known. + pub counterparty_node_id: Option, + /// Number of forwarded payments where this was the incoming channel. + pub inbound_payments_forwarded: u64, + /// Number of forwarded payments where this was the outgoing channel. + pub outbound_payments_forwarded: u64, + /// Total inbound amount forwarded through this channel, in millisatoshis. + pub total_inbound_amount_msat: u64, + /// Total outbound amount forwarded through this channel, in millisatoshis. + pub total_outbound_amount_msat: u64, + /// Total forwarding fees earned through this channel, in millisatoshis. + pub total_fee_earned_msat: u64, + /// Total skimmed fees for this channel, in millisatoshis. + pub total_skimmed_fee_msat: u64, + /// Number of forwarded HTLCs claimed from on-chain transactions. + pub onchain_claims_count: u64, + /// Timestamp of the first forward recorded for this channel. + pub first_forwarded_at_timestamp: u64, + /// Timestamp of the latest forward recorded for this channel. + pub last_forwarded_at_timestamp: u64, +} + +impl_writeable_tlv_based!(ChannelForwardingStats, { + (0, channel_id, required), + (2, counterparty_node_id, option), + (4, inbound_payments_forwarded, required), + (6, outbound_payments_forwarded, required), + (8, total_inbound_amount_msat, required), + (10, total_outbound_amount_msat, required), + (12, total_fee_earned_msat, required), + (14, total_skimmed_fee_msat, required), + (16, onchain_claims_count, required), + (18, first_forwarded_at_timestamp, required), + (20, last_forwarded_at_timestamp, required), +}); + +/// Channel pair identifier, formed from previous channel, next channel, and time bucket. +#[derive(Clone, Copy, PartialEq, Eq, Hash)] +pub struct ChannelPairStatsId(pub [u8; 72]); + +impl ChannelPairStatsId { + pub fn from_channel_pair_and_bucket( + prev: &ChannelId, next: &ChannelId, bucket_start_timestamp: u64, + ) -> Self { + let mut result = [0u8; 72]; + result[0..32].copy_from_slice(&prev.0); + result[32..64].copy_from_slice(&next.0); + result[64..72].copy_from_slice(&bucket_start_timestamp.to_be_bytes()); + Self(result) + } +} + +impl Writeable for ChannelPairStatsId { + fn write( + &self, writer: &mut W, + ) -> Result<(), lightning::io::Error> { + writer.write_all(&self.0) + } +} + +impl Readable for ChannelPairStatsId { + fn read(reader: &mut R) -> Result { + let mut bytes = [0u8; 72]; + reader.read_exact(&mut bytes)?; + Ok(Self(bytes)) + } +} + +impl Display for ChannelPairStatsId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", hex_utils::to_string(&self.0)) + } +} + +impl Debug for ChannelPairStatsId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "ChannelPairStatsId({})", hex_utils::to_string(&self.0)) + } +} + +impl StorableObjectId for ChannelPairStatsId { + fn encode_to_hex_str(&self) -> String { + hex_utils::to_string(&self.0) + } +} + +/// Aggregated statistics for a specific channel pair. +#[derive(Clone, Debug, PartialEq, Eq)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Record))] +pub struct ChannelPairForwardingStats { + /// The unique channel-pair bucket id. + pub id: ChannelPairStatsId, + /// The incoming channel id. + pub prev_channel_id: ChannelId, + /// The outgoing channel id. + pub next_channel_id: ChannelId, + /// Start timestamp of this aggregation bucket. + pub bucket_start_timestamp: u64, + /// The previous node id, if available. + pub prev_node_id: Option, + /// The next node id, if available. + pub next_node_id: Option, + /// Number of payments aggregated in this bucket. + pub payment_count: u64, + /// Total inbound amount in this bucket, in millisatoshis. + pub total_inbound_amount_msat: u64, + /// Total outbound amount in this bucket, in millisatoshis. + pub total_outbound_amount_msat: u64, + /// Total forwarding fees earned in this bucket, in millisatoshis. + pub total_fee_earned_msat: u64, + /// Total skimmed fees in this bucket, in millisatoshis. + pub total_skimmed_fee_msat: u64, + /// Number of forwarded HTLCs claimed from on-chain transactions. + pub onchain_claims_count: u64, + /// Average forwarding fee per payment, in millisatoshis. + pub avg_fee_msat: u64, + /// Average inbound amount per payment, in millisatoshis. + pub avg_inbound_amount_msat: u64, + /// Timestamp of the first forward in this bucket. + pub first_forwarded_at_timestamp: u64, + /// Timestamp of the latest forward in this bucket. + pub last_forwarded_at_timestamp: u64, + /// Timestamp when this bucket was aggregated. + pub aggregated_at_timestamp: u64, +} + +impl_writeable_tlv_based!(ChannelPairForwardingStats, { + (0, id, required), + (2, prev_channel_id, required), + (4, next_channel_id, required), + (6, prev_node_id, option), + (8, next_node_id, option), + (10, payment_count, required), + (12, total_inbound_amount_msat, required), + (14, total_outbound_amount_msat, required), + (16, total_fee_earned_msat, required), + (18, total_skimmed_fee_msat, required), + (20, onchain_claims_count, required), + (22, avg_fee_msat, required), + (24, avg_inbound_amount_msat, required), + (26, first_forwarded_at_timestamp, required), + (28, last_forwarded_at_timestamp, required), + (30, aggregated_at_timestamp, required), + (32, bucket_start_timestamp, required), +}); + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct ChannelForwardingStatsUpdate { + pub channel_id: ChannelId, + pub counterparty_node_id: Option, + pub inbound_payments_increment: u64, + pub outbound_payments_increment: u64, + pub inbound_amount_increment_msat: u64, + pub outbound_amount_increment_msat: u64, + pub fee_earned_increment_msat: u64, + pub skimmed_fee_increment_msat: u64, + pub onchain_claims_increment: u64, + pub timestamp: u64, +} + +impl StorableObjectUpdate for ChannelForwardingStatsUpdate { + fn id(&self) -> ChannelId { + self.channel_id + } +} + +impl StorableObjectId for ChannelId { + fn encode_to_hex_str(&self) -> String { + hex_utils::to_string(&self.0) + } +} + +impl StorableObject for ChannelForwardingStats { + type Id = ChannelId; + type Update = ChannelForwardingStatsUpdate; + + fn id(&self) -> Self::Id { + self.channel_id + } + + fn update(&mut self, update: Self::Update) -> bool { + debug_assert_eq!(self.channel_id, update.channel_id); + let mut updated = false; + if self.counterparty_node_id.is_none() && update.counterparty_node_id.is_some() { + self.counterparty_node_id = update.counterparty_node_id; + updated = true; + } + if update.inbound_payments_increment > 0 { + self.inbound_payments_forwarded += update.inbound_payments_increment; + updated = true; + } + if update.outbound_payments_increment > 0 { + self.outbound_payments_forwarded += update.outbound_payments_increment; + updated = true; + } + if update.inbound_amount_increment_msat > 0 { + self.total_inbound_amount_msat += update.inbound_amount_increment_msat; + updated = true; + } + if update.outbound_amount_increment_msat > 0 { + self.total_outbound_amount_msat += update.outbound_amount_increment_msat; + updated = true; + } + if update.fee_earned_increment_msat > 0 { + self.total_fee_earned_msat += update.fee_earned_increment_msat; + updated = true; + } + if update.skimmed_fee_increment_msat > 0 { + self.total_skimmed_fee_msat += update.skimmed_fee_increment_msat; + updated = true; + } + if update.onchain_claims_increment > 0 { + self.onchain_claims_count += update.onchain_claims_increment; + updated = true; + } + if updated { + self.first_forwarded_at_timestamp = + self.first_forwarded_at_timestamp.min(update.timestamp); + self.last_forwarded_at_timestamp = + self.last_forwarded_at_timestamp.max(update.timestamp); + } + updated + } + + fn to_update(&self) -> Self::Update { + ChannelForwardingStatsUpdate { + channel_id: self.channel_id, + counterparty_node_id: self.counterparty_node_id, + inbound_payments_increment: self.inbound_payments_forwarded, + outbound_payments_increment: self.outbound_payments_forwarded, + inbound_amount_increment_msat: self.total_inbound_amount_msat, + outbound_amount_increment_msat: self.total_outbound_amount_msat, + fee_earned_increment_msat: self.total_fee_earned_msat, + skimmed_fee_increment_msat: self.total_skimmed_fee_msat, + onchain_claims_increment: self.onchain_claims_count, + timestamp: self.last_forwarded_at_timestamp, + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct ChannelPairForwardingStatsUpdate { + pub id: ChannelPairStatsId, + pub prev_node_id: Option, + pub next_node_id: Option, + pub payment_count_increment: u64, + pub inbound_amount_increment_msat: u64, + pub outbound_amount_increment_msat: u64, + pub fee_earned_increment_msat: u64, + pub skimmed_fee_increment_msat: u64, + pub onchain_claims_increment: u64, + pub first_timestamp: u64, + pub last_timestamp: u64, +} + +impl StorableObjectUpdate for ChannelPairForwardingStatsUpdate { + fn id(&self) -> ChannelPairStatsId { + self.id + } +} + +impl StorableObject for ChannelPairForwardingStats { + type Id = ChannelPairStatsId; + type Update = ChannelPairForwardingStatsUpdate; + + fn id(&self) -> Self::Id { + self.id + } + + fn update(&mut self, update: Self::Update) -> bool { + debug_assert_eq!(self.id, update.id); + let mut updated = false; + if self.prev_node_id.is_none() && update.prev_node_id.is_some() { + self.prev_node_id = update.prev_node_id; + updated = true; + } + if self.next_node_id.is_none() && update.next_node_id.is_some() { + self.next_node_id = update.next_node_id; + updated = true; + } + if update.payment_count_increment > 0 { + self.payment_count += update.payment_count_increment; + updated = true; + } + if update.inbound_amount_increment_msat > 0 { + self.total_inbound_amount_msat += update.inbound_amount_increment_msat; + updated = true; + } + if update.outbound_amount_increment_msat > 0 { + self.total_outbound_amount_msat += update.outbound_amount_increment_msat; + updated = true; + } + if update.fee_earned_increment_msat > 0 { + self.total_fee_earned_msat += update.fee_earned_increment_msat; + updated = true; + } + if update.skimmed_fee_increment_msat > 0 { + self.total_skimmed_fee_msat += update.skimmed_fee_increment_msat; + updated = true; + } + if update.onchain_claims_increment > 0 { + self.onchain_claims_count += update.onchain_claims_increment; + updated = true; + } + if updated { + if self.first_forwarded_at_timestamp == 0 { + self.first_forwarded_at_timestamp = update.first_timestamp; + } else { + self.first_forwarded_at_timestamp = + self.first_forwarded_at_timestamp.min(update.first_timestamp); + } + self.last_forwarded_at_timestamp = + self.last_forwarded_at_timestamp.max(update.last_timestamp); + if self.payment_count > 0 { + self.avg_fee_msat = self.total_fee_earned_msat / self.payment_count; + self.avg_inbound_amount_msat = self.total_inbound_amount_msat / self.payment_count; + } + } + updated + } + + fn to_update(&self) -> Self::Update { + ChannelPairForwardingStatsUpdate { + id: self.id, + prev_node_id: self.prev_node_id, + next_node_id: self.next_node_id, + payment_count_increment: self.payment_count, + inbound_amount_increment_msat: self.total_inbound_amount_msat, + outbound_amount_increment_msat: self.total_outbound_amount_msat, + fee_earned_increment_msat: self.total_fee_earned_msat, + skimmed_fee_increment_msat: self.total_skimmed_fee_msat, + onchain_claims_increment: self.onchain_claims_count, + first_timestamp: self.first_forwarded_at_timestamp, + last_timestamp: self.last_forwarded_at_timestamp, + } + } +} + +/// Aggregate expired forwarded payments into time-bucketed channel pair statistics. +pub(crate) async fn aggregate_expired_forwarded_payments( + forwarded_payment_store: &DataStore>, + channel_pair_stats_store: &DataStore>, + retention_minutes: u64, logger: &Arc, +) -> Result<(u64, u64), Error> { + let now = + SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or(Duration::from_secs(0)).as_secs(); + let cutoff = now.saturating_sub(retention_minutes.saturating_mul(60)); + let bucket_size_secs = retention_minutes.saturating_mul(60); + if bucket_size_secs == 0 { + return Ok((0, 0)); + } + + let expired = forwarded_payment_store.list_filter(|p| p.forwarded_at_timestamp < cutoff); + if expired.is_empty() { + log_debug!(logger, "No expired forwarded payments found"); + return Ok((0, 0)); + } + + let mut bucket_groups: HashMap<(ChannelId, ChannelId, u64), Vec<&ForwardedPaymentDetails>> = + HashMap::new(); + for payment in &expired { + let bucket_start = (payment.forwarded_at_timestamp / bucket_size_secs) * bucket_size_secs; + bucket_groups + .entry((payment.prev_channel_id, payment.next_channel_id, bucket_start)) + .or_default() + .push(payment); + } + + let mut aggregated_bucket_count = 0u64; + let mut removed_payment_count = 0u64; + for ((prev_channel_id, next_channel_id, bucket_start), payments) in bucket_groups { + let pair_id = ChannelPairStatsId::from_channel_pair_and_bucket( + &prev_channel_id, + &next_channel_id, + bucket_start, + ); + + let first_payment = payments[0]; + let mut total_inbound_amount_msat = 0u64; + let mut total_outbound_amount_msat = 0u64; + let mut total_fee_earned_msat = 0u64; + let mut total_skimmed_fee_msat = 0u64; + let mut onchain_claims_count = 0u64; + let mut first_timestamp = u64::MAX; + let mut last_timestamp = 0u64; + + for payment in &payments { + let outbound = payment.outbound_amount_forwarded_msat.unwrap_or(0); + let fee = payment.total_fee_earned_msat.unwrap_or(0); + let skimmed = payment.skimmed_fee_msat.unwrap_or(0); + total_inbound_amount_msat = + total_inbound_amount_msat.saturating_add(outbound.saturating_add(fee)); + total_outbound_amount_msat = total_outbound_amount_msat.saturating_add(outbound); + total_fee_earned_msat = total_fee_earned_msat.saturating_add(fee); + total_skimmed_fee_msat = total_skimmed_fee_msat.saturating_add(skimmed); + if payment.claim_from_onchain_tx { + onchain_claims_count += 1; + } + first_timestamp = first_timestamp.min(payment.forwarded_at_timestamp); + last_timestamp = last_timestamp.max(payment.forwarded_at_timestamp); + } + + let payment_count = payments.len() as u64; + let stats = ChannelPairForwardingStats { + id: pair_id, + prev_channel_id, + next_channel_id, + bucket_start_timestamp: bucket_start, + prev_node_id: first_payment.prev_node_id, + next_node_id: first_payment.next_node_id, + payment_count, + total_inbound_amount_msat, + total_outbound_amount_msat, + total_fee_earned_msat, + total_skimmed_fee_msat, + onchain_claims_count, + avg_fee_msat: total_fee_earned_msat / payment_count, + avg_inbound_amount_msat: total_inbound_amount_msat / payment_count, + first_forwarded_at_timestamp: first_timestamp, + last_forwarded_at_timestamp: last_timestamp, + aggregated_at_timestamp: now, + }; + + channel_pair_stats_store.insert_or_update(stats).await.map_err(|e| { + log_error!(logger, "Failed to upsert channel pair stats bucket for {pair_id:?}: {e}"); + e + })?; + aggregated_bucket_count += 1; + + for payment in payments { + forwarded_payment_store.remove(&payment.id).await.map_err(|e| { + log_error!(logger, "Failed to remove forwarded payment {:?}: {}", payment.id, e); + e + })?; + removed_payment_count += 1; + } + } + + Ok((aggregated_bucket_count, removed_payment_count)) +} + +/// Aggregates multiple channel pair statistics buckets into cumulative totals. +pub fn aggregate_channel_pair_stats( + buckets: &[ChannelPairForwardingStats], +) -> ChannelPairForwardingStats { + assert!(!buckets.is_empty(), "Cannot aggregate empty bucket list"); + let first = &buckets[0]; + for bucket in &buckets[1..] { + assert_eq!(bucket.prev_channel_id, first.prev_channel_id); + assert_eq!(bucket.next_channel_id, first.next_channel_id); + } + + let mut payment_count = 0u64; + let mut total_inbound_amount_msat = 0u64; + let mut total_outbound_amount_msat = 0u64; + let mut total_fee_earned_msat = 0u64; + let mut total_skimmed_fee_msat = 0u64; + let mut onchain_claims_count = 0u64; + let mut first_forwarded_at_timestamp = u64::MAX; + let mut last_forwarded_at_timestamp = 0u64; + let mut earliest_bucket_start = u64::MAX; + for bucket in buckets { + payment_count += bucket.payment_count; + total_inbound_amount_msat += bucket.total_inbound_amount_msat; + total_outbound_amount_msat += bucket.total_outbound_amount_msat; + total_fee_earned_msat += bucket.total_fee_earned_msat; + total_skimmed_fee_msat += bucket.total_skimmed_fee_msat; + onchain_claims_count += bucket.onchain_claims_count; + first_forwarded_at_timestamp = + first_forwarded_at_timestamp.min(bucket.first_forwarded_at_timestamp); + last_forwarded_at_timestamp = + last_forwarded_at_timestamp.max(bucket.last_forwarded_at_timestamp); + earliest_bucket_start = earliest_bucket_start.min(bucket.bucket_start_timestamp); + } + let now = + SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or(Duration::from_secs(0)).as_secs(); + ChannelPairForwardingStats { + id: ChannelPairStatsId::from_channel_pair_and_bucket( + &first.prev_channel_id, + &first.next_channel_id, + earliest_bucket_start, + ), + prev_channel_id: first.prev_channel_id, + next_channel_id: first.next_channel_id, + bucket_start_timestamp: earliest_bucket_start, + prev_node_id: first.prev_node_id, + next_node_id: first.next_node_id, + payment_count, + total_inbound_amount_msat, + total_outbound_amount_msat, + total_fee_earned_msat, + total_skimmed_fee_msat, + onchain_claims_count, + avg_fee_msat: if payment_count > 0 { total_fee_earned_msat / payment_count } else { 0 }, + avg_inbound_amount_msat: if payment_count > 0 { + total_inbound_amount_msat / payment_count + } else { + 0 + }, + first_forwarded_at_timestamp, + last_forwarded_at_timestamp, + aggregated_at_timestamp: now, + } +} diff --git a/src/types.rs b/src/types.rs index e24db4d25..347392fcf 100644 --- a/src/types.rs +++ b/src/types.rs @@ -48,6 +48,9 @@ use crate::fee_estimator::OnchainFeeEstimator; use crate::ffi::maybe_wrap; use crate::logger::Logger; use crate::message_handler::NodeCustomMessageHandler; +use crate::payment::store::{ + ChannelForwardingStats, ChannelPairForwardingStats, ForwardedPaymentDetails, +}; use crate::payment::{PaymentDetails, PendingPaymentDetails}; use crate::runtime::RuntimeSpawner; @@ -372,6 +375,10 @@ pub(crate) type BumpTransactionEventHandler = >; pub(crate) type PaymentStore = DataStore>; +pub(crate) type ForwardedPaymentStore = DataStore>; +pub(crate) type ChannelForwardingStatsStore = DataStore>; +pub(crate) type ChannelPairForwardingStatsStore = + DataStore>; /// A local, potentially user-provided, identifier of a channel. ///