Update timeturner.py

This commit is contained in:
Chris Frankland-Wright 2025-07-08 15:36:27 +01:00 committed by GitHub
parent 393998a521
commit 9fbf61fc1f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,183 +1,217 @@
import serial import serial
import curses
import time import time
import datetime import datetime
import curses
import re import re
import subprocess
import os import os
import threading
import queue
import json import json
from threading import Thread, Lock
from collections import deque from collections import deque
# Config SERIAL_PORT = None
CONFIG_FILE = "config.json"
DEFAULT_CONFIG = {
"serial_port": "auto",
"hardware_offset_ms": 0
}
# Globals
SERIAL_PORT = "/dev/ttyUSB0"
BAUD_RATE = 115200 BAUD_RATE = 115200
FRAME_RATE = 25.0 FRAME_RATE = 25.0
CONFIG_PATH = "config.json"
sync_pending = False
ltc_data_queue = queue.Queue()
latest_ltc = None
offset_history = deque(maxlen=20)
lock_total = 0
free_total = 0
hardware_offset_ms = 0 hardware_offset_ms = 0
ltc_locked = False
lock_stable_since = None
sync_enabled = False
# Shared data def load_config():
latest_line = None global hardware_offset_ms
latest_timestamp = None
line_lock = Lock()
# Sync Jitter buffer
offset_buffer = deque(maxlen=50)
# Load config
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE) as f:
try: try:
with open(CONFIG_PATH, "r") as f:
config = json.load(f) config = json.load(f)
SERIAL_PORT = config.get("serial_port", SERIAL_PORT) hardware_offset_ms = int(config.get("hardware_offset_ms", 0))
hardware_offset_ms = config.get("hardware_offset_ms", 0) except Exception:
except json.JSONDecodeError: hardware_offset_ms = 0
print("⚠️ Failed to parse config.json. Using defaults.")
else:
config = DEFAULT_CONFIG
def auto_detect_serial_port(): def find_teensy_serial():
for dev in os.listdir("/dev"): for dev in os.listdir('/dev'):
if dev.startswith("ttyACM") or dev.startswith("ttyUSB"): if dev.startswith('ttyACM') or dev.startswith('ttyUSB'):
return f"/dev/{dev}" return f'/dev/{dev}'
return SERIAL_PORT return None
def parse_ltc_line(line): def parse_ltc_line(line):
match = re.match(r"\[(LOCK|FREE)\]\s+(\d{2}):(\d{2}):(\d{2})[:;](\d{2})\s+\|\s+([\d.]+)fps", line) match = re.match(r"\[(LOCK|FREE)\]\s+(\d{2}):(\d{2}):(\d{2})[:;](\d{2})\s+\|\s+([\d.]+)fps", line)
if not match: if not match:
return None return None
status, hh, mm, ss, ff, fps = match.groups()
return { return {
"lock": match.group(1), "status": status,
"hours": int(match.group(2)), "hours": int(hh),
"minutes": int(match.group(3)), "minutes": int(mm),
"seconds": int(match.group(4)), "seconds": int(ss),
"frames": int(match.group(5)), "frames": int(ff),
"fps": float(match.group(6)) "frame_rate": float(fps)
} }
def serial_reader(port, baud): def serial_thread(port, baud, q):
global latest_line, latest_timestamp try:
ser = serial.Serial(port, baud, timeout=1) ser = serial.Serial(port, baud, timeout=1)
while True: while True:
line = ser.readline().decode(errors='ignore').strip() line = ser.readline().decode(errors='ignore').strip()
if line: timestamp = datetime.datetime.now()
with line_lock: parsed = parse_ltc_line(line)
latest_line = line if parsed:
latest_timestamp = datetime.datetime.now() q.put((parsed, timestamp))
except Exception as e:
print(f"Serial thread error: {e}")
def get_system_time():
return datetime.datetime.now()
def format_time(dt):
return dt.strftime("%H:%M:%S.%f")[:-3]
def run_curses(stdscr): def run_curses(stdscr):
global FRAME_RATE, sync_pending, SERIAL_PORT, latest_ltc
global offset_history, lock_total, free_total
global ltc_locked, lock_stable_since, sync_enabled
curses.curs_set(0) curses.curs_set(0)
stdscr.nodelay(True) stdscr.nodelay(True)
stdscr.timeout(100) stdscr.timeout(50)
serial_port = auto_detect_serial_port() load_config()
stdscr.addstr(1, 2, f"NTP Timeturner v1.3")
stdscr.addstr(2, 2, f"Using Serial Port: {serial_port}") SERIAL_PORT = find_teensy_serial()
if not SERIAL_PORT:
stdscr.addstr(0, 0, "❌ No serial device found.")
stdscr.refresh() stdscr.refresh()
time.sleep(2)
return
reader_thread = Thread(target=serial_reader, args=(serial_port, BAUD_RATE), daemon=True) thread = threading.Thread(target=serial_thread, args=(SERIAL_PORT, BAUD_RATE, ltc_data_queue), daemon=True)
reader_thread.start() thread.start()
lock_count = 0
free_count = 0
sync_allowed = False
while True: while True:
now = datetime.datetime.now() try:
with line_lock: while not ltc_data_queue.empty():
line = latest_line parsed, arrival_time = ltc_data_queue.get_nowait()
timestamp = latest_timestamp latest_ltc = (parsed, arrival_time)
parsed = parse_ltc_line(line) if line else None FRAME_RATE = parsed["frame_rate"]
status = parsed["status"]
if status == "LOCK":
lock_total += 1
if not ltc_locked:
lock_stable_since = time.time()
ltc_locked = True
elif time.time() - lock_stable_since > 1.0:
sync_enabled = True
else:
free_total += 1
ltc_locked = False
sync_enabled = False
lock_stable_since = None
offset_history.clear()
if ltc_locked and sync_enabled:
offset_ms = (get_system_time() - arrival_time).total_seconds() * 1000 - hardware_offset_ms
offset_frames = offset_ms / (1000 / FRAME_RATE)
offset_history.append((offset_ms, offset_frames))
if sync_pending and ltc_locked and sync_enabled:
do_sync(stdscr, parsed, arrival_time)
sync_pending = False
stdscr.erase() stdscr.erase()
stdscr.addstr(1, 2, f"NTP Timeturner v1.3") stdscr.addstr(0, 2, "NTP Timeturner v1.2")
stdscr.addstr(2, 2, f"Using Serial Port: {serial_port}") stdscr.addstr(1, 2, f"Using Serial Port: {SERIAL_PORT}")
if parsed: if latest_ltc:
lock_state = parsed["lock"] parsed, arrival_time = latest_ltc
if lock_state == "LOCK": stdscr.addstr(3, 2, f"LTC Status : {parsed['status']}")
lock_count += 1 stdscr.addstr(4, 2, f"LTC Timecode : {parsed['hours']:02}:{parsed['minutes']:02}:{parsed['seconds']:02}:{parsed['frames']:02}")
stdscr.addstr(5, 2, f"Frame Rate : {FRAME_RATE:.2f}fps")
stdscr.addstr(6, 2, f"System Clock : {format_time(get_system_time())}")
if ltc_locked and sync_enabled and offset_history:
avg_ms = sum(x[0] for x in offset_history) / len(offset_history)
avg_frames = sum(x[1] for x in offset_history) / len(offset_history)
if abs(avg_ms) < 10:
color = curses.color_pair(2)
elif abs(avg_ms) < 40:
color = curses.color_pair(3)
else: else:
free_count += 1 color = curses.color_pair(1)
stdscr.addstr(4, 2, f"LTC Status : {lock_state}", curses.color_pair(2 if lock_state == "LOCK" else 3)) stdscr.attron(color)
stdscr.addstr(7, 2, f"Sync Offset : {avg_ms:+.0f} ms ({avg_frames:+.0f} frames)")
stdscr.attroff(color)
elif parsed["status"] == "FREE":
stdscr.attron(curses.color_pair(3))
stdscr.addstr(7, 2, "⚠️ LTC UNSYNCED — offset unavailable")
stdscr.attroff(curses.color_pair(3))
else:
stdscr.addstr(7, 2, "Sync Offset : …")
timecode_str = f"{parsed['hours']:02}:{parsed['minutes']:02}:{parsed['seconds']:02}:{parsed['frames']:02}" total = lock_total + free_total
stdscr.addstr(5, 2, f"LTC Timecode : {timecode_str}") lock_pct = (lock_total / total) * 100 if total else 0
stdscr.addstr(6, 2, f"Frame Rate : {parsed['fps']:.2f}fps") if ltc_locked and sync_enabled:
stdscr.addstr(8, 2, f"Lock Ratio : {lock_pct:.1f}% LOCK")
else:
stdscr.attron(curses.color_pair(3))
stdscr.addstr(8, 2, f"Lock Ratio : {lock_pct:.1f}% (not stable)")
stdscr.attroff(curses.color_pair(3))
else:
stdscr.addstr(3, 2, "LTC Status : (waiting)")
stdscr.addstr(4, 2, "LTC Timecode : …")
stdscr.addstr(5, 2, "Frame Rate : …")
stdscr.addstr(6, 2, f"System Clock : {format_time(get_system_time())}")
stdscr.addstr(7, 2, "Sync Offset : …")
stdscr.addstr(8, 2, "Lock Ratio : …")
# Generate LTC datetime object if sync_enabled:
ltc_time = datetime.datetime.now().replace( stdscr.addstr(10, 2, "[S] Set system clock to LTC [Ctrl+C] Quit")
else:
stdscr.addstr(10, 2, "(Sync disabled — LTC not locked) [Ctrl+C] Quit")
stdscr.refresh()
key = stdscr.getch()
if key in (ord('s'), ord('S')) and latest_ltc and sync_enabled:
sync_pending = True
except KeyboardInterrupt:
break
except Exception as e:
stdscr.addstr(13, 2, f"⚠️ Error: {e}")
stdscr.refresh()
time.sleep(1)
def do_sync(stdscr, parsed, arrival_time):
try:
ms = int((parsed["frames"] / parsed["frame_rate"]) * 1000)
sync_time = arrival_time.replace(
hour=parsed["hours"], hour=parsed["hours"],
minute=parsed["minutes"], minute=parsed["minutes"],
second=parsed["seconds"], second=parsed["seconds"],
microsecond=int((parsed["frames"] / parsed["fps"]) * 1_000_000) microsecond=(ms + hardware_offset_ms) * 1000
) )
timestamp = sync_time.strftime("%H:%M:%S.%f")[:-3]
# Show system clock subprocess.run(["sudo", "date", "-s", timestamp], check=True)
system_time = datetime.datetime.now() stdscr.addstr(13, 2, f"✔️ Synced to LTC: {timestamp}")
stdscr.addstr(7, 2, f"System Clock : {system_time.strftime('%H:%M:%S.%f')[:-3]}") except Exception as e:
stdscr.addstr(13, 2, f"❌ Sync failed: {e}")
# Calculate Sync Jitter
if lock_state == "LOCK":
offset_ms = (system_time - ltc_time).total_seconds() * 1000 - hardware_offset_ms
offset_buffer.append(offset_ms)
if offset_buffer:
avg_offset = sum(offset_buffer) / len(offset_buffer)
frame_error = round((avg_offset / 1000) * parsed["fps"])
stdscr.addstr(8, 2, f"Sync Jitter : {avg_offset:+.0f} ms ({frame_error:+} frames)",
curses.color_pair(0 if abs(avg_offset) < 5 else 3))
else:
stdscr.addstr(8, 2, f"Sync Jitter : --")
else:
stdscr.addstr(8, 2, f"Sync Jitter : -- (FREE mode)", curses.color_pair(3))
# Timecode Match (HH:MM:SS only)
ltc_time_str = ltc_time.strftime('%H:%M:%S')
sys_time_str = system_time.strftime('%H:%M:%S')
if lock_state == "LOCK":
if ltc_time_str == sys_time_str:
stdscr.addstr(9, 2, "Timecode Match: MATCHED", curses.color_pair(2))
else:
stdscr.addstr(9, 2, "Timecode Match: OUT OF SYNC", curses.color_pair(3))
else:
stdscr.addstr(9, 2, "Timecode Match: UNKNOWN", curses.color_pair(3))
sync_allowed = lock_state == "LOCK"
# Lock Ratio
total = lock_count + free_count
if total > 0:
lock_ratio = (lock_count / total) * 100
stdscr.addstr(10, 2, f"Lock Ratio : {lock_ratio:.1f}% LOCK", curses.color_pair(2 if lock_ratio > 90 else 3))
else:
stdscr.addstr(4, 2, "Waiting for LTC data...", curses.color_pair(3))
stdscr.addstr(12, 2, "[S] Set system clock to LTC [Ctrl+C] Quit")
key = stdscr.getch()
if key in (ord('s'), ord('S')) and parsed and sync_allowed:
clock_str = f"{parsed['hours']:02}:{parsed['minutes']:02}:{parsed['seconds']:02}"
os.system(f"sudo date -s \"{clock_str}\"")
stdscr.refresh()
time.sleep(0.05)
def main():
curses.wrapper(start_curses)
def start_curses(stdscr):
curses.start_color()
curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLACK)
curses.init_pair(2, curses.COLOR_GREEN, curses.COLOR_BLACK)
curses.init_pair(3, curses.COLOR_RED, curses.COLOR_BLACK)
run_curses(stdscr)
if __name__ == "__main__": if __name__ == "__main__":
main() curses.initscr()
curses.start_color()
curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK)
curses.init_pair(2, curses.COLOR_GREEN, curses.COLOR_BLACK)
curses.init_pair(3, curses.COLOR_YELLOW, curses.COLOR_BLACK)
curses.wrapper(run_curses)