Source code for djura.hazard_consistency.hazard_model

# SPDX-License-Identifier: AGPL-3.0-or-later
# Copyright (C) 2025-2026 Djura | Risk - Data - Engineering S.r.l.
from typing import List, Union
from pathlib import Path
import json
import numpy as np
from scipy.optimize import fsolve

from .hazard_fit import analytical_mafe, HazardFit
from .models import HazardModelSchema


[docs] class HazardModel: def __init__(self, method: str = 'analytical-sac-fema') -> None: """Initialize Hazard module Parameters ---------- method : str Method to fit the hazard data, by default 'analytical-sac-fema' 'analytical-sac-fema' - analytical approach (recommended) 'least-squares' - least squares fitting method 'power-law' - loglinear power law constrained at two intensity levels 'analytical' - analytical approach, not SAC/FEMA-compatible """ if method == "": method = "analytical-sac-fema" self.method = method.lower() def _remove_zeros(self, x, y) -> np.array: """Removes trailing zeros to avoid bias in fitting Parameters ---------- x : List y : List Returns ------- ndarray, ndarray """ x = np.array(x) y = np.array(y) x = x[y > 1.00000000e-12] y = y[y > 1.00000000e-12] return x, y
[docs] def read_hazard(self, filename: Union[Path, str]): """Reads provided hazard data Parameters ------- filename : Path or str, "*.txt", "*.json" Returns ------- data : dict s: list intensity measure values in [g] poe: list Probability of exceedance (PoE) """ filename = Path(filename) extension = filename.suffix if extension == ".txt": s, poes = np.transpose(np.loadtxt(filename)) data = {'s': list(s), 'poe': list(poes)} elif extension == ".json": data = json.load(open(filename)) else: raise ValueError("Extension of hazard file not supported or file " "format is wrong! Supported options: " ".pickle, .json") return data
[docs] def fit_hazard_model( self, s: List, mafe: List, iterator: List[int] = None, dbe: float = 475., mce: float = 10000.) -> HazardModelSchema: """Runs the fitting function Parameters ------- s : List intensity measure values in [g] mafe : List Mean annual frequency of exceeding an IM value (MAFE) iterator : List[int] List of 3 integer values where the fitting will be prioritized Necessary only for method="analytical-sac-fema" dbe : float First return period Only for haz_fit='power-law' mce : float Second return period Only for haz_fit='power-law' Returns ------- HazardModelSchema s : List Original IM range mafe : List Original MAFE s_fit : List Fitted IM range mafe_fit : List Fitted MAFE coef : List[float] if method is 'analytical': Fitting coefficients [H_asy, s_asy, alpha] if method is 'power-law': SAC/FEMA-compatible coefficients, [k0, k1] else: SAC/FEMA-compatible coefficients, [k0, k1, k2] method : str Fitting method name """ # Range of IM values, where fitting will be performed im_range = np.linspace(min(s), max(s), 1000) # Initialize model model = HazardFit(im_range, s, mafe) # Get rid of trailing zeros s, mafe = self._remove_zeros(s, mafe) if self.method == 'analytical-sac-fema': out = model.analytical_sac_fema(iterator) elif self.method == 'least-squares': out = model.least_squares() elif self.method == 'power-law': out = model.power_law(dbe, mce) elif self.method == 'analytical': out = model.analytical() else: raise ValueError('Wrong hazard fitting model!') return out
@staticmethod def _to_ndarray(data): if isinstance(data, list): return np.asarray(data) return data def _return_period_to_poe(self, return_period, investigation_time): return 1 - np.exp(-investigation_time / self._to_ndarray(return_period)) def _return_period_to_apoe(self, return_period): return 1 - np.exp(-1 / self._to_ndarray(return_period)) def _mafe_to_apoe(self, mafe): return 1 - np.exp(-self._to_ndarray(mafe)) def _apoe_to_mafe(self, apoe): return -np.log(1 - self._to_ndarray(apoe)) def _poe_to_mafe(self, poe, investigation_time): return -np.log(1 - self._to_ndarray(poe)) / investigation_time def _return_period_to_mafe(self, return_period): return 1 / self._to_ndarray(return_period) def _mafe_to_return_period(self, mafe): return 1 / self._to_ndarray(mafe) def _mafe_to_poe(self, mafe, investigation_time): return 1 - np.exp(-self._to_ndarray(mafe) * investigation_time)
[docs] def get_return_period( self, poe: float = None, apoe: float = None, mafe: float = None, investigation_time: float = 50.) -> float: """Get Return Period Parameters ---------- poe : float, optional Probability of exceedance (POE), by default None apoe : float, optional Annual probability of exceedance (APOE), by default None mafe : float, optional Mean annual frequency of exceeding (MAFE) an IM value, by default None investigation_time : float, optional Investigation time in years, by default 50 Returns ------- float Return period Raises ------ ValueError Must provide one of the following input arguments: poe, apoe, mafe """ if mafe is not None: return self._mafe_to_return_period(mafe) if apoe is not None: mafe = self._apoe_to_mafe(apoe) return self._mafe_to_return_period(mafe) if poe is not None: mafe = self._poe_to_mafe(poe, investigation_time) return self._mafe_to_return_period(mafe) if poe is None and apoe is None and mafe is None: raise ValueError("Must provide one of the following " "input arguments: poe, apoe, mafe")
[docs] def get_poe( self, return_period: float = None, apoe: float = None, mafe: float = None, investigation_time: float = 50.) -> float: """Get probability of exceedance (POE) Parameters ---------- return_period : float, optional Return period, by default None apoe : float, optional Annual probability of exceedance (APOE), by default None mafe : float, optional Mean annual frequency of exceeding (MAFE) an IM value, by default None investigation_time : float, optional Investigation time in years, by default 50 Returns ------- float Probability of exceedance (POE) Raises ------ ValueError Must provide one of the following input arguments: return_period, apoe, mafe """ if return_period is not None: return self._return_period_to_poe(return_period, investigation_time) if apoe is not None: mafe = self._apoe_to_mafe(apoe) return self._mafe_to_poe(mafe, investigation_time) if mafe is not None: return self._mafe_to_poe(mafe, investigation_time) if return_period is None and apoe is None and mafe is None: raise ValueError( "Must provide one of the following input arguments:" " return_period, apoe, mafe")
[docs] def get_apoe(self, return_period: float = None, poe: float = None, mafe: float = None, investigation_time: float = 50.) -> float: """Get annual probability of exceedance (APOE) Parameters ---------- return_period : float, optional Return period, by default None poe : float, optional Probability of exceedance (POE), by default None mafe : float, optional Mean annual frequency of exceeding (MAFE) an IM value, by default None investigation_time : float, optional Investigation time in years, by default 50 Returns ------- float Annual probability of exceedance (APOE) Raises ------ ValueError Must provide one of the following input arguments: return_period, poe, mafe """ if return_period is not None: return self._return_period_to_apoe(return_period) if poe is not None: mafe = self._poe_to_mafe(poe, investigation_time) return self._mafe_to_apoe(mafe) if mafe is not None: return self._mafe_to_apoe(mafe) if return_period is None and poe is None and mafe is None: raise ValueError( "Must provide one of the following input arguments: " "return_period, poe, mafe")
[docs] def get_mafe( self, return_period: float = None, poe: float = None, apoe: float = None, investigation_time: float = 50.) -> float: """Get mean annual frequency (rate) of exceeding (MAFE) an IM value Parameters ---------- return_period : float, optional Return period, by default None poe : float, optional Probability of exceedance (POE), by default None apoe : float, optional Annual probability of exceedance (APOE), by default None investigation_time : float, optional Investigation time in years, by default 50 Returns ------- float Mean annual frequency of exceeding (MAFE) an IM value Raises ------ ValueError Must provide one of the following input arguments: return_period, apoe, poe """ if return_period is not None: return self._return_period_to_mafe(return_period) if apoe is not None: return self._apoe_to_mafe(apoe) if poe is not None: return self._poe_to_mafe(poe, investigation_time) if return_period is None and apoe is None and poe is None: raise ValueError( "Must provide one of the following input arguments:" " return_period, apoe, poe")
[docs] def get_mafe_limit_state( self, k0: float, k1: float, k2: float = 0.0, s: float = None, return_period: float = None, beta: float = 0.0 ) -> float: """Derive mean annual frequency of exceeding a limit state Parameters ---------- k0 : float SAC/FEMA-compatible coefficient k1 : float SAC/FEMA-compatible coefficient k2 : float SAC/FEMA-compatible coefficient, by default 0. s : float, optional Intensity measure level, by default None return_period : float, optional Return period level, by default None beta: float, optional Aleatory uncertainty, by default 0.0 Returns ------- float Mean annual frequency of exceedance of a limit state Raises ------- ValueError If both 's' and 'return_period' are None """ if k2 is None: k2 = 0.0 if s is None and return_period is None: raise ValueError("You must provide 's' or 'return_period'") # Compute MAFE of IM if return_period is not None: mafe = 1 / return_period else: mafe = analytical_mafe(s, k0, k1, k2) # Compute mean annual frequency of exceedance of a limit state p = 1 / (1 + 2 * k2 * beta**2) mafe_ls = np.sqrt(p) * k0**(1 - p) * mafe**p * \ np.exp(1 / 2 * p * k1**2 * beta**2) return mafe_ls
[docs] def get_im( self, mafe: float, k0: float, k1: float, k2: float = 0.) -> float: """Get intensity measure (IM) value Parameters ---------- mafe : float Mean annual frequency of exceeding (MAFE) an IM value k0 : float SAC/FEMA-compatible coefficient k1 : float SAC/FEMA-compatible coefficient k2 : float SAC/FEMA-compatible coefficient, by default 0. Returns ------- float intensity measure (IM) """ if k2 is None: k2 = 0.0 def func(x): return mafe - analytical_mafe(x, k0, k1, k2) s = fsolve(func, x0=0.1) return float(s)
[docs] def get_im_ls( self, mafe_ls: float, k0: float, k1: float, k2: float = 0., beta: float = 0., ) -> float: """Get intensity measure (IM) value for Limit state Parameters ---------- mafe : float Mean annual frequency of exceeding (MAFE) an IM value k0 : float SAC/FEMA-compatible coefficient k1 : float SAC/FEMA-compatible coefficient k2 : float SAC/FEMA-compatible coefficient, by default 0. Returns ------- float intensity measure (IM) """ if k2 is None: k2 = 0.0 def func(x): return mafe_ls - self.get_mafe_limit_state( k0, k1, k2, s=x, beta=beta) s = fsolve(func, x0=0.1) return float(s)
[docs] def get_beta_ls( self, mafe_ls: float, k0: float, k1: float, k2: float = 0., mu: float = 0., ) -> float: """Get intensity measure (IM) value for Limit state Parameters ---------- mafe : float Mean annual frequency of exceeding (MAFE) an IM value k0 : float SAC/FEMA-compatible coefficient k1 : float SAC/FEMA-compatible coefficient k2 : float SAC/FEMA-compatible coefficient, by default 0. Returns ------- float intensity measure (IM) """ if k2 is None: k2 = 0.0 def func(x): return mafe_ls - self.get_mafe_limit_state( k0, k1, k2, s=mu, beta=x) s = fsolve(func, x0=0.1) return float(s)