NTP-Timeturner/timeturner.py
Chris Frankland-Wright a1da396874 refactor: Use rational numbers for accurate frame rate calculations
Co-authored-by: aider (gemini/gemini-2.5-pro) <aider@aider.chat>
2025-08-02 12:26:17 +01:00

258 lines
9.2 KiB
Python

import serial
import curses
import time
import datetime
import re
import subprocess
import os
import threading
import queue
import json
from collections import deque
from fractions import Fraction
SERIAL_PORT = None
BAUD_RATE = 115200
FRAME_RATE = Fraction(25, 1)
CONFIG_PATH = "config.json"
sync_pending = False
ltc_data_queue = queue.Queue()
latest_ltc = None
offset_history = deque(maxlen=20)
lock_total = 0
free_total = 0
hardware_offset_ms = 0
ltc_locked = False
lock_stable_since = None
sync_enabled = False
last_match_check = 0
timecode_match_status = "UNKNOWN"
def framerate_str_to_fraction(s):
if s == "23.98": return Fraction(24000, 1001)
if s == "24.00": return Fraction(24, 1)
if s == "25.00": return Fraction(25, 1)
if s == "29.97": return Fraction(30000, 1001)
if s == "30.00": return Fraction(30, 1)
return None
def load_config():
global hardware_offset_ms
try:
with open(CONFIG_PATH, "r") as f:
config = json.load(f)
hardware_offset_ms = int(config.get("hardware_offset_ms", 0))
except Exception:
hardware_offset_ms = 0
def find_teensy_serial():
for dev in os.listdir('/dev'):
if dev.startswith('ttyACM') or dev.startswith('ttyUSB'):
return f'/dev/{dev}'
return None
def parse_ltc_line(line):
match = re.match(r"\[(LOCK|FREE)\]\s+(\d{2}):(\d{2}):(\d{2})[:;](\d{2})\s+\|\s+([\d.]+)fps", line)
if not match:
return None
status, hh, mm, ss, ff, fps = match.groups()
rate = framerate_str_to_fraction(fps)
if not rate:
return None
return {
"status": status,
"hours": int(hh),
"minutes": int(mm),
"seconds": int(ss),
"frames": int(ff),
"frame_rate": rate
}
def serial_thread(port, baud, q):
try:
ser = serial.Serial(port, baud, timeout=1)
while True:
line = ser.readline().decode(errors='ignore').strip()
timestamp = datetime.datetime.now()
parsed = parse_ltc_line(line)
if parsed:
q.put((parsed, timestamp))
except Exception as e:
print(f"Serial thread error: {e}")
def get_system_time():
return datetime.datetime.now()
def format_time(dt):
return dt.strftime("%H:%M:%S.%f")[:-3]
def run_curses(stdscr):
global FRAME_RATE, sync_pending, SERIAL_PORT, latest_ltc
global offset_history, lock_total, free_total
global ltc_locked, lock_stable_since, sync_enabled
global last_match_check, timecode_match_status
curses.curs_set(0)
stdscr.nodelay(True)
stdscr.timeout(50)
load_config()
SERIAL_PORT = find_teensy_serial()
if not SERIAL_PORT:
stdscr.addstr(0, 0, "X No serial device found.")
stdscr.refresh()
time.sleep(2)
return
thread = threading.Thread(target=serial_thread, args=(SERIAL_PORT, BAUD_RATE, ltc_data_queue), daemon=True)
thread.start()
while True:
try:
now = time.time()
while not ltc_data_queue.empty():
parsed, arrival_time = ltc_data_queue.get_nowait()
latest_ltc = (parsed, arrival_time)
FRAME_RATE = parsed["frame_rate"]
status = parsed["status"]
if status == "LOCK":
lock_total += 1
if not ltc_locked:
lock_stable_since = time.time()
ltc_locked = True
elif time.time() - lock_stable_since > 1.0:
sync_enabled = True
else:
free_total += 1
ltc_locked = False
sync_enabled = False
lock_stable_since = None
offset_history.clear()
timecode_match_status = "UNKNOWN"
if ltc_locked and sync_enabled:
offset_ms = (get_system_time() - arrival_time).total_seconds() * 1000 - hardware_offset_ms
offset_frames = offset_ms / (1000 / FRAME_RATE)
offset_history.append((offset_ms, offset_frames))
if sync_pending and ltc_locked and sync_enabled:
do_sync(stdscr, parsed, arrival_time)
sync_pending = False
# Check timecode match every 5 seconds
if latest_ltc and now - last_match_check > 5:
parsed, _ = latest_ltc
system_time = get_system_time()
if (parsed["hours"] == system_time.hour and
parsed["minutes"] == system_time.minute and
parsed["seconds"] == system_time.second):
timecode_match_status = "IN SYNC"
else:
timecode_match_status = "OUT OF SYNC"
last_match_check = now
stdscr.erase()
stdscr.addstr(0, 2, "NTP Timeturner v2 - Python Final")
stdscr.addstr(1, 2, f"Using Serial Port: {SERIAL_PORT}")
if latest_ltc:
parsed, arrival_time = latest_ltc
stdscr.addstr(3, 2, f"LTC Status : {parsed['status']}")
stdscr.addstr(4, 2, f"LTC Timecode : {parsed['hours']:02}:{parsed['minutes']:02}:{parsed['seconds']:02}:{parsed['frames']:02}")
stdscr.addstr(5, 2, f"Frame Rate : {float(FRAME_RATE):.2f}fps")
stdscr.addstr(6, 2, f"System Clock : {format_time(get_system_time())}")
if ltc_locked and sync_enabled and offset_history:
avg_ms = sum(x[0] for x in offset_history) / len(offset_history)
avg_frames = sum(x[1] for x in offset_history) / len(offset_history)
if abs(avg_ms) < 10:
color = curses.color_pair(2)
elif abs(avg_ms) < 40:
color = curses.color_pair(3)
else:
color = curses.color_pair(1)
stdscr.attron(color)
stdscr.addstr(7, 2, f"Sync Jitter : {avg_ms:+.0f} ms ({avg_frames:+.0f} frames)")
stdscr.attroff(color)
elif parsed["status"] == "FREE":
stdscr.attron(curses.color_pair(3))
stdscr.addstr(7, 2, "⚠ LTC UNSYNCED — offset unavailable")
stdscr.attroff(curses.color_pair(3))
else:
stdscr.addstr(7, 2, "Sync Jitter : …")
# Timecode Match
if timecode_match_status == "IN SYNC":
stdscr.attron(curses.color_pair(2))
elif timecode_match_status == "OUT OF SYNC":
stdscr.attron(curses.color_pair(1))
stdscr.addstr(8, 2, f"Sync Status : {timecode_match_status}")
stdscr.attroff(curses.color_pair(1))
stdscr.attroff(curses.color_pair(2))
total = lock_total + free_total
lock_pct = (lock_total / total) * 100 if total else 0
if ltc_locked and sync_enabled:
stdscr.addstr(9, 2, f"Lock Ratio : {lock_pct:.1f}% LOCK")
else:
stdscr.attron(curses.color_pair(3))
stdscr.addstr(9, 2, f"Lock Ratio : {lock_pct:.1f}% (not stable)")
stdscr.attroff(curses.color_pair(3))
else:
stdscr.addstr(3, 2, "LTC Status : (waiting)")
stdscr.addstr(4, 2, "LTC Timecode : …")
stdscr.addstr(5, 2, "Frame Rate : …")
stdscr.addstr(6, 2, f"System Clock : {format_time(get_system_time())}")
stdscr.addstr(7, 2, "Sync Jitter : …")
stdscr.addstr(8, 2, "Sync Status : …")
stdscr.addstr(9, 2, "Lock Ratio : …")
if sync_enabled:
stdscr.addstr(11, 2, "[S] Set system clock to LTC [Ctrl+C] Quit")
else:
stdscr.addstr(11, 2, "(Sync disabled — LTC not locked) [Ctrl+C] Quit")
stdscr.refresh()
key = stdscr.getch()
if key in (ord('s'), ord('S')) and latest_ltc and sync_enabled:
sync_pending = True
except KeyboardInterrupt:
break
except Exception as e:
stdscr.addstr(13, 2, f"⚠ Error: {e}")
stdscr.refresh()
time.sleep(1)
def do_sync(stdscr, parsed, arrival_time):
try:
ms = int((parsed["frames"] / parsed["frame_rate"]) * 1000)
sync_time = arrival_time.replace(
hour=parsed["hours"],
minute=parsed["minutes"],
second=parsed["seconds"],
microsecond=(ms + hardware_offset_ms) * 1000
)
timestamp = sync_time.strftime("%H:%M:%S.%f")[:-3]
subprocess.run(["sudo", "date", "-s", timestamp], check=True)
stdscr.addstr(13, 2, f"✔ Synced to LTC: {timestamp}")
except Exception as e:
stdscr.addstr(13, 2, f"X Sync failed: {e}")
if __name__ == "__main__":
curses.initscr()
curses.start_color()
curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK)
curses.init_pair(2, curses.COLOR_GREEN, curses.COLOR_BLACK)
curses.init_pair(3, curses.COLOR_YELLOW, curses.COLOR_BLACK)
curses.wrapper(run_curses)