diff --git a/timeturner.py b/timeturner.py index 2d34ae2..943b32e 100644 --- a/timeturner.py +++ b/timeturner.py @@ -2,115 +2,107 @@ """ timeturner.py -NTP Timeturner — LTC-to-NTP time server for Raspberry Pi -Now with a colourful LTC status light 🌈🟢 +NTP Timeturner Core UI +Displays LTC signal probe info using curses, updated in real-time. """ -import os -import sys -import time -import logging -import json import curses -from datetime import datetime, timedelta +import threading +import time +import numpy as np +import sounddevice as sd -CONFIG_PATH = "config.json" +# --- CONFIGURATION --- +SAMPLERATE = 48000 +CHANNELS = 1 +PROBE_INTERVAL = 1.0 # seconds +MIN_EDGES = 1000 -DEFAULT_CONFIG = { - "ltc_device": "/dev/audio", - "offset": { - "hours": 0, - "minutes": 0, - "seconds": 0, - "milliseconds": 0 - }, - "frame_rate": 25 +status = { + "count": 0, + "avg_width_ms": 0.0, + "short_pct": 0.0, + "long_pct": 0.0, + "verdict": "Waiting for signal...", } -TIMETURNER_SPINNER = ['⧗', '⧖', '⧗', '⧖', '🕰'] -HEARTBEAT_PULSE = ['●', '○'] -def load_config(path=CONFIG_PATH): - if not os.path.exists(path): - logging.warning("Config file not found, using defaults.") - return DEFAULT_CONFIG - with open(path, "r") as f: - return json.load(f) +# --- LTC PROBE THREAD --- +def detect_rising_edges(signal): + above_zero = signal > 0 + edges = np.where(np.logical_and(~above_zero[:-1], above_zero[1:]))[0] + return edges -def read_ltc_time(): - return datetime.utcnow() +def cluster_durations(durations): + if len(durations) < 2: + return None, None -def apply_offset(base_time, offset): - delta = timedelta( - hours=offset.get("hours", 0), - minutes=offset.get("minutes", 0), - seconds=offset.get("seconds", 0), - milliseconds=offset.get("milliseconds", 0) - ) - return base_time + delta + durations = np.array(durations) + mean1, mean2 = np.min(durations), np.max(durations) -def set_system_time(new_time): - formatted = new_time.strftime('%Y-%m-%d %H:%M:%S') - logging.info(f"Setting system time to: {formatted}") - # os.system(f"sudo timedatectl set-time \"{formatted}\"") + for _ in range(10): + group1 = durations[np.abs(durations - mean1) < np.abs(durations - mean2)] + group2 = durations[np.abs(durations - mean1) >= np.abs(durations - mean2)] + if len(group1) == 0 or len(group2) == 0: + break + mean1 = np.mean(group1) + mean2 = np.mean(group2) -def draw_dashboard(stdscr, ltc_time, adjusted_time, config, frame): - stdscr.clear() - - # Setup strings - offset = config["offset"] - offset_str = f"{offset['hours']:02}:{offset['minutes']:02}:{offset['seconds']:02}.{offset['milliseconds']:03}" - spinner = TIMETURNER_SPINNER[frame % len(TIMETURNER_SPINNER)] - heartbeat = HEARTBEAT_PULSE[frame % len(HEARTBEAT_PULSE)] - - # Hardcoded LTC status - ltc_status = "LOCKED" - ltc_colour = curses.color_pair(2) # Green - - # Draw - stdscr.addstr(0, 0, f"┌───────────────────────────────┐") - stdscr.addstr(1, 0, f"│ {spinner} NTP Timeturner {spinner} │") - stdscr.addstr(2, 0, f"├───────────────────────────────┤") - stdscr.addstr(3, 0, f"│ LTC Time: {ltc_time.strftime('%H:%M:%S:%f')[:-3]} │") - stdscr.addstr(4, 0, f"│ Offset Applied: +{offset_str:<17}│") - stdscr.addstr(5, 0, f"│ System Time: {adjusted_time.strftime('%H:%M:%S')} │") - stdscr.addstr(6, 0, f"│ Frame Rate: {config['frame_rate']} fps │") - stdscr.addstr(7, 0, f"│ LTC Status: ") - stdscr.addstr("● ", ltc_colour) - stdscr.addstr(f"{ltc_status:<14}│") - stdscr.addstr(8, 0, f"│ NTP Broadcast: PENDING │") - stdscr.addstr(9, 0, f"├───────────────────────────────┤") - stdscr.addstr(10, 0, f"│ System Status: {heartbeat} │") - stdscr.addstr(11, 0, f"│ [Ctrl+C to exit] │") - stdscr.addstr(12, 0, f"└───────────────────────────────┘") - - stdscr.refresh() - -def start_timeturner(stdscr): - curses.curs_set(0) - curses.start_color() - curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK) # Not used yet - curses.init_pair(2, curses.COLOR_GREEN, curses.COLOR_BLACK) # LOCKED - curses.init_pair(3, curses.COLOR_YELLOW, curses.COLOR_BLACK) # UNSTABLE - - stdscr.nodelay(True) - stdscr.timeout(1000) - - config = load_config() - frame = 0 + short = group1 if mean1 < mean2 else group2 + long = group2 if mean1 < mean2 else group1 + return short, long +def analyze_signal(): + global status while True: try: - ltc_time = read_ltc_time() - adjusted_time = apply_offset(ltc_time, config["offset"]) - set_system_time(adjusted_time) - draw_dashboard(stdscr, ltc_time, adjusted_time, config, frame) - frame += 1 + audio = sd.rec(int(PROBE_INTERVAL * SAMPLERATE), samplerate=SAMPLERATE, + channels=CHANNELS, dtype='float32') + sd.wait() + signal = audio.flatten() + edges = detect_rising_edges(signal) + durations = np.diff(edges) / SAMPLERATE + short, long = cluster_durations(durations) + + if short is None or long is None or len(durations) < MIN_EDGES: + status["verdict"] = "❌ No signal or not enough pulses" + continue + + status.update({ + "count": len(durations), + "avg_width_ms": np.mean(durations) * 1000, + "short_pct": (len(short) / len(durations)) * 100, + "long_pct": (len(long) / len(durations)) * 100, + "verdict": "✅ LTC-like signal detected" if 10 <= (len(short) / len(durations)) * 100 <= 90 + else "⚠️ Pulse imbalance — possible noise or non-LTC" + }) + except Exception as e: + status["verdict"] = f"⚠️ Audio error: {e}" + +# --- CURSES UI --- +def draw_ui(stdscr): + curses.curs_set(0) + stdscr.nodelay(True) + stdscr.timeout(500) + + while True: + stdscr.clear() + stdscr.addstr(0, 2, "🕰️ NTP Timeturner - Live LTC Monitor", curses.A_BOLD) + stdscr.addstr(2, 4, f"Pulses captured: {status['count']}") + stdscr.addstr(3, 4, f"Avg pulse width: {status['avg_width_ms']:.2f} ms") + stdscr.addstr(4, 4, f"Short pulse ratio: {status['short_pct']:.1f}%") + stdscr.addstr(5, 4, f"Long pulse ratio: {status['long_pct']:.1f}%") + stdscr.addstr(7, 4, f"Status: {status['verdict']}") + stdscr.addstr(9, 4, "Press Ctrl+C to exit.") + stdscr.refresh() + + try: + time.sleep(1) except KeyboardInterrupt: break +# --- ENTRY POINT --- if __name__ == "__main__": - logging.basicConfig(level=logging.INFO, - format="[%(asctime)s] %(levelname)s: %(message)s") - logging.info("✨ Timeturner console mode started.") - curses.wrapper(start_timeturner) + probe_thread = threading.Thread(target=analyze_signal, daemon=True) + probe_thread.start() + curses.wrapper(draw_ui)