From b862e8d3074d2cca8f1ecd952e22438cc5df4d1f Mon Sep 17 00:00:00 2001 From: John Rogers Date: Thu, 10 Jul 2025 17:50:58 +0100 Subject: [PATCH] fix: Update PTP socket and clock init for statime-linux API Co-authored-by: aider (gemini/gemini-2.5-pro-preview-05-06) --- Cargo.toml | 1 + src/ptp.rs | 49 +++++++++++++++++++++++++++++++++++++------------ 2 files changed, 38 insertions(+), 12 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fde8597..af37961 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,3 +17,4 @@ tokio = { version = "1", features = ["full"] } log = "0.4" env_logger = "0.11" rand = "0.8" +socket2 = "0.4" diff --git a/src/ptp.rs b/src/ptp.rs index bb236dc..60da193 100644 --- a/src/ptp.rs +++ b/src/ptp.rs @@ -11,9 +11,12 @@ use statime::{ time::{Duration as PtpDuration, Interval}, OverlayClock, PtpInstance, SharedClock, }; -use statime_linux::{clock::LinuxClock, socket::udp::LinuxUdpHandles}; +use socket2::{Domain, Protocol, Socket, Type}; +use statime_linux::clock::LinuxClock; +use std::net::SocketAddrV4; use std::sync::{Arc, Mutex}; use std::time::Duration; +use tokio::net::UdpSocket; use tokio::time::{sleep, Instant}; pub async fn start_ptp_client(state: Arc>, config: Arc>) { @@ -94,12 +97,31 @@ async fn run_ptp_session( }; // 4. Create Clock and Filter - let clock = SharedClock::new(OverlayClock::new(LinuxClock::new())); + let clock = SharedClock::new(OverlayClock::new(LinuxClock::open("/dev/ptp0")?)); let filter_config = 0.1; // Filter coefficient // 5. Create network handles - let (mut event_handle, mut general_handle) = - LinuxUdpHandles::new(&port_config, &interface)?; + fn create_socket( + interface: &str, + port: u16, + ) -> Result> { + let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?; + #[cfg(target_os = "linux")] + if let Err(e) = socket.bind_device(Some(interface.as_bytes())) { + log::warn!( + "Failed to bind to device '{}', maybe you need to be root? Error: {}", + interface, + e + ); + } + socket.set_reuse_address(true)?; + let address = SocketAddrV4::new("0.0.0.0".parse().unwrap(), port); + socket.bind(&address.into())?; + Ok(UdpSocket::from_std(socket.into())?) + } + + let event_socket = create_socket(&interface, 319)?; + let general_socket = create_socket(&interface, 320)?; // 6. Add port and run BMCA let mut port = ptp_instance.add_port(port_config, filter_config, clock, thread_rng())?; @@ -109,24 +131,27 @@ async fn run_ptp_session( let mut last_state_update = Instant::now(); let mut actions: Vec<_> = initial_actions.collect(); + let mut event_buf = [0u8; 1500]; + let mut general_buf = [0u8; 1500]; loop { for action in actions { match action { PortAction::SendEvent { data, - link_local: _, + destination, .. } => { - if let Err(e) = event_handle.send(data, None).await { + if let Err(e) = event_socket.send_to(data, destination.into()).await { log::error!("Error sending PTP event packet: {}", e); } } PortAction::SendGeneral { data, - link_local: _, + destination, + .. } => { - if let Err(e) = general_handle.send(data, None).await { + if let Err(e) = general_socket.send_to(data, destination.into()).await { log::error!("Error sending PTP general packet: {}", e); } } @@ -155,11 +180,11 @@ async fn run_ptp_session( _ = tokio::time::sleep_until(timer_instant) => { actions.extend(running_port.handle_timer()); } - Ok((message, source_address)) = event_handle.recv() => { - actions.extend(running_port.handle_message(&message, source_address)); + Ok((len, source_address)) = event_socket.recv_from(&mut event_buf) => { + actions.extend(running_port.handle_message(&event_buf[..len], source_address)); } - Ok((message, source_address)) = general_handle.recv() => { - actions.extend(running_port.handle_message(&message, source_address)); + Ok((len, source_address)) = general_socket.recv_from(&mut general_buf) => { + actions.extend(running_port.handle_message(&general_buf[..len], source_address)); } }