From 73d8634c23fe24312c4973bc2aa9f43e2d566b72 Mon Sep 17 00:00:00 2001 From: John Rogers Date: Thu, 10 Jul 2025 16:40:00 +0100 Subject: [PATCH] refactor: Adapt PTP client to new statime API structure Co-authored-by: aider (gemini/gemini-2.5-pro-preview-05-06) --- src/ptp.rs | 111 ++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 93 insertions(+), 18 deletions(-) diff --git a/src/ptp.rs b/src/ptp.rs index be92719..faecebd 100644 --- a/src/ptp.rs +++ b/src/ptp.rs @@ -1,11 +1,15 @@ - use crate::config::Config; use crate::sync_logic::LtcState; -use statime::{Config as PtpConfig, PtpInstance}; -use statime_linux::{LinuxClock, LinuxUdpSocket}; +use statime::{ + config::{InstanceConfig, PortConfig, TimePropertiesDS}, + filters::MovingAverageFilter, + port::{PortAction, PortState}, + PtpInstance, +}; +use statime_linux::{LinuxClock, LinuxUdpHandles}; use std::sync::{Arc, Mutex}; use std::time::Duration; -use tokio::time::sleep; +use tokio::time::{sleep, Instant}; pub async fn start_ptp_client(state: Arc>, config: Arc>) { loop { @@ -52,37 +56,108 @@ async fn run_ptp_session( config: Arc>, ) -> Result<(), Box> { let interface = config.lock().unwrap().ptp_interface.clone(); + let initial_interface = interface.clone(); - let mut ptp_config = PtpConfig::default(); - ptp_config.set_iface(interface.clone()); - ptp_config.set_use_hardware_timestamping(false); + // 1. Create configs + let instance_config = InstanceConfig::default(); + let time_properties_ds = TimePropertiesDS::default(); + // 2. Create PtpInstance + let mut ptp_instance = PtpInstance::new(instance_config, time_properties_ds); + + // 3. Create PortConfig + let mut port_config = PortConfig::default(); + port_config.iface = interface.into(); + port_config.use_hardware_timestamping = false; + + // 4. Create Clock and Filter let clock = LinuxClock::new(); - let socket = LinuxUdpSocket::new(ptp_config.clone())?; - let mut instance = PtpInstance::new(ptp_config, socket, clock); + let filter = MovingAverageFilter::new(20); - let initial_interface = interface; + // 5. Add port and run BMCA + let mut port = ptp_instance.add_port(port_config.clone(), clock, filter)?; + ptp_instance.run_bmca(&mut [&mut port]); + let mut running_port = port.end_bmca()?; + + // 6. Create network handles + let (mut event_handle, mut general_handle) = LinuxUdpHandles::new(&port_config)?; + + let mut last_state_update = Instant::now(); loop { + // Check for config changes that would require a restart let (enabled, current_interface) = { let cfg = config.lock().unwrap(); (cfg.ptp_enabled, cfg.ptp_interface.clone()) }; - if !enabled || current_interface != initial_interface { log::info!("PTP disabled or interface changed. Stopping PTP session."); return Ok(()); } - if let Err(e) = instance.tick().await { - log::warn!("PTP tick error: {}", e); + let timer_instant = running_port + .get_next_timer_instant() + .map(tokio::time::Instant::from_std) + .unwrap_or_else(|| tokio::time::Instant::now() + Duration::from_secs(1)); + + let mut actions = Vec::new(); + + tokio::select! { + _ = tokio::time::sleep_until(timer_instant) => { + if let Some(action) = running_port.handle_timer() { + actions.push(action); + } + } + Ok((message, source_address)) = event_handle.recv() => { + if let Some(action) = running_port.handle_message(&message, source_address) { + actions.push(action); + } + } + Ok((message, source_address)) = general_handle.recv() => { + if let Some(action) = running_port.handle_message(&message, source_address) { + actions.push(action); + } + } } - let summary = instance.get_summary(); - let mut st = state.lock().unwrap(); - st.ptp_offset = summary.offset; - st.ptp_state = summary.state.to_string(); + for action in actions { + match action { + PortAction::Send { + destination, + data, + event, + } => { + let handle = if event { + &mut event_handle + } else { + &mut general_handle + }; + if let Err(e) = handle.send(data, destination).await { + log::error!("Error sending PTP packet: {}", e); + } + } + PortAction::Reset => { + log::warn!("PTP port is resetting"); + ptp_instance.run_bmca(&mut [&mut running_port.start_bmca()]); + running_port = running_port.start_bmca().end_bmca()?; + } + PortAction::UpdateMaster(_) => { + // Handled by the instance, nothing to do here + } + } + } - sleep(Duration::from_millis(200)).await; + // Update shared state periodically + if last_state_update.elapsed() > Duration::from_millis(500) { + let port_ds = running_port.get_port_ds(); + let mut st = state.lock().unwrap(); + st.ptp_state = port_ds.port_state.to_string(); + if port_ds.port_state == PortState::Slave { + st.ptp_offset = Some(port_ds.offset_from_master.mean as f64); + } else { + st.ptp_offset = None; + } + last_state_update = Instant::now(); + } } }