#!/usr/bin/env python3 """ engine_5d.py — 5D FDTD Wave Propagation Engine for Time Ledger Theory Source: 4D engine output at c=1.700 (resonance point). The 4D geometry becomes the pulse. No hand-placed 5D geometry. Let the 5D structure EMERGE. Test: TLT_5D_ENGINE_TEST_DESCRIPTION.txt (pre-registered) Usage: python3 engine_5d.py # Single run at c_5d=2.0 python3 engine_5d.py --sweep # Coarse sweep 1.5-2.5 python3 engine_5d.py --c 1.8 # Single run at specific c python3 engine_5d.py --sweep --fine 1.8 2.2 # Fine sweep in range """ import argparse import json import logging import math import os import sys import time as time_mod from pathlib import Path import numpy as np logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") logger = logging.getLogger("5d_engine") RESULTS_DIR = Path(__file__).parent / "results_5d" RESULTS_DIR.mkdir(exist_ok=True) # Path to 4D source data — check multiple locations _candidates = [ Path(__file__).parent / "source_4d", # Local/Hetzner deployment Path(__file__).parent.parent / "results" / "audited" / "TLT-4D-001_4d_engine", # Main repo Path(__file__).parent.parent / "audited" / "4d_engine" / "results", # Alt layout ] SOURCE_4D_DIR = None for _c in _candidates: if _c.exists(): SOURCE_4D_DIR = _c break if SOURCE_4D_DIR is None: SOURCE_4D_DIR = _candidates[0] # Will produce clear error message # ══════════════════════════════════════════════════════════════════════════════ # 4D SOURCE LOADER # ══════════════════════════════════════════════════════════════════════════════ def load_4d_source(target_n=32): """Load the 4D engine cross-section at c=1.700 and resample to target_n. The cross-section is a 3D slice (x,y,z at w=midplane) of the 4D field. Since 3D pattern is locked across w (>0.99998 correlation), this 3D pattern IS the 4D geometry's signature. """ source_file = SOURCE_4D_DIR / "cross_section_c1.700.npy" if not source_file.exists(): # Try results subdirectory source_file = SOURCE_4D_DIR / "results" / "cross_section_c1.700.npy" if not source_file.exists(): raise FileNotFoundError( f"4D source not found at {source_file}. " "Copy cross_section_c1.700.npy to the source_4d/ directory." ) data = np.load(source_file) logger.info("Loaded 4D source: shape %s, range [%.4e, %.4e]", data.shape, data.min(), data.max()) source_n = data.shape[0] if source_n == target_n: return data # Resample using scipy zoom if available, otherwise stride if source_n > target_n: try: from scipy.ndimage import zoom factor = target_n / source_n resampled = zoom(data, factor, order=1) logger.info("Resampled %d -> %d via scipy zoom", source_n, target_n) return resampled except ImportError: # Stride-based downsampling stride = source_n // target_n resampled = data[::stride, ::stride, ::stride][:target_n, :target_n, :target_n] logger.info("Resampled %d -> %d via stride=%d", source_n, target_n, stride) return resampled else: # Upsample (unlikely but handle it) try: from scipy.ndimage import zoom factor = target_n / source_n return zoom(data, factor, order=1) except ImportError: # Repeat result = np.zeros((target_n, target_n, target_n)) for i in range(target_n): for j in range(target_n): for k in range(target_n): si = int(i * source_n / target_n) sj = int(j * source_n / target_n) sk = int(k * source_n / target_n) result[i, j, k] = data[si, sj, sk] return result # ══════════════════════════════════════════════════════════════════════════════ # ENGINE CORE # ══════════════════════════════════════════════════════════════════════════════ class Engine5D: """5D FDTD wave equation engine. Equation: d²ψ/dt² = c²∇²ψ + J(x,t) Source: 4D cross-section pattern, injected at v=midplane continuously. The geometry IS the pulse. First pass: no C_potential, no overflow tracking. Pure wave propagation. """ def __init__(self, c_5d=2.0, N=32, n_periods=50, source_amplitude=1.0): """Initialize the 5D engine. Args: c_5d: 5D propagation speed (in units of c_3D=1) N: Grid size (N^5 cells) n_periods: Number of wave periods to simulate source_amplitude: Amplitude scaling for the 4D source pattern """ self.N = N self.L = 3.0 # Domain: [-L, L]^5 self.h = 2 * self.L / self.N self.c = c_5d # CFL condition for 5D: dt = safety * h / (sqrt(5) * c) c_max = max(c_5d, 2.5) self.dt = 0.8 * self.h / (math.sqrt(5.0) * c_max) # Physics parameters self.f0 = 1.0 # Base frequency self.T = 1.0 / self.f0 self.source_amplitude = source_amplitude # Simulation self.n_periods = n_periods self.steps_per_period = int(self.T / self.dt) self.total_steps = self.n_periods * self.steps_per_period # PML self.n_pml = 6 # Thinner PML for memory savings self.sigma_max = 3.0 * self.c / (2.0 * self.h * self.n_pml) # Load 4D source pattern self.source_pattern = load_4d_source(target_n=N) # Normalize source pattern to unit max sp_max = np.max(np.abs(self.source_pattern)) if sp_max > 0: self.source_pattern = self.source_pattern / sp_max # Allocate fields self._allocate() # Energy tracking self.energy_history = [] logger.info( "Engine5D initialized: N=%d, c=%.3f, h=%.4f, dt=%.6f, " "steps=%d (%.0f periods), cells=%s", self.N, self.c, self.h, self.dt, self.total_steps, self.n_periods, f"{self.N**5:,}" ) def _allocate(self): """Allocate 5D field arrays.""" N = self.N shape = (N, N, N, N, N) logger.info("Allocating 5D arrays: %s (%.1f MB each)...", shape, N**5 * 8 / 1e6) self.psi = np.zeros(shape, dtype=np.float64) self.psi_prev = np.zeros(shape, dtype=np.float64) # PML damping self.sigma = np.zeros(shape, dtype=np.float64) self._setup_pml() # Time-averaged intensity (accumulated during final periods) self.intensity_avg = np.zeros(shape, dtype=np.float64) self.intensity_count = 0 self.collecting_intensity = False mem_mb = (self.psi.nbytes * 2 + self.sigma.nbytes + self.intensity_avg.nbytes) / 1e6 logger.info("Total memory allocated: %.1f MB", mem_mb) def _setup_pml(self): """Set up 5D PML absorbing boundary.""" N = self.N for dim in range(5): for i in range(N): if i < self.n_pml: d = (self.n_pml - i) / self.n_pml elif i >= N - self.n_pml: d = (i - (N - self.n_pml - 1)) / self.n_pml else: continue sigma_val = self.sigma_max * d * d slc = [slice(None)] * 5 slc[dim] = i self.sigma[tuple(slc)] = np.maximum( self.sigma[tuple(slc)], sigma_val ) def _laplacian_5d(self, field): """Compute 5D Laplacian using central differences.""" lap = np.zeros_like(field) h2 = self.h * self.h for dim in range(5): lap += (np.roll(field, 1, axis=dim) + np.roll(field, -1, axis=dim) - 2.0 * field) / h2 return lap def _inject_source(self, t): """Inject 4D source pattern at v=midplane and w=midplane. The 4D geometry IS the pulse. The source pattern (3D cross-section) is injected at (x, y, z, w=mid, v=mid) with sinusoidal time dependence. """ mid = self.N // 2 sin_val = math.sin(2.0 * math.pi * self.f0 * t) # Inject source at w=mid, v=mid (a 3D source in 5D space) self.psi[:, :, :, mid, mid] += ( self.source_amplitude * self.source_pattern * sin_val * self.dt * self.dt ) def step(self, t): """Advance one FDTD timestep.""" # 5D Laplacian lap = self._laplacian_5d(self.psi) # FDTD update: ψ(t+dt) = 2ψ(t) - ψ(t-dt) + dt²·c²·∇²ψ dt2 = self.dt * self.dt psi_new = 2.0 * self.psi - self.psi_prev + dt2 * self.c**2 * lap # PML damping psi_new *= np.exp(-self.sigma * self.dt) # Source injection self.psi_prev = self.psi self.psi = psi_new self._inject_source(t) # Accumulate intensity (last 20% of simulation for steady state) if self.collecting_intensity: self.intensity_avg += self.psi**2 self.intensity_count += 1 def run(self): """Run the full simulation.""" logger.info("Running %d steps (%.0f periods)...", self.total_steps, self.n_periods) t_start = time_mod.monotonic() collect_start = int(self.total_steps * 0.8) # Last 20% for step_n in range(self.total_steps): t = step_n * self.dt # Start collecting intensity for measurements if step_n == collect_start: self.collecting_intensity = True logger.info(" Started intensity collection (step %d)", step_n) # FDTD step self.step(t) # Track energy periodically if step_n % max(1, self.total_steps // 100) == 0: field_energy = float(np.sum(self.psi**2) * self.h**5) self.energy_history.append({ "step": step_n, "t": round(t, 6), "field_energy": field_energy, }) # Progress log if step_n > 0 and step_n % max(1, self.total_steps // 10) == 0: elapsed = time_mod.monotonic() - t_start pct = step_n / self.total_steps * 100 field_e = float(np.sum(self.psi**2) * self.h**5) logger.info(" %.0f%% (step %d/%d, E=%.4e, %.1fs elapsed)", pct, step_n, self.total_steps, field_e, elapsed) elapsed = time_mod.monotonic() - t_start logger.info("Simulation complete: %.1f seconds (%.1f min)", elapsed, elapsed/60) return self._collect_results(elapsed) def _collect_results(self, elapsed): """Collect measurements.""" # Average intensity if self.intensity_count > 0: avg_intensity = self.intensity_avg / self.intensity_count else: avg_intensity = self.psi**2 mid = self.N // 2 # === CROSS-SECTIONS === # 4D cross-section at v=mid (comparable to 4D engine output) cross_4d_at_vmid = avg_intensity[:, :, :, mid, mid] # 3D: x,y,z # 3D cross-section at w=mid, v varies (the v-axis profile) cross_v_axis = avg_intensity[:, :, :, mid, :] # 4D: x,y,z,v # === V-AXIS INTENSITY PROFILE === # Total intensity per v-slice (the primary measurement) v_profile = np.zeros(self.N) for v_idx in range(self.N): v_profile[v_idx] = np.sum(avg_intensity[:, :, :, :, v_idx]) # === W-AXIS INTENSITY PROFILE === w_profile = np.zeros(self.N) for w_idx in range(self.N): w_profile[w_idx] = np.sum(avg_intensity[:, :, :, w_idx, :]) # === PEAK ANALYSIS === try: from scipy.ndimage import maximum_filter # Find peaks in the 4D cross-section at v=mid local_max_3d = maximum_filter(cross_4d_at_vmid, size=3) == cross_4d_at_vmid threshold_3d = np.mean(cross_4d_at_vmid) + 2 * np.std(cross_4d_at_vmid) peaks_3d = np.argwhere(local_max_3d & (cross_4d_at_vmid > threshold_3d)) n_peaks = len(peaks_3d) except ImportError: n_peaks = -1 # scipy not available # === V-PROFILE PEAK COUNT === # How many peaks in the v-axis intensity profile? v_norm = v_profile / (v_profile.max() + 1e-20) v_peaks = [] for i in range(1, len(v_norm) - 1): if v_norm[i] > v_norm[i-1] and v_norm[i] > v_norm[i+1] and v_norm[i] > 0.1: v_peaks.append((i, v_norm[i])) # === SPREAD === center = self.N // 2 # Intensity-weighted radius at v=mid cross-section y_idx, x_idx = np.mgrid[0:self.N, 0:self.N] r_dist_2d = np.sqrt((x_idx - center)**2 + (y_idx - center)**2) mid_slice = cross_4d_at_vmid[:, :, mid] # 2D: x,y at z=mid total_2d = mid_slice.sum() if total_2d > 0: spread = float((np.abs(mid_slice) * r_dist_2d).sum() / total_2d) else: spread = 0.0 # === AUTOCORRELATION === flat = avg_intensity.ravel() if len(flat) > 1 and np.std(flat) > 1e-20: # Sample for speed (full 5D array is huge) sample_size = min(100000, len(flat)) indices = np.random.RandomState(42).choice(len(flat), sample_size, replace=False) indices.sort() sample = flat[indices] if np.std(sample) > 1e-20: autocorr = float(np.corrcoef(sample[:-1], sample[1:])[0, 1]) else: autocorr = 0.0 else: autocorr = 0.0 # === COMPILE RESULTS === result = { "c_5d": self.c, "N": self.N, "total_steps": self.total_steps, "elapsed_seconds": round(elapsed, 2), "elapsed_minutes": round(elapsed / 60, 1), "field_energy_final": float(np.sum(self.psi**2) * self.h**5), "n_peaks_3d": n_peaks, "n_peaks_v_profile": len(v_peaks), "v_peak_positions": [int(p[0]) for p in v_peaks], "v_peak_values": [float(p[1]) for p in v_peaks], "spread_at_vmid": spread, "autocorrelation": autocorr, "intensity_std": float(np.std(avg_intensity)), "peak_to_noise": float(np.max(avg_intensity) / (np.std(avg_intensity) + 1e-20)), "v_profile": [float(x) for x in v_profile], "w_profile": [float(x) for x in w_profile], "energy_history_len": len(self.energy_history), } return result, cross_4d_at_vmid, v_profile, w_profile # ══════════════════════════════════════════════════════════════════════════════ # SWEEP # ══════════════════════════════════════════════════════════════════════════════ def run_sweep(c_values, N=32, n_periods=50): """Run framerate sweep across c_5d values.""" logger.info("═" * 60) logger.info("5D FRAMERATE SWEEP: %d values from %.3f to %.3f", len(c_values), c_values[0], c_values[-1]) logger.info("N=%d (%s cells), %d periods each", N, f"{N**5:,}", n_periods) logger.info("═" * 60) all_results = [] for i, c_val in enumerate(c_values): logger.info("\n═══ [%d/%d] c_5D = %.3f ═══", i+1, len(c_values), c_val) try: engine = Engine5D(c_5d=c_val, N=N, n_periods=n_periods) result, cross_section, v_profile, w_profile = engine.run() all_results.append(result) # Save cross-section and profiles np.save(RESULTS_DIR / f"cross_section_5d_c{c_val:.3f}.npy", cross_section) np.save(RESULTS_DIR / f"v_profile_c{c_val:.3f}.npy", v_profile) np.save(RESULTS_DIR / f"w_profile_c{c_val:.3f}.npy", w_profile) # Save individual result with open(RESULTS_DIR / f"result_c{c_val:.3f}.json", "w") as f: json.dump(result, f, indent=2) # Free memory del engine except Exception as e: logger.error("FAILED at c=%.3f: %s", c_val, str(e)) all_results.append({"c_5d": c_val, "error": str(e)}) # Save all results with open(RESULTS_DIR / "sweep_results_5d.json", "w") as f: json.dump(all_results, f, indent=2) # Print summary print("\n" + "═" * 80) print("5D SWEEP RESULTS") print("═" * 80) print(f"{'c_5D':>8s} {'V-Peaks':>8s} {'3D-Peaks':>8s} {'AutoCorr':>10s} " f"{'P/N':>10s} {'Spread':>8s} {'Time(m)':>8s}") print("-" * 80) for r in all_results: if "error" in r: print(f"{r['c_5d']:8.3f} ERROR: {r['error'][:50]}") else: print(f"{r['c_5d']:8.3f} {r['n_peaks_v_profile']:8d} " f"{r['n_peaks_3d']:8d} {r['autocorrelation']:10.4f} " f"{r['peak_to_noise']:10.2f} {r['spread_at_vmid']:8.2f} " f"{r['elapsed_minutes']:8.1f}") print("═" * 80) # Check predictions print("\n── PRE-REGISTERED PREDICTIONS ──") for r in all_results: if "error" not in r and r['n_peaks_v_profile'] == 3: print(f" PREDICTION 1 (3 v-peaks): MATCH at c={r['c_5d']:.3f}") return all_results # ══════════════════════════════════════════════════════════════════════════════ # MAIN # ══════════════════════════════════════════════════════════════════════════════ if __name__ == "__main__": parser = argparse.ArgumentParser(description="5D FDTD Wave Engine for TLT") parser.add_argument("--sweep", action="store_true", help="Run framerate sweep") parser.add_argument("--c", type=float, default=2.0, help="5D propagation speed") parser.add_argument("--N", type=int, default=32, help="Grid size (N^5)") parser.add_argument("--periods", type=int, default=50, help="Periods to simulate") parser.add_argument("--fine", type=float, nargs=2, metavar=("LOW", "HIGH"), help="Fine sweep range (step 0.025)") args = parser.parse_args() if args.sweep: if args.fine: low, high = args.fine c_values = [round(low + i * 0.025, 3) for i in range(int((high - low) / 0.025) + 1)] else: # Coarse sweep: 1.5 to 2.5 step 0.1 c_values = [round(1.5 + i * 0.1, 1) for i in range(11)] results = run_sweep(c_values, N=args.N, n_periods=args.periods) else: logger.info("Single run: c_5D=%.3f, N=%d, %d periods", args.c, args.N, args.periods) engine = Engine5D(c_5d=args.c, N=args.N, n_periods=args.periods) result, cross_section, v_profile, w_profile = engine.run() np.save(RESULTS_DIR / f"cross_section_5d_c{args.c:.3f}.npy", cross_section) np.save(RESULTS_DIR / f"v_profile_c{args.c:.3f}.npy", v_profile) np.save(RESULTS_DIR / f"w_profile_c{args.c:.3f}.npy", w_profile) with open(RESULTS_DIR / f"result_c{args.c:.3f}.json", "w") as f: json.dump(result, f, indent=2) print(json.dumps(result, indent=2))