#!/usr/bin/env python3 """ CMB Angular Power Spectrum Comparison ================================================================================ Computes the angular power spectrum from a v6c simulation run and compares the structure to the known CMB power spectrum pattern. Method: 1. Run v6c for a short period to build up 3D structure 2. Extract a spherical shell from the 3D intensity_cumulative field (analogous to the CMB being a spherical shell at last scattering) 3. Compute the angular power spectrum C(l) via spherical harmonics 4. Compare peak positions and relative amplitudes to CMB data The CMB acoustic peaks are at multipoles l ≈ 200, 540, 800. Our grid is 128³, so we can resolve up to l_max ≈ N/2 = 64. This means we can't reach l=200 directly, but we CAN check: - Whether the power spectrum has PEAKS (not flat) - The RATIO between peak positions (should be ~1:2.7:4 for CMB) - The overall spectral shape (rising to first peak, then oscillating) We also measure sigma/mean at different fill levels to find what fill level corresponds to the CMB's ΔT/T ≈ 10⁻⁵. """ import json import math import numpy as np import time as time_mod from datetime import datetime, timezone from pathlib import Path # Import the simulation import sys sys.path.insert(0, str(Path(__file__).parent)) from importlib import import_module # We need the simulation classes — import from the v6c module spec = import_module("SIM-003_v6c_wave_reinjection") CascadeEngine = spec.CascadeEngine DimensionalGrid = spec.DimensionalGrid RESULTS_DIR = Path(__file__).parent / "results_cmb_comparison" RESULTS_DIR.mkdir(exist_ok=True) def extract_spherical_shell(grid_3d, radius_frac=0.5): """Extract intensity values on a spherical shell at given radius fraction. Returns: (theta_array, phi_array, values_array) theta: polar angle [0, pi] phi: azimuthal angle [0, 2*pi] """ N = grid_3d.N ic = grid_3d.intensity_cumulative.astype(np.float64) cx, cy, cz = N // 2, N // 2, N // 2 R = int(radius_frac * N / 2) if R < 2: R = 2 # Sample the shell at regular angular intervals n_theta = 180 # polar resolution n_phi = 360 # azimuthal resolution thetas = np.linspace(0.01, np.pi - 0.01, n_theta) phis = np.linspace(0, 2 * np.pi, n_phi, endpoint=False) shell = np.zeros((n_theta, n_phi)) for i, theta in enumerate(thetas): for j, phi in enumerate(phis): x = int(cx + R * np.sin(theta) * np.cos(phi)) y = int(cy + R * np.sin(theta) * np.sin(phi)) z = int(cz + R * np.cos(theta)) if 0 <= x < N and 0 <= y < N and 0 <= z < N: shell[i, j] = ic[x, y, z] return thetas, phis, shell def angular_power_spectrum(shell, l_max=60): """Compute angular power spectrum C(l) from a spherical shell. Uses the azimuthal FFT at each latitude ring, then combines to approximate the spherical harmonic decomposition. For a proper Cl analysis you'd use healpy/SHT, but this gives the essential structure: where the peaks are and their ratios. """ n_theta, n_phi = shell.shape # Remove mean (like CMB: analyze fluctuations, not baseline) mean_val = np.mean(shell) if mean_val > 0: delta = (shell - mean_val) / mean_val # fractional fluctuation (like ΔT/T) else: delta = shell # Compute power spectrum # Method: FFT along phi (azimuthal) at each theta, then average over theta # This gives C(m) where m is the azimuthal mode number # For a statistically isotropic field, C(l) ≈ C(m) at m=l Cl = np.zeros(l_max + 1) for i in range(n_theta): row = delta[i, :] fft = np.fft.rfft(row) power = np.abs(fft) ** 2 # Weight by sin(theta) for proper spherical measure weight = np.sin(np.linspace(0.01, np.pi - 0.01, n_theta)[i]) for l in range(min(l_max + 1, len(power))): Cl[l] += power[l] * weight # Normalize total_weight = np.sum(np.sin(np.linspace(0.01, np.pi - 0.01, n_theta))) if total_weight > 0: Cl /= total_weight # Convention: l(l+1)Cl / 2pi (standard CMB plot normalization) l_arr = np.arange(l_max + 1) Dl = l_arr * (l_arr + 1) * Cl / (2 * np.pi) Dl[0] = 0 # monopole Dl[1] = 0 # dipole (remove like CMB) return l_arr, Cl, Dl def measure_fluctuation_history(engine, n_cycles, sample_interval=50): """Run simulation and record sigma/mean at intervals. This tells us what fill level produces CMB-like fluctuation amplitudes. """ spc = int(engine.T_pulse / engine.dt_master) total_steps = n_cycles * spc history = [] report_steps = sample_interval * spc eq_measured = {1: False, 2: False, 3: False} for step in range(total_steps): t = step * engine.dt_master # Run one step of the cascade (simplified from engine.run) overflow_1d = engine.grids[1].step(t) if 1 in engine.grids else 0 if not eq_measured[1] and engine.grids[1].is_equilibrium: engine._measure_overflow_quantum(1) eq_measured[1] = True if overflow_1d > 0: engine.overflow_buffer[1] += overflow_1d overflow_2d = 0.0 if 2 in engine.grids: q1 = engine.overflow_quantum[1] if q1 is not None: if not engine.grids[2].active and engine.overflow_buffer[1] >= q1: engine.grids[2].active = True if engine.grids[2].active: if not engine.grids[2].injecting and engine.overflow_buffer[1] >= q1: engine.grids[2].start_cascade_pulse(q1, t) engine.overflow_buffer[1] -= q1 engine.pulse_count[1] += 1 for _ in range(max(1, int(engine.grids[2].dt / engine.dt_master))): overflow_2d = engine.grids[2].step(t) if not eq_measured[2] and engine.grids[2].is_equilibrium: engine._measure_overflow_quantum(2) eq_measured[2] = True if overflow_2d > 0: engine.overflow_buffer[2] += overflow_2d if 3 in engine.grids: q2 = engine.overflow_quantum[2] if q2 is not None: if not engine.grids[3].active and engine.overflow_buffer[2] >= q2: engine.grids[3].active = True if engine.grids[3].active: if not engine.grids[3].injecting and engine.overflow_buffer[2] >= q2: engine.grids[3].start_cascade_pulse(q2, t) engine.overflow_buffer[2] -= q2 engine.pulse_count[2] += 1 for _ in range(max(1, int(engine.grids[3].dt / engine.dt_master))): engine.grids[3].step(t) if not eq_measured[3] and engine.grids[3].is_equilibrium: engine._measure_overflow_quantum(3) eq_measured[3] = True # Sample at intervals if step > 0 and step % report_steps == 0: cycle = step / spc for d in sorted(engine.grids): g = engine.grids[d] r = g.r_local mean_r = float(np.mean(r)) std_r = float(np.std(r)) min_r = float(np.min(r)) max_r = float(np.max(r)) global_fill = g._get_global_fill() sigma_over_mean = std_r / mean_r if mean_r > 0 else 0 history.append({ "cycle": int(cycle), "dim": d, "global_r": g.global_r, "global_fill": global_fill, "mean_r": mean_r, "std_r": std_r, "min_r": min_r, "max_r": max_r, "sigma_over_mean": sigma_over_mean, "is_eq": g.is_equilibrium, "active": g.active, }) # Progress s3 = engine.grids.get(3) if s3 and s3.active: print(f" c={cycle:.0f} 3D: gr={s3.global_r:.4f} σ/μ={history[-1]['sigma_over_mean']:.6f} " f"fill={s3._get_global_fill():.3f}") return history def main(): import argparse parser = argparse.ArgumentParser() parser.add_argument("--grid-3d", type=int, default=128) parser.add_argument("--cycles", type=int, default=2000) parser.add_argument("--label", type=str, default="") args = parser.parse_args() N3 = args.grid_3d label = args.label or f"N{N3}" print("=" * 70) print(f"CMB ANGULAR POWER SPECTRUM COMPARISON — {label}") print(f" {datetime.now(timezone.utc).isoformat()}") print(f" 3D grid: {N3}³ = {N3**3:,} cells") print(f" Memory: ~{N3**3 * 4 * 5 / 1e6:.0f} MB for field arrays") print("=" * 70) print(f"\n--- Running v6c simulation (Fe, {args.cycles} cycles, {N3}³) ---") engine = CascadeEngine("Fe", delta_r_base=0.0001, N1=2048, N2=256, N3=N3, max_dim=3) t_start = time_mod.monotonic() history = measure_fluctuation_history(engine, n_cycles=args.cycles, sample_interval=100) elapsed = time_mod.monotonic() - t_start print(f"\n Simulation complete in {elapsed:.1f}s") # Extract 3D state g3 = engine.grids.get(3) if g3 is None or not g3.active: print(" 3D never activated — need more cycles or different parameters") return print(f"\n 3D state: global_r={g3.global_r:.4f}, σ={float(np.std(g3.r_local)):.4f}") # --- Angular Power Spectrum at multiple radii --- print("\n--- Computing angular power spectra ---") spectra = {} for r_frac in [0.3, 0.5, 0.7]: print(f" Shell at r={r_frac:.1f}×R...") thetas, phis, shell = extract_spherical_shell(g3, radius_frac=r_frac) mean_shell = np.mean(shell) std_shell = np.std(shell) delta_T_over_T = std_shell / mean_shell if mean_shell > 0 else 0 print(f" mean={mean_shell:.4e}, σ={std_shell:.4e}, σ/μ={delta_T_over_T:.6f}") # l_max scales with grid: more resolution = higher multipoles l_max = min(N3 // 2, 120) l_arr, Cl, Dl = angular_power_spectrum(shell, l_max=l_max) # Find peaks in Dl peaks = [] for i in range(2, len(Dl) - 1): if Dl[i] > Dl[i-1] and Dl[i] > Dl[i+1] and Dl[i] > 0.01 * np.max(Dl[2:]): peaks.append({"l": int(l_arr[i]), "Dl": float(Dl[i])}) peaks.sort(key=lambda p: -p["Dl"]) spectra[f"r={r_frac}"] = { "radius_frac": r_frac, "mean_intensity": float(mean_shell), "std_intensity": float(std_shell), "delta_T_over_T": delta_T_over_T, "peaks": peaks[:10], "l": [int(x) for x in l_arr], "Dl": [float(x) for x in Dl], "Cl": [float(x) for x in Cl], } if peaks: peak_strs = ["l=" + str(p["l"]) for p in peaks[:5]] print(" Top peaks: " + ", ".join(peak_strs)) if len(peaks) >= 2: ratio = peaks[1]["l"] / peaks[0]["l"] if peaks[0]["l"] > 0 else 0 print(" Peak ratio (2nd/1st): %.2f (CMB: ~2.7)" % ratio) else: print(f" No peaks detected") # --- Fluctuation amplitude vs fill level --- print("\n--- σ/μ vs fill level (CMB comparison) ---") print(f" CMB target: ΔT/T ≈ 1.1 × 10⁻⁵") print(f" {'Cycle':>6} {'Dim':>3} {'Fill':>6} {'σ/μ':>12} {'global_r':>8}") print(f" {'-'*45}") # Show 3D history only for h in history: if h["dim"] == 3 and h["active"]: marker = " <<<" if abs(h["sigma_over_mean"] - 1.1e-5) < 5e-5 else "" print(f" {h['cycle']:>6} {h['dim']:>3} {h['global_fill']:>6.3f} " f"{h['sigma_over_mean']:>12.6e} {h['global_r']:>8.4f}{marker}") # --- Save results --- results = { "element": "Fe", "timestamp": datetime.now(timezone.utc).isoformat(), "simulation": { "n_cycles": args.cycles, "grid_3d": N3, "label": label, "delta_r": 0.0001, "final_3d_global_r": g3.global_r, "final_3d_std": float(np.std(g3.r_local)), "final_3d_fill": g3._get_global_fill(), }, "spectra": spectra, "fluctuation_history": history, "cmb_reference": { "delta_T_over_T": 1.1e-5, "first_peak_l": 200, "second_peak_l": 540, "third_peak_l": 800, "peak_ratio_2_1": 2.7, "peak_ratio_3_1": 4.0, "note": "Our grid l_max=64, so we compare spectral SHAPE not absolute peak positions" } } ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") out = RESULTS_DIR / f"cmb_{label}_{ts}.json" with open(out, "w") as f: json.dump(results, f, indent=2, default=float) print(f"\nResults: {out}") if __name__ == "__main__": main()