diff --git a/crates/consensus/src/qbft/component.rs b/crates/consensus/src/qbft/component.rs index 13524a87..442bf778 100644 --- a/crates/consensus/src/qbft/component.rs +++ b/crates/consensus/src/qbft/component.rs @@ -21,6 +21,7 @@ use crate::{ use pluto_core::{ corepb::v1::{consensus as pbconsensus, core as pbcore, priority as pbpriority}, deadline::{AddOutcome, DeadlinerHandle}, + gater::DutyGaterFn, qbft, types::{Duty, DutyType}, }; @@ -41,9 +42,6 @@ pub type Broadcaster = Arc< + 'static, >; -/// Duty admission gate. -pub type DutyGater = Arc bool + Send + Sync + 'static>; - /// Sink for completed sniffer instances. pub type SnifferSink = Arc; @@ -77,7 +75,7 @@ pub struct Config { /// Duty deadline scheduler. pub deadliner: DeadlinerHandle, /// Duty admission gate. - pub duty_gater: DutyGater, + pub duty_gater: DutyGaterFn, /// External message broadcaster. pub broadcaster: Broadcaster, /// Completed sniffer sink. @@ -263,7 +261,7 @@ pub struct Consensus { local_peer_idx: i64, privkey: SecretKey, deadliner: DeadlinerHandle, - duty_gater: DutyGater, + duty_gater: DutyGaterFn, broadcaster: Broadcaster, sniffer: SnifferSink, timer_func: RoundTimerFunc, diff --git a/crates/consensus/src/qbft/mod.rs b/crates/consensus/src/qbft/mod.rs index 6ce9bc90..0c2ff9ed 100644 --- a/crates/consensus/src/qbft/mod.rs +++ b/crates/consensus/src/qbft/mod.rs @@ -5,7 +5,7 @@ pub(crate) mod definition; pub(crate) mod runner; pub use component::{ - BroadcastResult, Broadcaster, Config, Consensus, DutyGater, Error, Peer, Result, SnifferSink, + BroadcastResult, Broadcaster, Config, Consensus, Error, Peer, Result, SnifferSink, SubscriberResult, }; pub use runner::{Error as RunnerError, Result as RunnerResult}; diff --git a/crates/core/src/gater.rs b/crates/core/src/gater.rs new file mode 100644 index 00000000..4afafeee --- /dev/null +++ b/crates/core/src/gater.rs @@ -0,0 +1,311 @@ +//! Duty gater — rejects duties whose type is invalid or that are too far in the +//! future. + +use std::sync::Arc; + +use chrono::{DateTime, Utc}; +use pluto_eth2api::{EthBeaconNodeApiClient, EthBeaconNodeApiClientError}; + +use crate::{ + clock::{ChronoClock, Clock}, + types::Duty, +}; + +/// Shared, callable duty-gating predicate: the value form that the wire +/// components (parsigex, consensus) accept and invoke per duty. +pub type DutyGaterFn = Arc bool + Send + Sync + 'static>; + +/// Default number of epochs into the future for which duties are accepted. +const DEFAULT_ALLOWED_FUTURE_EPOCHS: u64 = 2; + +/// Errors returned while constructing a [`DutyGater`]. +#[derive(Debug, thiserror::Error)] +pub enum GaterError { + /// Failed to fetch beacon node configuration. + #[error("Failed to fetch beacon node configuration: {0}")] + BeaconNodeConfigError(#[from] EthBeaconNodeApiClientError), + + /// The slot duration is not a positive whole number of milliseconds + /// (sub-millisecond, or too large to fit `u64`), so it cannot be used as a + /// divisor in the millisecond-resolution epoch arithmetic. + #[error("Slot duration is not a positive number of milliseconds")] + InvalidSlotDuration, +} + +/// Result type for gater operations. +pub type Result = std::result::Result; + +/// Gates duties by type and recency. +/// +/// [`DutyGater::allows`] returns `true` only when a duty may be processed. It +/// rejects duties received from peers over the wire whose type is invalid or +/// whose epoch is more than `allowed_future_epochs` beyond the current epoch. +/// It does **not** reject duties in the past — that is the responsibility of +/// the [`crate::deadline`] component. +pub struct DutyGater { + genesis_time: DateTime, + /// Slot duration in milliseconds. Always ≥ 1, enforced in + /// [`DutyGater::with_options`]. + slot_duration_ms: u64, + /// Slots per epoch. Guaranteed non-zero by the `fetch_slots_config` + /// contract. + slots_per_epoch: u64, + allowed_future_epochs: u64, + clock: Box, +} + +impl DutyGater { + /// Builds a gater from a beacon node client using production defaults: a + /// real wall clock and a `DEFAULT_ALLOWED_FUTURE_EPOCHS` future-epoch + /// budget. + pub async fn new(client: &EthBeaconNodeApiClient) -> Result { + Self::with_options(client, Box::new(ChronoClock), DEFAULT_ALLOWED_FUTURE_EPOCHS).await + } + + /// Builds a gater with an injected clock and future-epoch budget. The + /// single fetch path shared with [`DutyGater::new`]; the overrides + /// exist for tests. + async fn with_options( + client: &EthBeaconNodeApiClient, + clock: Box, + allowed_future_epochs: u64, + ) -> Result { + let genesis_time = client.fetch_genesis_time().await?; + let (slot_duration, slots_per_epoch) = client.fetch_slots_config().await?; + + // Work in whole milliseconds. `as_millis()` is u128 (SECONDS_PER_SLOT + // keeps it tiny); reject a zero (sub-millisecond) or overflowing value + // rather than divide by zero in `current_epoch`. + let slot_duration_ms = u64::try_from(slot_duration.as_millis()) + .ok() + .filter(|&ms| ms != 0) + .ok_or(GaterError::InvalidSlotDuration)?; + + Ok(Self { + genesis_time, + slot_duration_ms, + slots_per_epoch, + allowed_future_epochs, + clock, + }) + } + + /// Returns `true` if `duty` may be processed: its type is valid and its + /// epoch is no more than `allowed_future_epochs` beyond the current epoch. + #[must_use] + pub fn allows(&self, duty: &Duty) -> bool { + if !duty.duty_type.is_valid() { + return false; + } + + let duty_epoch = duty + .slot + .inner() + .checked_div(self.slots_per_epoch) + .expect("slots_per_epoch is non-zero (fetch_slots_config contract)"); + + duty_epoch + <= self + .current_epoch() + .saturating_add(self.allowed_future_epochs) + } + + /// Converts this gater into the shared callable [`DutyGaterFn`] consumed by + /// the wire components. + #[must_use] + pub fn into_fn(self) -> DutyGaterFn { + let gater = Arc::new(self); + Arc::new(move |duty: &Duty| gater.allows(duty)) + } + + /// Current epoch derived from the injected clock and genesis time. + fn current_epoch(&self) -> u64 { + let elapsed_ms = self + .clock + .now() + .signed_duration_since(self.genesis_time) + .num_milliseconds(); + + // Pre-genesis instants clamp to epoch 0: the gater is only built after + // genesis, so this path is unreachable in practice, and treating a + // negative elapsed time as a huge future slot would be nonsense. + let elapsed_ms = u64::try_from(elapsed_ms).unwrap_or(0); + + let current_slot = elapsed_ms + .checked_div(self.slot_duration_ms) + .expect("slot_duration_ms >= 1 (enforced in with_options)"); + + current_slot + .checked_div(self.slots_per_epoch) + .expect("slots_per_epoch is non-zero (fetch_slots_config contract)") + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use chrono::{DateTime, Utc}; + use pluto_testutil::BeaconMock; + + use super::*; + use crate::types::{DutyType, SlotNumber}; + + /// A fixed clock returning `now` regardless of when it is called. + fn fixed_clock(now: DateTime) -> Box { + Box::new(move || now) + } + + /// Builds a gater from hand-set configuration and a fixed clock, for + /// non-async coverage that needs no beacon node. + fn gater( + genesis_time: DateTime, + slot_duration_ms: u64, + slots_per_epoch: u64, + allowed_future_epochs: u64, + now: DateTime, + ) -> DutyGater { + DutyGater { + genesis_time, + slot_duration_ms, + slots_per_epoch, + allowed_future_epochs, + clock: fixed_clock(now), + } + } + + fn attester(slot: u64) -> Duty { + Duty::new_attester_duty(SlotNumber::new(slot)) + } + + fn duty_with_type(slot: u64, duty_type: DutyType) -> Duty { + Duty { + slot: SlotNumber::new(slot), + duty_type, + } + } + + /// genesis == now (current epoch 0), 1s slots, 2 slots/epoch, 2 future + /// epochs allowed ⇒ slots 0-5 accepted. + #[tokio::test] + async fn duty_gater() { + // Genesis round-trips through the beacon API as whole seconds, so pin + // `now` to a whole second to make the injected clock equal genesis. + let now = DateTime::from_timestamp(Utc::now().timestamp(), 0).expect("valid timestamp"); + + let bmock = BeaconMock::builder() + .genesis_time(now) + .slot_duration(Duration::from_secs(1)) + .slots_per_epoch(2) + .build() + .await + .expect("build beacon mock"); + + let gater = DutyGater::with_options(bmock.client(), fixed_clock(now), 2) + .await + .expect("build gater"); + + // Allowed: slots 0-5 (epochs 0, 1, 2 ≤ budget 2). + for slot in 0..=5 { + assert!( + gater.allows(&attester(slot)), + "slot {slot} should be allowed" + ); + } + + // Disallowed: slot 6 onwards (epoch 3+). + for slot in [6, 7, 1000] { + assert!( + !gater.allows(&attester(slot)), + "slot {slot} should be disallowed" + ); + } + + // Invalid duty types are rejected regardless of slot. + assert!(!gater.allows(&duty_with_type(0, DutyType::Unknown))); + assert!(!gater.allows(&duty_with_type( + 1, + DutyType::DutySentinel(Box::new(DutyType::Attester)) + ))); + } + + /// Smoke test of the public `new` entrypoint (real `ChronoClock`, default + /// budget) against a mainnet-like 12s/32-slot config. Genesis is pinned to + /// ~now, so `current_epoch` stays 0 for the whole test (epochs are 384s + /// long) and the default future-epoch budget of 2 is locked in: slot 96 + /// (epoch 3) would only be allowed if the default were 3. + #[tokio::test] + async fn new_defaults() { + let now = DateTime::from_timestamp(Utc::now().timestamp(), 0).expect("valid timestamp"); + + let bmock = BeaconMock::builder() + .genesis_time(now) + .slot_duration(Duration::from_secs(12)) + .slots_per_epoch(32) + .build() + .await + .expect("build beacon mock"); + + let gater = DutyGater::new(bmock.client()).await.expect("build gater"); + + assert!(gater.allows(&attester(0))); // current epoch + assert!(gater.allows(&attester(95))); // epoch 2 (= budget) + assert!(!gater.allows(&attester(96))); // epoch 3 (> budget) + } + + /// Non-async coverage of the epoch boundary with a non-zero current epoch + /// (the async test above only exercises current epoch 0). + #[test] + fn epoch_boundary() { + let genesis = DateTime::from_timestamp(1_600_000_000, 0).expect("valid timestamp"); + // 100s after genesis at 1s slots ⇒ slot 100 ⇒ epoch 3 (32 slots/epoch). + let now = DateTime::from_timestamp(1_600_000_100, 0).expect("valid timestamp"); + // Budget = current epoch 3 + 2 = 5 ⇒ duty epoch ≤ 5 (slot ≤ 191) allowed. + let gater = gater(genesis, 1_000, 32, 2, now); + + assert!(gater.allows(&attester(96))); // current epoch (3) + assert!(gater.allows(&attester(128))); // N+1 + assert!(gater.allows(&attester(160))); // N+2 start + assert!(gater.allows(&attester(191))); // N+2 end + assert!(!gater.allows(&attester(192))); // N+3 + assert!(!gater.allows(&attester(10_000))); + } + + /// Pre-genesis instants clamp to epoch 0. + #[test] + fn pre_genesis_clamps_to_epoch_zero() { + let genesis = DateTime::from_timestamp(1_600_000_100, 0).expect("valid timestamp"); + let now = DateTime::from_timestamp(1_600_000_000, 0).expect("valid timestamp"); + // Budget = epoch 0 + 2 = 2 ⇒ slot ≤ 95 (epoch ≤ 2) allowed at 32 slots/epoch. + let gater = gater(genesis, 1_000, 32, 2, now); + + assert!(gater.allows(&attester(0))); + assert!(gater.allows(&attester(95))); // epoch 2 + assert!(!gater.allows(&attester(96))); // epoch 3 + } + + /// `into_fn` yields a callable predicate equivalent to `allows`, usable + /// where a `DutyGaterFn` (e.g. `pluto_parsigex::DutyGater`) is expected. + #[test] + fn into_fn_matches_allows() { + let genesis = DateTime::from_timestamp(1_600_000_000, 0).expect("valid timestamp"); + let now = DateTime::from_timestamp(1_600_000_100, 0).expect("valid timestamp"); + let gater_fn: DutyGaterFn = gater(genesis, 1_000, 32, 2, now).into_fn(); + + assert!(gater_fn(&attester(191))); + assert!(!gater_fn(&attester(192))); + assert!(!gater_fn(&duty_with_type(0, DutyType::Unknown))); + } + + #[test] + fn invalid_type_rejected() { + let genesis = DateTime::from_timestamp(1_600_000_000, 0).expect("valid timestamp"); + let gater = gater(genesis, 1_000, 32, 2, genesis); + + assert!(!gater.allows(&duty_with_type(0, DutyType::Unknown))); + assert!(!gater.allows(&duty_with_type( + 0, + DutyType::DutySentinel(Box::new(DutyType::Attester)) + ))); + } +} diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index a53341bd..6daff9d9 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -26,6 +26,10 @@ pub mod deadline; /// Clock abstraction over the current time. pub mod clock; +/// Duty gater — rejects duties whose type is invalid or that are too far in the +/// future. +pub mod gater; + /// parsigdb pub mod parsigdb; diff --git a/crates/dkg/src/exchanger.rs b/crates/dkg/src/exchanger.rs index 67eabd2b..e5153ff3 100644 --- a/crates/dkg/src/exchanger.rs +++ b/crates/dkg/src/exchanger.rs @@ -49,6 +49,7 @@ use tracing::warn; use pluto_core::{ deadline::{DeadlinerTask, NeverExpiringCalculator}, + gater::DutyGaterFn, parsigdb::memory::{ InternalSubscriberError, MemDB, MemDBError, internal_subscriber, threshold_subscriber, }, @@ -101,8 +102,6 @@ impl Default for DataByPubkey { } } -type DutyGaterFn = Arc bool + Send + Sync + 'static>; - /// Errors returned by exchanger operations. #[derive(Debug, thiserror::Error)] pub enum ExchangerError { @@ -331,7 +330,9 @@ mod tests { use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; - use super::{Exchanger, SIG_DEPOSIT_DATA, SIG_LOCK, SIG_VALIDATOR_REG, SigTypeStore}; + use super::{ + DutyGaterFn, Exchanger, SIG_DEPOSIT_DATA, SIG_LOCK, SIG_VALIDATOR_REG, SigTypeStore, + }; fn available_tcp_port() -> anyhow::Result { let listener = TcpListener::bind("127.0.0.1:0")?; @@ -375,7 +376,7 @@ mod tests { let p2p_context = P2PContext::new(peer_ids.to_vec()); let verifier: pluto_parsigex::Verifier = Arc::new(|_duty, _pk, _psig| Box::pin(async { Ok(()) })); - let duty_gater: pluto_parsigex::DutyGater = + let duty_gater: DutyGaterFn = Arc::new(|duty: &pluto_core::types::Duty| duty.duty_type == DutyType::Signature); ParsexBehaviour::new(ParsexConfig::new( @@ -515,7 +516,7 @@ mod tests { let verifier: pluto_parsigex::Verifier = Arc::new(|_duty, _pk, _psig| Box::pin(async { Ok(()) })); - let duty_gater: pluto_parsigex::DutyGater = + let duty_gater: DutyGaterFn = Arc::new(|duty: &pluto_core::types::Duty| duty.duty_type == DutyType::Signature); let config = ParsexConfig::new(peer_ids[i], p2p_context.clone(), verifier, duty_gater); diff --git a/crates/parsigex/examples/parsigex.rs b/crates/parsigex/examples/parsigex.rs index 13481c59..06663a60 100644 --- a/crates/parsigex/examples/parsigex.rs +++ b/crates/parsigex/examples/parsigex.rs @@ -68,6 +68,7 @@ use libp2p::{ }; use pluto_cluster::lock::Lock; use pluto_core::{ + gater::DutyGaterFn, signeddata::SignedRandao, types::{Duty, DutyType, ParSignedDataSet, PubKey, SlotNumber}, }; @@ -81,7 +82,7 @@ use pluto_p2p::{ peer::peer_id_from_key, relay::{RelayManager, RelayManagerEvent}, }; -use pluto_parsigex::{self as parsigex, DutyGater, Event, Handle, Verifier}; +use pluto_parsigex::{self as parsigex, Event, Handle, Verifier}; use pluto_tracing::TracingConfig; use tokio::fs; use tokio_util::sync::CancellationToken; @@ -247,7 +248,7 @@ async fn main() -> Result<()> { let verifier: Verifier = std::sync::Arc::new(|_duty, _pubkey, _data| Box::pin(async { Ok(()) })); - let duty_gater: DutyGater = std::sync::Arc::new(|duty| duty.duty_type != DutyType::Unknown); + let duty_gater: DutyGaterFn = std::sync::Arc::new(|duty| duty.duty_type != DutyType::Unknown); let handle_slot = std::sync::Arc::new(tokio::sync::Mutex::new(1_u64)); let p2p_config = P2PConfig { diff --git a/crates/parsigex/src/behaviour.rs b/crates/parsigex/src/behaviour.rs index 5247127c..a243ba72 100644 --- a/crates/parsigex/src/behaviour.rs +++ b/crates/parsigex/src/behaviour.rs @@ -22,7 +22,10 @@ use libp2p::{ }; use tokio::sync::{RwLock, mpsc, oneshot}; -use pluto_core::types::{Duty, ParSignedData, ParSignedDataSet, PubKey}; +use pluto_core::{ + gater::DutyGaterFn, + types::{Duty, ParSignedData, ParSignedDataSet, PubKey}, +}; use pluto_p2p::p2p_context::P2PContext; use super::{Handler, encode_message}; @@ -39,9 +42,6 @@ pub type VerifyFuture = pub type Verifier = Arc VerifyFuture + Send + Sync + 'static>; -/// Duty gate callback type. -pub type DutyGater = Arc bool + Send + Sync + 'static>; - /// Future returned by received subscriber callbacks. pub type ReceivedSubFuture = Pin + Send + 'static>>; @@ -187,7 +187,7 @@ pub struct Config { peer_id: PeerId, p2p_context: P2PContext, verifier: Verifier, - duty_gater: DutyGater, + duty_gater: DutyGaterFn, timeout: Duration, } @@ -197,7 +197,7 @@ impl Config { peer_id: PeerId, p2p_context: P2PContext, verifier: Verifier, - duty_gater: DutyGater, + duty_gater: DutyGaterFn, ) -> Self { Self { peer_id, diff --git a/crates/parsigex/src/handler.rs b/crates/parsigex/src/handler.rs index c7788b49..44203f2a 100644 --- a/crates/parsigex/src/handler.rs +++ b/crates/parsigex/src/handler.rs @@ -19,9 +19,12 @@ use libp2p::{ }; use tokio::time::timeout; -use pluto_core::types::{Duty, ParSignedDataSet}; +use pluto_core::{ + gater::DutyGaterFn, + types::{Duty, ParSignedDataSet}, +}; -use super::{DutyGater, PROTOCOL_NAME, Verifier, protocol}; +use super::{PROTOCOL_NAME, Verifier, protocol}; use crate::error::Failure; /// Command sent from the behaviour to a handler. @@ -78,14 +81,14 @@ type ActiveFuture = BoxFuture<'static, Option>; pub struct Handler { timeout: Duration, verifier: Verifier, - duty_gater: DutyGater, + duty_gater: DutyGaterFn, pending_open: VecDeque, active_futures: FuturesUnordered, } impl Handler { /// Creates a new handler for one connection. - pub fn new(timeout: Duration, verifier: Verifier, duty_gater: DutyGater) -> Self { + pub fn new(timeout: Duration, verifier: Verifier, duty_gater: DutyGaterFn) -> Self { Self { timeout, verifier, @@ -247,7 +250,7 @@ impl ConnectionHandler for Handler { async fn do_recv( mut stream: libp2p::swarm::Stream, verifier: Verifier, - duty_gater: DutyGater, + duty_gater: DutyGaterFn, ) -> Result<(Duty, ParSignedDataSet), Failure> { let bytes = protocol::recv_message(&mut stream) .await diff --git a/crates/parsigex/src/lib.rs b/crates/parsigex/src/lib.rs index e1abdfd3..4ff310ec 100644 --- a/crates/parsigex/src/lib.rs +++ b/crates/parsigex/src/lib.rs @@ -6,8 +6,7 @@ mod handler; mod protocol; pub use behaviour::{ - Behaviour, Config, DutyGater, Event, Handle, ReceivedSub, ReceivedSubFuture, Verifier, - received_subscriber, + Behaviour, Config, Event, Handle, ReceivedSub, ReceivedSubFuture, Verifier, received_subscriber, }; pub use error::{Error, Failure, Result, VerifyError}; pub use handler::Handler;