Source code for camb.check_accuracy

"""Check numerical stability of CAMB parameters against a higher-accuracy reference run.

The command-line interface compares spectra and derived parameters from an input
``.ini`` file with a reference calculation using boosted accuracy settings. It
can also plot fractional differences, estimate a fiducial CMB delta chi-squared,
search for minimal top-level boosts, and refine which underlying component
accuracy settings are most relevant.

Examples::

    camb check_accuracy inifiles/params.ini
    camb check_accuracy inifiles/params.ini --plot-dir accuracy_plots
    camb check_accuracy inifiles/params.ini --find-minimal-boosts --refine-accuracy-components
"""

from __future__ import annotations

import argparse
import itertools
import math
import time
from collections.abc import Iterable
from dataclasses import dataclass
from pathlib import Path

import numpy as np

DEFAULT_ACCURACY_SETTINGS = {
    "AccuracyBoost": 2.0,
    "lSampleBoost": 2.0,
    "lAccuracyBoost": 2.0,
    "IntTolBoost": 2.0,
    "DoLateRadTruncation": True,
}

DEFAULT_NOISE_CONFIGS = {
    "planck": {
        "noise_muK_arcmin_T": 45.0,
        "noise_muK_arcmin_P": 70.0,
        "fwhm_arcmin": 7.0,
        "fsky": 0.6,
    },
    "so": {  # optimistic extended 2034 numbers
        "noise_muK_arcmin_T": 2.6,
        "noise_muK_arcmin_P": 3.67,
        "fwhm_arcmin": 1.4,
        "fsky": 0.6,
    },
}

TT_EE_TOLERANCES = [(0, 3e-3), (600, 1e-3), (3500, 3e-3), (6000, 2e-2)]
BB_TOLERANCES = [(0, 5e-3), (1000, 1e-2), (6000, 2e-2), (8000, 1e-1)]
LENSING_TOLERANCES = [(0, 5e-3), (2000, 5e-3), (6000, 2e-2)]
MPK_TOLERANCE = 1e-3
DERIVED_TOLERANCE = 1e-3

CL_COLUMNS = {"TT": 0, "EE": 1, "BB": 2, "TE": 3}
OMEGA_K_FLAT = 5e-7
BASE_ACCURACY_KEYS = ("AccuracyBoost", "lSampleBoost", "lAccuracyBoost", "IntTolBoost")
GLOBAL_BOOST_COMPONENT_KEYS = (
    "TimeStepBoost",
    "BackgroundTimeStepBoost",
    "TimeSwitchBoost",
    "IntTolBoost",
    "SourcekAccuracyBoost",
    "IntkAccuracyBoost",
    "TransferkBoost",
    "NonFlatIntAccuracyBoost",
    "BessIntBoost",
    "LensingBoost",
    "NonlinSourceBoost",
    "BesselBoost",
    "LimberBoost",
    "SourceLimberBoost",
    "KmaxBoost",
    "neutrino_q_boost",
)
COMPONENT_REFINEMENT_KEYS = GLOBAL_BOOST_COMPONENT_KEYS
DISCRETE_ACCURACY_VALUES = {"neutrino_q_boost": (1.0, 1.5, 2.5)}


@dataclass(frozen=True)
class RangeTolerance:
    start: int
    stop: int | None
    tolerance: float

    @property
    def label(self) -> str:
        if self.stop is None:
            return f"L >= {self.start}"
        return f"{self.start} <= L < {self.stop}"


[docs] @dataclass class RunOutput: label: str params: object results: object cpu_time: float wall_time: float derived: dict[str, float] lensed_cls: np.ndarray | None lens_potential_cls: np.ndarray | None matter_power: MatterPowerData | None
@dataclass(frozen=True) class MatterPowerData: k: np.ndarray z: np.ndarray pk: np.ndarray requested_kmax: float npoints: int @dataclass class StatRow: quantity: str range_label: str max_abs: float rms: float tolerance: float passed: bool location: str = ""
[docs] @dataclass class ComparisonResult: derived_rows: list[StatRow] cl_rows: list[StatRow] lensing_rows: list[StatRow] mpk_rows: list[StatRow] @property def passed(self) -> bool: rows = self.derived_rows + self.cl_rows + self.lensing_rows + self.mpk_rows return all(row.passed for row in rows) @property def worst_failure(self) -> StatRow | None: failures = [ row for row in self.derived_rows + self.cl_rows + self.lensing_rows + self.mpk_rows if not row.passed ] if not failures: return None return max(failures, key=stat_score)
def comparison_rows(comparison: ComparisonResult) -> list[StatRow]: return comparison.derived_rows + comparison.cl_rows + comparison.lensing_rows + comparison.mpk_rows def stat_score(row: StatRow) -> float: if not math.isfinite(row.max_abs): return math.inf if row.tolerance == 0: return 0.0 if row.max_abs == 0 else math.inf score = abs(row.max_abs) / abs(row.tolerance) return score if math.isfinite(score) else math.inf def comparison_score(comparison: ComparisonResult) -> float: rows = comparison_rows(comparison) if not rows: return 0.0 return max(stat_score(row) for row in rows)
[docs] @dataclass(frozen=True) class NoiseConfig: name: str noise_muK_arcmin_T: float noise_muK_arcmin_P: float fwhm_arcmin: float fsky: float lmin: int lmax: int | None fields: tuple[str, ...]
@dataclass(frozen=True) class ChiSquaredResult: delta_chi2: float lmin: int lmax: int fields: tuple[str, ...] fsky: float noise_muK_arcmin_T: float noise_muK_arcmin_P: float fwhm_arcmin: float worst_ell: int worst_delta_chi2: float
[docs] @dataclass class AccuracyCheckResult: standard: RunOutput reference: RunOutput comparison: ComparisonResult chi2: ChiSquaredResult | None = None
@dataclass(frozen=True) class PassingCandidate: settings: dict[str, float | bool] comparison: ComparisonResult run: RunOutput | None
[docs] @dataclass(frozen=True) class SearchResult: settings: dict[str, float | bool] comparison: ComparisonResult run: RunOutput | None = None fastest: PassingCandidate | None = None def __iter__(self): yield self.settings yield self.comparison
def tolerance_ranges(spec: list[tuple[int, float]]) -> list[RangeTolerance]: return [ RangeTolerance(start, spec[index + 1][0] if index + 1 < len(spec) else None, tolerance) for index, (start, tolerance) in enumerate(spec) ] def positive_int(value: str) -> int: parsed = int(value) if parsed < 1: raise argparse.ArgumentTypeError("must be at least 1") return parsed def comma_separated_floats(value: str) -> list[float]: values = [float(item.strip()) for item in value.split(",") if item.strip()] if not values: raise argparse.ArgumentTypeError("must contain at least one value") return sorted(set(values)) def cmb_fields(value: str) -> tuple[str, ...]: fields = tuple(field.strip().upper() for field in value.replace(",", " ").split() if field.strip()) allowed = {"T", "E", "B"} if not fields or any(field not in allowed for field in fields): raise argparse.ArgumentTypeError("fields must be a comma- or space-separated subset of T,E,B") if len(set(fields)) != len(fields): raise argparse.ArgumentTypeError("fields must not contain duplicates") return fields def build_parser(prog: str | None = None) -> argparse.ArgumentParser: """Build the command-line parser for the accuracy checker.""" parser = argparse.ArgumentParser( prog=prog, description="Check stability of one CAMB ini file against a boosted-accuracy calculation." ) parser.add_argument("ini_file", help="CAMB ini file to load with camb.read_ini") parser.add_argument( "--no-validate", action="store_true", help="pass no_validate=True to camb.read_ini", ) parser.add_argument( "--lmax", type=positive_int, help="maximum L to compare; default is common calculated lmax", ) parser.add_argument( "--set-for-lmax", type=positive_int, help="call params.set_for_lmax before running; if --lmax is unset, this is also used for comparison", ) parser.add_argument( "--lens-margin", type=int, help="lens margin to use for both runs; can be set with or without --set-for-lmax", ) parser.add_argument( "--lens-potential-accuracy", type=float, help="lens_potential_accuracy to use for both runs; can be set with or without --set-for-lmax", ) parser.add_argument( "--reference-lens-margin", type=int, help="lens margin override for the boosted reference only", ) parser.add_argument( "--reference-lens-potential-accuracy", type=float, help="lens_potential_accuracy override for the boosted reference only", ) parser.add_argument( "--mpk-npoints", type=positive_int, default=500, help="number of k samples for matter power comparison grid", ) parser.add_argument( "--mpk-kmin", type=float, default=1e-4, help="minimum k/h for matter power comparison", ) parser.add_argument("--mpk-tolerance", type=float, default=None) parser.add_argument("--derived-tolerance", type=float, default=DERIVED_TOLERANCE) parser.add_argument( "--plot-dir", type=Path, help="write plots of fractional errors to this directory", ) parser.add_argument( "--chi2", action="store_true", help="calculate a fiducial Gaussian CMB delta chi-squared for standard vs boosted C_L", ) parser.add_argument( "--chi2-config", choices=tuple(DEFAULT_NOISE_CONFIGS) + ("custom",), default="so", help="noise configuration for --chi2", ) parser.add_argument("--chi2-lmin", type=positive_int, default=2) parser.add_argument( "--chi2-lmax", type=positive_int, help="maximum L for --chi2; defaults to comparison lmax", ) parser.add_argument("--chi2-fields", type=cmb_fields, default=("T", "E", "B")) parser.add_argument("--noise-muk-arcmin-t", type=float, help="temperature white noise for --chi2") parser.add_argument("--noise-muk-arcmin-p", type=float, help="polarization white noise for --chi2") parser.add_argument("--beam-fwhm-arcmin", type=float, help="Gaussian beam FWHM for --chi2") parser.add_argument("--fsky", type=float, help="sky fraction for --chi2") parser.add_argument( "--find-minimal-boosts", action="store_true", help="search for the lowest-cost boosted settings that pass against the high-accuracy run", ) parser.add_argument( "--exhaustive-boost-search", action="store_true", help="continue boost search after a passing single-parameter path is found", ) parser.add_argument( "--refine-accuracy-components", action="store_true", help="after --find-minimal-boosts, try AccuracyBoost=1 with underlying accuracy fields tweaked instead", ) parser.add_argument( "--component-search-size", type=positive_int, default=4, help="maximum number of component accuracy fields to try together", ) parser.add_argument( "--search-grid", type=comma_separated_floats, help="optional comma-separated seed values for the four boost parameters; the default search does not need this", ) parser.add_argument( "--search-tolerance", type=float, default=0.02, help="target precision for refining numeric boost values", ) parser.add_argument( "--max-search-runs", type=positive_int, help="maximum candidate runs for boost search", ) parser.add_argument( "--accuracy-boost", type=float, default=DEFAULT_ACCURACY_SETTINGS["AccuracyBoost"], ) parser.add_argument( "--l-sample-boost", type=float, default=DEFAULT_ACCURACY_SETTINGS["lSampleBoost"], ) parser.add_argument( "--l-accuracy-boost", type=float, default=DEFAULT_ACCURACY_SETTINGS["lAccuracyBoost"], ) parser.add_argument("--int-tol-boost", type=float, default=DEFAULT_ACCURACY_SETTINGS["IntTolBoost"]) parser.add_argument( "--do-late-rad-truncation", action=argparse.BooleanOptionalAction, default=DEFAULT_ACCURACY_SETTINGS["DoLateRadTruncation"], ) return parser def requested_accuracy_settings(args: argparse.Namespace) -> dict[str, float | bool]: return { "AccuracyBoost": args.accuracy_boost, "lSampleBoost": args.l_sample_boost, "lAccuracyBoost": args.l_accuracy_boost, "IntTolBoost": args.int_tol_boost, "DoLateRadTruncation": args.do_late_rad_truncation, } def apply_accuracy_settings(params, settings: dict[str, float | bool], *, boost_from_raw: bool = False) -> None: for key, setting in settings.items(): if key == "DoLateRadTruncation": params.DoLateRadTruncation = bool(setting) continue if not hasattr(params.Accuracy, key): raise ValueError(f"Unknown accuracy setting {key}") value = float(setting) if boost_from_raw: value = max(float(getattr(params.Accuracy, key)), value) setattr(params.Accuracy, key, value) def copy_params(params): return params.copy() def apply_lensing_settings( params, *, set_for_lmax: int | None = None, lens_margin: int | None = None, lens_potential_accuracy: float | None = None, ) -> None: if set_for_lmax is None and lens_margin is None and lens_potential_accuracy is None: return lens_accuracy = 0.0 if lens_potential_accuracy is None else lens_potential_accuracy if set_for_lmax is not None: params.set_for_lmax( set_for_lmax, lens_margin=150 if lens_margin is None else lens_margin, lens_potential_accuracy=lens_accuracy, nonlinear=current_uses_nonlinear_lensing(params), ) return margin = 0 if lens_margin is None else lens_margin target_lmax = params.max_l - margin if params.DoLensing else params.max_l max_eta_k = params.max_eta_k if lens_potential_accuracy is None else None params.set_for_lmax( max(1, target_lmax), max_eta_k=max_eta_k, lens_margin=margin, lens_potential_accuracy=lens_accuracy, nonlinear=current_uses_nonlinear_lensing(params), ) def current_uses_nonlinear_lensing(params) -> bool: return str(params.NonLinear) in {"NonLinear_lens", "NonLinear_both"} def load_params( ini_file: Path, *, no_validate: bool, settings: dict[str, float | bool] | None = None, set_for_lmax: int | None = None, lens_margin: int | None = None, lens_potential_accuracy: float | None = None, ): import camb params = camb.read_ini(str(ini_file), no_validate=no_validate) apply_lensing_settings( params, set_for_lmax=set_for_lmax, lens_margin=lens_margin, lens_potential_accuracy=lens_potential_accuracy, ) if settings: apply_accuracy_settings(params, settings) return params def run_params_case( params, label: str, *, lmax: int | None, mpk_kmin: float, mpk_npoints: int, ) -> RunOutput: import camb cpu_start = time.process_time() wall_start = time.perf_counter() results = camb.get_results(params) cpu_time = time.process_time() - cpu_start wall_time = time.perf_counter() - wall_start derived = results.get_derived_params() if params.WantDerivedParameters else {} lensed_cls = get_lensed_cls(results, lmax) lens_potential_cls = get_lens_potential_cls(results, lmax) matter_power = get_matter_power(results, mpk_kmin, mpk_npoints) return RunOutput( label=label, params=params, results=results, cpu_time=cpu_time, wall_time=wall_time, derived=derived, lensed_cls=lensed_cls, lens_potential_cls=lens_potential_cls, matter_power=matter_power, ) def run_case( ini_file: Path, label: str, *, no_validate: bool, accuracy_settings: dict[str, float | bool] | None, lmax: int | None, set_for_lmax: int | None, lens_margin: int | None, lens_potential_accuracy: float | None, mpk_kmin: float, mpk_npoints: int, ) -> RunOutput: params = load_params( ini_file, no_validate=no_validate, settings=accuracy_settings, set_for_lmax=set_for_lmax, lens_margin=lens_margin, lens_potential_accuracy=lens_potential_accuracy, ) return run_params_case( params, label, lmax=lmax, mpk_kmin=mpk_kmin, mpk_npoints=mpk_npoints, ) def run_timing_summary(run: RunOutput) -> str: return f"cpu={run.cpu_time:.2f}s wall={run.wall_time:.2f}s" def run_timing_key(run: RunOutput) -> tuple[float, float]: return run.cpu_time, run.wall_time def get_lensed_cls(results, lmax: int | None) -> np.ndarray | None: params = results.Params if not (params.WantCls and params.Want_CMB): return None return results.get_total_cls(lmax) def get_lens_potential_cls(results, lmax: int | None) -> np.ndarray | None: params = results.Params if not (params.WantCls and params.Want_CMB_lensing): return None return results.get_lens_potential_cls(lmax) def get_matter_power(results, minkh: float, npoints: int) -> MatterPowerData | None: params = results.Params if not params.WantTransfer: return None requested_kmax = params.Transfer.kmax / (params.H0 / 100.0) if requested_kmax <= minkh: return None nonlinear = str(params.NonLinear) in {"NonLinear_pk", "NonLinear_both"} k, z, pk = results.get_linear_matter_power_spectrum( have_power_spectra=True, nonlinear=nonlinear, ) mask = k >= minkh if not np.any(mask): return None return MatterPowerData(k=k[mask], z=z, pk=pk[:, mask], requested_kmax=requested_kmax, npoints=npoints) def fractional_delta(values: np.ndarray, reference: np.ndarray, *, floor: float = 0.0) -> np.ndarray: denominator = np.abs(reference) if floor: denominator = np.maximum(denominator, floor) with np.errstate(divide="ignore", invalid="ignore"): delta = (values - reference) / denominator return delta def normalized_te_delta(values: np.ndarray, reference: np.ndarray) -> np.ndarray: denominator = np.sqrt(np.maximum(reference[:, CL_COLUMNS["TT"]] * reference[:, CL_COLUMNS["EE"]], 0.0)) with np.errstate(divide="ignore", invalid="ignore"): delta = (values[:, CL_COLUMNS["TE"]] - reference[:, CL_COLUMNS["TE"]]) / denominator return delta def finite_stats( quantity: str, range_label: str, errors: np.ndarray, tolerance: float, locations: np.ndarray | None = None, ) -> StatRow: finite = np.isfinite(errors) if not np.all(finite): bad_index = int(np.flatnonzero(~finite)[0]) location = "" if locations is not None: location = str(locations[bad_index]) return StatRow( quantity, range_label, math.inf, math.inf, tolerance, False, location or "non-finite sample", ) if not np.any(finite): return StatRow( quantity, range_label, math.inf, math.inf, tolerance, False, "no finite samples", ) valid_errors = errors[finite] max_index = int(np.argmax(np.abs(valid_errors))) max_abs = float(np.abs(valid_errors[max_index])) rms = float(np.sqrt(np.mean(valid_errors**2))) location = "" if locations is not None: location = str(locations[finite][max_index]) return StatRow(quantity, range_label, max_abs, rms, tolerance, max_abs <= tolerance, location) def compare_derived(standard: RunOutput, reference: RunOutput, tolerance: float) -> list[StatRow]: rows = [] for name in reference.derived: if name not in standard.derived: continue ref = float(reference.derived[name]) value = float(standard.derived[name]) denominator = abs(ref) if ref else 1.0 error = abs((value - ref) / denominator) rows.append( StatRow( f"derived:{name}", "fractional", error, error, tolerance, error <= tolerance, ) ) return rows def compare_cls(standard: RunOutput, reference: RunOutput) -> list[StatRow]: if standard.lensed_cls is None or reference.lensed_cls is None: return [] lmax = min(standard.lensed_cls.shape[0], reference.lensed_cls.shape[0]) - 1 standard_cls = standard.lensed_cls[: lmax + 1] reference_cls = reference.lensed_cls[: lmax + 1] ell = np.arange(lmax + 1) rows = [] for quantity in ("TT", "EE"): errors = fractional_delta( standard_cls[:, CL_COLUMNS[quantity]], reference_cls[:, CL_COLUMNS[quantity]], ) rows.extend(compare_l_ranges(quantity, errors, ell, tolerance_ranges(TT_EE_TOLERANCES), min_ell=2)) te_errors = normalized_te_delta(standard_cls, reference_cls) rows.extend(compare_l_ranges("TE", te_errors, ell, tolerance_ranges(TT_EE_TOLERANCES), min_ell=2)) bb_errors = fractional_delta(standard_cls[:, CL_COLUMNS["BB"]], reference_cls[:, CL_COLUMNS["BB"]]) rows.extend(compare_l_ranges("BB", bb_errors, ell, tolerance_ranges(bb_tolerances(standard.params)), min_ell=2)) return rows def bb_tolerances(params) -> list[tuple[int, float]]: if not bool(params.Accuracy.AccurateBB): return [(0, 2e-2), (8000, 1e-1)] return BB_TOLERANCES def compare_lensing(standard: RunOutput, reference: RunOutput) -> list[StatRow]: if standard.lens_potential_cls is None or reference.lens_potential_cls is None: return [] lmax = min(standard.lens_potential_cls.shape[0], reference.lens_potential_cls.shape[0]) - 1 standard_pp = standard.lens_potential_cls[: lmax + 1, 0] reference_pp = reference.lens_potential_cls[: lmax + 1, 0] ell = np.arange(lmax + 1) errors = fractional_delta(standard_pp, reference_pp) return compare_l_ranges("lens PP", errors, ell, tolerance_ranges(LENSING_TOLERANCES), min_ell=2) def compare_l_ranges( quantity: str, errors: np.ndarray, ell: np.ndarray, ranges: Iterable[RangeTolerance], *, min_ell: int, ) -> list[StatRow]: rows = [] for range_tolerance in ranges: mask = ell >= max(min_ell, range_tolerance.start) if range_tolerance.stop is not None: mask &= ell < range_tolerance.stop if not np.any(mask): continue selected_ell = ell[mask] rows.append( finite_stats( quantity, ell_range_label(selected_ell, range_tolerance), errors[mask], range_tolerance.tolerance, locations=ell[mask], ) ) return rows def ell_range_label(selected_ell: np.ndarray, range_tolerance: RangeTolerance) -> str: start = int(selected_ell[0]) stop = int(selected_ell[-1]) if range_tolerance.stop is None: return f"L >= {start}" if stop < range_tolerance.stop - 1: return f"{start} <= L <= {stop}" return range_tolerance.label def compare_matter_power(standard: RunOutput, reference: RunOutput, tolerance: float) -> list[StatRow]: if standard.matter_power is None or reference.matter_power is None: return [] common_k, z, standard_pk, reference_pk = common_matter_power_grid(standard.matter_power, reference.matter_power) errors = fractional_delta(standard_pk, reference_pk) z_grid, k_grid = np.meshgrid(z, common_k, indexing="ij") locations = np.array([f"z={z_value:.6g}, k/h={k:.6g}" for z_value, k in zip(z_grid.ravel(), k_grid.ravel())]) range_label = f"all z, {common_k[0]:.4g} <= k/h <= {common_k[-1]:.4g}" return [finite_stats("matter P(k)", range_label, errors.ravel(), tolerance, locations=locations)] def common_matter_power_grid( standard_matter_power: MatterPowerData, reference_matter_power: MatterPowerData, ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: std_k, std_z, std_pk = ( standard_matter_power.k, standard_matter_power.z, standard_matter_power.pk, ) ref_k, ref_z, ref_pk = ( reference_matter_power.k, reference_matter_power.z, reference_matter_power.pk, ) if std_pk.shape[0] != ref_pk.shape[0] or not np.allclose(std_z, ref_z): raise ValueError("Matter power redshift grids differ") kmin = max(float(std_k[0]), float(ref_k[0])) requested_kmax = min(standard_matter_power.requested_kmax, reference_matter_power.requested_kmax) kmax = min(float(std_k[-1]), float(ref_k[-1]), requested_kmax) if kmax <= kmin: raise ValueError("Matter power k grids do not overlap") npoints = min(standard_matter_power.npoints, reference_matter_power.npoints) common_k = np.exp(np.linspace(np.log(kmin), np.log(kmax), npoints)) standard_pk = interpolate_pk_to_grid(std_k, std_pk, common_k) reference_pk = interpolate_pk_to_grid(ref_k, ref_pk, common_k) return common_k, std_z, standard_pk, reference_pk def interpolate_pk_to_grid(k: np.ndarray, pk: np.ndarray, common_k: np.ndarray) -> np.ndarray: log_k = np.log(k) log_common_k = np.log(common_k) interpolated = np.empty((pk.shape[0], len(common_k))) for index, values in enumerate(pk): if np.all(values > 0): interpolated[index] = np.exp(np.interp(log_common_k, log_k, np.log(values))) else: interpolated[index] = np.interp(log_common_k, log_k, values) return interpolated def compare_runs( standard: RunOutput, reference: RunOutput, *, derived_tolerance: float, mpk_tolerance: float, ) -> ComparisonResult: return ComparisonResult( derived_rows=compare_derived(standard, reference, derived_tolerance), cl_rows=compare_cls(standard, reference), lensing_rows=compare_lensing(standard, reference), mpk_rows=compare_matter_power(standard, reference, mpk_tolerance), )
[docs] def compare_params_accuracy( params, *, reference_accuracy_settings: dict[str, float | bool] | None = None, comparator_accuracy_settings: dict[str, float | bool] | None = None, lmax: int | None = None, set_for_lmax: int | None = None, lens_margin: int | None = None, lens_potential_accuracy: float | None = None, reference_lens_margin: int | None = None, reference_lens_potential_accuracy: float | None = None, mpk_kmin: float = 1e-4, mpk_npoints: int = 500, derived_tolerance: float = DERIVED_TOLERANCE, mpk_tolerance: float | None = None, chi2_config: NoiseConfig | None = None, ) -> AccuracyCheckResult: """Compare a :class:`~camb.model.CAMBparams` object against a higher-accuracy copy. The input parameters are copied before modification. The returned result includes both CAMB runs, row-wise comparison statistics, and optionally a fiducial CMB delta chi-squared estimate. """ if mpk_tolerance is None: mpk_tolerance = MPK_TOLERANCE if params.Transfer.high_precision else 3e-3 reference_accuracy_settings = reference_accuracy_settings or DEFAULT_ACCURACY_SETTINGS standard_params = copy_params(params) apply_lensing_settings( standard_params, set_for_lmax=set_for_lmax, lens_margin=lens_margin, lens_potential_accuracy=lens_potential_accuracy, ) if comparator_accuracy_settings: apply_accuracy_settings(standard_params, comparator_accuracy_settings) reference_params = copy_params(params) apply_lensing_settings( reference_params, set_for_lmax=set_for_lmax, lens_margin=reference_lens_margin if reference_lens_margin is not None else lens_margin, lens_potential_accuracy=( reference_lens_potential_accuracy if reference_lens_potential_accuracy is not None else lens_potential_accuracy ), ) apply_accuracy_settings(reference_params, reference_accuracy_settings, boost_from_raw=True) lmax = lmax or set_for_lmax standard = run_params_case( standard_params, "standard", lmax=lmax, mpk_kmin=mpk_kmin, mpk_npoints=mpk_npoints, ) reference = run_params_case( reference_params, "boosted", lmax=lmax, mpk_kmin=mpk_kmin, mpk_npoints=mpk_npoints, ) comparison = compare_runs( standard, reference, derived_tolerance=derived_tolerance, mpk_tolerance=mpk_tolerance, ) chi2 = calculate_cmb_delta_chi2(standard, reference, chi2_config) if chi2_config else None return AccuracyCheckResult(standard=standard, reference=reference, comparison=comparison, chi2=chi2)
def chi2_noise_config(args: argparse.Namespace) -> NoiseConfig: if args.chi2_config == "custom": defaults = {} else: defaults = DEFAULT_NOISE_CONFIGS[args.chi2_config] noise_t = args.noise_muk_arcmin_t if noise_t is None: noise_t = defaults.get("noise_muK_arcmin_T") noise_p = args.noise_muk_arcmin_p if noise_p is None: noise_p = defaults.get("noise_muK_arcmin_P", noise_t) fwhm = args.beam_fwhm_arcmin if fwhm is None: fwhm = defaults.get("fwhm_arcmin") fsky = args.fsky if fsky is None: fsky = defaults.get("fsky", 1.0) missing = [ name for name, value in ( ("noise_muK_arcmin_T", noise_t), ("noise_muK_arcmin_P", noise_p), ("fwhm_arcmin", fwhm), ) if value is None ] if missing: raise ValueError(f"--chi2-config custom requires {', '.join(missing)}") if fsky <= 0: raise ValueError("--fsky must be positive") return NoiseConfig( name=args.chi2_config, noise_muK_arcmin_T=float(noise_t), noise_muK_arcmin_P=float(noise_p), fwhm_arcmin=float(fwhm), fsky=float(fsky), lmin=args.chi2_lmin, lmax=args.chi2_lmax or effective_lmax(args), fields=args.chi2_fields, ) def white_noise_from_muK_arcmin(noise_muK_arcmin: float) -> float: return (noise_muK_arcmin * np.pi / (180.0 * 60.0)) ** 2 def make_cmb_noise_spectra(ell: np.ndarray, config: NoiseConfig) -> dict[str, np.ndarray]: noise_var_t = white_noise_from_muK_arcmin(config.noise_muK_arcmin_T) noise_var_p = white_noise_from_muK_arcmin(config.noise_muK_arcmin_P) fwhm_degrees = config.fwhm_arcmin / 60.0 xlc = 180.0 * np.sqrt(8.0 * np.log(2.0)) / np.pi sigma2 = (fwhm_degrees / xlc) ** 2 beam_fac = np.exp(ell * (ell + 1.0) * sigma2) dl_fac = ell * (ell + 1.0) / (2.0 * np.pi) return { "T": dl_fac * noise_var_t * beam_fac, "E": dl_fac * noise_var_p * beam_fac, "B": dl_fac * noise_var_p * beam_fac, } def calculate_cmb_delta_chi2(standard: RunOutput, reference: RunOutput, config: NoiseConfig) -> ChiSquaredResult | None: if not ( standard.params.WantCls and reference.params.WantCls and standard.params.Want_CMB and reference.params.Want_CMB ): return None lmax_available = min(standard.results.Params.max_l, reference.results.Params.max_l) if config.lmax is not None: lmax_available = min(lmax_available, config.lmax) lmin = max(2, config.lmin) if lmax_available < lmin: return None standard_cls = standard.results.get_total_cls(lmax_available, CMB_unit="muK") reference_cls = reference.results.get_total_cls(lmax_available, CMB_unit="muK") ell = np.arange(lmax_available + 1) noise = make_cmb_noise_spectra(ell, config) delta_chi2 = 0.0 worst_ell = lmin worst_delta_chi2 = -math.inf for multipole in range(lmin, lmax_available + 1): model_cov = cmb_covariance_at_l(standard_cls[multipole], noise, multipole, config.fields) fiducial_cov = cmb_covariance_at_l(reference_cls[multipole], noise, multipole, config.fields) ell_chi2 = gaussian_delta_chi2_at_l(model_cov, fiducial_cov, multipole, config.fsky) delta_chi2 += ell_chi2 if ell_chi2 > worst_delta_chi2: worst_delta_chi2 = ell_chi2 worst_ell = multipole return ChiSquaredResult( delta_chi2=delta_chi2, lmin=lmin, lmax=lmax_available, fields=config.fields, fsky=config.fsky, noise_muK_arcmin_T=config.noise_muK_arcmin_T, noise_muK_arcmin_P=config.noise_muK_arcmin_P, fwhm_arcmin=config.fwhm_arcmin, worst_ell=worst_ell, worst_delta_chi2=worst_delta_chi2, ) def cmb_covariance_at_l(cls: np.ndarray, noise: dict[str, np.ndarray], ell: int, fields: tuple[str, ...]) -> np.ndarray: covariance = np.zeros((len(fields), len(fields))) for i, field_i in enumerate(fields): for j, field_j in enumerate(fields): if i == j: covariance[i, j] = cls[CL_COLUMNS[field_i + field_i]] + noise[field_i][ell] elif {field_i, field_j} == {"T", "E"}: covariance[i, j] = cls[CL_COLUMNS["TE"]] return covariance def gaussian_delta_chi2_at_l( model_cov: np.ndarray, fiducial_cov: np.ndarray, ell: int, fsky: float, ) -> float: sign_model, logdet_model = np.linalg.slogdet(model_cov) sign_fiducial, logdet_fiducial = np.linalg.slogdet(fiducial_cov) if sign_model <= 0 or sign_fiducial <= 0: return math.inf trace = np.trace(np.linalg.solve(model_cov, fiducial_cov)) return float(fsky * (2 * ell + 1) * (trace + logdet_model - logdet_fiducial - model_cov.shape[0])) def print_chi2_result(result: ChiSquaredResult | None, config_name: str) -> None: if result is None: print("\nFiducial CMB delta chi-squared: not calculated") return fields = ",".join(result.fields) print("\nFiducial CMB delta chi-squared:") print( f" config={config_name}, fields={fields}, L={result.lmin}..{result.lmax}, " f"fsky={result.fsky:g}, beam={result.fwhm_arcmin:g} arcmin" ) print(f" noise T={result.noise_muK_arcmin_T:g} muK-arcmin, P={result.noise_muK_arcmin_P:g} muK-arcmin") print( f" delta chi2={result.delta_chi2:.6g}; largest per-L contribution={result.worst_delta_chi2:.6g} at L={result.worst_ell}" ) def print_run_summary(standard: RunOutput, reference: RunOutput) -> None: print("Runs:") for run in (standard, reference): print(f" {run.label}: {run_timing_summary(run)}") if reference.cpu_time: print(f" cpu ratio {standard.label}/{reference.label}: {standard.cpu_time / reference.cpu_time:.3g}") if reference.wall_time: print(f" wall ratio {standard.label}/{reference.label}: {standard.wall_time / reference.wall_time:.3g}") def print_table(title: str, rows: list[StatRow]) -> None: if not rows: print(f"\n{title}: not calculated") return print(f"\n{title}:") print(f" {'quantity':<20} {'range':<20} {'max abs':>12} {'rms':>12} {'tol':>12} status") for row in rows: max_abs = "nan" if math.isnan(row.max_abs) else f"{row.max_abs:.4g}" rms = "nan" if math.isnan(row.rms) else f"{row.rms:.4g}" status = "OK" if row.passed else "FAIL" location = f" at {row.location}" if row.location and row.location != "no finite samples" else "" print( f" {row.quantity:<20} {row.range_label:<20} {max_abs:>12} {rms:>12} " f"{row.tolerance:>12.4g} {status}{location}" ) def print_derived_table(rows: list[StatRow]) -> None: title = "Derived parameter fractional changes" if not rows: print(f"\n{title}: not calculated") return print(f"\n{title}:") print(f" {'quantity':<20} {'frac change':>12} {'tol':>12} status") for row in rows: value = "nan" if math.isnan(row.max_abs) else f"{row.max_abs:.4g}" status = "OK" if row.passed else "FAIL" print(f" {row.quantity:<20} {value:>12} {row.tolerance:>12.4g} {status}") def print_comparison(comparison: ComparisonResult) -> None: print_derived_table(comparison.derived_rows) print_table("Lensed CMB spectra errors", comparison.cl_rows) print_table("Lensing potential errors", comparison.lensing_rows) print_table("Matter power errors", comparison.mpk_rows) def failure_summary(comparison: ComparisonResult) -> str: worst = comparison.worst_failure if worst is None: return "FAIL: at least one comparison exceeded its tolerance." location = f" at {worst.location}" if worst.location else "" return ( "FAIL: worst failure is " f"{worst.quantity} ({worst.range_label}) max abs {worst.max_abs:.4g} > tol {worst.tolerance:.4g}" f"{location}." ) def comparison_status(comparison: ComparisonResult) -> str: score = comparison_score(comparison) if comparison.passed: return f"PASS score={score:.4g}" worst = comparison.worst_failure if worst is None: return f"FAIL score={score:.4g}" location = f" at {worst.location}" if worst.location else "" return ( f"FAIL score={score:.4g}; worst={worst.quantity} {worst.range_label}{location}, " f"{worst.max_abs:.4g}/{worst.tolerance:.4g}" ) def search_timing_summary(result: SearchResult) -> list[str]: lines = [] if result.run is not None: lines.append(f" selected timing: {run_timing_summary(result.run)}") fastest = result.fastest if ( fastest is not None and fastest.run is not None and settings_key(fastest.settings) != settings_key(result.settings) ): lines.append( " fastest passing candidate seen: " f"{run_timing_summary(fastest.run)} with {format_settings(changed_settings(fastest.settings, result.settings))}" ) return lines def print_search_timing_summary(result: SearchResult) -> None: for line in search_timing_summary(result): print(line) def plot_errors(standard: RunOutput, reference: RunOutput, plot_dir: Path) -> None: import matplotlib.pyplot as plt plot_dir.mkdir(parents=True, exist_ok=True) if standard.lensed_cls is not None and reference.lensed_cls is not None: lmax = min(standard.lensed_cls.shape[0], reference.lensed_cls.shape[0]) - 1 ell = np.arange(lmax + 1) std_cls = standard.lensed_cls[: lmax + 1] ref_cls = reference.lensed_cls[: lmax + 1] plt.figure(figsize=(9, 5)) for quantity in ("TT", "EE", "BB"): errors = fractional_delta(std_cls[:, CL_COLUMNS[quantity]], ref_cls[:, CL_COLUMNS[quantity]]) plot_log_l_errors(plt, ell, errors, quantity) plot_log_l_errors(plt, ell, normalized_te_delta(std_cls, ref_cls), "TE/sqrt(TT*EE)") plt.axhline(0, color="black", linewidth=0.8) plt.xlabel("L") plt.ylabel("fractional error") plt.legend() plt.tight_layout() path = plot_dir / "lensed_cls_errors.png" plt.savefig(path, dpi=150) plt.close() print(f"Wrote {path}") if standard.lens_potential_cls is not None and reference.lens_potential_cls is not None: lmax = ( min( standard.lens_potential_cls.shape[0], reference.lens_potential_cls.shape[0], ) - 1 ) ell = np.arange(lmax + 1) errors = fractional_delta( standard.lens_potential_cls[: lmax + 1, 0], reference.lens_potential_cls[: lmax + 1, 0], ) plt.figure(figsize=(9, 5)) plot_log_l_errors(plt, ell, errors, "PP") plt.axhline(0, color="black", linewidth=0.8) plt.xlabel("L") plt.ylabel("fractional error") plt.legend() plt.tight_layout() path = plot_dir / "lens_potential_errors.png" plt.savefig(path, dpi=150) plt.close() print(f"Wrote {path}") if standard.matter_power is not None and reference.matter_power is not None: std_k, std_z, std_pk, ref_pk = common_matter_power_grid(standard.matter_power, reference.matter_power) errors = fractional_delta(std_pk, ref_pk) plt.figure(figsize=(9, 5)) for index, z in enumerate(std_z): plt.semilogx(std_k, errors[index], label=f"z={z:.4g}") plt.axhline(0, color="black", linewidth=0.8) plt.xlabel("k/h") plt.ylabel("fractional error") plt.legend() plt.tight_layout() path = plot_dir / "matter_power_errors.png" plt.savefig(path, dpi=150) plt.close() print(f"Wrote {path}") def plot_log_l_errors(plt, ell: np.ndarray, errors: np.ndarray, label: str) -> None: mask = (ell >= 2) & np.isfinite(errors) if np.any(mask): plt.semilogx(ell[mask], errors[mask], label=label, alpha=0.85) def settings_cost(settings: dict[str, float | bool], raw_settings: dict[str, float | bool]) -> tuple[float, int, tuple]: changed = 0 product = 1.0 values = [] numeric_keys = sorted(key for key in set(settings) | set(raw_settings) if key != "DoLateRadTruncation") for key in numeric_keys: value = float(settings[key]) raw_value = float(raw_settings[key]) changed += int(not math.isclose(value, raw_value)) product *= value values.append(value) if "DoLateRadTruncation" in settings or "DoLateRadTruncation" in raw_settings: changed += int(settings.get("DoLateRadTruncation") != raw_settings.get("DoLateRadTruncation")) values.append(int(bool(settings.get("DoLateRadTruncation")))) return product, changed, tuple(values) def raw_accuracy_settings(params) -> dict[str, float | bool]: return { "AccuracyBoost": float(params.Accuracy.AccuracyBoost), "lSampleBoost": float(params.Accuracy.lSampleBoost), "lAccuracyBoost": float(params.Accuracy.lAccuracyBoost), "IntTolBoost": float(params.Accuracy.IntTolBoost), "DoLateRadTruncation": bool(params.DoLateRadTruncation), } def component_accuracy_settings(params) -> dict[str, float | bool]: settings = {"AccuracyBoost": 1.0} for key in component_refinement_keys(params): settings[key] = float(getattr(params.Accuracy, key)) settings["DoLateRadTruncation"] = bool(params.DoLateRadTruncation) return settings def component_refinement_keys(params) -> tuple[str, ...]: keys = list(COMPONENT_REFINEMENT_KEYS) if abs(float(getattr(params, "omk", 0.0))) <= OMEGA_K_FLAT: keys.remove("NonFlatIntAccuracyBoost") if not uses_nonlinear_sources(params): keys.remove("NonlinSourceBoost") if not has_redshift_windows(params): keys.remove("KmaxBoost") keys.remove("SourceLimberBoost") return tuple(keys) def uses_nonlinear_sources(params) -> bool: return str(getattr(params, "NonLinear", "NonLinear_none")) in {"NonLinear_pk", "NonLinear_both"} def has_redshift_windows(params) -> bool: source_windows = getattr(params, "SourceWindows", None) if source_windows is not None: try: return len(source_windows) >= 1 except TypeError: pass return int(getattr(params, "num_redshiftwindows", 0)) >= 1 def component_target_settings( raw_settings: dict[str, float | bool], passing_settings: dict[str, float | bool], ) -> dict[str, float | bool]: global_boost = float(passing_settings.get("AccuracyBoost", 1.0)) target = dict(raw_settings) target["AccuracyBoost"] = 1.0 target["DoLateRadTruncation"] = passing_settings.get( "DoLateRadTruncation", raw_settings.get("DoLateRadTruncation", True) ) for key in GLOBAL_BOOST_COMPONENT_KEYS: if key not in raw_settings: continue direct_value = float(passing_settings.get(key, raw_settings[key])) boosted_value = float(raw_settings[key]) * global_boost target_value = max( float(raw_settings[key]), direct_value, boosted_value, ) target[key] = next_meaningful_accuracy_value(key, float(raw_settings[key]), target_value) for key in ("lSampleBoost", "lAccuracyBoost"): if key in raw_settings: target[key] = max( float(raw_settings[key]), float(passing_settings.get(key, raw_settings[key])), ) return target def accuracy_search_target_settings( raw_settings: dict[str, float | bool], requested_settings: dict[str, float | bool], ) -> dict[str, float | bool]: target = dict(requested_settings) for key in BASE_ACCURACY_KEYS: target[key] = max(float(raw_settings[key]), float(requested_settings[key])) target["DoLateRadTruncation"] = bool(requested_settings["DoLateRadTruncation"]) return target def next_meaningful_accuracy_value(key: str, raw_value: float, target_value: float) -> float: if key not in DISCRETE_ACCURACY_VALUES: return target_value for value in DISCRETE_ACCURACY_VALUES[key]: if value >= raw_value and value >= target_value: return value return max(raw_value, DISCRETE_ACCURACY_VALUES[key][-1]) def search_candidates( raw_settings: dict[str, float | bool], high_accuracy_settings: dict[str, float | bool], grid: Iterable[float] | None, ) -> list[dict[str, float | bool]]: if grid is None: return [] candidate_values = {} for key in BASE_ACCURACY_KEYS: raw_value = float(raw_settings[key]) high_value = max(raw_value, float(high_accuracy_settings[key])) candidate_values[key] = sorted({max(raw_value, value) for value in grid if value <= high_value} | {high_value}) late_rad_values = sorted( { bool(raw_settings["DoLateRadTruncation"]), bool(high_accuracy_settings["DoLateRadTruncation"]), }, key=int, ) candidates = [ { "AccuracyBoost": accuracy_boost, "lSampleBoost": l_sample_boost, "lAccuracyBoost": l_accuracy_boost, "IntTolBoost": int_tol_boost, "DoLateRadTruncation": do_late_rad_truncation, } for accuracy_boost, l_sample_boost, l_accuracy_boost, int_tol_boost, do_late_rad_truncation in itertools.product( candidate_values["AccuracyBoost"], candidate_values["lSampleBoost"], candidate_values["lAccuracyBoost"], candidate_values["IntTolBoost"], late_rad_values, ) ] return sorted(candidates, key=lambda settings: settings_cost(settings, raw_settings)) def bracket_then_refine_boost_subset( subset: tuple[str, ...], raw_settings: dict[str, float | bool], target_settings: dict[str, float | bool], late_rad_value: bool, evaluator, tolerance: float, ) -> tuple[dict[str, float | bool], ComparisonResult | None] | None: if not any(float(target_settings[key]) > float(raw_settings[key]) for key in subset): return None settings = subset_target_settings(raw_settings, target_settings, subset, late_rad_value) comparison = evaluator(settings) if comparison is None: return settings, None if not comparison.passed: return None print(f"Found passing subset {subset}; refining within bracket.") return refine_numeric_boosts( settings, raw_settings, evaluator, tolerance, cost_function=settings_cost, ) def subset_target_settings( raw_settings: dict[str, float | bool], target_settings: dict[str, float | bool], subset: tuple[str, ...], late_rad_value: bool, ) -> dict[str, float | bool]: settings = dict(raw_settings) settings["DoLateRadTruncation"] = late_rad_value for key in subset: settings[key] = target_settings[key] return settings def find_minimal_boosts( ini_file: Path, args: argparse.Namespace, reference: RunOutput, high_accuracy_settings: dict[str, float | bool], raw_comparison: ComparisonResult | None = None, raw_run: RunOutput | None = None, ) -> SearchResult | tuple[None, None]: raw_params = load_params( ini_file, no_validate=args.no_validate, set_for_lmax=args.set_for_lmax, lens_margin=args.lens_margin, lens_potential_accuracy=args.lens_potential_accuracy, ) raw_settings = raw_accuracy_settings(raw_params) target_settings = accuracy_search_target_settings(raw_settings, high_accuracy_settings) candidates = search_candidates(raw_settings, target_settings, args.search_grid) comparisons: dict[tuple, ComparisonResult] = {} runs: dict[tuple, RunOutput] = {} comparison_runs: dict[int, RunOutput] = {} if raw_comparison is not None: raw_key = settings_key(raw_settings) comparisons[raw_key] = raw_comparison if raw_run is not None: runs[raw_key] = raw_run comparison_runs[id(raw_comparison)] = raw_run fastest: PassingCandidate | None = None runs_done = 0 def make_result(settings: dict[str, float | bool], comparison: ComparisonResult) -> SearchResult: key = settings_key(refine_discrete_accuracy_settings(settings, raw_settings)) return SearchResult(settings, comparison, runs.get(key) or comparison_runs.get(id(comparison)), fastest) def run_candidate(settings: dict[str, float | bool]) -> ComparisonResult | None: nonlocal fastest, runs_done settings = refine_discrete_accuracy_settings(settings, raw_settings) key = settings_key(settings) if key in comparisons: return comparisons[key] if args.max_search_runs is not None and runs_done >= args.max_search_runs: return None runs_done += 1 print(f" [{runs_done}] {format_settings(settings)}") candidate = run_case( ini_file, "candidate", no_validate=args.no_validate, accuracy_settings=settings, lmax=effective_lmax(args), set_for_lmax=args.set_for_lmax, lens_margin=args.lens_margin, lens_potential_accuracy=args.lens_potential_accuracy, mpk_kmin=args.mpk_kmin, mpk_npoints=args.mpk_npoints, ) runs[key] = candidate print(f" {run_timing_summary(candidate)}") comparison = compare_runs( candidate, reference, derived_tolerance=args.derived_tolerance, mpk_tolerance=args.mpk_tolerance, ) comparisons[key] = comparison comparison_runs[id(comparison)] = candidate if comparison.passed: passing = PassingCandidate(settings, comparison, candidate) if fastest is None or run_timing_key(candidate) < run_timing_key(fastest.run): fastest = passing print(f" {comparison_status(comparison)}") return comparison raw_comparison = run_candidate(raw_settings) if raw_comparison is None: return None, None if raw_comparison.passed: return make_result(raw_settings, raw_comparison) numeric_keys = BASE_ACCURACY_KEYS late_rad_values = [bool(raw_settings["DoLateRadTruncation"])] if target_settings["DoLateRadTruncation"] != raw_settings["DoLateRadTruncation"]: late_rad_values.append(bool(target_settings["DoLateRadTruncation"])) exhaustive = getattr(args, "exhaustive_boost_search", False) is True if target_settings["DoLateRadTruncation"] != raw_settings["DoLateRadTruncation"]: settings = dict(raw_settings) settings["DoLateRadTruncation"] = target_settings["DoLateRadTruncation"] comparison = run_candidate(settings) if comparison is None: return None, None if comparison.passed: return make_result(settings, comparison) print("\nSearching single boost parameters from values close to raw...") best_settings = None best_comparison = None best_cost = None for subset_size in range(1, len(numeric_keys) + 1): if subset_size == 2 and best_settings is not None and not exhaustive: return make_result(best_settings, best_comparison) if subset_size == 2: if best_settings is None: print("\nNo single-parameter path passed; searching boost parameter combinations...") else: print("\nExhaustive search requested; searching boost parameter combinations...") elif subset_size > 2: print(f"\nSearching {subset_size}-parameter boost combinations...") for late_rad_value in late_rad_values: for subset in itertools.combinations(numeric_keys, subset_size): result = bracket_then_refine_boost_subset( subset, raw_settings, target_settings, late_rad_value, run_candidate, args.search_tolerance, ) if result is None: continue refined_settings, refined_comparison = result if refined_comparison is None: if best_settings is not None: print("Search run budget exhausted; returning the best passing subset found so far.") return make_result(best_settings, best_comparison) print("Search run budget exhausted before finding a passing subset.") return None, None cost = settings_cost(refined_settings, raw_settings) if best_cost is None or cost < best_cost: best_settings = refined_settings best_comparison = refined_comparison best_cost = cost if best_settings is not None and not exhaustive: return make_result(best_settings, best_comparison) if best_settings is not None: return make_result(best_settings, best_comparison) if not candidates: return None, None print(f"\nAdaptive combination search did not pass; trying {len(candidates)} optional seed candidates...") for settings in candidates: comparison = run_candidate(settings) if comparison is None: if best_settings is not None: print("Search run budget exhausted; returning the best passing settings found so far.") return make_result(best_settings, best_comparison) print("Search run budget exhausted before finding a passing coarse candidate.") return None, None if comparison.passed: print("Found passing coarse settings; refining numeric boosts.") refined_settings, refined_comparison = refine_numeric_boosts( settings, raw_settings, run_candidate, args.search_tolerance, cost_function=settings_cost, ) return make_result(refined_settings, refined_comparison) return None, None def refine_accuracy_components( ini_file: Path, args: argparse.Namespace, reference: RunOutput, reference_accuracy_settings: dict[str, float | bool], ) -> SearchResult | tuple[None, ComparisonResult | None]: raw_params = load_params( ini_file, no_validate=args.no_validate, set_for_lmax=args.set_for_lmax, lens_margin=args.lens_margin, lens_potential_accuracy=args.lens_potential_accuracy, ) target_accuracy_settings = accuracy_search_target_settings( raw_accuracy_settings(raw_params), reference_accuracy_settings ) raw_settings = component_accuracy_settings(raw_params) target_settings = component_target_settings(raw_settings, target_accuracy_settings) comparisons: dict[tuple, ComparisonResult] = {} runs: dict[tuple, RunOutput] = {} comparison_runs: dict[int, RunOutput] = {} fastest: PassingCandidate | None = None runs_done = 0 def make_result(settings: dict[str, float | bool], comparison: ComparisonResult) -> SearchResult: key = settings_key(refine_discrete_accuracy_settings(settings, raw_settings)) return SearchResult(settings, comparison, runs.get(key) or comparison_runs.get(id(comparison)), fastest) def run_candidate(settings: dict[str, float | bool]) -> ComparisonResult | None: nonlocal fastest, runs_done settings = refine_discrete_accuracy_settings(settings, raw_settings) key = settings_key(settings) if key in comparisons: return comparisons[key] if args.max_search_runs is not None and runs_done >= args.max_search_runs: return None runs_done += 1 print(f" [component {runs_done}] {format_settings(changed_settings(settings, raw_settings))}") candidate = run_case( ini_file, "component-candidate", no_validate=args.no_validate, accuracy_settings=settings, lmax=effective_lmax(args), set_for_lmax=args.set_for_lmax, lens_margin=args.lens_margin, lens_potential_accuracy=args.lens_potential_accuracy, mpk_kmin=args.mpk_kmin, mpk_npoints=args.mpk_npoints, ) runs[key] = candidate print(f" {run_timing_summary(candidate)}") comparison = compare_runs( candidate, reference, derived_tolerance=args.derived_tolerance, mpk_tolerance=args.mpk_tolerance, ) comparisons[key] = comparison comparison_runs[id(comparison)] = candidate if comparison.passed: passing = PassingCandidate(settings, comparison, candidate) if fastest is None or run_timing_key(candidate) < run_timing_key(fastest.run): fastest = passing print(f" {comparison_status(comparison)}") return comparison keys = tuple( key for key in raw_settings if key != "DoLateRadTruncation" and target_settings[key] > raw_settings[key] ) max_size = min(args.component_search_size, len(keys)) print("\nRefining with AccuracyBoost=1 and component accuracy parameters...") if not keys: return None, None all_target = dict(raw_settings) all_target["DoLateRadTruncation"] = target_settings["DoLateRadTruncation"] for key in keys: all_target[key] = target_settings[key] print("Checking all component targets first...") all_target_comparison = run_candidate(all_target) if all_target_comparison is None: print("Component search run budget exhausted.") return None, None if not all_target_comparison.passed: print( "All component targets with AccuracyBoost=1 still fail; " "the passing run likely depends on direct AccuracyBoost effects." ) print(failure_summary(all_target_comparison)) return None, all_target_comparison print("Pruning unnecessary components from the all-target pass...") pruned_settings, pruned_comparison = greedy_prune_component_settings( all_target, raw_settings, keys, run_candidate, ) if pruned_comparison is None: print("Component search run budget exhausted.") return None, None pruned_keys = component_changed_numeric_keys(pruned_settings, raw_settings) if component_settings_cost(pruned_settings, raw_settings) < component_settings_cost(all_target, raw_settings): print(f"Greedy pruning reduced component count to {len(pruned_keys)}.") if not pruned_keys: return make_result(pruned_settings, pruned_comparison) best_settings = None best_cost = None max_size = min(max_size, len(pruned_keys)) if max_size >= 1: print("Checking target-valued subsets within the pruned component set...") for subset_size in range(1, max_size + 1): for subset in itertools.combinations(pruned_keys, subset_size): settings = subset_target_settings( raw_settings, target_settings, subset, bool(target_settings["DoLateRadTruncation"]), ) comparison = run_candidate(settings) if comparison is None: print("Component search run budget exhausted.") return None, None if not comparison.passed: continue cost = component_settings_cost(settings, raw_settings) if best_cost is None or cost < best_cost: best_settings = settings best_cost = cost if best_settings is not None: print("Found passing component subset; refining values.") refined_settings, refined_comparison = refine_numeric_boosts( best_settings, raw_settings, run_candidate, args.search_tolerance, cost_function=component_settings_cost, ) return make_result(refined_settings, refined_comparison) print("No smaller target-valued subset passed; refining the pruned component set.") refined_settings, refined_comparison = refine_numeric_boosts( pruned_settings, raw_settings, run_candidate, args.search_tolerance, cost_function=component_settings_cost, ) return make_result(refined_settings, refined_comparison) def greedy_prune_component_settings( settings: dict[str, float | bool], raw_settings: dict[str, float | bool], keys: tuple[str, ...], evaluator, ) -> tuple[dict[str, float | bool], ComparisonResult | None]: current = dict(settings) current_comparison = evaluator(current) if current_comparison is None or not current_comparison.passed: return current, current_comparison removable_keys = [ key for key in keys if key in current and not math.isclose(float(current[key]), float(raw_settings[key])) ] improved = True while improved: improved = False best_trial = None best_comparison = None best_cost = None for key in removable_keys: if math.isclose(float(current[key]), float(raw_settings[key])): continue trial = dict(current) trial[key] = raw_settings[key] comparison = evaluator(trial) if comparison is None: return current, None if not comparison.passed: continue cost = component_settings_cost(trial, raw_settings) if best_cost is None or cost < best_cost: best_trial = trial best_comparison = comparison best_cost = cost if best_trial is not None: current = best_trial current_comparison = best_comparison improved = True return current, current_comparison def component_changed_numeric_keys( settings: dict[str, float | bool], raw_settings: dict[str, float | bool] ) -> tuple[str, ...]: return tuple( key for key, value in settings.items() if key not in {"AccuracyBoost", "DoLateRadTruncation"} and not math.isclose(float(value), float(raw_settings[key])) ) def changed_settings( settings: dict[str, float | bool], raw_settings: dict[str, float | bool] ) -> dict[str, float | bool]: return { key: value for key, value in settings.items() if key == "AccuracyBoost" or key == "DoLateRadTruncation" or not math.isclose(float(value), float(raw_settings[key])) } def component_settings_cost(settings: dict[str, float | bool], raw_settings: dict[str, float | bool]) -> tuple: changed = changed_settings(settings, raw_settings) numeric = [ float(value) / max(float(raw_settings[key]), 1e-30) for key, value in changed.items() if key != "DoLateRadTruncation" ] return len(numeric), math.prod(numeric) if numeric else 1.0, tuple(sorted(changed)) def refine_numeric_boosts( settings: dict[str, float | bool], raw_settings: dict[str, float | bool], evaluator, tolerance: float, cost_function=None, ) -> tuple[dict[str, float | bool], ComparisonResult]: cost_function = settings_cost if cost_function is None else cost_function initial_settings = refine_discrete_accuracy_settings(settings, raw_settings) initial_comparison = evaluator(initial_settings) if initial_comparison is None or not initial_comparison.passed: raise RuntimeError("refine_numeric_boosts requires an initially passing settings point") numeric_keys = tuple( key for key in initial_settings if key != "DoLateRadTruncation" and float(initial_settings[key]) - float(raw_settings[key]) > tolerance ) best_settings, best_comparison = maybe_restore_late_rad_truncation( initial_settings, raw_settings, evaluator, initial_comparison ) if len(numeric_keys) == 1: key = numeric_keys[0] best_settings, best_comparison = refine_single_numeric_boost( best_settings, raw_settings, evaluator, tolerance, key, best_comparison, ) best_settings, best_comparison = maybe_restore_late_rad_truncation( best_settings, raw_settings, evaluator, best_comparison, ) return rounded_numeric_settings(best_settings), best_comparison starts: list[tuple[dict[str, float | bool], ComparisonResult]] = [(best_settings, best_comparison)] balanced_settings, balanced_comparison = refine_balanced_path( initial_settings, raw_settings, evaluator, tolerance, numeric_keys ) if balanced_comparison is not None: starts.append( maybe_restore_late_rad_truncation(balanced_settings, raw_settings, evaluator, balanced_comparison) ) key_orders = refinement_key_orders(numeric_keys) for start_settings, start_comparison in starts: for key_order in key_orders: refined_settings, refined_comparison = coordinate_refine_numeric_boosts( start_settings, raw_settings, evaluator, tolerance, key_order, start_comparison, ) if cost_function(refined_settings, raw_settings) < cost_function(best_settings, raw_settings): best_settings = refined_settings best_comparison = refined_comparison return rounded_numeric_settings(best_settings), best_comparison def maybe_restore_late_rad_truncation( settings: dict[str, float | bool], raw_settings: dict[str, float | bool], evaluator, comparison: ComparisonResult, ) -> tuple[dict[str, float | bool], ComparisonResult]: current = dict(settings) current_comparison = comparison if current.get("DoLateRadTruncation") != raw_settings.get("DoLateRadTruncation"): trial = dict(current) trial["DoLateRadTruncation"] = raw_settings["DoLateRadTruncation"] comparison = evaluator(trial) if comparison is not None and comparison.passed: current = trial current_comparison = comparison return current, current_comparison def refine_balanced_path( settings: dict[str, float | bool], raw_settings: dict[str, float | bool], evaluator, tolerance: float, numeric_keys: tuple[str, ...], ) -> tuple[dict[str, float | bool], ComparisonResult | None]: continuous_keys = tuple(key for key in numeric_keys if key not in DISCRETE_ACCURACY_VALUES) if len(continuous_keys) != len(numeric_keys): settings = refine_discrete_accuracy_settings(settings, raw_settings) if not numeric_keys: comparison = evaluator(settings) return dict(settings), comparison if not continuous_keys: comparison = evaluator(settings) return dict(settings), comparison span = max(float(settings[key]) - float(raw_settings[key]) for key in continuous_keys) if span <= tolerance: comparison = evaluator(settings) return dict(settings), comparison low = 0.0 high = 1.0 current = dict(settings) current_comparison = evaluator(current) while (high - low) * span > tolerance: mid = (low + high) / 2 trial = dict(settings) for key in continuous_keys: trial[key] = float(raw_settings[key]) + mid * (float(settings[key]) - float(raw_settings[key])) comparison = evaluator(trial) if comparison is None: break if comparison.passed: current = trial current_comparison = comparison high = mid else: low = mid return current, current_comparison def refinement_key_orders(numeric_keys: tuple[str, ...]) -> list[tuple[str, ...]]: if not numeric_keys: return [()] orders = [numeric_keys, tuple(reversed(numeric_keys))] orders.extend(numeric_keys[index:] + numeric_keys[:index] for index in range(1, len(numeric_keys))) return list(dict.fromkeys(orders)) def coordinate_refine_numeric_boosts( settings: dict[str, float | bool], raw_settings: dict[str, float | bool], evaluator, tolerance: float, key_order: tuple[str, ...], comparison: ComparisonResult, ) -> tuple[dict[str, float | bool], ComparisonResult]: current = dict(settings) current_comparison = comparison improved = True while improved: improved = False for key in key_order: before = float(current[key]) current, current_comparison = refine_single_numeric_boost( current, raw_settings, evaluator, tolerance, key, current_comparison ) improved = improved or float(current[key]) < before - tolerance / 2 return current, current_comparison def refine_single_numeric_boost( settings: dict[str, float | bool], raw_settings: dict[str, float | bool], evaluator, tolerance: float, key: str, comparison: ComparisonResult, ) -> tuple[dict[str, float | bool], ComparisonResult]: current = dict(settings) current_comparison = comparison if key in DISCRETE_ACCURACY_VALUES: return refine_discrete_numeric_boost(current, raw_settings, evaluator, key, current_comparison) low = float(raw_settings[key]) high = float(current[key]) if high - low <= tolerance: return current, current_comparison while high - low > tolerance: mid = (low + high) / 2 trial = dict(current) trial[key] = mid trial_comparison = evaluator(trial) if trial_comparison is None: break if trial_comparison.passed: current = trial current_comparison = trial_comparison high = mid else: low = mid return current, current_comparison def refine_discrete_accuracy_settings(settings: dict[str, float | bool], raw_settings: dict[str, float | bool]): refined = dict(settings) for key in DISCRETE_ACCURACY_VALUES: if key in refined and key in raw_settings: refined[key] = next_meaningful_accuracy_value(key, float(raw_settings[key]), float(refined[key])) return refined def refine_discrete_numeric_boost( settings: dict[str, float | bool], raw_settings: dict[str, float | bool], evaluator, key: str, comparison: ComparisonResult, ) -> tuple[dict[str, float | bool], ComparisonResult]: current = dict(settings) current_comparison = comparison candidates = discrete_accuracy_candidates(key, float(raw_settings[key]), float(current[key])) for value in candidates: trial = dict(current) trial[key] = value trial_comparison = evaluator(trial) if trial_comparison is not None and trial_comparison.passed: current = trial current_comparison = trial_comparison break return current, current_comparison def discrete_accuracy_candidates(key: str, raw_value: float, high_value: float) -> tuple[float, ...]: values = [raw_value] values.extend(value for value in DISCRETE_ACCURACY_VALUES[key] if raw_value < value <= high_value) if high_value not in values: values.append(high_value) return tuple(dict.fromkeys(values)) def rounded_numeric_settings( settings: dict[str, float | bool], ) -> dict[str, float | bool]: rounded = dict(settings) for key, value in rounded.items(): if key != "DoLateRadTruncation": rounded[key] = round(float(value), 6) return rounded def settings_key(settings: dict[str, float | bool]) -> tuple: return tuple( (key, bool(value) if key == "DoLateRadTruncation" else round(float(value), 8)) for key, value in sorted(settings.items()) ) def format_settings(settings: dict[str, float | bool]) -> str: preferred_order = ( "AccuracyBoost", "lSampleBoost", "lAccuracyBoost", "IntTolBoost", "DoLateRadTruncation", ) ordered_keys = [key for key in preferred_order if key in settings] + sorted( key for key in settings if key not in preferred_order ) return ", ".join(f"{key}={settings[key]}" for key in ordered_keys) def effective_lmax(args: argparse.Namespace) -> int | None: return args.lmax or args.set_for_lmax def main(argv: list[str] | None = None, *, prog: str | None = None) -> int: """Run the accuracy checker command-line interface.""" parser = build_parser(prog=prog) args = parser.parse_args(argv) ini_file = Path(args.ini_file) high_accuracy_settings = requested_accuracy_settings(args) print(f"Input ini: {ini_file}") print(f"High-accuracy settings: {format_settings(high_accuracy_settings)}") base_params = load_params(ini_file, no_validate=args.no_validate) if args.mpk_tolerance is None: args.mpk_tolerance = MPK_TOLERANCE if base_params.Transfer.high_precision else 3e-3 noise_config = None if args.chi2: try: noise_config = chi2_noise_config(args) except ValueError as exc: parser.error(str(exc)) result = compare_params_accuracy( base_params, reference_accuracy_settings=high_accuracy_settings, lmax=effective_lmax(args), set_for_lmax=args.set_for_lmax, lens_margin=args.lens_margin, lens_potential_accuracy=args.lens_potential_accuracy, reference_lens_margin=args.reference_lens_margin, reference_lens_potential_accuracy=args.reference_lens_potential_accuracy, mpk_kmin=args.mpk_kmin, mpk_npoints=args.mpk_npoints, derived_tolerance=args.derived_tolerance, mpk_tolerance=args.mpk_tolerance, chi2_config=noise_config, ) standard = result.standard reference = result.reference comparison = result.comparison print_run_summary(standard, reference) print_comparison(comparison) if args.chi2: print_chi2_result(result.chi2, noise_config.name) if args.plot_dir: plot_errors(standard, reference, args.plot_dir) if args.find_minimal_boosts: search_result = find_minimal_boosts( ini_file, args, reference, high_accuracy_settings, raw_comparison=comparison, raw_run=standard, ) settings, search_comparison = search_result if settings is None: print("\nNo passing candidate settings found.") print(f"\n{failure_summary(comparison)}") return 1 else: print(f"\nMinimal passing settings from search: {format_settings(settings)}") if isinstance(search_result, SearchResult): print_search_timing_summary(search_result) if search_comparison: print_comparison(search_comparison) if args.refine_accuracy_components: component_result = refine_accuracy_components(ini_file, args, reference, high_accuracy_settings) component_settings, component_comparison = component_result if component_settings is None: print("\nNo passing AccuracyBoost=1 component settings found.") else: print( "\nPassing AccuracyBoost=1 component settings: " f"{format_settings(changed_settings(component_settings, component_accuracy_settings(base_params)))}" ) if isinstance(component_result, SearchResult): print_search_timing_summary(component_result) if component_comparison: print_comparison(component_comparison) return 0 if comparison.passed: print("\nPASS: standard results are stable against boosted accuracy settings.") return 0 print(f"\n{failure_summary(comparison)}") return 1 if __name__ == "__main__": raise SystemExit(main())