Update timeturner.py

This commit is contained in:
Chris Frankland-Wright 2025-07-07 20:39:49 +01:00 committed by GitHub
parent 61a8fd9434
commit 929daea7cb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -8,23 +8,30 @@ from datetime import datetime
# Serial config # Serial config
SERIAL_PORT = "/dev/ttyACM0" SERIAL_PORT = "/dev/ttyACM0"
BAUD_RATE = 115200 BAUD_RATE = 115200
REFRESH_INTERVAL = 0.5 # seconds UI_REFRESH_INTERVAL = 0.25 # seconds
SIGNAL_TIMEOUT = 1.5 # seconds
# Regex pattern # Regex pattern
ltc_pattern = re.compile( ltc_pattern = re.compile(
r"\[(LOCK|FREE)\]\s+(\d{2}:\d{2}:\d{2}[:;]\d{2})\s+\|\s+([\d.]+fps)", re.IGNORECASE r"\[(LOCK|FREE)\]\s+(\d{2}:\d{2}:\d{2}[:;]\d{2})\s+\|\s+([\d.]+fps)", re.IGNORECASE
) )
# Stats # Shared state
lock_count = 0 state = {
free_count = 0 "ltc_status": "--",
last_frame = None "ltc_timecode": "--:--:--:--",
drift_warnings = [] "framerate": "--",
"system_clock": "--:--:--.---",
"offset_str": "--",
"lock_count": 0,
"free_count": 0,
"last_received": None,
"signal_loss": False
}
def parse_timecode(tc_str): def parse_timecode(tc_str):
sep = ":" if ":" in tc_str else ";"
h, m, s, f = map(int, tc_str.replace(";", ":").split(":")) h, m, s, f = map(int, tc_str.replace(";", ":").split(":"))
return h, m, s, f, sep return h, m, s, f
def timecode_to_milliseconds(h, m, s, f, fps): def timecode_to_milliseconds(h, m, s, f, fps):
return int(((h * 3600 + m * 60 + s) * 1000) + (f * (1000 / fps))) return int(((h * 3600 + m * 60 + s) * 1000) + (f * (1000 / fps)))
@ -39,79 +46,78 @@ def format_offset(ms, fps):
frame_offset = int(round(ms / frame_duration)) frame_offset = int(round(ms / frame_duration))
return f"{ms:+} ms ({frame_offset:+} frames)" return f"{ms:+} ms ({frame_offset:+} frames)"
def serial_reader(ser):
global state
while ser.in_waiting:
line = ser.readline().decode(errors='ignore').strip()
match = ltc_pattern.match(line)
now = datetime.now()
if match:
status, tc_str, fps_str = match.groups()
fps = float(fps_str.lower().replace("fps", ""))
h, m, s, f = parse_timecode(tc_str)
# Update shared state
state["ltc_status"] = status
state["ltc_timecode"] = tc_str
state["framerate"] = fps_str
state["system_clock"] = now.strftime("%H:%M:%S.%f")[:-3]
state["offset_str"] = format_offset(get_offset(now, h, m, s, f, fps), fps)
state["last_received"] = time.time()
state["signal_loss"] = False
if status == "LOCK":
state["lock_count"] += 1
else:
state["free_count"] += 1
def draw_ui(stdscr): def draw_ui(stdscr):
global lock_count, free_count, last_frame, drift_warnings global state
curses.curs_set(0) curses.curs_set(0)
stdscr.nodelay(True) stdscr.nodelay(True)
try: try:
ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1) ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=0.1)
except serial.SerialException as e: except serial.SerialException as e:
stdscr.addstr(0, 0, f"[ERROR] Couldn't open {SERIAL_PORT}: {e}") stdscr.addstr(0, 0, f"[ERROR] Couldn't open {SERIAL_PORT}: {e}")
stdscr.getch() stdscr.getch()
return return
# Init variables
ltc_status = "--"
ltc_timecode = "--:--:--:--"
framerate = "--"
offset_str = "--"
while True: while True:
try: try:
line = ser.readline().decode(errors='ignore').strip() # Read as fast as possible
match = ltc_pattern.match(line) serial_reader(ser)
now = datetime.now()
if match: # Check for signal timeout
status, tc_str, fps_str = match.groups() if state["last_received"]:
ltc_status = status elapsed = time.time() - state["last_received"]
ltc_timecode = tc_str if elapsed > SIGNAL_TIMEOUT:
framerate = fps_str state["signal_loss"] = True
fps = float(fps_str.lower().replace("fps", ""))
h, m, s, f, sep = parse_timecode(tc_str) # Draw UI
current_frame = timecode_to_milliseconds(h, m, s, f, fps)
# Drift detection
if last_frame is not None:
expected = last_frame + int(1000 / fps)
if abs(current_frame - expected) > int(2 * (1000 / fps)):
drift_warnings.append(f"Drift: Δ{current_frame - last_frame} ms")
if len(drift_warnings) > 3:
drift_warnings.pop(0)
last_frame = current_frame
# Sync offset
offset_ms = get_offset(now, h, m, s, f, fps)
offset_str = format_offset(offset_ms, fps)
# Stats
if ltc_status == "LOCK":
lock_count += 1
else:
free_count += 1
# UI
stdscr.clear() stdscr.clear()
stdscr.addstr(0, 0, "🕰 NTP Timeturner v0.3") stdscr.addstr(0, 0, "🕰 NTP Timeturner v0.4")
stdscr.addstr(2, 0, f"LTC Status : {ltc_status}")
stdscr.addstr(3, 0, f"LTC Timecode : {ltc_timecode}") if state["signal_loss"]:
stdscr.addstr(4, 0, f"Frame Rate : {framerate}") stdscr.addstr(2, 0, "⚠️ No LTC signal detected!")
stdscr.addstr(5, 0, f"System Clock : {now.strftime('%H:%M:%S.%f')[:-3]}") stdscr.addstr(3, 0, f"Last seen: {elapsed:.2f}s ago")
stdscr.addstr(6, 0, f"Sync Offset : {offset_str}") else:
stdscr.addstr(7, 0, f"Lock Ratio : {lock_count} LOCK / {free_count} FREE") stdscr.addstr(2, 0, f"LTC Status : {state['ltc_status']}")
if drift_warnings: stdscr.addstr(3, 0, f"LTC Timecode : {state['ltc_timecode']}")
stdscr.addstr(9, 0, f"⚠️ {drift_warnings[-1]}") stdscr.addstr(4, 0, f"Frame Rate : {state['framerate']}")
stdscr.addstr(11, 0, "Press Ctrl+C to exit.") stdscr.addstr(5, 0, f"System Clock : {state['system_clock']}")
stdscr.addstr(6, 0, f"Sync Offset : {state['offset_str']}")
stdscr.addstr(7, 0, f"Lock Ratio : {state['lock_count']} LOCK / {state['free_count']} FREE")
stdscr.addstr(9, 0, "Press Ctrl+C to exit.")
stdscr.refresh() stdscr.refresh()
time.sleep(REFRESH_INTERVAL) time.sleep(UI_REFRESH_INTERVAL)
except KeyboardInterrupt: except KeyboardInterrupt:
break break
except Exception as e: except Exception as e:
stdscr.addstr(13, 0, f"[EXCEPTION] {e}") stdscr.addstr(11, 0, f"[EXCEPTION] {e}")
stdscr.refresh() stdscr.refresh()
time.sleep(1) time.sleep(1)