first iteration

This commit is contained in:
Chris Frankland-Wright 2025-06-24 22:02:43 +01:00
parent 0c423f30ab
commit d931f61da3

View file

@ -2,115 +2,107 @@
""" """
timeturner.py timeturner.py
NTP Timeturner LTC-to-NTP time server for Raspberry Pi NTP Timeturner Core UI
Now with a colourful LTC status light 🌈🟢 Displays LTC signal probe info using curses, updated in real-time.
""" """
import os
import sys
import time
import logging
import json
import curses import curses
from datetime import datetime, timedelta import threading
import time
import numpy as np
import sounddevice as sd
CONFIG_PATH = "config.json" # --- CONFIGURATION ---
SAMPLERATE = 48000
CHANNELS = 1
PROBE_INTERVAL = 1.0 # seconds
MIN_EDGES = 1000
DEFAULT_CONFIG = { status = {
"ltc_device": "/dev/audio", "count": 0,
"offset": { "avg_width_ms": 0.0,
"hours": 0, "short_pct": 0.0,
"minutes": 0, "long_pct": 0.0,
"seconds": 0, "verdict": "Waiting for signal...",
"milliseconds": 0
},
"frame_rate": 25
} }
TIMETURNER_SPINNER = ['', '', '', '', '🕰']
HEARTBEAT_PULSE = ['', '']
def load_config(path=CONFIG_PATH): # --- LTC PROBE THREAD ---
if not os.path.exists(path): def detect_rising_edges(signal):
logging.warning("Config file not found, using defaults.") above_zero = signal > 0
return DEFAULT_CONFIG edges = np.where(np.logical_and(~above_zero[:-1], above_zero[1:]))[0]
with open(path, "r") as f: return edges
return json.load(f)
def read_ltc_time(): def cluster_durations(durations):
return datetime.utcnow() if len(durations) < 2:
return None, None
def apply_offset(base_time, offset): durations = np.array(durations)
delta = timedelta( mean1, mean2 = np.min(durations), np.max(durations)
hours=offset.get("hours", 0),
minutes=offset.get("minutes", 0),
seconds=offset.get("seconds", 0),
milliseconds=offset.get("milliseconds", 0)
)
return base_time + delta
def set_system_time(new_time): for _ in range(10):
formatted = new_time.strftime('%Y-%m-%d %H:%M:%S') group1 = durations[np.abs(durations - mean1) < np.abs(durations - mean2)]
logging.info(f"Setting system time to: {formatted}") group2 = durations[np.abs(durations - mean1) >= np.abs(durations - mean2)]
# os.system(f"sudo timedatectl set-time \"{formatted}\"") if len(group1) == 0 or len(group2) == 0:
break
mean1 = np.mean(group1)
mean2 = np.mean(group2)
def draw_dashboard(stdscr, ltc_time, adjusted_time, config, frame): short = group1 if mean1 < mean2 else group2
stdscr.clear() long = group2 if mean1 < mean2 else group1
return short, long
# Setup strings
offset = config["offset"]
offset_str = f"{offset['hours']:02}:{offset['minutes']:02}:{offset['seconds']:02}.{offset['milliseconds']:03}"
spinner = TIMETURNER_SPINNER[frame % len(TIMETURNER_SPINNER)]
heartbeat = HEARTBEAT_PULSE[frame % len(HEARTBEAT_PULSE)]
# Hardcoded LTC status
ltc_status = "LOCKED"
ltc_colour = curses.color_pair(2) # Green
# Draw
stdscr.addstr(0, 0, f"┌───────────────────────────────┐")
stdscr.addstr(1, 0, f"{spinner} NTP Timeturner {spinner}")
stdscr.addstr(2, 0, f"├───────────────────────────────┤")
stdscr.addstr(3, 0, f"│ LTC Time: {ltc_time.strftime('%H:%M:%S:%f')[:-3]}")
stdscr.addstr(4, 0, f"│ Offset Applied: +{offset_str:<17}")
stdscr.addstr(5, 0, f"│ System Time: {adjusted_time.strftime('%H:%M:%S')}")
stdscr.addstr(6, 0, f"│ Frame Rate: {config['frame_rate']} fps │")
stdscr.addstr(7, 0, f"│ LTC Status: ")
stdscr.addstr("", ltc_colour)
stdscr.addstr(f"{ltc_status:<14}")
stdscr.addstr(8, 0, f"│ NTP Broadcast: PENDING │")
stdscr.addstr(9, 0, f"├───────────────────────────────┤")
stdscr.addstr(10, 0, f"│ System Status: {heartbeat}")
stdscr.addstr(11, 0, f"│ [Ctrl+C to exit] │")
stdscr.addstr(12, 0, f"└───────────────────────────────┘")
stdscr.refresh()
def start_timeturner(stdscr):
curses.curs_set(0)
curses.start_color()
curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK) # Not used yet
curses.init_pair(2, curses.COLOR_GREEN, curses.COLOR_BLACK) # LOCKED
curses.init_pair(3, curses.COLOR_YELLOW, curses.COLOR_BLACK) # UNSTABLE
stdscr.nodelay(True)
stdscr.timeout(1000)
config = load_config()
frame = 0
def analyze_signal():
global status
while True: while True:
try: try:
ltc_time = read_ltc_time() audio = sd.rec(int(PROBE_INTERVAL * SAMPLERATE), samplerate=SAMPLERATE,
adjusted_time = apply_offset(ltc_time, config["offset"]) channels=CHANNELS, dtype='float32')
set_system_time(adjusted_time) sd.wait()
draw_dashboard(stdscr, ltc_time, adjusted_time, config, frame) signal = audio.flatten()
frame += 1 edges = detect_rising_edges(signal)
durations = np.diff(edges) / SAMPLERATE
short, long = cluster_durations(durations)
if short is None or long is None or len(durations) < MIN_EDGES:
status["verdict"] = "❌ No signal or not enough pulses"
continue
status.update({
"count": len(durations),
"avg_width_ms": np.mean(durations) * 1000,
"short_pct": (len(short) / len(durations)) * 100,
"long_pct": (len(long) / len(durations)) * 100,
"verdict": "✅ LTC-like signal detected" if 10 <= (len(short) / len(durations)) * 100 <= 90
else "⚠️ Pulse imbalance — possible noise or non-LTC"
})
except Exception as e:
status["verdict"] = f"⚠️ Audio error: {e}"
# --- CURSES UI ---
def draw_ui(stdscr):
curses.curs_set(0)
stdscr.nodelay(True)
stdscr.timeout(500)
while True:
stdscr.clear()
stdscr.addstr(0, 2, "🕰️ NTP Timeturner - Live LTC Monitor", curses.A_BOLD)
stdscr.addstr(2, 4, f"Pulses captured: {status['count']}")
stdscr.addstr(3, 4, f"Avg pulse width: {status['avg_width_ms']:.2f} ms")
stdscr.addstr(4, 4, f"Short pulse ratio: {status['short_pct']:.1f}%")
stdscr.addstr(5, 4, f"Long pulse ratio: {status['long_pct']:.1f}%")
stdscr.addstr(7, 4, f"Status: {status['verdict']}")
stdscr.addstr(9, 4, "Press Ctrl+C to exit.")
stdscr.refresh()
try:
time.sleep(1)
except KeyboardInterrupt: except KeyboardInterrupt:
break break
# --- ENTRY POINT ---
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, probe_thread = threading.Thread(target=analyze_signal, daemon=True)
format="[%(asctime)s] %(levelname)s: %(message)s") probe_thread.start()
logging.info("✨ Timeturner console mode started.") curses.wrapper(draw_ui)
curses.wrapper(start_timeturner)