Source code for djura.vulnerability_modeller.demands

# SPDX-License-Identifier: AGPL-3.0-or-later
# Copyright (C) 2025-2026 Djura | Risk - Data - Engineering S.r.l.
import asyncio
from typing import List
import numpy as np
import scipy.optimize as optimization

from .utilities import to_json_serializable, set_ductilities, DUCTILITY_F
from .pfa_profile import PFAProfile
from .hazard_model import analytical_mafe, solve_for_im


[docs] class Demands: g = 9.81 def __init__( self, data: list, ) -> None: self.data = data
[docs] async def estimate_drifts(self): tasks = [ self._process_single_drift(i, model) for i, model in enumerate(self.data) ] results = await asyncio.gather(*tasks) # Calculate statistics successes = [r for r in results if r["status"] == "success"] failures = [r for r in results if r["status"] == "error"] success_ids = [r["index"] for r in successes] failure_ids = [r["index"] for r in failures] # Group errors by type error_types = {} for failure in failures: error_type = failure.get("error_type", "unknown") error_types[error_type] = error_types.get(error_type, 0) + 1 return to_json_serializable({ "status": "success", "total": len(self.data), "successful": len(successes), "failed": len(failures), "error_summary": error_types, "successes": successes, "failures": failures, "success_ids": success_ids, "failure_ids": failure_ids, "message": f"Batch processing complete: {len(successes)} succeeded" f", {len(failures)} failed" })
async def _process_single_drift(self, i: int, model: dict): # TODO add higher mode participation factor, omega_theta try: if "mode-shape" not in model or not model["mode-shape"]: if model["building-type"].lower() == "frame": model["mode-shape"] = self._get_mode_shape_frame( model['backbone']["heights"]) else: return { "index": i, "status": "error", "error": "Building type not supported. Supported " "types are: frame", "error_type": "validation" } result = await asyncio.to_thread( self._compute_drifts_for_model, model ) model.update(result) return { "index": i, "status": "success", "out": model, } except Exception as e: return { "index": i, "status": "error", "error": str(e), "error_type": "computation" }
[docs] async def estimate_accelerations(self): tasks = [ self._process_single_acc(i, model) for i, model in enumerate(self.data) ] results = await asyncio.gather(*tasks) # Calculate statistics successes = [r for r in results if r["status"] == "success"] failures = [r for r in results if r["status"] == "error"] success_ids = [r["index"] for r in successes] failure_ids = [r["index"] for r in failures] # Group errors by type error_types = {} for failure in failures: error_type = failure.get("error_type", "unknown") error_types[error_type] = error_types.get(error_type, 0) + 1 return to_json_serializable({ "status": "success", "total": len(self.data), "successful": len(successes), "failed": len(failures), "error_summary": error_types, "successes": successes, "failures": failures, "success_ids": success_ids, "failure_ids": failure_ids, "message": f"Batch processing complete: {len(successes)} succeeded" f", {len(failures)} failed" })
async def _process_single_acc(self, i: int, model: dict): try: self._validate_input(model) if "pga" not in model or model["pga"] is None: model["pga"] = self._estimate_pga(model) result = await asyncio.to_thread( self._compute_acc_for_model, model ) model.update(result) return { "index": i, "status": "success", "out": model, } except Exception as e: return { "index": i, "status": "error", "error": str(e), "error_type": "computation" } @staticmethod def _compute_drifts_for_model(model: dict): """CPU-bound operations""" # Compute 1st mode participation factor masses = np.ones(len(model["mode-shape"])) m_phi = sum(map(lambda x, y: x * y, masses, model["mode-shape"])) m_phi_2 = sum(map(lambda x, y: x * y * y, masses, model["mode-shape"])) gamma = float(m_phi / m_phi_2) ductility_f = model["backbone"].get('ductility_f', DUCTILITY_F) + 0.01 model["dynamic-ductilities"] = set_ductilities(ductility_f) # Estimate drifts disps, drifts = Demands._estimate_drifts( model['backbone']["dy"], gamma, model["mode-shape"], model['backbone']["heights"], model['dynamic-ductilities'] ) return { "gamma": gamma, "psd": drifts.tolist(), "displacement": disps.tolist(), } @staticmethod def _get_mode_shape_frame(heights: List[float]): """ Reference --------- M. J. N. Priestley, G. M. Calvi, and M. J. Kowalsky, Displacement Based Seismic Design of Structures. Pavia, Italy: IUSS Press, 2007. """ heights = np.cumsum(heights) roof = heights[-1] if len(heights) <= 4: phi = heights / roof else: phi = 4 / 3 * (heights / roof) * (1 - heights / 4 / roof) return phi @staticmethod def _estimate_drifts( dy: float, gamma: float, phi: np.ndarray, heights: List[float], dyn_duct: List[float] ): phi = np.asarray(phi) heights = np.asarray(heights) dyn_duct = np.asarray(dyn_duct) # Displacements disps = phi.reshape(len(phi), 1) * dyn_duct * dy * gamma drifts = disps.copy() drifts[1:, :] = np.diff(disps, axis=0) # Compute the drifts return disps, drifts / heights.reshape(len(phi), 1) @staticmethod def _estimate_pga(model: dict): """Estimate PGA dependent on Sa-avg or Sa Reference --------- O'Reilly, G. J., and G. M. Calvi. 2020. “Quantifying seismic risk in structures via simplified demand-intensity models.” Bull. Earthq. Eng., 18 (5): 2003-2022. Springer Netherlands. https://doi.org/10.1007/s10518-019-00776-0. """ # TODO, this part needs to be optimised to work with HazardModel # TODO, of RS repository # TODO, or perform fitting first pga_coefs = model["hazard"]["pga"] im_coefs = model["hazard"]["im"] model["ims"] = np.asarray(model["sr"]) * model["backbone"]["cy"] mafes = analytical_mafe(model["ims"], *im_coefs) pgas = [] for mafe in mafes: pga = optimization.leastsq(solve_for_im, 1.0, args=( mafe, *pga_coefs), factor=1)[0][0] pgas.append(pga) return np.array(pgas) @staticmethod def _compute_acc_for_model(model: dict): m = PFAProfile( model["pfa-model"], model["building-type"], model["psd"], model["backbone"]["period"], model["backbone"]["heights"], ) # For visualisation ductility_f = model["backbone"].get('ductility_f', DUCTILITY_F) + 0.01 model["dynamic-ductilities"] = set_ductilities(ductility_f) pfa = m.omega * model["pga"] return { "omega": m.omega, "pfa": pfa.tolist(), } def _validate_input(self, model): if "avg" in model["im_type"]: # make sure that average SA naming is correct model["im_type"] = "sa_avg" backbone = model['backbone'] if "dy" not in backbone: backbone["dy"] = backbone["cy"] * self.g * \ (backbone["period"] / 2 / np.pi) ** 2 elif "period" not in backbone: backbone["period"] = np.sqrt( backbone["dy"] / backbone["cy"] / self.g) * 2 * np.pi elif "cy" not in backbone: backbone["cy"] = backbone["dy"] / self.g * \ (2 * np.pi / backbone["period"]) ** 2 else: raise ValueError("2 of 'dy', 'period' or 'cy' for each" " case must be provided") # Update the case model['backbone'] = backbone