From 6bceabd1a398338206a923efc00e2add22a70e87 Mon Sep 17 00:00:00 2001 From: Chris Frankland-Wright <85807217+cjfranko@users.noreply.github.com> Date: Tue, 8 Jul 2025 15:29:34 +0100 Subject: [PATCH] Update timeturner.py --- timeturner.py | 320 +++++++++++++++++++++++--------------------------- 1 file changed, 147 insertions(+), 173 deletions(-) diff --git a/timeturner.py b/timeturner.py index 2d9725e..fa0f378 100644 --- a/timeturner.py +++ b/timeturner.py @@ -1,217 +1,191 @@ import serial -import curses import time import datetime +import curses import re -import subprocess import os -import threading -import queue import json +from threading import Thread, Lock from collections import deque -SERIAL_PORT = None +# Config +CONFIG_FILE = "config.json" +DEFAULT_CONFIG = { + "serial_port": "auto", + "hardware_offset_ms": 0 +} + +# Globals +SERIAL_PORT = "/dev/ttyUSB0" BAUD_RATE = 115200 FRAME_RATE = 25.0 -CONFIG_PATH = "config.json" - -sync_pending = False -ltc_data_queue = queue.Queue() -latest_ltc = None -offset_history = deque(maxlen=20) - -lock_total = 0 -free_total = 0 hardware_offset_ms = 0 -ltc_locked = False -lock_stable_since = None -sync_enabled = False -def load_config(): - global hardware_offset_ms - try: - with open(CONFIG_PATH, "r") as f: +# Shared data +latest_line = None +latest_timestamp = None +line_lock = Lock() + +# Rolling jitter calculation +jitter_samples = deque(maxlen=25) +last_jitter_update = time.time() +displayed_jitter_ms = None +displayed_jitter_frames = None + +# Load config +if os.path.exists(CONFIG_FILE): + with open(CONFIG_FILE) as f: + try: config = json.load(f) - hardware_offset_ms = int(config.get("hardware_offset_ms", 0)) - except Exception: - hardware_offset_ms = 0 + SERIAL_PORT = config.get("serial_port", SERIAL_PORT) + hardware_offset_ms = config.get("hardware_offset_ms", 0) + except json.JSONDecodeError: + print("⚠️ Failed to parse config.json. Using defaults.") +else: + config = DEFAULT_CONFIG -def find_teensy_serial(): - for dev in os.listdir('/dev'): - if dev.startswith('ttyACM') or dev.startswith('ttyUSB'): - return f'/dev/{dev}' - return None +def auto_detect_serial_port(): + for dev in os.listdir("/dev"): + if dev.startswith("ttyACM") or dev.startswith("ttyUSB"): + return f"/dev/{dev}" + return SERIAL_PORT def parse_ltc_line(line): match = re.match(r"\[(LOCK|FREE)\]\s+(\d{2}):(\d{2}):(\d{2})[:;](\d{2})\s+\|\s+([\d.]+)fps", line) if not match: return None - status, hh, mm, ss, ff, fps = match.groups() return { - "status": status, - "hours": int(hh), - "minutes": int(mm), - "seconds": int(ss), - "frames": int(ff), - "frame_rate": float(fps) + "lock": match.group(1), + "hours": int(match.group(2)), + "minutes": int(match.group(3)), + "seconds": int(match.group(4)), + "frames": int(match.group(5)), + "fps": float(match.group(6)) } -def serial_thread(port, baud, q): - try: - ser = serial.Serial(port, baud, timeout=1) - while True: - line = ser.readline().decode(errors='ignore').strip() - timestamp = datetime.datetime.now() - parsed = parse_ltc_line(line) - if parsed: - q.put((parsed, timestamp)) - except Exception as e: - print(f"Serial thread error: {e}") - -def get_system_time(): - return datetime.datetime.now() - -def format_time(dt): - return dt.strftime("%H:%M:%S.%f")[:-3] +def serial_reader(port, baud): + global latest_line, latest_timestamp + ser = serial.Serial(port, baud, timeout=1) + while True: + line = ser.readline().decode(errors='ignore').strip() + if line: + with line_lock: + latest_line = line + latest_timestamp = datetime.datetime.now() def run_curses(stdscr): - global FRAME_RATE, sync_pending, SERIAL_PORT, latest_ltc - global offset_history, lock_total, free_total - global ltc_locked, lock_stable_since, sync_enabled + global displayed_jitter_ms, displayed_jitter_frames, last_jitter_update curses.curs_set(0) stdscr.nodelay(True) - stdscr.timeout(50) + stdscr.timeout(100) - load_config() + serial_port = auto_detect_serial_port() + stdscr.addstr(1, 2, f"NTP Timeturner v1.3") + stdscr.addstr(2, 2, f"Using Serial Port: {serial_port}") + stdscr.refresh() - SERIAL_PORT = find_teensy_serial() - if not SERIAL_PORT: - stdscr.addstr(0, 0, "❌ No serial device found.") - stdscr.refresh() - time.sleep(2) - return + reader_thread = Thread(target=serial_reader, args=(serial_port, BAUD_RATE), daemon=True) + reader_thread.start() - thread = threading.Thread(target=serial_thread, args=(SERIAL_PORT, BAUD_RATE, ltc_data_queue), daemon=True) - thread.start() + lock_count = 0 + free_count = 0 + last_displayed_frame = "" + sync_allowed = False + last_frame = None while True: - try: - while not ltc_data_queue.empty(): - parsed, arrival_time = ltc_data_queue.get_nowait() - latest_ltc = (parsed, arrival_time) + now = datetime.datetime.now() + with line_lock: + line = latest_line + timestamp = latest_timestamp - FRAME_RATE = parsed["frame_rate"] - status = parsed["status"] + parsed = parse_ltc_line(line) if line else None - if status == "LOCK": - lock_total += 1 - if not ltc_locked: - lock_stable_since = time.time() - ltc_locked = True - elif time.time() - lock_stable_since > 1.0: - sync_enabled = True - else: - free_total += 1 - ltc_locked = False - sync_enabled = False - lock_stable_since = None - offset_history.clear() + stdscr.erase() + stdscr.addstr(1, 2, f"NTP Timeturner v1.3") + stdscr.addstr(2, 2, f"Using Serial Port: {serial_port}") - if ltc_locked and sync_enabled: - offset_ms = (get_system_time() - arrival_time).total_seconds() * 1000 - hardware_offset_ms - offset_frames = offset_ms / (1000 / FRAME_RATE) - offset_history.append((offset_ms, offset_frames)) - - if sync_pending and ltc_locked and sync_enabled: - do_sync(stdscr, parsed, arrival_time) - sync_pending = False - - stdscr.erase() - stdscr.addstr(0, 2, "NTP Timeturner v1.2") - stdscr.addstr(1, 2, f"Using Serial Port: {SERIAL_PORT}") - - if latest_ltc: - parsed, arrival_time = latest_ltc - stdscr.addstr(3, 2, f"LTC Status : {parsed['status']}") - stdscr.addstr(4, 2, f"LTC Timecode : {parsed['hours']:02}:{parsed['minutes']:02}:{parsed['seconds']:02}:{parsed['frames']:02}") - stdscr.addstr(5, 2, f"Frame Rate : {FRAME_RATE:.2f}fps") - stdscr.addstr(6, 2, f"System Clock : {format_time(get_system_time())}") - - if ltc_locked and sync_enabled and offset_history: - avg_ms = sum(x[0] for x in offset_history) / len(offset_history) - avg_frames = sum(x[1] for x in offset_history) / len(offset_history) - - if abs(avg_ms) < 10: - color = curses.color_pair(2) - elif abs(avg_ms) < 40: - color = curses.color_pair(3) - else: - color = curses.color_pair(1) - - stdscr.attron(color) - stdscr.addstr(7, 2, f"Sync Offset : {avg_ms:+.0f} ms ({avg_frames:+.0f} frames)") - stdscr.attroff(color) - elif parsed["status"] == "FREE": - stdscr.attron(curses.color_pair(3)) - stdscr.addstr(7, 2, "⚠️ LTC UNSYNCED — offset unavailable") - stdscr.attroff(curses.color_pair(3)) - else: - stdscr.addstr(7, 2, "Sync Offset : …") - - total = lock_total + free_total - lock_pct = (lock_total / total) * 100 if total else 0 - if ltc_locked and sync_enabled: - stdscr.addstr(8, 2, f"Lock Ratio : {lock_pct:.1f}% LOCK") - else: - stdscr.attron(curses.color_pair(3)) - stdscr.addstr(8, 2, f"Lock Ratio : {lock_pct:.1f}% (not stable)") - stdscr.attroff(curses.color_pair(3)) + if parsed: + lock_state = parsed["lock"] + if lock_state == "LOCK": + lock_count += 1 else: - stdscr.addstr(3, 2, "LTC Status : (waiting)") - stdscr.addstr(4, 2, "LTC Timecode : …") - stdscr.addstr(5, 2, "Frame Rate : …") - stdscr.addstr(6, 2, f"System Clock : {format_time(get_system_time())}") - stdscr.addstr(7, 2, "Sync Offset : …") - stdscr.addstr(8, 2, "Lock Ratio : …") + free_count += 1 - if sync_enabled: - stdscr.addstr(10, 2, "[S] Set system clock to LTC [Ctrl+C] Quit") + stdscr.addstr(4, 2, f"LTC Status : {lock_state}", curses.color_pair(2 if lock_state == "LOCK" else 3)) + + timecode_str = f"{parsed['hours']:02}:{parsed['minutes']:02}:{parsed['seconds']:02}:{parsed['frames']:02}" + stdscr.addstr(5, 2, f"LTC Timecode : {timecode_str}") + stdscr.addstr(6, 2, f"Frame Rate : {parsed['fps']:.2f}fps") + + # Approximate LTC timecode as a datetime + ltc_time = timestamp.replace( + hour=parsed["hours"], + minute=parsed["minutes"], + second=parsed["seconds"], + microsecond=int((parsed["frames"] / parsed["fps"]) * 1_000_000) + ) + + # Show system clock + system_time = datetime.datetime.now() + stdscr.addstr(7, 2, f"System Clock : {system_time.strftime('%H:%M:%S.%f')[:-3]}") + + # Only calculate jitter if LOCKED + if lock_state == "LOCK": + offset_ms = (system_time - ltc_time).total_seconds() * 1000 - hardware_offset_ms + jitter_samples.append(offset_ms) + + if time.time() - last_jitter_update >= 1.0: + avg_offset = sum(jitter_samples) / len(jitter_samples) + displayed_jitter_ms = avg_offset + frame_error = round((avg_offset / 1000) * parsed["fps"]) + displayed_jitter_frames = frame_error + last_jitter_update = time.time() + + if displayed_jitter_ms is not None: + stdscr.addstr(8, 2, f"Sync Jitter : {displayed_jitter_ms:+.0f} ms ({displayed_jitter_frames:+} frames)", + curses.color_pair(0 if abs(displayed_jitter_ms) < 5 else 3)) + + # Compare timecode match + ltc_str = ltc_time.strftime('%H:%M:%S') + sys_str = system_time.strftime('%H:%M:%S') + if ltc_str == sys_str and parsed["frames"] == int((system_time.microsecond / 1_000_000) * parsed["fps"]): + stdscr.addstr(9, 2, "Timecode Match: MATCHED", curses.color_pair(2)) + else: + stdscr.addstr(9, 2, "Timecode Match: OUT OF SYNC", curses.color_pair(3)) + + sync_allowed = True else: - stdscr.addstr(10, 2, "(Sync disabled — LTC not locked) [Ctrl+C] Quit") + stdscr.addstr(8, 2, f"Sync Jitter : -- (FREE mode)", curses.color_pair(3)) + stdscr.addstr(9, 2, f"Timecode Match: UNKNOWN", curses.color_pair(3)) + sync_allowed = False - stdscr.refresh() + total = lock_count + free_count + if total > 0: + lock_ratio = (lock_count / total) * 100 + stdscr.addstr(10, 2, f"Lock Ratio : {lock_ratio:.1f}% LOCK", curses.color_pair(2 if lock_ratio > 90 else 3)) + else: + stdscr.addstr(4, 2, "Waiting for LTC data...", curses.color_pair(3)) - key = stdscr.getch() - if key in (ord('s'), ord('S')) and latest_ltc and sync_enabled: - sync_pending = True + stdscr.addstr(12, 2, "[S] Set system clock to LTC [Ctrl+C] Quit") - except KeyboardInterrupt: - break - except Exception as e: - stdscr.addstr(13, 2, f"⚠️ Error: {e}") - stdscr.refresh() - time.sleep(1) + key = stdscr.getch() + if key in (ord('s'), ord('S')) and parsed and sync_allowed: + os.system(f"sudo date -s \"{timecode_str.replace(':', ':')}\"") -def do_sync(stdscr, parsed, arrival_time): - try: - ms = int((parsed["frames"] / parsed["frame_rate"]) * 1000) - sync_time = arrival_time.replace( - hour=parsed["hours"], - minute=parsed["minutes"], - second=parsed["seconds"], - microsecond=(ms + hardware_offset_ms) * 1000 - ) - timestamp = sync_time.strftime("%H:%M:%S.%f")[:-3] - subprocess.run(["sudo", "date", "-s", timestamp], check=True) - stdscr.addstr(13, 2, f"✔️ Synced to LTC: {timestamp}") - except Exception as e: - stdscr.addstr(13, 2, f"❌ Sync failed: {e}") + stdscr.refresh() + time.sleep(0.05) + +def main(): + curses.wrapper(start_curses) + +def start_curses(stdscr): + curses.start_color() + curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLACK) + curses.init_pair(2, curses.COLOR_GREEN, curses.COLOR_BLACK) + curses.init_pair(3, curses.COLOR_RED, curses.COLOR_BLACK) + run_curses(stdscr) if __name__ == "__main__": - curses.initscr() - curses.start_color() - curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK) - curses.init_pair(2, curses.COLOR_GREEN, curses.COLOR_BLACK) - curses.init_pair(3, curses.COLOR_YELLOW, curses.COLOR_BLACK) - curses.wrapper(run_curses) + main()