accurate millisecond count

This commit is contained in:
Chris Frankland-Wright 2025-07-07 21:54:17 +01:00 committed by GitHub
parent 007f8ffd7d
commit 3a7492d307
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,153 +1,128 @@
import curses
import datetime
import serial import serial
import subprocess
import time import time
import re import datetime
import glob import curses
import os import os
from datetime import datetime as dt, timedelta
BAUD_RATE = 115200 BAUD_RATE = 115200
line_regex = re.compile(r"\[(LOCK|FREE)\]\s+(\d{2}:\d{2}:\d{2}[:;]\d{2})\s+\|\s+([\d.]+)fps") SCAN_PORTS = ["/dev/ttyACM0", "/dev/ttyUSB0", "/dev/serial0", "/dev/serial1"]
VERSION = "0.6"
def find_serial_port(): def find_serial_port():
candidates = sorted(glob.glob('/dev/ttyACM*') + glob.glob('/dev/ttyUSB*')) for port in SCAN_PORTS:
for port in candidates: if os.path.exists(port):
try:
s = serial.Serial(port, BAUD_RATE, timeout=1)
s.close()
return port return port
except serial.SerialException:
continue
return None return None
def parse_ltc_line(line): def parse_ltc_line(line):
match = line_regex.match(line.strip())
if not match:
return None
status, timecode, fps = match.groups()
return status, timecode.replace(';', ':'), float(fps)
def get_system_time():
return datetime.datetime.now()
def timecode_to_dt(tc):
try: try:
h, m, s, f = map(int, tc.split(":")) parts = line.strip().split()
return datetime.datetime.now().replace(hour=h, minute=m, second=s, microsecond=0) if len(parts) != 4:
return None
status = parts[0][1:-1] # [LOCK] -> LOCK
timecode = parts[1] # e.g. 10:00:00:12 or 10:00:00;12
framerate = float(parts[3].replace("fps", ""))
sep = ":" if ":" in timecode else ";"
hh, mm, ss, ff = map(int, timecode.replace(";", ":").split(":"))
now = dt.now()
ltc_dt = now.replace(hour=hh, minute=mm, second=ss, microsecond=0)
return (status, ltc_dt, framerate, ff)
except Exception: except Exception:
return None return None
def run_curses(stdscr): def run_curses(stdscr):
curses.curs_set(0) curses.curs_set(0)
curses.start_color() stdscr.nodelay(True)
curses.use_default_colors() stdscr.timeout(200)
curses.init_pair(1, curses.COLOR_GREEN, -1) port = find_serial_port()
curses.init_pair(2, curses.COLOR_YELLOW, -1) if port is None:
curses.init_pair(3, curses.COLOR_RED, -1) stdscr.addstr(0, 0, "No serial port found. Check connection.")
curses.init_pair(4, curses.COLOR_CYAN, -1)
serial_port = find_serial_port()
if not serial_port:
stdscr.addstr(0, 0, "Could not find Teensy serial port (ACM/USB).")
stdscr.refresh()
time.sleep(3)
return
try:
ser = serial.Serial(serial_port, BAUD_RATE, timeout=0.1)
except Exception as e:
stdscr.addstr(0, 0, f"Failed to open {serial_port}: {e}")
stdscr.refresh() stdscr.refresh()
time.sleep(3) time.sleep(3)
return return
ser = serial.Serial(port, BAUD_RATE, timeout=0.1)
buffer = ""
last_ltc_dt = None
last_frame = None
frame_rate = None
status = "FREE"
lock_count = 0 lock_count = 0
free_count = 0 free_count = 0
last_ltc_dt = None sync_offset_ms = 0
last_status = "LOST"
frame_rate = 0.0
sync_requested = False
syncing = False
read_buffer = ""
while True: while True:
now = get_system_time()
try: try:
data = ser.read(128).decode(errors='ignore') buffer += ser.read(ser.in_waiting or 1).decode("utf-8", errors="ignore")
read_buffer += data while "\n" in buffer:
while '\n' in read_buffer: line, buffer = buffer.split("\n", 1)
line, read_buffer = read_buffer.split('\n', 1) result = parse_ltc_line(line)
parsed = parse_ltc_line(line) if result:
if parsed: status, ltc_dt, frame_rate, frame = result
status, tc_str, fps = parsed last_ltc_dt = ltc_dt
frame_rate = fps last_frame = frame
last_ltc_dt = timecode_to_dt(tc_str)
last_status = status
if status == "LOCK": if status == "LOCK":
lock_count += 1 lock_count += 1
else: else:
free_count += 1 free_count += 1
if sync_requested and not syncing:
syncing = True
new_time = last_ltc_dt.strftime("%H:%M:%S")
try:
subprocess.run(["sudo", "date", "-s", new_time], check=True)
sync_feedback = f"[OK] Clock set to {new_time}"
except subprocess.CalledProcessError as e:
sync_feedback = f"[ERR] Failed to sync: {e}"
sync_requested = False
syncing = False
except Exception as e: except Exception as e:
pass # ignore serial read errors stdscr.addstr(0, 0, f"Serial Error: {str(e)}")
stdscr.refresh()
time.sleep(1)
return
now = dt.now()
if last_ltc_dt: if last_ltc_dt:
sys_time = now.replace(microsecond=0) offset_td = now - last_ltc_dt
offset = (sys_time - last_ltc_dt).total_seconds() sync_offset_ms = round(offset_td.total_seconds() * 1000)
offset_ms = int(offset * 1000)
offset_frames = int(round(offset * frame_rate))
offset_str = f"{offset_ms:+} ms ({offset_frames:+} frames)"
else:
offset_str = "n/a"
stdscr.erase() stdscr.erase()
stdscr.addstr(0, 0, "NTP Timeturner v0.5") stdscr.addstr(0, 2, f"NTP Timeturner v{VERSION}")
stdscr.addstr(1, 0, f"Using Serial Port: {serial_port}") stdscr.addstr(1, 2, f"Using Serial Port: {port}")
stdscr.addstr(3, 0, "LTC Status : ")
if last_status == "LOCK": stdscr.addstr(3, 2, f"LTC Status : {status}")
stdscr.addstr("LOCK", curses.color_pair(1)) if last_ltc_dt and last_frame is not None:
elif last_status == "FREE": stdscr.addstr(4, 2, f"LTC Timecode : {last_ltc_dt.strftime('%H:%M:%S')}:{last_frame:02}")
stdscr.addstr("FREE", curses.color_pair(2))
else: else:
stdscr.addstr("LOST", curses.color_pair(3)) stdscr.addstr(4, 2, f"LTC Timecode : ---")
stdscr.addstr(4, 0, f"LTC Timecode : {last_ltc_dt.strftime('%H:%M:%S') if last_ltc_dt else 'n/a'}") stdscr.addstr(5, 2, f"Frame Rate : {frame_rate:.2f}fps" if frame_rate else "Frame Rate : ---")
stdscr.addstr(5, 0, f"Frame Rate : {frame_rate:.2f}fps") stdscr.addstr(6, 2, f"System Clock : {now.strftime('%H:%M:%S.%f')[:-3]}")
stdscr.addstr(6, 0, f"System Clock : {now.strftime('%H:%M:%S.%f')[:-3]}")
stdscr.addstr(7, 0, "Sync Offset : ")
stdscr.addstr(offset_str, curses.color_pair(4))
stdscr.addstr(8, 0, f"Lock Ratio : {lock_count} LOCK / {free_count} FREE")
stdscr.addstr(10, 0, "[S] Set system clock to LTC [Ctrl+C] Quit")
if 'sync_feedback' in locals(): if sync_offset_ms and frame_rate:
stdscr.addstr(12, 0, sync_feedback[:curses.COLS - 1]) frame_duration_ms = 1000 / frame_rate
del sync_feedback offset_frames = round(sync_offset_ms / frame_duration_ms)
stdscr.addstr(7, 2, f"Sync Offset : {sync_offset_ms:+} ms ({offset_frames:+} frames)")
else:
stdscr.addstr(7, 2, "Sync Offset : ---")
stdscr.addstr(8, 2, f"Lock Ratio : {lock_count} LOCK / {free_count} FREE")
stdscr.addstr(10, 2, "[S] Set system clock to LTC [Ctrl+C] Quit")
stdscr.refresh() stdscr.refresh()
stdscr.nodelay(True) # Handle user input
try: try:
key = stdscr.getch() key = stdscr.getch()
if key == ord('s') or key == ord('S'): if key in [ord('s'), ord('S')]:
sync_requested = True if last_ltc_dt:
new_dt = last_ltc_dt.replace(microsecond=0)
date_str = new_dt.strftime("%H:%M:%S")
os.system(f"sudo date -s \"{date_str}\"")
elif key == 3: # Ctrl+C
raise KeyboardInterrupt
except: except:
pass pass
time.sleep(0.1) time.sleep(0.05)
if __name__ == "__main__": if __name__ == "__main__":
curses.wrapper(run_curses) try:
curses.wrapper(run_curses)
except KeyboardInterrupt:
print("\nExited cleanly.")