Update timeturner.py

This commit is contained in:
Chris Frankland-Wright 2025-07-07 22:01:49 +01:00 committed by GitHub
parent 3a7492d307
commit 69419b6d59
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,128 +1,144 @@
import serial import serial
import curses
import time import time
import datetime import datetime
import curses import re
import subprocess
import os import os
from datetime import datetime as dt, timedelta
SERIAL_PORT = None # Will be auto-detected
BAUD_RATE = 115200 BAUD_RATE = 115200
SCAN_PORTS = ["/dev/ttyACM0", "/dev/ttyUSB0", "/dev/serial0", "/dev/serial1"] FRAME_RATE = 25.0 # Default, will update from stream
VERSION = "0.6"
def find_serial_port(): last_ltc_time = None
for port in SCAN_PORTS: last_frame = None
if os.path.exists(port): lock_count = 0
return port free_count = 0
sync_requested = False
sync_pending = False
def find_teensy_serial():
for dev in os.listdir('/dev'):
if dev.startswith('ttyACM') or dev.startswith('ttyUSB'):
return f'/dev/{dev}'
return None return None
def parse_ltc_line(line): def parse_ltc_line(line):
try: """
parts = line.strip().split() Expected format: [LOCK] 19:56:56:14 | 25.00fps
if len(parts) != 4: or dropframe style: [LOCK] 19:56:56;14 | 29.97fps
"""
match = re.match(r"\[(LOCK|FREE)\]\s+(\d{2}):(\d{2}):(\d{2})[:;](\d{2})\s+\|\s+([\d.]+)fps", line)
if not match:
return None return None
status, hh, mm, ss, ff, fps = match.groups()
return {
"status": status,
"hours": int(hh),
"minutes": int(mm),
"seconds": int(ss),
"frames": int(ff),
"frame_rate": float(fps)
}
status = parts[0][1:-1] # [LOCK] -> LOCK def get_system_time():
timecode = parts[1] # e.g. 10:00:00:12 or 10:00:00;12 return datetime.datetime.now()
framerate = float(parts[3].replace("fps", ""))
sep = ":" if ":" in timecode else ";" def format_time(dt):
hh, mm, ss, ff = map(int, timecode.replace(";", ":").split(":")) return dt.strftime("%H:%M:%S.%f")[:-3] # show milliseconds only
now = dt.now()
ltc_dt = now.replace(hour=hh, minute=mm, second=ss, microsecond=0)
return (status, ltc_dt, framerate, ff)
except Exception:
return None
def run_curses(stdscr): def run_curses(stdscr):
global last_ltc_time, FRAME_RATE, lock_count, free_count, sync_requested, sync_pending, SERIAL_PORT
curses.curs_set(0) curses.curs_set(0)
stdscr.nodelay(True) stdscr.nodelay(True)
stdscr.timeout(200) stdscr.timeout(100)
port = find_serial_port() # Auto-detect Teensy serial
if port is None: SERIAL_PORT = find_teensy_serial()
stdscr.addstr(0, 0, "No serial port found. Check connection.") if not SERIAL_PORT:
stdscr.addstr(0, 0, "❌ No serial device found. Connect Teensy and try again.")
stdscr.refresh() stdscr.refresh()
time.sleep(3) time.sleep(3)
return return
ser = serial.Serial(port, BAUD_RATE, timeout=0.1) try:
buffer = "" ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1)
last_ltc_dt = None except Exception as e:
last_frame = None stdscr.addstr(0, 0, f"❌ Failed to open serial port: {e}")
frame_rate = None stdscr.refresh()
status = "FREE" time.sleep(3)
lock_count = 0 return
free_count = 0
sync_offset_ms = 0
while True: while True:
try: try:
buffer += ser.read(ser.in_waiting or 1).decode("utf-8", errors="ignore") line = ser.readline().decode(errors='ignore').strip()
while "\n" in buffer: parsed = parse_ltc_line(line)
line, buffer = buffer.split("\n", 1)
result = parse_ltc_line(line) if parsed:
if result: if parsed["status"] == "LOCK":
status, ltc_dt, frame_rate, frame = result
last_ltc_dt = ltc_dt
last_frame = frame
if status == "LOCK":
lock_count += 1 lock_count += 1
else: else:
free_count += 1 free_count += 1
except Exception as e: FRAME_RATE = parsed["frame_rate"]
stdscr.addstr(0, 0, f"Serial Error: {str(e)}")
stdscr.refresh()
time.sleep(1)
return
now = dt.now() # Construct full datetime with microsecond precision from LTC
if last_ltc_dt: ms = int((parsed["frames"] / FRAME_RATE) * 1000)
offset_td = now - last_ltc_dt ltc_dt = datetime.datetime.now().replace(
sync_offset_ms = round(offset_td.total_seconds() * 1000) hour=parsed["hours"],
minute=parsed["minutes"],
second=parsed["seconds"],
microsecond=ms * 1000
)
last_ltc_time = ltc_dt
last_frame = parsed["frames"]
# Perform sync only if requested and we just got a new frame
if sync_pending:
do_sync(stdscr, ltc_dt)
sync_pending = False
# Display
stdscr.erase() stdscr.erase()
stdscr.addstr(0, 2, f"NTP Timeturner v{VERSION}") stdscr.addstr(0, 2, f"NTP Timeturner v0.7")
stdscr.addstr(1, 2, f"Using Serial Port: {port}") stdscr.addstr(1, 2, f"Using Serial Port: {SERIAL_PORT}")
stdscr.addstr(3, 2, f"LTC Status : {parsed['status'] if parsed else ''}")
stdscr.addstr(4, 2, f"LTC Timecode : {parsed['hours']:02}:{parsed['minutes']:02}:{parsed['seconds']:02}:{parsed['frames']:02}" if parsed else "LTC Timecode : …")
stdscr.addstr(5, 2, f"Frame Rate : {FRAME_RATE:.2f}fps")
now = get_system_time()
stdscr.addstr(6, 2, f"System Clock : {format_time(now)}")
stdscr.addstr(3, 2, f"LTC Status : {status}") # Sync offset
if last_ltc_dt and last_frame is not None: if last_ltc_time:
stdscr.addstr(4, 2, f"LTC Timecode : {last_ltc_dt.strftime('%H:%M:%S')}:{last_frame:02}") offset_ms = (now - last_ltc_time).total_seconds() * 1000
else: offset_frames = offset_ms / (1000 / FRAME_RATE)
stdscr.addstr(4, 2, f"LTC Timecode : ---") stdscr.addstr(7, 2, f"Sync Offset : {offset_ms:+.0f} ms ({offset_frames:+.0f} frames)")
stdscr.addstr(5, 2, f"Frame Rate : {frame_rate:.2f}fps" if frame_rate else "Frame Rate : ---")
stdscr.addstr(6, 2, f"System Clock : {now.strftime('%H:%M:%S.%f')[:-3]}")
if sync_offset_ms and frame_rate:
frame_duration_ms = 1000 / frame_rate
offset_frames = round(sync_offset_ms / frame_duration_ms)
stdscr.addstr(7, 2, f"Sync Offset : {sync_offset_ms:+} ms ({offset_frames:+} frames)")
else:
stdscr.addstr(7, 2, "Sync Offset : ---")
stdscr.addstr(8, 2, f"Lock Ratio : {lock_count} LOCK / {free_count} FREE") stdscr.addstr(8, 2, f"Lock Ratio : {lock_count} LOCK / {free_count} FREE")
stdscr.addstr(10, 2, "[S] Set system clock to LTC [Ctrl+C] Quit") stdscr.addstr(10, 2, "[S] Set system clock to LTC [Ctrl+C] Quit")
stdscr.refresh() stdscr.refresh()
# Handle user input
try:
key = stdscr.getch() key = stdscr.getch()
if key in [ord('s'), ord('S')]: if key in (ord('s'), ord('S')):
if last_ltc_dt: sync_pending = True
new_dt = last_ltc_dt.replace(microsecond=0)
date_str = new_dt.strftime("%H:%M:%S")
os.system(f"sudo date -s \"{date_str}\"")
elif key == 3: # Ctrl+C
raise KeyboardInterrupt
except:
pass
time.sleep(0.05) except KeyboardInterrupt:
break
except Exception as e:
stdscr.addstr(15, 2, f"⚠️ Error: {e}")
stdscr.refresh()
time.sleep(1)
def do_sync(stdscr, ltc_dt):
try:
timestamp = ltc_dt.strftime("%H:%M:%S.%f")[:-3] # Drop microsecond to milliseconds
subprocess.run(["sudo", "date", "-s", timestamp], check=True)
stdscr.addstr(13, 2, f"✔️ System clock set to LTC: {timestamp}")
except Exception as e:
stdscr.addstr(13, 2, f"❌ Failed to sync system time: {e}")
if __name__ == "__main__": if __name__ == "__main__":
try:
curses.wrapper(run_curses) curses.wrapper(run_curses)
except KeyboardInterrupt:
print("\nExited cleanly.")