added hardware offset with config.json

This commit is contained in:
Chris Frankland-Wright 2025-07-07 22:31:53 +01:00 committed by GitHub
parent 5c6add264f
commit 2672563d2b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -7,11 +7,13 @@ import subprocess
import os import os
import threading import threading
import queue import queue
import json
from collections import deque from collections import deque
SERIAL_PORT = None SERIAL_PORT = None
BAUD_RATE = 115200 BAUD_RATE = 115200
FRAME_RATE = 25.0 FRAME_RATE = 25.0
CONFIG_PATH = "config.json"
sync_pending = False sync_pending = False
ltc_data_queue = queue.Queue() ltc_data_queue = queue.Queue()
@ -20,6 +22,16 @@ offset_history = deque(maxlen=20)
lock_total = 0 lock_total = 0
free_total = 0 free_total = 0
hardware_offset_ms = 0
def load_config():
global hardware_offset_ms
try:
with open(CONFIG_PATH, "r") as f:
config = json.load(f)
hardware_offset_ms = int(config.get("hardware_offset_ms", 0))
except Exception:
hardware_offset_ms = 0
def find_teensy_serial(): def find_teensy_serial():
for dev in os.listdir('/dev'): for dev in os.listdir('/dev'):
@ -66,6 +78,8 @@ def run_curses(stdscr):
stdscr.nodelay(True) stdscr.nodelay(True)
stdscr.timeout(50) stdscr.timeout(50)
load_config()
SERIAL_PORT = find_teensy_serial() SERIAL_PORT = find_teensy_serial()
if not SERIAL_PORT: if not SERIAL_PORT:
stdscr.addstr(0, 0, "❌ No serial device found.") stdscr.addstr(0, 0, "❌ No serial device found.")
@ -78,19 +92,16 @@ def run_curses(stdscr):
while True: while True:
try: try:
# Pull latest from queue
while not ltc_data_queue.empty(): while not ltc_data_queue.empty():
parsed, arrival_time = ltc_data_queue.get_nowait() parsed, arrival_time = ltc_data_queue.get_nowait()
latest_ltc = (parsed, arrival_time) latest_ltc = (parsed, arrival_time)
# Update lock ratio counters
if parsed["status"] == "LOCK": if parsed["status"] == "LOCK":
lock_total += 1 lock_total += 1
else: else:
free_total += 1 free_total += 1
# Record sync offset offset_ms = (get_system_time() - arrival_time).total_seconds() * 1000 - hardware_offset_ms
offset_ms = (get_system_time() - arrival_time).total_seconds() * 1000
offset_frames = offset_ms / (1000 / parsed["frame_rate"]) offset_frames = offset_ms / (1000 / parsed["frame_rate"])
offset_history.append((offset_ms, offset_frames)) offset_history.append((offset_ms, offset_frames))
@ -98,9 +109,8 @@ def run_curses(stdscr):
do_sync(stdscr, parsed, arrival_time) do_sync(stdscr, parsed, arrival_time)
sync_pending = False sync_pending = False
# Draw UI
stdscr.erase() stdscr.erase()
stdscr.addstr(0, 2, "NTP Timeturner v1.0") stdscr.addstr(0, 2, "NTP Timeturner v1.1")
stdscr.addstr(1, 2, f"Using Serial Port: {SERIAL_PORT}") stdscr.addstr(1, 2, f"Using Serial Port: {SERIAL_PORT}")
if latest_ltc: if latest_ltc:
@ -116,7 +126,6 @@ def run_curses(stdscr):
avg_ms = sum(x[0] for x in offset_history) / len(offset_history) avg_ms = sum(x[0] for x in offset_history) / len(offset_history)
avg_frames = sum(x[1] for x in offset_history) / len(offset_history) avg_frames = sum(x[1] for x in offset_history) / len(offset_history)
# Optional colour coding
if abs(avg_ms) < 10: if abs(avg_ms) < 10:
color = curses.color_pair(2) color = curses.color_pair(2)
elif abs(avg_ms) < 40: elif abs(avg_ms) < 40:
@ -162,7 +171,7 @@ def do_sync(stdscr, parsed, arrival_time):
hour=parsed["hours"], hour=parsed["hours"],
minute=parsed["minutes"], minute=parsed["minutes"],
second=parsed["seconds"], second=parsed["seconds"],
microsecond=ms * 1000 microsecond=(ms + hardware_offset_ms) * 1000
) )
timestamp = sync_time.strftime("%H:%M:%S.%f")[:-3] timestamp = sync_time.strftime("%H:%M:%S.%f")[:-3]
subprocess.run(["sudo", "date", "-s", timestamp], check=True) subprocess.run(["sudo", "date", "-s", timestamp], check=True)
@ -171,10 +180,9 @@ def do_sync(stdscr, parsed, arrival_time):
stdscr.addstr(13, 2, f"❌ Sync failed: {e}") stdscr.addstr(13, 2, f"❌ Sync failed: {e}")
if __name__ == "__main__": if __name__ == "__main__":
# Optional: enable basic color scheme
curses.initscr() curses.initscr()
curses.start_color() curses.start_color()
curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK) # bad curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK)
curses.init_pair(2, curses.COLOR_GREEN, curses.COLOR_BLACK) # perfect curses.init_pair(2, curses.COLOR_GREEN, curses.COLOR_BLACK)
curses.init_pair(3, curses.COLOR_YELLOW, curses.COLOR_BLACK) # ok curses.init_pair(3, curses.COLOR_YELLOW, curses.COLOR_BLACK)
curses.wrapper(run_curses) curses.wrapper(run_curses)