""" window_correction_numba.py Numba-accelerated port of the Fortran-77 WINDOW algorithm (Price 1983). """ from __future__ import annotations import numpy as np from numba import njit, prange # ---------------------------------------------------------------------- # Physical constants # ---------------------------------------------------------------------- EM = 28.967 GO = 9.78 R = 8314.34 C1 = 1.191e-9 C2 = 1.439 B = R / (GO * EM) ANV_DEFAULT = 934.6 EPSLN_DEFAULT = 0.98 BADATA = 999.0 MISSING = -9999.0 PLEV = np.array([100., 200., 300., 500., 700., 900.]) DPRES = np.array([10., 20., 30., 50., 75., 100.]) # Max sublayers we ever expect after intermediate-level insertion. # With smallest DPRES=10 mb and surface~1013 mb -> top~0.05 mb, # 256 is comfortably safe. _MAX_SUB = 256 # ---------------------------------------------------------------------- # Scalar physics kernels (njit, inlined into hot loops) # ---------------------------------------------------------------------- @njit(cache=True, inline='always', fastmath=True) def plank(temp_k, anv): c1p = C1 * anv ** 3 c2p = C2 * anv return c1p / (np.exp(c2p / temp_k) - 1.0) @njit(cache=True, inline='always', fastmath=True) def aplank(radiance, anv): c1p = C1 * anv ** 3 c2p = C2 * anv return c2p / np.log(1.0 + c1p / radiance) @njit(cache=True, inline='always', fastmath=True) def dtaudp(w, t, e, p): dtaudp1 = ((4.18 + 5578.0 * np.exp(-7.87e-3 * w)) * np.exp(1800.0 * (1.0 / t - 1.0 / 296.0)) * (e / p + 0.002) * (0.622 / (101.3 * GO)) * e) return dtaudp1 + (0.00004 + e / 60000.0) # ---------------------------------------------------------------------- # Humidity -> vapor pressure (njit, in-place style on a copy) # ---------------------------------------------------------------------- @njit(cache=True) def humidity_to_vapor_pressure(ei, ti, pi_, ivap): n = ei.shape[0] out = np.empty(n) for i in range(n): v = ei[i] if ivap == 1: # rel humidity (%) expt = 7.5 * (ti[i] - 273.15) / (237.3 + ti[i] - 273.15) v = v / 100.0 * 6.11 * 10.0 ** expt elif ivap == 2: # abs humidity v = v * R * ti[i] / 18.02 elif ivap == 3: # mixing ratio (g/kg) v = v * pi_[i] / (0.622 + v) elif ivap == 4: # specific humidity v = v * pi_[i] / (0.378 * v + 0.622) elif ivap == 5: # dewpoint depression td = ti[i] - v - 273.15 expt = 7.5 * td / (237.3 + td) v = 6.11 * 10.0 ** expt elif ivap == 6: # vapor pressure (mb) pass elif ivap == 7: # dewpoint (K) td = v - 273.15 expt = 7.5 * td / (237.3 + td) v = 6.11 * 10.0 ** expt elif ivap == 8: # dewpoint (C) td = v expt = 7.5 * td / (237.3 + td) v = 6.11 * 10.0 ** expt if v < 0.01: v = 0.01 out[i] = v return out # ---------------------------------------------------------------------- # Single-profile window correction (Numba) # Returns: (tc15, tc55, pw, iflag, rdini, rdfin) # ---------------------------------------------------------------------- @njit(cache=True, fastmath=True) def window_profile_nb(pi_, ta, ei_in, angle, surt0, surt1, anv, epsln, ivap, plev, dpres): kz = pi_.shape[0] nlev = kz - 1 # ---- bad surface temps ---- if surt0 == 0.0 or surt1 == 0.0: return BADATA, BADATA, -9999.0, 2, np.nan, np.nan if angle > 90.0: angle = 80.0 ti = ta.copy() ei = ei_in.copy() # ---- humidity -> vapor pressure (only first nlev like Fortran) ---- ei[:nlev] = humidity_to_vapor_pressure(ei[:nlev], ti[:nlev], pi_[:nlev], ivap) # ---- precipitable water (cm) ---- sumpr = 0.0 for i in range(1, nlev): denom = pi_[i] - pi_[i - 1] if denom == 0.0: continue precip = (-0.622 / GO * 10.0 * ((ei[i - 1] * pi_[i] - ei[i] * pi_[i - 1]) * np.log(pi_[i] / pi_[i - 1]) / denom + ei[i] - ei[i - 1])) sumpr += precip # ---- intermediate-level insertion + Simpson optical-depth integration ---- P = np.empty(_MAX_SUB) Earr = np.empty(_MAX_SUB) TT = np.empty(_MAX_SUB) EMIS = np.empty(_MAX_SUB) TAU = np.empty(_MAX_SUB) P[0] = pi_[0] Earr[0] = ei[0] TT[0] = ti[0] n_sub = 1 # number of P/E/TT entries written j = 0 mlev = 0 dpp = 0.0 optd = 0.0 egr = (ei[1] - ei[0]) / (pi_[1] - pi_[0]) tgr = (ti[1] - ti[0]) / (pi_[1] - pi_[0]) max_iter = 10 * kz it = 0 while it < max_iter and n_sub < _MAX_SUB: it += 1 ilp = int(((pi_[j] - dpp) - pi_[j + 1]) / dpres[mlev] + 0.999) if ilp < 1: ilp = 1 dlp = ((pi_[j] - dpp) - pi_[j + 1]) / ilp p_prev = P[n_sub - 1] e_prev = Earr[n_sub - 1] tt_prev = TT[n_sub - 1] p_next = p_prev - dlp e_next = e_prev + egr * (p_next - p_prev) tt_next = tt_prev + tgr * (p_next - p_prev) emis_layer = 0.5 * (plank(tt_prev, anv) + plank(tt_next, anv)) tau_layer = ((p_prev - p_next) * (dtaudp(anv, tt_prev, e_prev, p_prev) + dtaudp(anv, tt_next, e_next, p_next) + 4.0 * dtaudp(anv, 0.5 * (tt_prev + tt_next), 0.5 * (e_prev + e_next), 0.5 * (p_prev + p_next))) / 6.0) EMIS[n_sub - 1] = emis_layer TAU[n_sub - 1] = tau_layer optd += tau_layer P[n_sub] = p_next Earr[n_sub] = e_next TT[n_sub] = tt_next n_sub += 1 # Hit the next reported input level? if abs(p_next - pi_[j + 1]) <= 0.01: j += 1 if j + 1 < kz: dp = pi_[j + 1] - pi_[j] if dp != 0.0: egr = (ei[j + 1] - ei[j]) / dp tgr = (ti[j + 1] - ti[j]) / dp dpp = 0.0 # Coarsen sub-layer thickness once we're above PLEV(mlev) if (pi_[0] - p_next) >= plev[mlev] and mlev < plev.shape[0] - 1: mlev += 1 dpp = pi_[j] - p_next # Reached the top reported level? if abs(p_next - pi_[nlev]) < 0.1: break # M = number of integrated sublayers (matches Fortran's M) M = n_sub - 1 # ---- radiative-transfer integration (upwelling sky radiance) ---- cs = np.cos(angle * np.pi / 180.0) if cs <= 0.0: return MISSING, MISSING, MISSING, 2, np.nan, np.nan trans = np.exp(-optd / cs) # Compute SKY iteratively. Only the top value (SKY[M]) is needed # downstream, so we use a scalar accumulator instead of an array. if M >= 1: sky = EMIS[0] * (1.0 - np.exp(-TAU[0] / cs)) else: sky = 0.0 for i in range(2, M + 1): sky = EMIS[i - 1] + (sky - EMIS[i - 1]) * np.exp(-TAU[i - 1] / cs) sky_top = sky # ---- surface-temperature correction (Fortran DO 760) ---- tc15_out = BADATA tc55_out = BADATA rdini = np.nan rdfin = np.nan iflag = 0 for i in range(2): surt_i = surt0 if i == 0 else surt1 if trans <= 0.001: grndrad = 0.0 else: grndrad = (plank(surt_i, anv) - sky_top * (1.0 + trans * (1.0 - epsln))) / trans if grndrad <= 0.0: # Cloudy point -- bail out, return missing return MISSING, MISSING, MISSING, 2, np.nan, np.nan trad = aplank(grndrad / epsln, anv) corr = trad - surt_i if i == 0: tc15_out = trad - surt_i rdini = surt_i + corr else: tc55_out = trad - surt_i rdfin = surt_i + corr return tc15_out, tc55_out, sumpr, iflag, rdini, rdfin # ---------------------------------------------------------------------- # Parallel grid driver (Numba, prange across analysis-grid points) # ---------------------------------------------------------------------- @njit(cache=True, parallel=True, fastmath=True) def window_grid_nb(pres, theta, spfh, satang, radini, radfin, iflag, lookup_i, lookup_j, anv, epsln, ivap, plev, dpres): """ Apply the window correction over a 2-D analysis grid in parallel. Inputs (all NumPy arrays): pres (nx_model, ny_model, kz) pressure in Pa theta (nx_model, ny_model, kz) temperature (K) spfh (nx_model, ny_model, kz) specific humidity (or per `ivap`) satang (ilg, jlg) view angle (deg) -- per Fortran's final assignment ANGLE = satang radini, radfin (ilg, jlg) surface BTs at two times (K) iflag (ilg, jlg) input QC flag (0 = process) lookup_i, lookup_j (ilg, jlg) map analysis grid -> model grid plev, dpres pressure-bin / sublayer-thickness arrays Returns ------- tc15, tc55, pw, iflag_out : (ilg, jlg) ndarrays """ ilg = radini.shape[0] jlg = radini.shape[1] kz = pres.shape[2] tc15 = np.full((ilg, jlg), MISSING) tc55 = np.full((ilg, jlg), MISSING) pw = np.full((ilg, jlg), MISSING) iflag_out = iflag.copy() # Parallel over the outer (ja) loop; the inner ia loop runs serially # within each thread. This avoids race conditions on the output arrays. for ja in prange(jlg): PI_col = np.empty(kz) TA_col = np.empty(kz) EI_col = np.empty(kz) for ia in range(ilg): if iflag_out[ia, ja] >= 1: # Already flagged: leave outputs at MISSING continue ipt = lookup_i[ia, ja] jpt = lookup_j[ia, ja] if ipt == -9999 or jpt == -9999: ipt = 0 jpt = 0 # Surface pressure check (use surface-most model level) sfpr = pres[ipt, jpt, kz - 1] / 100.0 if sfpr == 0.0: continue # Build the column with vertical index flipped so [0] = surface for i in range(kz): k_model = kz - 1 - i PI_col[i] = pres[ipt, jpt, k_model] / 100.0 TA_col[i] = theta[ipt, jpt, k_model] EI_col[i] = spfh[ipt, jpt, k_model] angle = satang[ia, ja] tc15_v, tc55_v, pw_v, iflag_v, _rdi, _rdf = window_profile_nb( PI_col, TA_col, EI_col, angle, radini[ia, ja], radfin[ia, ja], anv, epsln, ivap, plev, dpres ) tc15[ia, ja] = tc15_v tc55[ia, ja] = tc55_v pw[ia, ja] = pw_v iflag_out[ia, ja] = iflag_v return tc15, tc55, pw, iflag_out # ---------------------------------------------------------------------- # Convenience wrapper with sane defaults # ---------------------------------------------------------------------- def run_window_correction(pres, theta, spfh, satang, radini, radfin, iflag, lookup_i, lookup_j, anv=ANV_DEFAULT, epsln=EPSLN_DEFAULT, ivap=4): """ Pure-Python convenience wrapper that calls the Numba kernel. Casts inputs to contiguous float64 / int64 arrays (matches the kernel signatures) and supplies the PLEV / DPRES tables. Parameters ---------- pres : (nx_model, ny_model, kz) pressure in Pa theta : (nx_model, ny_model, kz) temperature in K spfh : (nx_model, ny_model, kz) specific humidity (or as per `ivap`) satang : (ilg, jlg) view angle in degrees radini : (ilg, jlg) surface brightness temperature, time 1 (K) radfin : (ilg, jlg) surface brightness temperature, time 2 (K) iflag : (ilg, jlg) integer QC flag (0 = process, >=1 = skip) lookup_i, lookup_j : (ilg, jlg) integer indices mapping analysis grid -> model grid anv : wavenumber (cm^-1), default = GOES-I imager 10.7 micron epsln : surface emissivity ivap : humidity-type index (4 = specific humidity) Returns ------- tc15, tc55, pw, iflag_out : (ilg, jlg) ndarrays tc15, tc55 : brightness-temperature corrections (K) pw : column precipitable water (cm) iflag_out : updated QC flag (2 = cloudy / could not be corrected) """ return window_grid_nb( np.ascontiguousarray(pres, dtype=np.float64), np.ascontiguousarray(theta, dtype=np.float64), np.ascontiguousarray(spfh, dtype=np.float64), np.ascontiguousarray(satang, dtype=np.float64), np.ascontiguousarray(radini, dtype=np.float64), np.ascontiguousarray(radfin, dtype=np.float64), np.ascontiguousarray(iflag, dtype=np.int64), np.ascontiguousarray(lookup_i, dtype=np.int64), np.ascontiguousarray(lookup_j, dtype=np.int64), float(anv), float(epsln), int(ivap), PLEV.astype(np.float64), DPRES.astype(np.float64), ) # ---------------------------------------------------------------------- # Example / smoke test # ---------------------------------------------------------------------- if __name__ == "__main__": # Single-column smoke test using the Numba kernel directly. PI_test = np.array([1013., 950., 900., 850., 800., 700., 600., 500., 400., 300., 250., 200., 150., 100., 70., 50., 30., 20., 10., 5., 2., 1., 0.5, 0.2, 0.1, 0.05]) nlev = len(PI_test) TA_test = np.linspace(295., 215., nlev) EI_test = 0.012 * np.exp(-(PI_test[0] - PI_test) / 300.) tc15, tc55, pw, iflag, rdi, rdf = window_profile_nb( PI_test, TA_test, EI_test, 55.0, # angle (deg) 300.0, 305.0, # surt0, surt1 ANV_DEFAULT, EPSLN_DEFAULT, 4, PLEV.astype(np.float64), DPRES.astype(np.float64) ) print(f"Precipitable water (cm) : {pw:.3f}") print(f"BT correction tc15 (K) : {tc15:.3f}") print(f"BT correction tc55 (K) : {tc55:.3f}") print(f"iflag : {iflag}")