From 78ea1aefed498a913bbb375b34a4798d17dfcf5e Mon Sep 17 00:00:00 2001 From: John Rogers Date: Thu, 10 Jul 2025 16:33:47 +0100 Subject: [PATCH] feat: Integrate statime for PTP time sync Co-authored-by: aider (gemini/gemini-2.5-pro-preview-05-06) --- Cargo.toml | 7 +++- config.json | 4 ++- src/config.rs | 57 ++++++++++++++++++++---------- src/main.rs | 45 ++++++++++++++++-------- src/ptp.rs | 88 +++++++++++++++++++++++++++++++++++++++++++++++ src/sync_logic.rs | 5 +++ src/ui.rs | 34 +++++++++++++++--- 7 files changed, 200 insertions(+), 40 deletions(-) create mode 100644 src/ptp.rs diff --git a/Cargo.toml b/Cargo.toml index 8f7f726..d596203 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,4 +10,9 @@ crossterm = "0.29" regex = "1.11" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -notify = "8.1.0" \ No newline at end of file +notify = "8.1.0" +statime = "0.4.0" +statime-linux = "0.4.0" +tokio = { version = "1", features = ["full"] } +log = "0.4" +env_logger = "0.11" diff --git a/config.json b/config.json index 5ba71c3..147ce1b 100644 --- a/config.json +++ b/config.json @@ -1,3 +1,5 @@ { - "hardware_offset_ms": 20 + "hardware_offset_ms": 20, + "ptp_enabled": true, + "ptp_interface": "eth0" } diff --git a/src/config.rs b/src/config.rs index 2504450..372e543 100644 --- a/src/config.rs +++ b/src/config.rs @@ -12,50 +12,69 @@ use std::{ sync::{Arc, Mutex}, }; -#[derive(Deserialize)] +#[derive(Deserialize, Clone, Debug)] pub struct Config { pub hardware_offset_ms: i64, + #[serde(default)] + pub ptp_enabled: bool, + #[serde(default = "default_ptp_interface")] + pub ptp_interface: String, +} + +fn default_ptp_interface() -> String { + "eth0".to_string() +} + +impl Default for Config { + fn default() -> Self { + Self { + hardware_offset_ms: 0, + ptp_enabled: false, + ptp_interface: default_ptp_interface(), + } + } } impl Config { pub fn load(path: &PathBuf) -> Self { let mut file = match File::open(path) { Ok(f) => f, - Err(_) => return Self { hardware_offset_ms: 0 }, + Err(_) => return Self::default(), }; let mut contents = String::new(); if file.read_to_string(&mut contents).is_err() { - return Self { hardware_offset_ms: 0 }; + return Self::default(); } - serde_json::from_str(&contents).unwrap_or(Self { hardware_offset_ms: 0 }) + serde_json::from_str(&contents).unwrap_or_else(|e| { + eprintln!("Failed to parse config.json: {}, using default", e); + Self::default() + }) } } -pub fn watch_config(path: &str) -> Arc> { - let initial = Config::load(&PathBuf::from(path)).hardware_offset_ms; - let offset = Arc::new(Mutex::new(initial)); +pub fn watch_config(path: &str) -> Arc> { + let initial_config = Config::load(&PathBuf::from(path)); + let config = Arc::new(Mutex::new(initial_config)); - // Owned PathBuf for watch() call let watch_path = PathBuf::from(path); - // Clone for moving into the closure let watch_path_for_cb = watch_path.clone(); - let offset_for_cb = Arc::clone(&offset); + let config_for_cb = Arc::clone(&config); std::thread::spawn(move || { - // Move `watch_path_for_cb` into the callback - let mut watcher: RecommendedWatcher = recommended_watcher(move |res: NotifyResult| { + let event_handler = move |res: NotifyResult| { if let Ok(evt) = res { if matches!(evt.kind, EventKind::Modify(_)) { let new_cfg = Config::load(&watch_path_for_cb); - let mut hw = offset_for_cb.lock().unwrap(); - *hw = new_cfg.hardware_offset_ms; - eprintln!("🔄 Reloaded hardware_offset_ms = {}", *hw); + eprintln!("🔄 Reloaded config.json: {:?}", new_cfg); + let mut cfg = config_for_cb.lock().unwrap(); + *cfg = new_cfg; } } - }) - .expect("Failed to create file watcher"); + }; + + let mut watcher: RecommendedWatcher = + recommended_watcher(event_handler).expect("Failed to create file watcher"); - // Use the original `watch_path` here watcher .watch(&watch_path, RecursiveMode::NonRecursive) .expect("Failed to watch config.json"); @@ -65,5 +84,5 @@ pub fn watch_config(path: &str) -> Arc> { } }); - offset + config } diff --git a/src/main.rs b/src/main.rs index 68cf071..85f4383 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,19 +1,22 @@ // src/main.rs + mod config; -mod sync_logic; +mod ptp; mod serial_input; +mod sync_logic; mod ui; use crate::config::watch_config; -use crate::sync_logic::LtcState; +use crate::ptp::start_ptp_client; use crate::serial_input::start_serial_thread; +use crate::sync_logic::LtcState; use crate::ui::start_ui; use std::{ fs, path::Path, - sync::{Arc, Mutex, mpsc}, + sync::{mpsc, Arc, Mutex}, thread, }; @@ -30,13 +33,15 @@ fn ensure_config() { } } -fn main() { +#[tokio::main] +async fn main() { // 🔄 Ensure there's always a config.json present ensure_config(); + env_logger::init(); // 1️⃣ Start watching config.json for changes - let hw_offset = watch_config("config.json"); - println!("🔧 Watching config.json (hardware_offset_ms)..."); + let config_arc = watch_config("config.json"); + println!("🔧 Watching config.json..."); // 2️⃣ Channel for raw LTC frames let (tx, rx) = mpsc::channel(); @@ -62,19 +67,29 @@ fn main() { }); } - // 5️⃣ Spawn the UI renderer thread, passing the live offset Arc + // 5️⃣ Spawn PTP client task { - let ui_state = ltc_state.clone(); - let offset_clone = hw_offset.clone(); - let port = "/dev/ttyACM0".to_string(); - thread::spawn(move || { - println!("🖥️ UI thread launched"); - start_ui(ui_state, port, offset_clone); + let ptp_state = ltc_state.clone(); + let config_clone = config_arc.clone(); + tokio::spawn(async move { + println!("🚀 PTP task launched"); + start_ptp_client(ptp_state, config_clone).await; }); } - // 6️⃣ Keep main thread alive - println!("📡 Main thread entering loop..."); + // 6️⃣ Spawn the UI renderer thread, passing the live config Arc + { + let ui_state = ltc_state.clone(); + let config_clone = config_arc.clone(); + let port = "/dev/ttyACM0".to_string(); + thread::spawn(move || { + println!("🖥️ UI thread launched"); + start_ui(ui_state, port, config_clone); + }); + } + + // 7️⃣ Keep main thread alive for LTC frames + println!("📡 Main thread entering LTC frame loop..."); for _frame in rx { // no-op } diff --git a/src/ptp.rs b/src/ptp.rs new file mode 100644 index 0000000..be92719 --- /dev/null +++ b/src/ptp.rs @@ -0,0 +1,88 @@ + +use crate::config::Config; +use crate::sync_logic::LtcState; +use statime::{Config as PtpConfig, PtpInstance}; +use statime_linux::{LinuxClock, LinuxUdpSocket}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use tokio::time::sleep; + +pub async fn start_ptp_client(state: Arc>, config: Arc>) { + loop { + let (enabled, interface) = { + let cfg = config.lock().unwrap(); + (cfg.ptp_enabled, cfg.ptp_interface.clone()) + }; + + if !enabled { + { + let mut st = state.lock().unwrap(); + if st.ptp_state != "Disabled" { + st.ptp_state = "Disabled".to_string(); + st.ptp_offset = None; + log::info!("PTP client disabled via config."); + } + } + sleep(Duration::from_secs(5)).await; + continue; + } + + log::info!("Starting PTP client on interface {}", interface); + { + let mut st = state.lock().unwrap(); + st.ptp_state = format!("Starting on {}", interface); + } + + let result = run_ptp_session(state.clone(), config.clone()).await; + + if let Err(e) = result { + log::error!("PTP client error: {}", e); + let mut st = state.lock().unwrap(); + st.ptp_state = format!("Error: {}", e); + st.ptp_offset = None; + } + + // Wait before retrying or checking config again + sleep(Duration::from_secs(5)).await; + } +} + +async fn run_ptp_session( + state: Arc>, + config: Arc>, +) -> Result<(), Box> { + let interface = config.lock().unwrap().ptp_interface.clone(); + + let mut ptp_config = PtpConfig::default(); + ptp_config.set_iface(interface.clone()); + ptp_config.set_use_hardware_timestamping(false); + + let clock = LinuxClock::new(); + let socket = LinuxUdpSocket::new(ptp_config.clone())?; + let mut instance = PtpInstance::new(ptp_config, socket, clock); + + let initial_interface = interface; + + loop { + let (enabled, current_interface) = { + let cfg = config.lock().unwrap(); + (cfg.ptp_enabled, cfg.ptp_interface.clone()) + }; + + if !enabled || current_interface != initial_interface { + log::info!("PTP disabled or interface changed. Stopping PTP session."); + return Ok(()); + } + + if let Err(e) = instance.tick().await { + log::warn!("PTP tick error: {}", e); + } + + let summary = instance.get_summary(); + let mut st = state.lock().unwrap(); + st.ptp_offset = summary.offset; + st.ptp_state = summary.state.to_string(); + + sleep(Duration::from_millis(200)).await; + } +} diff --git a/src/sync_logic.rs b/src/sync_logic.rs index c96bb0c..4af5a32 100644 --- a/src/sync_logic.rs +++ b/src/sync_logic.rs @@ -45,6 +45,9 @@ pub struct LtcState { pub offset_history: VecDeque, pub last_match_status: String, pub last_match_check: i64, + // PTP state + pub ptp_offset: Option, + pub ptp_state: String, } impl LtcState { @@ -56,6 +59,8 @@ impl LtcState { offset_history: VecDeque::with_capacity(20), last_match_status: "UNKNOWN".into(), last_match_check: 0, + ptp_offset: None, + ptp_state: "Initializing".into(), } } diff --git a/src/ui.rs b/src/ui.rs index 66c485f..dc49025 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -17,21 +17,25 @@ use crossterm::{ terminal::{self, Clear, ClearType, EnterAlternateScreen, LeaveAlternateScreen}, }; +use crate::config::Config; use crate::sync_logic::LtcState; -/// Launch the TUI; reads `offset` live from the file-watcher. +/// Launch the TUI; reads `config` live from the file-watcher. pub fn start_ui( state: Arc>, serial_port: String, - offset: Arc>, + config: Arc>, ) { let mut stdout = stdout(); execute!(stdout, EnterAlternateScreen).unwrap(); terminal::enable_raw_mode().unwrap(); loop { - // 1️⃣ Read current hardware offset - let hw_offset_ms = *offset.lock().unwrap(); + // 1️⃣ Read current config + let (hw_offset_ms, ptp_enabled) = { + let cfg = config.lock().unwrap(); + (cfg.hardware_offset_ms, cfg.ptp_enabled) + }; // 2️⃣ Measure & record jitter only when LOCKED; clear on FREE { @@ -97,6 +101,28 @@ pub fn start_ui( .unwrap(); } + // PTP Status Display + if ptp_enabled { + if let Ok(st) = state.lock() { + let (ptp_state_str, ptp_offset_val) = + (st.ptp_state.clone(), st.ptp_offset); + + let offset_display = if let Some(offset) = ptp_offset_val { + format!("{:.3}μs", offset / 1000.0) + } else { + "N/A".to_string() + }; + + queue!( + stdout, + MoveTo(45, 1), Print("PTP Status"), + MoveTo(45, 2), Print(format!("State : {}", ptp_state_str)), + MoveTo(45, 3), Print(format!("Offset : {}", offset_display)), + ) + .unwrap(); + } + } + // Footer queue!( stdout,