#!/usr/bin/env python3 """ geometry_probe.py — 4D Geometry Probe: Multi-Slice W-Profile Probes the 4D interference pattern shape by saving intensity cross-sections at 9 w-slices (along the 4th spatial dimension) for each c_4D value. DOES NOT modify the engine physics. Uses engine_4d.Engine4D exactly as-is. Only the measurement extraction is new (multi-slice instead of midplane-only). Test description: TLT_4D_GEOMETRY_PROBE_test_description.txt Pre-registered: 2026-03-21 Usage: python3 geometry_probe.py # Full sweep 1.618-1.810 python3 geometry_probe.py --c 1.716 # Single c value python3 geometry_probe.py --phase 3 # Phase 3 resolution """ import argparse import json import logging import math import os import sys import time as time_mod from pathlib import Path import numpy as np # Import the UNMODIFIED engine sys.path.insert(0, str(Path(__file__).parent.parent / "audited" / "4d_engine")) from engine_4d import Engine4D, get_form_a_vertices, get_form_b_vertices_normalized logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") logger = logging.getLogger("geometry_probe") RESULTS_DIR = Path(__file__).parent.parent.parent / "results" / "unaudited" / "geometry_probe" RESULTS_DIR.mkdir(parents=True, exist_ok=True) def get_w_slice_indices(N, n_pml=10): """Return 9 evenly-spaced w-indices avoiding PML boundary layers. The usable range is [n_pml, N-1-n_pml]. We place 9 points evenly within this range including both endpoints. """ lo = n_pml hi = N - 1 - n_pml indices = [int(round(lo + i * (hi - lo) / 8)) for i in range(9)] return indices def analyze_3d_slice(data_3d, h, L, N): """Analyze a single 3D cross-section slice. Returns dict with: integrated_intensity, n_peaks, radial_extent_90, center_of_mass_offset, peak_positions (top 30). """ from scipy.ndimage import maximum_filter integrated = float(np.sum(data_3d) * h ** 3) mean_val = np.mean(data_3d) std_val = np.std(data_3d) # Peak detection if std_val > 1e-15: local_max = maximum_filter(data_3d, size=3) == data_3d threshold = mean_val + 2 * std_val peaks = np.argwhere(local_max & (data_3d > threshold)) n_peaks = len(peaks) else: peaks = np.array([]).reshape(0, 3) n_peaks = 0 # Top 30 peak positions with intensities peak_positions = [] if n_peaks > 0: peak_vals = [float(data_3d[tuple(p)]) for p in peaks] sorted_idx = np.argsort(peak_vals)[::-1][:30] for idx in sorted_idx: p = peaks[idx] peak_positions.append({ "grid": [int(p[0]), int(p[1]), int(p[2])], "physical": [float((p[d] * h) - L) for d in range(3)], "intensity": peak_vals[idx], }) # Radial extent (radius enclosing 90% of integrated intensity) mid = N // 2 total = np.sum(data_3d) if total > 0: # Vectorized radial binning (replaces O(N^3) triple loop) r_max = mid coords = np.indices(data_3d.shape).astype(np.float64) r_field = np.sqrt((coords[0] - mid)**2 + (coords[1] - mid)**2 + (coords[2] - mid)**2).astype(int) r_field = np.clip(r_field, 0, r_max) radial_cumulative = np.bincount(r_field.ravel(), weights=data_3d.ravel(), minlength=r_max + 1)[:r_max + 1] # Find 90% enclosure radius cumsum = np.cumsum(radial_cumulative) r90_idx = np.searchsorted(cumsum, 0.9 * total) radial_extent_90 = float(r90_idx * h) else: radial_extent_90 = 0.0 # Center of mass offset from geometric center if total > 0: coords = np.indices(data_3d.shape).astype(float) com = np.array([np.sum(coords[d] * data_3d) / total for d in range(3)]) com_physical = com * h - L com_offset = float(np.linalg.norm(com_physical)) else: com_offset = 0.0 return { "integrated_intensity": integrated, "n_peaks": n_peaks, "radial_extent_90": radial_extent_90, "center_of_mass_offset": com_offset, "peak_positions": peak_positions, } def run_geometry_probe(c_4d, phase=2, n_periods=50): """Run a single geometry probe at the given c_4D value.""" logger.info("Geometry probe: c_4D=%.4f, phase=%d, %d periods", c_4d, phase, n_periods) engine = Engine4D(c_4d=c_4d, phase=phase, n_periods=n_periods) result, _, full_intensity = engine.run() # Get w-slice indices w_indices = get_w_slice_indices(engine.N, engine.n_pml) # Final time-averaged intensity final_intensity = engine.intensity_avg / max(engine.intensity_count, 1) # Analyze each w-slice w_slices = {} slice_stack = [] for w_idx in w_indices: slice_3d = final_intensity[:, :, :, w_idx] slice_stack.append(slice_3d) w_physical = (w_idx * engine.h) - engine.L analysis = analyze_3d_slice(slice_3d, engine.h, engine.L, engine.N) analysis["w_index"] = w_idx analysis["w_physical"] = float(w_physical) w_slices[str(w_idx)] = analysis # W-profile: integrated intensity as function of w w_profile = { "w_indices": w_indices, "w_physical": [float((w * engine.h) - engine.L) for w in w_indices], "integrated_intensity": [w_slices[str(w)]["integrated_intensity"] for w in w_indices], "n_peaks": [w_slices[str(w)]["n_peaks"] for w in w_indices], "radial_extent_90": [w_slices[str(w)]["radial_extent_90"] for w in w_indices], "com_offset": [w_slices[str(w)]["center_of_mass_offset"] for w in w_indices], } # W-profile aggregate metrics intensities = w_profile["integrated_intensity"] max_intensity = max(intensities) if max_intensity > 0: # FWHM half_max = max_intensity / 2 above_half = [i for i, v in enumerate(intensities) if v >= half_max] if above_half: fwhm_indices = above_half[-1] - above_half[0] + 1 fwhm_physical = fwhm_indices * (w_profile["w_physical"][-1] - w_profile["w_physical"][0]) / (len(intensities) - 1) else: fwhm_physical = 0.0 # Asymmetry: left half vs right half mid_slice = len(intensities) // 2 left_sum = sum(intensities[:mid_slice]) right_sum = sum(intensities[mid_slice + 1:]) asymmetry = float((right_sum - left_sum) / (right_sum + left_sum)) if (right_sum + left_sum) > 0 else 0.0 else: fwhm_physical = 0.0 asymmetry = 0.0 # Peak shift: do peak x,y,z positions drift across w-slices? peak_centroids = [] for w_idx in w_indices: peaks = w_slices[str(w_idx)]["peak_positions"] if peaks: avg_pos = np.mean([p["physical"] for p in peaks[:10]], axis=0) peak_centroids.append(avg_pos.tolist()) else: peak_centroids.append([0.0, 0.0, 0.0]) if len(peak_centroids) >= 2 and any(np.any(np.array(p) != 0) for p in peak_centroids): peak_drift = float(np.linalg.norm( np.array(peak_centroids[-1]) - np.array(peak_centroids[0]) )) else: peak_drift = 0.0 # Compile full result probe_result = { **result, # All standard engine metrics "probe_type": "geometry_probe", "w_slice_count": len(w_indices), "w_profile": w_profile, "w_fwhm": float(fwhm_physical), "w_asymmetry": float(asymmetry), "w_peak_drift": float(peak_drift), "w_slices": w_slices, } # Save results c_str = f"{c_4d:.3f}" with open(RESULTS_DIR / f"result_c{c_str}.json", "w") as f: json.dump(probe_result, f, indent=2) # Save slice stack as compressed numpy np.savez_compressed( RESULTS_DIR / f"slices_c{c_str}.npz", **{f"w{w_idx}": s for w_idx, s in zip(w_indices, slice_stack)} ) logger.info(" c=%.3f: fwhm=%.3f, asymmetry=%.4f, peak_drift=%.4f, " "w_peaks=%s", c_4d, fwhm_physical, asymmetry, peak_drift, [w_slices[str(w)]["n_peaks"] for w in w_indices]) return probe_result def run_full_probe(phase=2, n_periods=50): """Run the full geometry probe sweep: c = 1.618 to 1.810, step 0.004.""" c_values = [round(1.618 + i * 0.004, 3) for i in range(49)] # Ensure we include exact key values key_values = [1.618, 1.625, 1.667, 1.700, 1.707, 1.716, 1.732, 1.750, 1.800] for kv in key_values: if not any(abs(c - kv) < 0.001 for c in c_values): c_values.append(kv) c_values = sorted(set([round(c, 3) for c in c_values])) logger.info("=" * 60) logger.info("GEOMETRY PROBE SWEEP: %d values from %.3f to %.3f", len(c_values), c_values[0], c_values[-1]) logger.info("Phase %d (N=%d), %d periods, 9 w-slices each", phase, {1: 32, 2: 48, 3: 64}.get(phase, 48), n_periods) logger.info("=" * 60) all_results = [] t_start = time_mod.monotonic() for i, c_val in enumerate(c_values): logger.info("\n--- [%d/%d] c_4D = %.3f ---", i + 1, len(c_values), c_val) probe = run_geometry_probe(c_val, phase=phase, n_periods=n_periods) all_results.append(probe) elapsed = time_mod.monotonic() - t_start # Save aggregate summary summary = { "probe_type": "geometry_probe_sweep", "phase": phase, "n_periods": n_periods, "n_c_values": len(c_values), "c_range": [c_values[0], c_values[-1]], "total_elapsed_seconds": round(elapsed, 1), "results": all_results, } with open(RESULTS_DIR / "probe_summary.json", "w") as f: json.dump(summary, f, indent=2) # Print summary table print("\n" + "=" * 90) print("GEOMETRY PROBE RESULTS") print("=" * 90) print(f"{'c_4D':>7s} {'Peaks':>5s} {'P/N':>10s} {'FWHM':>8s} " f"{'Asym':>8s} {'Drift':>8s} {'W-peaks (9 slices)':>30s}") print("-" * 90) for r in all_results: wp = [r["w_profile"]["n_peaks"][i] for i in range(len(r["w_profile"]["n_peaks"]))] print(f"{r['c_4d']:7.3f} {r['n_peaks']:5d} {r['peak_to_noise']:10.2f} " f"{r['w_fwhm']:8.3f} {r['w_asymmetry']:8.4f} {r['w_peak_drift']:8.4f} " f"{wp}") print("-" * 90) print(f"Total runtime: {elapsed/3600:.1f} hours") print(f"Results: {RESULTS_DIR}") return all_results if __name__ == "__main__": parser = argparse.ArgumentParser(description="4D Geometry Probe") parser.add_argument("--c", type=float, default=None, help="Single c value") parser.add_argument("--phase", type=int, default=2, choices=[1, 2, 3]) parser.add_argument("--periods", type=int, default=50) args = parser.parse_args() if args.c is not None: run_geometry_probe(args.c, phase=args.phase, n_periods=args.periods) else: run_full_probe(phase=args.phase, n_periods=args.periods)