From 17e9daaa039a2e8a6478d4a73dc9f581a910f9a3 Mon Sep 17 00:00:00 2001 From: Chris Frankland-Wright <85807217+cjfranko@users.noreply.github.com> Date: Mon, 7 Jul 2025 20:52:11 +0100 Subject: [PATCH] Update timeturner.py added sync system clock function --- timeturner.py | 86 ++++++++++++++++++++++++++++++++------------------- 1 file changed, 54 insertions(+), 32 deletions(-) diff --git a/timeturner.py b/timeturner.py index e363463..ba24201 100644 --- a/timeturner.py +++ b/timeturner.py @@ -1,22 +1,19 @@ -# -*- coding: utf-8 -*- import curses import serial import re import time +import subprocess from datetime import datetime -# Serial config SERIAL_PORT = "/dev/ttyACM0" BAUD_RATE = 115200 -UI_REFRESH_INTERVAL = 0.25 # seconds -SIGNAL_TIMEOUT = 1.5 # seconds +UI_REFRESH_INTERVAL = 0.25 +SIGNAL_TIMEOUT = 1.5 -# Regex pattern ltc_pattern = re.compile( r"\[(LOCK|FREE)\]\s+(\d{2}:\d{2}:\d{2}[:;]\d{2})\s+\|\s+([\d.]+fps)", re.IGNORECASE ) -# Shared state state = { "ltc_status": "--", "ltc_timecode": "--:--:--:--", @@ -26,26 +23,32 @@ state = { "lock_count": 0, "free_count": 0, "last_received": None, - "signal_loss": False + "signal_loss": False, + "last_ltc_dt": None, } + def parse_timecode(tc_str): h, m, s, f = map(int, tc_str.replace(";", ":").split(":")) return h, m, s, f + def timecode_to_milliseconds(h, m, s, f, fps): return int(((h * 3600 + m * 60 + s) * 1000) + (f * (1000 / fps))) + def get_offset(system_dt, h, m, s, f, fps): sys_ms = (system_dt.hour * 3600 + system_dt.minute * 60 + system_dt.second) * 1000 + system_dt.microsecond // 1000 ltc_ms = timecode_to_milliseconds(h, m, s, f, fps) return sys_ms - ltc_ms + def format_offset(ms, fps): frame_duration = 1000 / fps frame_offset = int(round(ms / frame_duration)) return f"{ms:+} ms ({frame_offset:+} frames)" + def serial_reader(ser): global state while ser.in_waiting: @@ -58,70 +61,89 @@ def serial_reader(ser): fps = float(fps_str.lower().replace("fps", "")) h, m, s, f = parse_timecode(tc_str) - # Update shared state - state["ltc_status"] = status - state["ltc_timecode"] = tc_str - state["framerate"] = fps_str - state["system_clock"] = now.strftime("%H:%M:%S.%f")[:-3] - state["offset_str"] = format_offset(get_offset(now, h, m, s, f, fps), fps) - state["last_received"] = time.time() - state["signal_loss"] = False + state.update({ + "ltc_status": status, + "ltc_timecode": tc_str, + "framerate": fps_str, + "system_clock": now.strftime("%H:%M:%S.%f")[:-3], + "offset_str": format_offset(get_offset(now, h, m, s, f, fps), fps), + "last_received": time.time(), + "signal_loss": False, + "last_ltc_dt": f"{h:02}:{m:02}:{s:02}" + }) if status == "LOCK": state["lock_count"] += 1 else: state["free_count"] += 1 + def draw_ui(stdscr): global state - curses.curs_set(0) stdscr.nodelay(True) + curses.start_color() + + curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK) # LOCK + curses.init_pair(2, curses.COLOR_YELLOW, curses.COLOR_BLACK) # FREE + curses.init_pair(3, curses.COLOR_RED, curses.COLOR_BLACK) # LOST + curses.init_pair(4, curses.COLOR_CYAN, curses.COLOR_BLACK) # OFFSET try: ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=0.1) except serial.SerialException as e: - stdscr.addstr(0, 0, f"[ERROR] Couldn't open {SERIAL_PORT}: {e}") + stdscr.addstr(0, 0, f"[ERR] Couldn't open {SERIAL_PORT}: {e}") stdscr.getch() return while True: try: - # Read as fast as possible serial_reader(ser) + if state["last_received"] and (time.time() - state["last_received"]) > SIGNAL_TIMEOUT: + state["signal_loss"] = True - # Check for signal timeout - if state["last_received"]: - elapsed = time.time() - state["last_received"] - if elapsed > SIGNAL_TIMEOUT: - state["signal_loss"] = True - - # Draw UI stdscr.clear() - stdscr.addstr(0, 0, "🕰 NTP Timeturner v0.4") + stdscr.addstr(0, 0, "NTP Timeturner v0.5") if state["signal_loss"]: - stdscr.addstr(2, 0, "⚠️ No LTC signal detected!") - stdscr.addstr(3, 0, f"Last seen: {elapsed:.2f}s ago") + stdscr.attron(curses.color_pair(3)) + stdscr.addstr(2, 0, "! No LTC signal detected") + stdscr.attroff(curses.color_pair(3)) else: - stdscr.addstr(2, 0, f"LTC Status : {state['ltc_status']}") + colour = curses.color_pair(1 if state['ltc_status'] == "LOCK" else 2) + stdscr.addstr(2, 0, f"LTC Status : ") + stdscr.attron(colour) + stdscr.addstr(state['ltc_status']) + stdscr.attroff(colour) + stdscr.addstr(3, 0, f"LTC Timecode : {state['ltc_timecode']}") stdscr.addstr(4, 0, f"Frame Rate : {state['framerate']}") stdscr.addstr(5, 0, f"System Clock : {state['system_clock']}") + stdscr.attron(curses.color_pair(4)) stdscr.addstr(6, 0, f"Sync Offset : {state['offset_str']}") + stdscr.attroff(curses.color_pair(4)) stdscr.addstr(7, 0, f"Lock Ratio : {state['lock_count']} LOCK / {state['free_count']} FREE") - stdscr.addstr(9, 0, "Press Ctrl+C to exit.") + stdscr.addstr(9, 0, "[S] Set system clock to LTC [Ctrl+C] Quit") stdscr.refresh() + + key = stdscr.getch() + if key in (ord('s'), ord('S')) and not state['signal_loss'] and state['last_ltc_dt']: + try: + subprocess.run(["sudo", "date", "-s", state['last_ltc_dt']], check=True) + stdscr.addstr(11, 0, "[OK] System clock updated to LTC") + except Exception as e: + stdscr.addstr(11, 0, f"[ERR] Failed to set clock: {e}") + time.sleep(UI_REFRESH_INTERVAL) except KeyboardInterrupt: break except Exception as e: - stdscr.addstr(11, 0, f"[EXCEPTION] {e}") - stdscr.refresh() + stdscr.addstr(13, 0, f"[ERR] {e}") time.sleep(1) ser.close() + if __name__ == "__main__": curses.wrapper(draw_ui)