diff --git a/timeturner.py b/timeturner.py index 91aadd4..f4099a7 100644 --- a/timeturner.py +++ b/timeturner.py @@ -1,153 +1,128 @@ -import curses -import datetime import serial -import subprocess import time -import re -import glob +import datetime +import curses import os +from datetime import datetime as dt, timedelta BAUD_RATE = 115200 -line_regex = re.compile(r"\[(LOCK|FREE)\]\s+(\d{2}:\d{2}:\d{2}[:;]\d{2})\s+\|\s+([\d.]+)fps") +SCAN_PORTS = ["/dev/ttyACM0", "/dev/ttyUSB0", "/dev/serial0", "/dev/serial1"] +VERSION = "0.6" def find_serial_port(): - candidates = sorted(glob.glob('/dev/ttyACM*') + glob.glob('/dev/ttyUSB*')) - for port in candidates: - try: - s = serial.Serial(port, BAUD_RATE, timeout=1) - s.close() + for port in SCAN_PORTS: + if os.path.exists(port): return port - except serial.SerialException: - continue return None def parse_ltc_line(line): - match = line_regex.match(line.strip()) - if not match: - return None - status, timecode, fps = match.groups() - return status, timecode.replace(';', ':'), float(fps) - -def get_system_time(): - return datetime.datetime.now() - -def timecode_to_dt(tc): try: - h, m, s, f = map(int, tc.split(":")) - return datetime.datetime.now().replace(hour=h, minute=m, second=s, microsecond=0) + parts = line.strip().split() + if len(parts) != 4: + return None + + status = parts[0][1:-1] # [LOCK] -> LOCK + timecode = parts[1] # e.g. 10:00:00:12 or 10:00:00;12 + framerate = float(parts[3].replace("fps", "")) + + sep = ":" if ":" in timecode else ";" + hh, mm, ss, ff = map(int, timecode.replace(";", ":").split(":")) + + now = dt.now() + ltc_dt = now.replace(hour=hh, minute=mm, second=ss, microsecond=0) + return (status, ltc_dt, framerate, ff) except Exception: return None def run_curses(stdscr): curses.curs_set(0) - curses.start_color() - curses.use_default_colors() + stdscr.nodelay(True) + stdscr.timeout(200) - curses.init_pair(1, curses.COLOR_GREEN, -1) - curses.init_pair(2, curses.COLOR_YELLOW, -1) - curses.init_pair(3, curses.COLOR_RED, -1) - curses.init_pair(4, curses.COLOR_CYAN, -1) - - serial_port = find_serial_port() - if not serial_port: - stdscr.addstr(0, 0, "Could not find Teensy serial port (ACM/USB).") - stdscr.refresh() - time.sleep(3) - return - - try: - ser = serial.Serial(serial_port, BAUD_RATE, timeout=0.1) - except Exception as e: - stdscr.addstr(0, 0, f"Failed to open {serial_port}: {e}") + port = find_serial_port() + if port is None: + stdscr.addstr(0, 0, "No serial port found. Check connection.") stdscr.refresh() time.sleep(3) return + ser = serial.Serial(port, BAUD_RATE, timeout=0.1) + buffer = "" + last_ltc_dt = None + last_frame = None + frame_rate = None + status = "FREE" lock_count = 0 free_count = 0 - last_ltc_dt = None - last_status = "LOST" - frame_rate = 0.0 - - sync_requested = False - syncing = False - - read_buffer = "" + sync_offset_ms = 0 while True: - now = get_system_time() try: - data = ser.read(128).decode(errors='ignore') - read_buffer += data - while '\n' in read_buffer: - line, read_buffer = read_buffer.split('\n', 1) - parsed = parse_ltc_line(line) - if parsed: - status, tc_str, fps = parsed - frame_rate = fps - last_ltc_dt = timecode_to_dt(tc_str) - last_status = status + buffer += ser.read(ser.in_waiting or 1).decode("utf-8", errors="ignore") + while "\n" in buffer: + line, buffer = buffer.split("\n", 1) + result = parse_ltc_line(line) + if result: + status, ltc_dt, frame_rate, frame = result + last_ltc_dt = ltc_dt + last_frame = frame if status == "LOCK": lock_count += 1 else: free_count += 1 - if sync_requested and not syncing: - syncing = True - new_time = last_ltc_dt.strftime("%H:%M:%S") - try: - subprocess.run(["sudo", "date", "-s", new_time], check=True) - sync_feedback = f"[OK] Clock set to {new_time}" - except subprocess.CalledProcessError as e: - sync_feedback = f"[ERR] Failed to sync: {e}" - sync_requested = False - syncing = False except Exception as e: - pass # ignore serial read errors + stdscr.addstr(0, 0, f"Serial Error: {str(e)}") + stdscr.refresh() + time.sleep(1) + return + now = dt.now() if last_ltc_dt: - sys_time = now.replace(microsecond=0) - offset = (sys_time - last_ltc_dt).total_seconds() - offset_ms = int(offset * 1000) - offset_frames = int(round(offset * frame_rate)) - offset_str = f"{offset_ms:+} ms ({offset_frames:+} frames)" - else: - offset_str = "n/a" + offset_td = now - last_ltc_dt + sync_offset_ms = round(offset_td.total_seconds() * 1000) stdscr.erase() - stdscr.addstr(0, 0, "NTP Timeturner v0.5") - stdscr.addstr(1, 0, f"Using Serial Port: {serial_port}") - stdscr.addstr(3, 0, "LTC Status : ") - if last_status == "LOCK": - stdscr.addstr("LOCK", curses.color_pair(1)) - elif last_status == "FREE": - stdscr.addstr("FREE", curses.color_pair(2)) + stdscr.addstr(0, 2, f"NTP Timeturner v{VERSION}") + stdscr.addstr(1, 2, f"Using Serial Port: {port}") + + stdscr.addstr(3, 2, f"LTC Status : {status}") + if last_ltc_dt and last_frame is not None: + stdscr.addstr(4, 2, f"LTC Timecode : {last_ltc_dt.strftime('%H:%M:%S')}:{last_frame:02}") else: - stdscr.addstr("LOST", curses.color_pair(3)) + stdscr.addstr(4, 2, f"LTC Timecode : ---") - stdscr.addstr(4, 0, f"LTC Timecode : {last_ltc_dt.strftime('%H:%M:%S') if last_ltc_dt else 'n/a'}") - stdscr.addstr(5, 0, f"Frame Rate : {frame_rate:.2f}fps") - stdscr.addstr(6, 0, f"System Clock : {now.strftime('%H:%M:%S.%f')[:-3]}") - stdscr.addstr(7, 0, "Sync Offset : ") - stdscr.addstr(offset_str, curses.color_pair(4)) - stdscr.addstr(8, 0, f"Lock Ratio : {lock_count} LOCK / {free_count} FREE") - stdscr.addstr(10, 0, "[S] Set system clock to LTC [Ctrl+C] Quit") + stdscr.addstr(5, 2, f"Frame Rate : {frame_rate:.2f}fps" if frame_rate else "Frame Rate : ---") + stdscr.addstr(6, 2, f"System Clock : {now.strftime('%H:%M:%S.%f')[:-3]}") - if 'sync_feedback' in locals(): - stdscr.addstr(12, 0, sync_feedback[:curses.COLS - 1]) - del sync_feedback + if sync_offset_ms and frame_rate: + frame_duration_ms = 1000 / frame_rate + offset_frames = round(sync_offset_ms / frame_duration_ms) + stdscr.addstr(7, 2, f"Sync Offset : {sync_offset_ms:+} ms ({offset_frames:+} frames)") + else: + stdscr.addstr(7, 2, "Sync Offset : ---") + stdscr.addstr(8, 2, f"Lock Ratio : {lock_count} LOCK / {free_count} FREE") + stdscr.addstr(10, 2, "[S] Set system clock to LTC [Ctrl+C] Quit") stdscr.refresh() - stdscr.nodelay(True) + # Handle user input try: key = stdscr.getch() - if key == ord('s') or key == ord('S'): - sync_requested = True + if key in [ord('s'), ord('S')]: + if last_ltc_dt: + new_dt = last_ltc_dt.replace(microsecond=0) + date_str = new_dt.strftime("%H:%M:%S") + os.system(f"sudo date -s \"{date_str}\"") + elif key == 3: # Ctrl+C + raise KeyboardInterrupt except: pass - time.sleep(0.1) + time.sleep(0.05) if __name__ == "__main__": - curses.wrapper(run_curses) + try: + curses.wrapper(run_curses) + except KeyboardInterrupt: + print("\nExited cleanly.")