From 3d48f50eabeea85100bfb7f8114efe9d64bd3fdd Mon Sep 17 00:00:00 2001 From: tanaya-g <140018475+tanaya-g@users.noreply.github.com> Date: Mon, 18 Aug 2025 14:28:09 -0700 Subject: [PATCH 1/9] added indpdt_verif2 function for test --- cfr/reconres.py | 66 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/cfr/reconres.py b/cfr/reconres.py index d6686f9c..7b30b28a 100644 --- a/cfr/reconres.py +++ b/cfr/reconres.py @@ -247,6 +247,72 @@ def indpdt_verif(self, job_path, verbose=False, calib_period=(1850, 2000),min_ve p_success(f">>> Records Number: {len(indpdt_info)}") return indpdt_info + def indpdt_verif2(self, job_path, verbose=False, calib_period=(1850,2000), min_verif_len=10): + """ + Perform independent verification (version 2). + """ + job = ReconJob() # Remove 'cfr.' since you're already inside the cfr module + job.load(job_path) + indpdt_info = [] + + for path_index, path in enumerate(self.paths): # Use self.paths + lbls = self.proxy_labels[path_index] # Use self.proxy_labels + + # Load a full prior pool (so TRW can get 'pr', etc.) + job.load_clim(tag="analysis", + path_dict={"tas": path}, + anom_period=(1951, 1980)) + + # Ensure a prior pool exists (from the saved job config) for any other vars PSMs might need + job.load_clim(tag="prior", + path_dict=None, + anom_period=(850, 1850)) + + # Make the forward step use recon tas (1–2000) while leaving other vars (e.g., pr) as in prior + job.prior['tas'] = job.analysis['tas'] + + job.forward_psms(verbose=verbose) + if verbose: + print(f">>> Validating against {path}") + + # compare pseudo-proxy to observed + calib_PDB = job.proxydb.filter(by="tag", keys=["calibrated"]) + for pname, proxy in calib_PDB.records.items(): + detail = getattr(proxy.psm, 'calib_details', {}) + attr = { + 'name': pname, + 'seasonality': detail.get('seasonality', None), + 'assim': True if pname in lbls['pids_assim'] else False if pname in lbls['pids_eval'] else None, + } + + reconstructed = pd.DataFrame({'time': proxy.pseudo.time, 'estimated': proxy.pseudo.value}) + real = pd.DataFrame({'time': proxy.time, 'observed': proxy.value}) + Df = real.dropna().merge(reconstructed, on='time', how='inner').set_index('time').sort_index() + + masks = { + 'all': None, + 'in': (Df.index >= calib_period[0]) & (Df.index <= calib_period[1]), + 'before': (Df.index < calib_period[0]), + } + for mname, m in masks.items(): + Dfm = Df if m is None else Df[m] + if len(Dfm) < min_verif_len: + corr = np.nan; ce = np.nan + else: + corr = Dfm[['observed','estimated']].corr().iloc[0,1] + ce = utils.coefficient_efficiency(Dfm.observed.values, Dfm.estimated.values) + attr[f'{mname}_corr'] = corr + attr[f'{mname}_ce'] = ce + + indpdt_info.append(attr) + + self.indpdt_info = pd.DataFrame(indpdt_info) # Store as instance attribute + if verbose: + p_success(f">>> indpdt verification completed, results stored in ReconRes.indpdt_info") + p_success(f">>> Records Number: {len(self.indpdt_info)}") + + return self.indpdt_info + def plot_indpdt_verif(self): """ Plot the indpdt verification results. From 1af7aedabe1d673fc294104f19f948e7f986ef90 Mon Sep 17 00:00:00 2001 From: tanaya-g <140018475+tanaya-g@users.noreply.github.com> Date: Wed, 18 Mar 2026 21:47:57 -0700 Subject: [PATCH 2/9] fixes for indpdt_verif function to not trim prior range --- cfr/reconres.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/cfr/reconres.py b/cfr/reconres.py index 7b30b28a..7abca98f 100644 --- a/cfr/reconres.py +++ b/cfr/reconres.py @@ -187,6 +187,17 @@ def indpdt_verif(self, job_path, verbose=False, calib_period=(1850, 2000),min_ve }, anom_period=(1951, 1980), ) + # Reconstruction files use integer year coordinates, not datetime64, + # so annualize() cannot access .month on them. Mark the field as + # already annualized so ClimateField.annualize() returns it as-is. + job.prior['tas'].da.attrs['annualized'] = 1 + # Clear only the model.tas cache so forward_psms() re-fetches it + # from the newly loaded prior. Leave other variables (e.g. model.pr) + # intact — they came from the original prior in the pickle and are + # still valid; we have not replaced them in job.prior. + for pobj in job.proxydb.records.values(): + if 'clim' in pobj.__dict__ and 'model.tas' in pobj.clim: + del pobj.clim['model.tas'] job.forward_psms(verbose=verbose) if verbose: p_success(f">>> Prior loaded from {path}") From 6760b8beeaaca3ae6506dfd599ef987bf30c37f3 Mon Sep 17 00:00:00 2001 From: tanaya-g <140018475+tanaya-g@users.noreply.github.com> Date: Thu, 19 Mar 2026 16:55:00 -0700 Subject: [PATCH 3/9] update to proxy.py for duplicate lat/lon fix for indpdt_verif function --- cfr/proxy.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/cfr/proxy.py b/cfr/proxy.py index 8edda375..2a09d346 100644 --- a/cfr/proxy.py +++ b/cfr/proxy.py @@ -482,7 +482,14 @@ def get_clim(self, fields, tag=None, verbose=False, search_dist=5, load=True, ** if tag is not None: name = f'{tag}.{name}' - nda = field.da.sel(lat=self.lat, lon=self.lon, **_kwargs) + da = field.da + if da.indexes.get('lat') is not None and not da.indexes['lat'].is_unique: + _, lat_idx = np.unique(da['lat'].values, return_index=True) + da = da.isel(lat=lat_idx) + if da.indexes.get('lon') is not None and not da.indexes['lon'].is_unique: + _, lon_idx = np.unique(da['lon'].values, return_index=True) + da = da.isel(lon=lon_idx) + nda = da.sel(lat=self.lat, lon=self.lon, **_kwargs) if np.all(np.isnan(nda.values)) and search_dist is not None: for i in range(1, search_dist+1): p_header(f'{self.pid} >>> Nearest climate is NaN. Searching around within distance of {i} deg ...') From 85800afbc818bf5f22563c6641f520ffa5341976 Mon Sep 17 00:00:00 2001 From: tanaya-g <140018475+tanaya-g@users.noreply.github.com> Date: Tue, 24 Mar 2026 10:29:56 -0700 Subject: [PATCH 4/9] indpdt_verif plotting fix calib_period change in reconres.py --- cfr/reconres.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cfr/reconres.py b/cfr/reconres.py index 7abca98f..dc912bbf 100644 --- a/cfr/reconres.py +++ b/cfr/reconres.py @@ -253,6 +253,7 @@ def indpdt_verif(self, job_path, verbose=False, calib_period=(1850, 2000),min_ve indpdt_info.append(attr_dict) indpdt_info = pd.DataFrame(indpdt_info) self.indpdt_info = indpdt_info + self.indpdt_calib_period = calib_period if verbose: p_success(f">>> indpdt verification completed, results stored in ReconRes.indpdt_info") p_success(f">>> Records Number: {len(indpdt_info)}") @@ -328,5 +329,6 @@ def plot_indpdt_verif(self): """ Plot the indpdt verification results. """ - fig, axs = visual.plot_indpdt_dist(self.indpdt_info) + calib_period = getattr(self, 'indpdt_calib_period', [1850, 2000]) + fig, axs = visual.plot_indpdt_dist(self.indpdt_info, calib_period=calib_period) return fig, axs \ No newline at end of file From 6fa12954e4f2fb6c9904832bd65122b595050430 Mon Sep 17 00:00:00 2001 From: tanaya-g <140018475+tanaya-g@users.noreply.github.com> Date: Wed, 25 Mar 2026 10:33:05 -0700 Subject: [PATCH 5/9] update model.pr cache --- cfr/reconres.py | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/cfr/reconres.py b/cfr/reconres.py index dc912bbf..70534082 100644 --- a/cfr/reconres.py +++ b/cfr/reconres.py @@ -180,24 +180,34 @@ def indpdt_verif(self, job_path, verbose=False, calib_period=(1850, 2000),min_ve indpdt_info = [] for path_index ,path in enumerate(self.paths): proxy_labels = self.proxy_labels[path_index] + # Save original prior fields before load_clim wipes job.prior. + saved_prior = {k: v for k, v in job.prior.items()} + # Detect which prior variables are available in the reconstruction file. + with xr.open_dataset(path) as ds_check: + recon_vars_in_file = [k for k in saved_prior if k in ds_check] + path_dict = {vn: path for vn in recon_vars_in_file} job.load_clim( tag="prior", - path_dict={ - "tas": path, - }, + path_dict=path_dict, anom_period=(1951, 1980), ) + # Restore any prior variables not present in the reconstruction file + # (e.g. pr if only tas was reconstructed). + for k, v in saved_prior.items(): + if k not in job.prior: + job.prior[k] = v # Reconstruction files use integer year coordinates, not datetime64, - # so annualize() cannot access .month on them. Mark the field as - # already annualized so ClimateField.annualize() returns it as-is. - job.prior['tas'].da.attrs['annualized'] = 1 - # Clear only the model.tas cache so forward_psms() re-fetches it - # from the newly loaded prior. Leave other variables (e.g. model.pr) - # intact — they came from the original prior in the pickle and are - # still valid; we have not replaced them in job.prior. + # so annualize() cannot access .month on them. Mark all reconstructed + # fields as already annualized so ClimateField.annualize() returns them as-is. + for vn in recon_vars_in_file: + job.prior[vn].da.attrs['annualized'] = 1 + # Clear all model.* clim caches so forward_psms() re-fetches every + # variable (tas from the reconstruction, pr etc. from the restored prior). for pobj in job.proxydb.records.values(): - if 'clim' in pobj.__dict__ and 'model.tas' in pobj.clim: - del pobj.clim['model.tas'] + if 'clim' in pobj.__dict__: + stale_keys = [k for k in pobj.clim if k.startswith('model.')] + for k in stale_keys: + del pobj.clim[k] job.forward_psms(verbose=verbose) if verbose: p_success(f">>> Prior loaded from {path}") From 0c36d4e0a99261f56972b553e5cb3ab12d82a73e Mon Sep 17 00:00:00 2001 From: tanaya-g <140018475+tanaya-g@users.noreply.github.com> Date: Wed, 25 Mar 2026 10:39:24 -0700 Subject: [PATCH 6/9] Update reconres.py --- cfr/reconres.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/cfr/reconres.py b/cfr/reconres.py index 70534082..01871420 100644 --- a/cfr/reconres.py +++ b/cfr/reconres.py @@ -168,7 +168,7 @@ def load_proxylabels(self, verbose=False): if verbose: p_success(f">>> ReconRes.proxy_labels created") - def indpdt_verif(self, job_path, verbose=False, calib_period=(1850, 2000),min_verif_len=10): + def indpdt_verif(self, job_path, verbose=False, calib_period=(1850, 2000),min_verif_len=10, debug=False): """ Perform independent verification. job_path (str): the path to the job. @@ -209,6 +209,17 @@ def indpdt_verif(self, job_path, verbose=False, calib_period=(1850, 2000),min_ve for k in stale_keys: del pobj.clim[k] job.forward_psms(verbose=verbose) + if debug and path_index == 0: + trw_proxies = [(pid, p) for pid, p in job.proxydb.records.items() + if getattr(p, 'ptype', '') == 'tree.TRW'] + for pid, p in trw_proxies[:2]: + print(f"\n[debug] {pid} ({p.ptype})") + for key in ['model.tas', 'model.pr']: + if hasattr(p, 'clim') and key in p.clim and p.clim[key] is not None: + da = p.clim[key].da + print(f" {key}: {da.time.values[0]} → {da.time.values[-1]}, len={len(da.time)}") + else: + print(f" {key}: NOT FOUND") if verbose: p_success(f">>> Prior loaded from {path}") # compare the pesudo-proxy records with the real records From a62e6041e5cc7e811fbb81e61b59fca6b6b0ffba Mon Sep 17 00:00:00 2001 From: tanaya-g <140018475+tanaya-g@users.noreply.github.com> Date: Wed, 25 Mar 2026 10:51:50 -0700 Subject: [PATCH 7/9] Update reconres.py --- cfr/reconres.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cfr/reconres.py b/cfr/reconres.py index 01871420..1e0de4ea 100644 --- a/cfr/reconres.py +++ b/cfr/reconres.py @@ -228,6 +228,7 @@ def indpdt_verif(self, job_path, verbose=False, calib_period=(1850, 2000),min_ve detail = proxy.psm.calib_details attr_dict = {} attr_dict['name'] = pname + attr_dict['ptype'] = proxy.ptype attr_dict['seasonality'] = detail['seasonality'] if pname in proxy_labels['pids_assim']: attr_dict['assim'] = True From 20ddedfa32a20e8f5d89c339cdfc843e71002534 Mon Sep 17 00:00:00 2001 From: tanaya-g <140018475+tanaya-g@users.noreply.github.com> Date: Thu, 26 Mar 2026 11:19:16 -0700 Subject: [PATCH 8/9] Update reconres.py --- cfr/reconres.py | 98 +++++++++++++++++++++++++++++++++++-------------- 1 file changed, 70 insertions(+), 28 deletions(-) diff --git a/cfr/reconres.py b/cfr/reconres.py index 1e0de4ea..1c17d45b 100644 --- a/cfr/reconres.py +++ b/cfr/reconres.py @@ -168,47 +168,77 @@ def load_proxylabels(self, verbose=False): if verbose: p_success(f">>> ReconRes.proxy_labels created") - def indpdt_verif(self, job_path, verbose=False, calib_period=(1850, 2000),min_verif_len=10, debug=False): + def indpdt_verif(self, job_path, verbose=False, calib_period=(1850, 2000), min_verif_len=10, debug=False): """ Perform independent verification. job_path (str): the path to the job. verbose (bool, optional): print verbose information. Defaults to False. + debug (bool, optional): print diagnostic info on the first iteration. Defaults to False. """ - # load the reconstructions for the "prior" - job = ReconJob() - job.load(job_path) + try: + import psutil, os as _os + def _rss_mb(): + return psutil.Process(_os.getpid()).memory_info().rss / 1e6 + except ImportError: + def _rss_mb(): + return float('nan') + indpdt_info = [] - for path_index ,path in enumerate(self.paths): + for path_index, path in enumerate(self.paths): + # Recreate job each iteration so proxy clim caches and prior fields + # never accumulate across ensemble members. + job = ReconJob() + job.load(job_path) + proxy_labels = self.proxy_labels[path_index] - # Save original prior fields before load_clim wipes job.prior. - saved_prior = {k: v for k, v in job.prior.items()} - # Detect which prior variables are available in the reconstruction file. + + # Identify which prior variables are present in the reconstruction file. + # Variables absent from the file (e.g. pr in a tas-only reconstruction) + # are kept from the freshly loaded original prior. with xr.open_dataset(path) as ds_check: - recon_vars_in_file = [k for k in saved_prior if k in ds_check] - path_dict = {vn: path for vn in recon_vars_in_file} + recon_vars_in_file = [k for k in job.prior if k in ds_check] + non_recon_prior = {k: v for k, v in job.prior.items() if k not in recon_vars_in_file} + job.load_clim( tag="prior", - path_dict=path_dict, + path_dict={vn: path for vn in recon_vars_in_file}, anom_period=(1951, 1980), ) - # Restore any prior variables not present in the reconstruction file - # (e.g. pr if only tas was reconstructed). - for k, v in saved_prior.items(): - if k not in job.prior: - job.prior[k] = v - # Reconstruction files use integer year coordinates, not datetime64, - # so annualize() cannot access .month on them. Mark all reconstructed - # fields as already annualized so ClimateField.annualize() returns them as-is. + # Restore variables not in the reconstruction file from the original prior. + if non_recon_prior and path_index == 0: + p_warning(f">>> Variables {list(non_recon_prior.keys())} not found in reconstruction file — using original prior (e.g. CCSM4) for these variables.") + job.prior.update(non_recon_prior) + del non_recon_prior + + # Mark reconstructed fields as already annualized (integer year coords) + # and collapse any ensemble dimension by mean before get_clim extracts + # a point time series, to avoid OOM at full proxy/ensemble scale. for vn in recon_vars_in_file: + if 'ens' in job.prior[vn].da.dims: + job.prior[vn].da = job.prior[vn].da.mean(dim='ens') job.prior[vn].da.attrs['annualized'] = 1 - # Clear all model.* clim caches so forward_psms() re-fetches every - # variable (tas from the reconstruction, pr etc. from the restored prior). + + # Clear only the model.* keys each proxy's PSM actually needs, + # so forward_psms() re-fetches them from the updated prior. for pobj in job.proxydb.records.values(): - if 'clim' in pobj.__dict__: - stale_keys = [k for k in pobj.clim if k.startswith('model.')] - for k in stale_keys: - del pobj.clim[k] + if 'clim' not in pobj.__dict__: + continue + for vn in pobj.psm.climate_required: + key = f'model.{vn}' + if key in pobj.clim: + del pobj.clim[key] + + if debug and path_index == 0: + trw_proxies = [(pid, p) for pid, p in job.proxydb.records.items() + if getattr(p, 'ptype', '') == 'tree.TRW'] + print(f"[debug] total calibrated proxies: {job.proxydb.filter(by='tag', keys=['calibrated']).nrec}") + print(f"[debug] TRW proxies: {len(trw_proxies)}") + print(f"[debug] prior keys loaded: {list(job.prior.keys())}") + print(f"[debug] recon vars from file: {recon_vars_in_file}") + print(f"[debug] RSS before forward_psms: {_rss_mb():.0f} MB") + job.forward_psms(verbose=verbose) + if debug and path_index == 0: trw_proxies = [(pid, p) for pid, p in job.proxydb.records.items() if getattr(p, 'ptype', '') == 'tree.TRW'] @@ -220,9 +250,20 @@ def indpdt_verif(self, job_path, verbose=False, calib_period=(1850, 2000),min_ve print(f" {key}: {da.time.values[0]} → {da.time.values[-1]}, len={len(da.time)}") else: print(f" {key}: NOT FOUND") + + # Drop heavy model.* clim fields now that pseudo values are computed. + # obs.* fields are left intact as they are small and may be needed. + for pobj in job.proxydb.records.values(): + if hasattr(pobj, 'clim'): + pobj.clim = {k: v for k, v in pobj.clim.items() if not k.startswith('model.')} + + if debug and path_index == 0: + print(f"[debug] RSS after clim cleanup: {_rss_mb():.0f} MB") + if verbose: p_success(f">>> Prior loaded from {path}") - # compare the pesudo-proxy records with the real records + + # Compare pseudo-proxy records with real proxy observations. calib_PDB = job.proxydb.filter(by="tag", keys=["calibrated"]) for i, (pname, proxy) in enumerate(calib_PDB.records.items()): detail = proxy.psm.calib_details @@ -254,8 +295,8 @@ def indpdt_verif(self, job_path, verbose=False, calib_period=(1850, 2000),min_ve Df.astype(float) masks = { "all": None, - "in": (Df.index >= calib_period[0]) & (Df.index <= calib_period[1]), # in the calibration period - "before": (Df.index < calib_period[0]), # before the calibration period + "in": (Df.index >= calib_period[0]) & (Df.index <= calib_period[1]), + "before": (Df.index < calib_period[0]), } for mask_name, mask in masks.items(): if mask is not None: @@ -273,6 +314,7 @@ def indpdt_verif(self, job_path, verbose=False, calib_period=(1850, 2000),min_ve attr_dict[mask_name + '_corr'] = corr attr_dict[mask_name + '_ce'] = ce indpdt_info.append(attr_dict) + indpdt_info = pd.DataFrame(indpdt_info) self.indpdt_info = indpdt_info self.indpdt_calib_period = calib_period From dc05a45898ecc100dd592de49d6967a0782ab747 Mon Sep 17 00:00:00 2001 From: tanaya-g <140018475+tanaya-g@users.noreply.github.com> Date: Thu, 2 Apr 2026 10:20:21 -0700 Subject: [PATCH 9/9] remove old indpdt_verif2 test function --- cfr/reconres.py | 66 ------------------------------------------------- 1 file changed, 66 deletions(-) diff --git a/cfr/reconres.py b/cfr/reconres.py index 1c17d45b..599121c6 100644 --- a/cfr/reconres.py +++ b/cfr/reconres.py @@ -322,73 +322,7 @@ def _rss_mb(): p_success(f">>> indpdt verification completed, results stored in ReconRes.indpdt_info") p_success(f">>> Records Number: {len(indpdt_info)}") return indpdt_info - - def indpdt_verif2(self, job_path, verbose=False, calib_period=(1850,2000), min_verif_len=10): - """ - Perform independent verification (version 2). - """ - job = ReconJob() # Remove 'cfr.' since you're already inside the cfr module - job.load(job_path) - indpdt_info = [] - - for path_index, path in enumerate(self.paths): # Use self.paths - lbls = self.proxy_labels[path_index] # Use self.proxy_labels - - # Load a full prior pool (so TRW can get 'pr', etc.) - job.load_clim(tag="analysis", - path_dict={"tas": path}, - anom_period=(1951, 1980)) - # Ensure a prior pool exists (from the saved job config) for any other vars PSMs might need - job.load_clim(tag="prior", - path_dict=None, - anom_period=(850, 1850)) - - # Make the forward step use recon tas (1–2000) while leaving other vars (e.g., pr) as in prior - job.prior['tas'] = job.analysis['tas'] - - job.forward_psms(verbose=verbose) - if verbose: - print(f">>> Validating against {path}") - - # compare pseudo-proxy to observed - calib_PDB = job.proxydb.filter(by="tag", keys=["calibrated"]) - for pname, proxy in calib_PDB.records.items(): - detail = getattr(proxy.psm, 'calib_details', {}) - attr = { - 'name': pname, - 'seasonality': detail.get('seasonality', None), - 'assim': True if pname in lbls['pids_assim'] else False if pname in lbls['pids_eval'] else None, - } - - reconstructed = pd.DataFrame({'time': proxy.pseudo.time, 'estimated': proxy.pseudo.value}) - real = pd.DataFrame({'time': proxy.time, 'observed': proxy.value}) - Df = real.dropna().merge(reconstructed, on='time', how='inner').set_index('time').sort_index() - - masks = { - 'all': None, - 'in': (Df.index >= calib_period[0]) & (Df.index <= calib_period[1]), - 'before': (Df.index < calib_period[0]), - } - for mname, m in masks.items(): - Dfm = Df if m is None else Df[m] - if len(Dfm) < min_verif_len: - corr = np.nan; ce = np.nan - else: - corr = Dfm[['observed','estimated']].corr().iloc[0,1] - ce = utils.coefficient_efficiency(Dfm.observed.values, Dfm.estimated.values) - attr[f'{mname}_corr'] = corr - attr[f'{mname}_ce'] = ce - - indpdt_info.append(attr) - - self.indpdt_info = pd.DataFrame(indpdt_info) # Store as instance attribute - if verbose: - p_success(f">>> indpdt verification completed, results stored in ReconRes.indpdt_info") - p_success(f">>> Records Number: {len(self.indpdt_info)}") - - return self.indpdt_info - def plot_indpdt_verif(self): """ Plot the indpdt verification results.