refactor: Adapt PTP client to new statime API structure

Co-authored-by: aider (gemini/gemini-2.5-pro-preview-05-06) <aider@aider.chat>
This commit is contained in:
Chaos Rogers 2025-07-10 16:40:00 +01:00
parent 78ea1aefed
commit 73d8634c23

View file

@ -1,11 +1,15 @@
use crate::config::Config;
use crate::sync_logic::LtcState;
use statime::{Config as PtpConfig, PtpInstance};
use statime_linux::{LinuxClock, LinuxUdpSocket};
use statime::{
config::{InstanceConfig, PortConfig, TimePropertiesDS},
filters::MovingAverageFilter,
port::{PortAction, PortState},
PtpInstance,
};
use statime_linux::{LinuxClock, LinuxUdpHandles};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::time::sleep;
use tokio::time::{sleep, Instant};
pub async fn start_ptp_client(state: Arc<Mutex<LtcState>>, config: Arc<Mutex<Config>>) {
loop {
@ -52,37 +56,108 @@ async fn run_ptp_session(
config: Arc<Mutex<Config>>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let interface = config.lock().unwrap().ptp_interface.clone();
let initial_interface = interface.clone();
let mut ptp_config = PtpConfig::default();
ptp_config.set_iface(interface.clone());
ptp_config.set_use_hardware_timestamping(false);
// 1. Create configs
let instance_config = InstanceConfig::default();
let time_properties_ds = TimePropertiesDS::default();
// 2. Create PtpInstance
let mut ptp_instance = PtpInstance::new(instance_config, time_properties_ds);
// 3. Create PortConfig
let mut port_config = PortConfig::default();
port_config.iface = interface.into();
port_config.use_hardware_timestamping = false;
// 4. Create Clock and Filter
let clock = LinuxClock::new();
let socket = LinuxUdpSocket::new(ptp_config.clone())?;
let mut instance = PtpInstance::new(ptp_config, socket, clock);
let filter = MovingAverageFilter::new(20);
let initial_interface = interface;
// 5. Add port and run BMCA
let mut port = ptp_instance.add_port(port_config.clone(), clock, filter)?;
ptp_instance.run_bmca(&mut [&mut port]);
let mut running_port = port.end_bmca()?;
// 6. Create network handles
let (mut event_handle, mut general_handle) = LinuxUdpHandles::new(&port_config)?;
let mut last_state_update = Instant::now();
loop {
// Check for config changes that would require a restart
let (enabled, current_interface) = {
let cfg = config.lock().unwrap();
(cfg.ptp_enabled, cfg.ptp_interface.clone())
};
if !enabled || current_interface != initial_interface {
log::info!("PTP disabled or interface changed. Stopping PTP session.");
return Ok(());
}
if let Err(e) = instance.tick().await {
log::warn!("PTP tick error: {}", e);
let timer_instant = running_port
.get_next_timer_instant()
.map(tokio::time::Instant::from_std)
.unwrap_or_else(|| tokio::time::Instant::now() + Duration::from_secs(1));
let mut actions = Vec::new();
tokio::select! {
_ = tokio::time::sleep_until(timer_instant) => {
if let Some(action) = running_port.handle_timer() {
actions.push(action);
}
}
Ok((message, source_address)) = event_handle.recv() => {
if let Some(action) = running_port.handle_message(&message, source_address) {
actions.push(action);
}
}
Ok((message, source_address)) = general_handle.recv() => {
if let Some(action) = running_port.handle_message(&message, source_address) {
actions.push(action);
}
}
}
let summary = instance.get_summary();
let mut st = state.lock().unwrap();
st.ptp_offset = summary.offset;
st.ptp_state = summary.state.to_string();
for action in actions {
match action {
PortAction::Send {
destination,
data,
event,
} => {
let handle = if event {
&mut event_handle
} else {
&mut general_handle
};
if let Err(e) = handle.send(data, destination).await {
log::error!("Error sending PTP packet: {}", e);
}
}
PortAction::Reset => {
log::warn!("PTP port is resetting");
ptp_instance.run_bmca(&mut [&mut running_port.start_bmca()]);
running_port = running_port.start_bmca().end_bmca()?;
}
PortAction::UpdateMaster(_) => {
// Handled by the instance, nothing to do here
}
}
}
sleep(Duration::from_millis(200)).await;
// Update shared state periodically
if last_state_update.elapsed() > Duration::from_millis(500) {
let port_ds = running_port.get_port_ds();
let mut st = state.lock().unwrap();
st.ptp_state = port_ds.port_state.to_string();
if port_ds.port_state == PortState::Slave {
st.ptp_offset = Some(port_ds.offset_from_master.mean as f64);
} else {
st.ptp_offset = None;
}
last_state_update = Instant::now();
}
}
}