# SPDX-License-Identifier: AGPL-3.0-or-later
# Copyright (C) 2025-2026 Djura | Risk - Data - Engineering S.r.l.
import asyncio
from typing import List
import numpy as np
import scipy.optimize as optimization
from .utilities import to_json_serializable, set_ductilities, DUCTILITY_F
from .pfa_profile import PFAProfile
from .hazard_model import analytical_mafe, solve_for_im
[docs]
class Demands:
g = 9.81
def __init__(
self, data: list,
) -> None:
self.data = data
[docs]
async def estimate_drifts(self):
tasks = [
self._process_single_drift(i, model)
for i, model in enumerate(self.data)
]
results = await asyncio.gather(*tasks)
# Calculate statistics
successes = [r for r in results if r["status"] == "success"]
failures = [r for r in results if r["status"] == "error"]
success_ids = [r["index"] for r in successes]
failure_ids = [r["index"] for r in failures]
# Group errors by type
error_types = {}
for failure in failures:
error_type = failure.get("error_type", "unknown")
error_types[error_type] = error_types.get(error_type, 0) + 1
return to_json_serializable({
"status": "success",
"total": len(self.data),
"successful": len(successes),
"failed": len(failures),
"error_summary": error_types,
"successes": successes,
"failures": failures,
"success_ids": success_ids,
"failure_ids": failure_ids,
"message": f"Batch processing complete: {len(successes)} succeeded"
f", {len(failures)} failed"
})
async def _process_single_drift(self, i: int, model: dict):
# TODO add higher mode participation factor, omega_theta
try:
if "mode-shape" not in model or not model["mode-shape"]:
if model["building-type"].lower() == "frame":
model["mode-shape"] = self._get_mode_shape_frame(
model['backbone']["heights"])
else:
return {
"index": i,
"status": "error",
"error": "Building type not supported. Supported "
"types are: frame",
"error_type": "validation"
}
result = await asyncio.to_thread(
self._compute_drifts_for_model,
model
)
model.update(result)
return {
"index": i,
"status": "success",
"out": model,
}
except Exception as e:
return {
"index": i,
"status": "error",
"error": str(e),
"error_type": "computation"
}
[docs]
async def estimate_accelerations(self):
tasks = [
self._process_single_acc(i, model)
for i, model in enumerate(self.data)
]
results = await asyncio.gather(*tasks)
# Calculate statistics
successes = [r for r in results if r["status"] == "success"]
failures = [r for r in results if r["status"] == "error"]
success_ids = [r["index"] for r in successes]
failure_ids = [r["index"] for r in failures]
# Group errors by type
error_types = {}
for failure in failures:
error_type = failure.get("error_type", "unknown")
error_types[error_type] = error_types.get(error_type, 0) + 1
return to_json_serializable({
"status": "success",
"total": len(self.data),
"successful": len(successes),
"failed": len(failures),
"error_summary": error_types,
"successes": successes,
"failures": failures,
"success_ids": success_ids,
"failure_ids": failure_ids,
"message": f"Batch processing complete: {len(successes)} succeeded"
f", {len(failures)} failed"
})
async def _process_single_acc(self, i: int, model: dict):
try:
self._validate_input(model)
if "pga" not in model or model["pga"] is None:
model["pga"] = self._estimate_pga(model)
result = await asyncio.to_thread(
self._compute_acc_for_model,
model
)
model.update(result)
return {
"index": i,
"status": "success",
"out": model,
}
except Exception as e:
return {
"index": i,
"status": "error",
"error": str(e),
"error_type": "computation"
}
@staticmethod
def _compute_drifts_for_model(model: dict):
"""CPU-bound operations"""
# Compute 1st mode participation factor
masses = np.ones(len(model["mode-shape"]))
m_phi = sum(map(lambda x, y: x * y, masses,
model["mode-shape"]))
m_phi_2 = sum(map(lambda x, y: x * y * y, masses,
model["mode-shape"]))
gamma = float(m_phi / m_phi_2)
ductility_f = model["backbone"].get('ductility_f', DUCTILITY_F) + 0.01
model["dynamic-ductilities"] = set_ductilities(ductility_f)
# Estimate drifts
disps, drifts = Demands._estimate_drifts(
model['backbone']["dy"],
gamma,
model["mode-shape"],
model['backbone']["heights"],
model['dynamic-ductilities']
)
return {
"gamma": gamma,
"psd": drifts.tolist(),
"displacement": disps.tolist(),
}
@staticmethod
def _get_mode_shape_frame(heights: List[float]):
"""
Reference
---------
M. J. N. Priestley, G. M. Calvi, and M. J. Kowalsky, Displacement
Based Seismic Design of Structures. Pavia, Italy: IUSS Press, 2007.
"""
heights = np.cumsum(heights)
roof = heights[-1]
if len(heights) <= 4:
phi = heights / roof
else:
phi = 4 / 3 * (heights / roof) * (1 - heights / 4 / roof)
return phi
@staticmethod
def _estimate_drifts(
dy: float, gamma: float, phi: np.ndarray, heights: List[float],
dyn_duct: List[float]
):
phi = np.asarray(phi)
heights = np.asarray(heights)
dyn_duct = np.asarray(dyn_duct)
# Displacements
disps = phi.reshape(len(phi), 1) * dyn_duct * dy * gamma
drifts = disps.copy()
drifts[1:, :] = np.diff(disps, axis=0)
# Compute the drifts
return disps, drifts / heights.reshape(len(phi), 1)
@staticmethod
def _estimate_pga(model: dict):
"""Estimate PGA dependent on Sa-avg or Sa
Reference
---------
O'Reilly, G. J., and G. M. Calvi. 2020. “Quantifying seismic risk in
structures via simplified demand-intensity models.” Bull.
Earthq. Eng., 18 (5): 2003-2022. Springer Netherlands.
https://doi.org/10.1007/s10518-019-00776-0.
"""
# TODO, this part needs to be optimised to work with HazardModel
# TODO, of RS repository
# TODO, or perform fitting first
pga_coefs = model["hazard"]["pga"]
im_coefs = model["hazard"]["im"]
model["ims"] = np.asarray(model["sr"]) * model["backbone"]["cy"]
mafes = analytical_mafe(model["ims"], *im_coefs)
pgas = []
for mafe in mafes:
pga = optimization.leastsq(solve_for_im, 1.0, args=(
mafe, *pga_coefs), factor=1)[0][0]
pgas.append(pga)
return np.array(pgas)
@staticmethod
def _compute_acc_for_model(model: dict):
m = PFAProfile(
model["pfa-model"],
model["building-type"],
model["psd"],
model["backbone"]["period"],
model["backbone"]["heights"],
)
# For visualisation
ductility_f = model["backbone"].get('ductility_f', DUCTILITY_F) + 0.01
model["dynamic-ductilities"] = set_ductilities(ductility_f)
pfa = m.omega * model["pga"]
return {
"omega": m.omega,
"pfa": pfa.tolist(),
}
def _validate_input(self, model):
if "avg" in model["im_type"]:
# make sure that average SA naming is correct
model["im_type"] = "sa_avg"
backbone = model['backbone']
if "dy" not in backbone:
backbone["dy"] = backbone["cy"] * self.g * \
(backbone["period"] / 2 / np.pi) ** 2
elif "period" not in backbone:
backbone["period"] = np.sqrt(
backbone["dy"] / backbone["cy"] / self.g) * 2 * np.pi
elif "cy" not in backbone:
backbone["cy"] = backbone["dy"] / self.g * \
(2 * np.pi / backbone["period"]) ** 2
else:
raise ValueError("2 of 'dy', 'period' or 'cy' for each"
" case must be provided")
# Update the case
model['backbone'] = backbone