From 38c15897aefa7be333b0ed9121c998eaf4540854 Mon Sep 17 00:00:00 2001 From: Chris Frankland-Wright <85807217+cjfranko@users.noreply.github.com> Date: Mon, 7 Jul 2025 21:20:15 +0100 Subject: [PATCH] phase1 timesync setting --- timeturner.py | 240 +++++++++++++++++++++++--------------------------- 1 file changed, 108 insertions(+), 132 deletions(-) diff --git a/timeturner.py b/timeturner.py index ba24201..c1d9dec 100644 --- a/timeturner.py +++ b/timeturner.py @@ -1,149 +1,125 @@ import curses +import datetime import serial -import re -import time import subprocess -from datetime import datetime +import time +import re -SERIAL_PORT = "/dev/ttyACM0" +SERIAL_PORT = "/dev/ttyUSB0" BAUD_RATE = 115200 -UI_REFRESH_INTERVAL = 0.25 -SIGNAL_TIMEOUT = 1.5 -ltc_pattern = re.compile( - r"\[(LOCK|FREE)\]\s+(\d{2}:\d{2}:\d{2}[:;]\d{2})\s+\|\s+([\d.]+fps)", re.IGNORECASE -) +line_regex = re.compile(r"\[(LOCK|FREE)\]\s+(\d{2}:\d{2}:\d{2}[:;]\d{2})\s+\|\s+([\d.]+)fps") -state = { - "ltc_status": "--", - "ltc_timecode": "--:--:--:--", - "framerate": "--", - "system_clock": "--:--:--.---", - "offset_str": "--", - "lock_count": 0, - "free_count": 0, - "last_received": None, - "signal_loss": False, - "last_ltc_dt": None, -} +def parse_ltc_line(line): + match = line_regex.match(line.strip()) + if not match: + return None + status, timecode, fps = match.groups() + return status, timecode.replace(';', ':'), float(fps) +def get_system_time(): + return datetime.datetime.now() -def parse_timecode(tc_str): - h, m, s, f = map(int, tc_str.replace(";", ":").split(":")) - return h, m, s, f - - -def timecode_to_milliseconds(h, m, s, f, fps): - return int(((h * 3600 + m * 60 + s) * 1000) + (f * (1000 / fps))) - - -def get_offset(system_dt, h, m, s, f, fps): - sys_ms = (system_dt.hour * 3600 + system_dt.minute * 60 + system_dt.second) * 1000 + system_dt.microsecond // 1000 - ltc_ms = timecode_to_milliseconds(h, m, s, f, fps) - return sys_ms - ltc_ms - - -def format_offset(ms, fps): - frame_duration = 1000 / fps - frame_offset = int(round(ms / frame_duration)) - return f"{ms:+} ms ({frame_offset:+} frames)" - - -def serial_reader(ser): - global state - while ser.in_waiting: - line = ser.readline().decode(errors='ignore').strip() - match = ltc_pattern.match(line) - now = datetime.now() - - if match: - status, tc_str, fps_str = match.groups() - fps = float(fps_str.lower().replace("fps", "")) - h, m, s, f = parse_timecode(tc_str) - - state.update({ - "ltc_status": status, - "ltc_timecode": tc_str, - "framerate": fps_str, - "system_clock": now.strftime("%H:%M:%S.%f")[:-3], - "offset_str": format_offset(get_offset(now, h, m, s, f, fps), fps), - "last_received": time.time(), - "signal_loss": False, - "last_ltc_dt": f"{h:02}:{m:02}:{s:02}" - }) - if status == "LOCK": - state["lock_count"] += 1 - else: - state["free_count"] += 1 - - -def draw_ui(stdscr): - global state - curses.curs_set(0) - stdscr.nodelay(True) - curses.start_color() - - curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK) # LOCK - curses.init_pair(2, curses.COLOR_YELLOW, curses.COLOR_BLACK) # FREE - curses.init_pair(3, curses.COLOR_RED, curses.COLOR_BLACK) # LOST - curses.init_pair(4, curses.COLOR_CYAN, curses.COLOR_BLACK) # OFFSET - +def timecode_to_dt(tc): try: - ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=0.1) - except serial.SerialException as e: - stdscr.addstr(0, 0, f"[ERR] Couldn't open {SERIAL_PORT}: {e}") - stdscr.getch() - return + h, m, s, f = map(int, tc.split(":")) + return datetime.datetime.now().replace(hour=h, minute=m, second=s, microsecond=0) + except Exception: + return None + +def run_curses(stdscr): + curses.curs_set(0) + curses.start_color() + curses.use_default_colors() + + curses.init_pair(1, curses.COLOR_GREEN, -1) # LOCK + curses.init_pair(2, curses.COLOR_YELLOW, -1) # FREE + curses.init_pair(3, curses.COLOR_RED, -1) # LOST + curses.init_pair(4, curses.COLOR_CYAN, -1) # Offset + + ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1) + lock_count = 0 + free_count = 0 + last_ltc_dt = None + last_status = "LOST" + frame_rate = 0.0 + + sync_requested = False + syncing = False while True: + now = get_system_time() + line = ser.readline().decode(errors='ignore').strip() + if line: + parsed = parse_ltc_line(line) + if parsed: + status, tc_str, fps = parsed + frame_rate = fps + last_ltc_dt = timecode_to_dt(tc_str) + last_status = status + if status == "LOCK": + lock_count += 1 + else: + free_count += 1 + + if sync_requested and not syncing: + syncing = True + # Snap to this LTC frame's HH:MM:SS (no frames) + new_time = last_ltc_dt.strftime("%H:%M:%S") + try: + subprocess.run(["sudo", "date", "-s", new_time], check=True) + sync_feedback = f"[OK] Clock set to {new_time}" + except subprocess.CalledProcessError as e: + sync_feedback = f"[ERR] Failed to sync: {e}" + sync_requested = False + syncing = False + + # Calculate offset + if last_ltc_dt: + sys_time = now.replace(microsecond=0) + offset = (sys_time - last_ltc_dt).total_seconds() + offset_ms = int(offset * 1000) + offset_frames = int(round(offset * frame_rate)) + offset_str = f"{offset_ms:+} ms ({offset_frames:+} frames)" + else: + offset_str = "n/a" + + # Draw UI + stdscr.erase() + stdscr.addstr(0, 0, "NTP Timeturner v0.5") + stdscr.addstr(2, 0, "LTC Status : ") + + if last_status == "LOCK": + stdscr.addstr("LOCK", curses.color_pair(1)) + elif last_status == "FREE": + stdscr.addstr("FREE", curses.color_pair(2)) + else: + stdscr.addstr("LOST", curses.color_pair(3)) + + stdscr.addstr(3, 0, f"LTC Timecode : {last_ltc_dt.strftime('%H:%M:%S') if last_ltc_dt else 'n/a'}") + stdscr.addstr(4, 0, f"Frame Rate : {frame_rate:.2f}fps") + stdscr.addstr(5, 0, f"System Clock : {now.strftime('%H:%M:%S.%f')[:-3]}") + stdscr.addstr(6, 0, "Sync Offset : ") + stdscr.addstr(offset_str, curses.color_pair(4)) + stdscr.addstr(7, 0, f"Lock Ratio : {lock_count} LOCK / {free_count} FREE") + stdscr.addstr(9, 0, "[S] Set system clock to LTC [Ctrl+C] Quit") + + if 'sync_feedback' in locals(): + stdscr.addstr(11, 0, sync_feedback[:curses.COLS - 1]) + del sync_feedback + + stdscr.refresh() + + # Handle input + stdscr.nodelay(True) try: - serial_reader(ser) - if state["last_received"] and (time.time() - state["last_received"]) > SIGNAL_TIMEOUT: - state["signal_loss"] = True - - stdscr.clear() - stdscr.addstr(0, 0, "NTP Timeturner v0.5") - - if state["signal_loss"]: - stdscr.attron(curses.color_pair(3)) - stdscr.addstr(2, 0, "! No LTC signal detected") - stdscr.attroff(curses.color_pair(3)) - else: - colour = curses.color_pair(1 if state['ltc_status'] == "LOCK" else 2) - stdscr.addstr(2, 0, f"LTC Status : ") - stdscr.attron(colour) - stdscr.addstr(state['ltc_status']) - stdscr.attroff(colour) - - stdscr.addstr(3, 0, f"LTC Timecode : {state['ltc_timecode']}") - stdscr.addstr(4, 0, f"Frame Rate : {state['framerate']}") - stdscr.addstr(5, 0, f"System Clock : {state['system_clock']}") - stdscr.attron(curses.color_pair(4)) - stdscr.addstr(6, 0, f"Sync Offset : {state['offset_str']}") - stdscr.attroff(curses.color_pair(4)) - stdscr.addstr(7, 0, f"Lock Ratio : {state['lock_count']} LOCK / {state['free_count']} FREE") - - stdscr.addstr(9, 0, "[S] Set system clock to LTC [Ctrl+C] Quit") - stdscr.refresh() - key = stdscr.getch() - if key in (ord('s'), ord('S')) and not state['signal_loss'] and state['last_ltc_dt']: - try: - subprocess.run(["sudo", "date", "-s", state['last_ltc_dt']], check=True) - stdscr.addstr(11, 0, "[OK] System clock updated to LTC") - except Exception as e: - stdscr.addstr(11, 0, f"[ERR] Failed to set clock: {e}") - - time.sleep(UI_REFRESH_INTERVAL) - - except KeyboardInterrupt: - break - except Exception as e: - stdscr.addstr(13, 0, f"[ERR] {e}") - time.sleep(1) - - ser.close() + if key == ord('s') or key == ord('S'): + sync_requested = True + except: + pass + time.sleep(0.1) if __name__ == "__main__": - curses.wrapper(draw_ui) + curses.wrapper(run_curses)