From 69419b6d59072448c719f5576d5fd29f070b3c6a Mon Sep 17 00:00:00 2001 From: Chris Frankland-Wright <85807217+cjfranko@users.noreply.github.com> Date: Mon, 7 Jul 2025 22:01:49 +0100 Subject: [PATCH] Update timeturner.py --- timeturner.py | 212 +++++++++++++++++++++++++++----------------------- 1 file changed, 114 insertions(+), 98 deletions(-) diff --git a/timeturner.py b/timeturner.py index f4099a7..77f610f 100644 --- a/timeturner.py +++ b/timeturner.py @@ -1,128 +1,144 @@ import serial +import curses import time import datetime -import curses +import re +import subprocess import os -from datetime import datetime as dt, timedelta +SERIAL_PORT = None # Will be auto-detected BAUD_RATE = 115200 -SCAN_PORTS = ["/dev/ttyACM0", "/dev/ttyUSB0", "/dev/serial0", "/dev/serial1"] -VERSION = "0.6" +FRAME_RATE = 25.0 # Default, will update from stream -def find_serial_port(): - for port in SCAN_PORTS: - if os.path.exists(port): - return port +last_ltc_time = None +last_frame = None +lock_count = 0 +free_count = 0 +sync_requested = False +sync_pending = False + +def find_teensy_serial(): + for dev in os.listdir('/dev'): + if dev.startswith('ttyACM') or dev.startswith('ttyUSB'): + return f'/dev/{dev}' return None def parse_ltc_line(line): - try: - parts = line.strip().split() - if len(parts) != 4: - return None - - status = parts[0][1:-1] # [LOCK] -> LOCK - timecode = parts[1] # e.g. 10:00:00:12 or 10:00:00;12 - framerate = float(parts[3].replace("fps", "")) - - sep = ":" if ":" in timecode else ";" - hh, mm, ss, ff = map(int, timecode.replace(";", ":").split(":")) - - now = dt.now() - ltc_dt = now.replace(hour=hh, minute=mm, second=ss, microsecond=0) - return (status, ltc_dt, framerate, ff) - except Exception: + """ + Expected format: [LOCK] 19:56:56:14 | 25.00fps + or dropframe style: [LOCK] 19:56:56;14 | 29.97fps + """ + match = re.match(r"\[(LOCK|FREE)\]\s+(\d{2}):(\d{2}):(\d{2})[:;](\d{2})\s+\|\s+([\d.]+)fps", line) + if not match: return None + status, hh, mm, ss, ff, fps = match.groups() + return { + "status": status, + "hours": int(hh), + "minutes": int(mm), + "seconds": int(ss), + "frames": int(ff), + "frame_rate": float(fps) + } + +def get_system_time(): + return datetime.datetime.now() + +def format_time(dt): + return dt.strftime("%H:%M:%S.%f")[:-3] # show milliseconds only def run_curses(stdscr): + global last_ltc_time, FRAME_RATE, lock_count, free_count, sync_requested, sync_pending, SERIAL_PORT + curses.curs_set(0) stdscr.nodelay(True) - stdscr.timeout(200) + stdscr.timeout(100) - port = find_serial_port() - if port is None: - stdscr.addstr(0, 0, "No serial port found. Check connection.") + # Auto-detect Teensy serial + SERIAL_PORT = find_teensy_serial() + if not SERIAL_PORT: + stdscr.addstr(0, 0, "❌ No serial device found. Connect Teensy and try again.") stdscr.refresh() time.sleep(3) return - ser = serial.Serial(port, BAUD_RATE, timeout=0.1) - buffer = "" - last_ltc_dt = None - last_frame = None - frame_rate = None - status = "FREE" - lock_count = 0 - free_count = 0 - sync_offset_ms = 0 + try: + ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1) + except Exception as e: + stdscr.addstr(0, 0, f"❌ Failed to open serial port: {e}") + stdscr.refresh() + time.sleep(3) + return while True: try: - buffer += ser.read(ser.in_waiting or 1).decode("utf-8", errors="ignore") - while "\n" in buffer: - line, buffer = buffer.split("\n", 1) - result = parse_ltc_line(line) - if result: - status, ltc_dt, frame_rate, frame = result - last_ltc_dt = ltc_dt - last_frame = frame - if status == "LOCK": - lock_count += 1 - else: - free_count += 1 + line = ser.readline().decode(errors='ignore').strip() + parsed = parse_ltc_line(line) + if parsed: + if parsed["status"] == "LOCK": + lock_count += 1 + else: + free_count += 1 + + FRAME_RATE = parsed["frame_rate"] + + # Construct full datetime with microsecond precision from LTC + ms = int((parsed["frames"] / FRAME_RATE) * 1000) + ltc_dt = datetime.datetime.now().replace( + hour=parsed["hours"], + minute=parsed["minutes"], + second=parsed["seconds"], + microsecond=ms * 1000 + ) + + last_ltc_time = ltc_dt + last_frame = parsed["frames"] + + # Perform sync only if requested and we just got a new frame + if sync_pending: + do_sync(stdscr, ltc_dt) + sync_pending = False + + # Display + stdscr.erase() + stdscr.addstr(0, 2, f"NTP Timeturner v0.7") + stdscr.addstr(1, 2, f"Using Serial Port: {SERIAL_PORT}") + stdscr.addstr(3, 2, f"LTC Status : {parsed['status'] if parsed else '…'}") + stdscr.addstr(4, 2, f"LTC Timecode : {parsed['hours']:02}:{parsed['minutes']:02}:{parsed['seconds']:02}:{parsed['frames']:02}" if parsed else "LTC Timecode : …") + stdscr.addstr(5, 2, f"Frame Rate : {FRAME_RATE:.2f}fps") + now = get_system_time() + stdscr.addstr(6, 2, f"System Clock : {format_time(now)}") + + # Sync offset + if last_ltc_time: + offset_ms = (now - last_ltc_time).total_seconds() * 1000 + offset_frames = offset_ms / (1000 / FRAME_RATE) + stdscr.addstr(7, 2, f"Sync Offset : {offset_ms:+.0f} ms ({offset_frames:+.0f} frames)") + + stdscr.addstr(8, 2, f"Lock Ratio : {lock_count} LOCK / {free_count} FREE") + + stdscr.addstr(10, 2, "[S] Set system clock to LTC [Ctrl+C] Quit") + stdscr.refresh() + + key = stdscr.getch() + if key in (ord('s'), ord('S')): + sync_pending = True + + except KeyboardInterrupt: + break except Exception as e: - stdscr.addstr(0, 0, f"Serial Error: {str(e)}") + stdscr.addstr(15, 2, f"⚠️ Error: {e}") stdscr.refresh() time.sleep(1) - return - now = dt.now() - if last_ltc_dt: - offset_td = now - last_ltc_dt - sync_offset_ms = round(offset_td.total_seconds() * 1000) - - stdscr.erase() - stdscr.addstr(0, 2, f"NTP Timeturner v{VERSION}") - stdscr.addstr(1, 2, f"Using Serial Port: {port}") - - stdscr.addstr(3, 2, f"LTC Status : {status}") - if last_ltc_dt and last_frame is not None: - stdscr.addstr(4, 2, f"LTC Timecode : {last_ltc_dt.strftime('%H:%M:%S')}:{last_frame:02}") - else: - stdscr.addstr(4, 2, f"LTC Timecode : ---") - - stdscr.addstr(5, 2, f"Frame Rate : {frame_rate:.2f}fps" if frame_rate else "Frame Rate : ---") - stdscr.addstr(6, 2, f"System Clock : {now.strftime('%H:%M:%S.%f')[:-3]}") - - if sync_offset_ms and frame_rate: - frame_duration_ms = 1000 / frame_rate - offset_frames = round(sync_offset_ms / frame_duration_ms) - stdscr.addstr(7, 2, f"Sync Offset : {sync_offset_ms:+} ms ({offset_frames:+} frames)") - else: - stdscr.addstr(7, 2, "Sync Offset : ---") - - stdscr.addstr(8, 2, f"Lock Ratio : {lock_count} LOCK / {free_count} FREE") - stdscr.addstr(10, 2, "[S] Set system clock to LTC [Ctrl+C] Quit") - stdscr.refresh() - - # Handle user input - try: - key = stdscr.getch() - if key in [ord('s'), ord('S')]: - if last_ltc_dt: - new_dt = last_ltc_dt.replace(microsecond=0) - date_str = new_dt.strftime("%H:%M:%S") - os.system(f"sudo date -s \"{date_str}\"") - elif key == 3: # Ctrl+C - raise KeyboardInterrupt - except: - pass - - time.sleep(0.05) +def do_sync(stdscr, ltc_dt): + try: + timestamp = ltc_dt.strftime("%H:%M:%S.%f")[:-3] # Drop microsecond to milliseconds + subprocess.run(["sudo", "date", "-s", timestamp], check=True) + stdscr.addstr(13, 2, f"✔️ System clock set to LTC: {timestamp}") + except Exception as e: + stdscr.addstr(13, 2, f"❌ Failed to sync system time: {e}") if __name__ == "__main__": - try: - curses.wrapper(run_curses) - except KeyboardInterrupt: - print("\nExited cleanly.") + curses.wrapper(run_curses)