# SPDX-License-Identifier: AGPL-3.0-or-later
# Copyright (C) 2025-2026 Djura | Risk - Data - Engineering S.r.l.
import numpy as np
import importlib
from pydantic import BaseModel, Field, model_validator
from typing import Optional, List
from scipy.stats import lognorm
import traceback
from ..vulnerability_modeller.utilities import (
filter_args, to_json_serializable, DUCTILITY_F, set_ductilities)
[docs]
class BackboneModel(BaseModel):
period: float
period_c: Optional[float] = None
case: Optional[str] = None
site: Optional[str] = None
period_g: Optional[float] = None
period_cc: Optional[float] = None
ductility: Optional[float] = None
ductility_f: Optional[float] = DUCTILITY_F
hardening_ratio: Optional[float] = None
ah: Optional[float] = None
damping: Optional[float] = None
# Plots
r_plot: Optional[List[float]] = Field(default=None, alias="r-plot")
mu_plot: Optional[List[float]] = Field(default=None, alias="mu-plot")
[docs]
@model_validator(mode='before')
def str_to_lower(cls, values):
for field, value in values.items():
if isinstance(value, str):
values[field] = value.lower()
return values
[docs]
@model_validator(mode='after')
def compute_backbone_plots(cls, model):
if model.hardening_ratio is not None and model.ductility is not None \
and model.ductility_f is not None:
ah = model.hardening_ratio
mu = model.ductility
mu_f = model.ductility_f
model.r_plot = [0, 1, 1 + ah * (mu - 1), 0]
model.mu_plot = [0, 1, mu, mu_f]
else:
model.r_plot = None
model.mu_plot = None
return model
[docs]
class EDPIMModel(BaseModel):
hysteresis: str = "bilin"
im_type: Optional[str] = "sa"
method: Optional[str] = "shahnazaryan-oreilly"
backbone: BackboneModel
[docs]
@model_validator(mode='before')
def str_to_lower(cls, values):
for field, value in values.items():
if isinstance(value, str):
values[field] = value.lower()
return values
[docs]
class EDPIMInfillModel(BaseModel):
period: float
c_y: float
c_rp: float
mu_h: float
mu_s: float
mu_rp: float
mu_ult: float
im_type: Optional[str] = "sa"
[docs]
@model_validator(mode='before')
def str_to_lower(cls, values):
for field, value in values.items():
if isinstance(value, str):
values[field] = value.lower()
return values
[docs]
class EDPIMIsolModel(BaseModel):
R: float
mu: float
delta_max: float = 1.5
def _ductility_based(data, func):
ductility_f = data.get('ductility_f', DUCTILITY_F) + 0.01
DYNAMIC_DUCTILITY = set_ductilities(ductility_f)
# Make prediction
outs = np.zeros(DYNAMIC_DUCTILITY.shape)
disps = np.zeros(DYNAMIC_DUCTILITY.shape)
for i, dyn_duct in enumerate(DYNAMIC_DUCTILITY):
if callable(func):
data["dynamic_ductility"] = dyn_duct
prediction = func(**data)
else:
return {
"status": "error",
"message": "Wrong method or something went wrong"
}
outs[i] = float(prediction["median"])
if "dispersion" in prediction:
disps[i] = prediction["dispersion"]
return {
"strength-ratios": outs,
"ductilities": DYNAMIC_DUCTILITY,
"dispersions": disps,
}
def _xgb(data, func):
ductility_f = data.get('dynamic_ductility', DUCTILITY_F) + 0.01
DYNAMIC_DUCTILITY = set_ductilities(ductility_f)
if callable(func):
data["dynamic_ductility"] = DYNAMIC_DUCTILITY
prediction = func(**data)
return {
"strength-ratios": prediction['median'],
"ductilities": DYNAMIC_DUCTILITY,
"dispersions": prediction['dispersion'],
}
return {
"status": "error",
"message": "Wrong method or something went wrong"
}
def _strength_ratio_based(data, func):
STRENGTH_RATIO = np.arange(0.01, 12.01, 0.01)
# Make prediction
outs = np.zeros(STRENGTH_RATIO.shape)
disps = np.zeros(STRENGTH_RATIO.shape)
for i, sr in enumerate(STRENGTH_RATIO):
if callable(func):
data["strength_ratio"] = sr
prediction = func(**data)
else:
return {
"status": "error",
"message": "Wrong method or something went wrong"
}
outs[i] = float(prediction["median"])
if "dispersion" in prediction:
disps[i] = prediction["dispersion"]
return {
"strength-ratios": STRENGTH_RATIO,
"ductilities": outs,
"dispersions": disps,
}
[docs]
def edp_im(data: EDPIMModel):
data = EDPIMModel.model_validate(data)
hysteresis = data.hysteresis
im_type = data.im_type
backbone = data.backbone.model_dump()
method = data.method
if not backbone:
return {"status": "error", "message": "Backbone missing"}
module = None
warnings = []
if method == "shahnazaryan-oreilly" and hysteresis == "bilin":
from .XGBPredict import XGBPredict
module = XGBPredict(
im_type=im_type, collapse=False)
func = module.make_prediction
backbone["scaler"], backbone["model"], backbone["dispersions"] = \
module.model
elif method != "shahnazaryan-oreilly" and im_type == "sa":
module = importlib.import_module(
f"djura.edp_im.r_mu_t.{method}")
func = getattr(module, 'make_prediction', None)
if not module:
return {
"status": "error",
"message": "Hysteresis method incorrect or missing"
}
backbone["dynamic_ductility"] = backbone['ductility_f']
backbone["strength_ratio"] = None
filtered_data = filter_args(func, backbone)
# Make prediction
if method == "guerrini":
outs = _strength_ratio_based(filtered_data, func)
elif method == "shahnazaryan-oreilly":
outs = _xgb(filtered_data, func)
warnings = module.WARNINGS
else:
outs = _ductility_based(filtered_data, func)
dispersions = outs.get("dispersions")
sr = outs.get("strength-ratios")
if isinstance(dispersions, float):
# No case for this condition yet
pass
elif dispersions is not None:
sr16 = lognorm.ppf(0.16, s=dispersions, scale=sr)
sr84 = lognorm.ppf(0.84, s=dispersions, scale=sr)
sr16[dispersions == 0] = sr[dispersions == 0]
sr84[dispersions == 0] = sr[dispersions == 0]
outs["strength-ratios-16%"] = sr16
outs["strength-ratios-84%"] = sr84
return to_json_serializable({
"status": "success",
"data": outs,
"message": "Predictions success",
"warnings": warnings,
})
[docs]
def edp_im_isol(data: EDPIMIsolModel):
data = EDPIMIsolModel.model_validate(data)
r = data.R
mu = data.mu
if r is None or mu is None:
return {"status": "error", "message": "'R' or 'mu' missing"}
a1, b1, c1 = 0.337475, 0.8083212, 1.02
a2, c2 = 1.182557, -0.08815
a3, b3, c3 = 0.857213, 0.4183158, 0.4027454
gamma = a1 * mu**b1 * r**c1
kappa = a2 * r**c2
beta = a3 * mu**b3 * r**c3
delta = np.arange(0.01, data.delta_max, 0.01)
rho = (delta / gamma) ** (1 / kappa)
sa_t_iso = rho * mu
outs = {"rho": rho, "deltas": delta, "beta": beta, "sa_t_iso": sa_t_iso}
return to_json_serializable({
"status": "success",
"data": outs,
"message": "Predictions success"
})
[docs]
def edp_im_infill(data: EDPIMInfillModel):
data = EDPIMInfillModel.model_validate(data)
period = data.period
c_y = data.c_y
c_rp = data.c_rp
mu_h = data.mu_h
mu_s = data.mu_s
mu_rp = data.mu_rp
mu_ult = data.mu_ult
im_type = data.im_type
mu = np.arange(0.01, mu_ult, 0.01)
if im_type == "sa_avg":
rho = np.zeros(mu.shape)
a = 0.704 * (period / c_y) ** 0.1595 - 0.239
b = 1.813 * (c_rp * (mu_rp - mu_s)) ** 0.0473 - 1.98
rho = np.exp(a * np.log(mu) + b)
rho[mu <= 1.0] = np.exp(
np.log(mu[mu <= 1.0]) + b)
return to_json_serializable({
"status": "success",
"data": {
"strength-ratios": rho,
"ductilities": mu,
"dispersion": 0.27,
},
"message": "Predictions success"
})
elif im_type == "sa":
if mu_h == mu_s:
mu_s = mu_h + 0.01
def _adjust(data, last):
if len(data) == 0:
return np.array([])
data = np.maximum.accumulate(data)
first = data[0]
data = data - (first - last)
return data
sr = np.zeros(mu.shape) + 1
sr16 = np.zeros(mu.shape) + 1
sr84 = np.zeros(mu.shape) + 1
# Hardening branch
h_idxs = (mu > 1) & (mu <= mu_h)
a1, b1 = _hard_branch_infill(period)
sr[h_idxs] = a1[1] * mu[h_idxs] ** b1[1]
sr16[h_idxs] = a1[0] * mu[h_idxs] ** b1[0]
sr84[h_idxs] = a1[2] * mu[h_idxs] ** b1[2]
sr[h_idxs] = _adjust(sr[h_idxs], 1)
sr16[h_idxs] = _adjust(sr16[h_idxs], 1)
sr84[h_idxs] = _adjust(sr84[h_idxs], 1)
# Softening branch
s_idxs = (mu > mu_h) & (mu <= mu_s)
a2, b2, g2 = _soft_branch_infill(period)
sr16[s_idxs] = a2[0] * mu[s_idxs] ** 2 + b2[0] * mu[s_idxs] + g2[0]
sr[s_idxs] = a2[1] * mu[s_idxs] ** 2 + b2[1] * sr16[s_idxs] + g2[1]
sr84[s_idxs] = a2[2] * mu[s_idxs] ** 2 + b2[2] * sr[s_idxs] + g2[2]
sr[s_idxs] = _adjust(sr[s_idxs], sr[h_idxs][-1])
sr16[s_idxs] = _adjust(sr16[s_idxs], sr16[h_idxs][-1])
sr84[s_idxs] = _adjust(sr84[s_idxs], sr84[h_idxs][-1])
# Residual plateau branch
rp_idxs = (mu > mu_s) & (mu <= mu_rp)
a3, b3 = _res_branch_infill(period)
sr[rp_idxs] = a3[1] * mu[rp_idxs] + b3[1]
sr16[rp_idxs] = a3[0] * mu[rp_idxs] + b3[0]
sr84[rp_idxs] = a3[2] * mu[rp_idxs] + b3[2]
sr[rp_idxs] = _adjust(sr[rp_idxs], sr[s_idxs][-1])
sr16[rp_idxs] = _adjust(sr16[rp_idxs], sr16[s_idxs][-1])
sr84[rp_idxs] = _adjust(sr84[rp_idxs], sr84[s_idxs][-1])
# Strength degradation branch
sd_idxs = (mu > mu_rp)
a4, b4 = _sd_branch_infill(period)
sr[sd_idxs] = a4[1] * mu[sd_idxs] + b4[1]
sr16[sd_idxs] = a4[0] * mu[sd_idxs] + b4[0]
sr84[sd_idxs] = a4[2] * mu[sd_idxs] + b4[2]
sr[sd_idxs] = _adjust(sr[sd_idxs], sr[rp_idxs][-1])
sr16[sd_idxs] = _adjust(sr16[sd_idxs], sr16[rp_idxs][-1])
sr84[sd_idxs] = _adjust(sr84[sd_idxs], sr84[rp_idxs][-1])
sr[np.isnan(sr)] = 1
sr16[np.isinf(sr16)] = 1e9
sr84[np.isinf(sr84)] = 1e9
# Elastic branch
sr[mu <= 1.0] = mu[mu <= 1.0]
sr16[mu <= 1.0] = mu[mu <= 1.0]
sr84[mu <= 1.0] = mu[mu <= 1.0]
return to_json_serializable({
"status": "success",
"data": {
"strength-ratios": sr,
"strength-ratios-16%": sr16,
"strength-ratios-84%": sr84,
"ductilities": mu,
"dispersion": 0,
},
"message": "Predictions success"
})
return {"status": "error", "message": "IM type not provided or incorrect"}
def _hard_branch_infill(period):
a = np.array([0.8628, 0.9235, 0.9195, 0.9632, 0.4745, 0.0654, 0.04461])
b = np.array([0.7624, 0.5041, 0.1785, 1.0220, 0.3253, 0.4064, 0.4479])
c = np.array([0.1643, 0.1701, 0.1147, 0.1694, 0.09403, 0.02054, 0.01584])
exponent_terms = - ((period - b) / c) ** 2
alpha50 = np.sum(a * np.exp(exponent_terms))
# Constants for the first table
a1 = np.array([0.1460, 0.5926, 0.07312, 0.2965, 0.02688, 1.0630, 0.3127])
b1 = np.array([0.5335, 0.4161, 0.4495, 0.2215, 0.3699, 1.0030, 0.1462])
c1 = np.array([0.03444, 0.3194, 0.01667, 0.1087, 0.0158, 0.6460, 0.07181])
exponent_terms = - ((period - b1) / c1) ** 2
alpha16 = np.sum(a1 * np.exp(exponent_terms))
# Constants for the second table
a2 = np.array([1.024, 0.6034, 0.2466, 0.06141, 0.2511, 0.0001, 0.07086])
b2 = np.array([0.9018, 0.1928, 0.4758, 0.6903, 0.3254, 0.9390, 0.3948])
c2 = np.array([0.6555, 0.1072, 0.1232, 0.05664, 0.07067, 0.00132, 0.02287])
exponent_terms = - ((period - b2) / c2) ** 2
alpha84 = np.sum(a2 * np.exp(exponent_terms))
# Constants for the first table
a_b1 = np.array([-0.1334, 0.3312, 0.7985, 0.0001, 0.1543, 0.9252, 0.2809])
b_b1 = np.array([0.7771, 0.7647, 0.04284, 0.5721, 0.4788, 0.8165, 0.3003])
c_b1 = np.array([0.04907, 0.000986, 0.09365,
0.0001, 0.1050, 0.5100, 0.1216])
exponent_terms = - ((period - b_b1) / c_b1) ** 2
beta50 = np.sum(a_b1 * np.exp(exponent_terms))
# Constants for the second table
a_b2 = np.array([0.2008, 0.1790, 0.1425, 0.1533,
3.623e12, 0.09451, 0.1964])
b_b2 = np.array([1.0930, 0.7169, 0.4876, 0.5709, 97.610, 0.4424, 0.3345])
c_b2 = np.array([0.5405, 0.08836, 0.04956,
0.07256, 17.940, 0.06262, 0.09522])
exponent_terms = - ((period - b_b2) / c_b2) ** 2
beta16 = np.sum(a_b2 * np.exp(exponent_terms))
# Constants for the third table
a_b3 = np.array([0.7182, 0.1320, 0.1233, 0.09805, 0.1429, 0.6547, 0.0001])
b_b3 = np.array([0.04151, 0.6058, 0.4904, 0.5448, 0.3652, 0.8431, 0.7115])
c_b3 = np.array([0.09018, 0.04845, 0.04392,
0.01778, 0.09815, 0.71260, 0.0001803])
exponent_terms = - ((period - b_b3) / c_b3) ** 2
beta84 = np.sum(a_b3 * np.exp(exponent_terms))
return [alpha16, alpha50, alpha84], [beta16, beta50, beta84]
def _soft_branch_infill(period):
aa2 = np.array([0.0395, 0.01833, 0.009508])
ba2 = np.array([-0.03069, -0.01481, -0.007821])
ab2 = np.array([1.049, 0.8237, 0.4175])
bb2 = np.array([0.2494, 0.04082, 0.03164])
ag2 = np.array([-0.7326, -0.7208, -0.0375])
bg2 = np.array([1.116, 1.279, 1.079])
a2 = aa2 * period + ba2
b2 = ab2 * period + bb2
g2 = ag2 * period + bg2
return a2, b2, g2
def _res_branch_infill(period):
aa = np.array([-5.075, -2.099, -0.382])
ba = np.array([7.112, 3.182, 0.6334])
ca = np.array([-1.572, -0.6989, -0.051])
da = np.array([0.1049, 0.0481, 0.002])
ab = np.array([16.16, 8.417, -0.027])
bb = np.array([-26.5, -14.51, -1.8])
cb = np.array([10.92, 6.75, 2.036])
db = np.array([1.055, 0.9061, 1.067])
a = aa * period ** 3 + ba * period ** 2 + ca * period + da
b = ab * period ** 3 + bb * period ** 2 + cb * period + db
return a, b
def _sd_branch_infill(period):
aa = np.array([-1.564, -0.5954, -0.06693])
ba = np.array([2.193, 0.817, 0.1418])
ca = np.array([-0.352, -0.09191, 0.0124])
da = np.array([0.0149, 0.001819, -0.002012])
ab = np.array([1.756, 0.7315, -0.408])
bb = np.array([-8.719, -3.703, -1.333])
cb = np.array([8.285, 4.391, 2.521])
db = np.array([1.198, 1.116, 1.058])
a = aa * period ** 3 + ba * period ** 2 + ca * period + da
b = ab * period ** 3 + bb * period ** 2 + cb * period + db
return a, b
[docs]
def edp_im_batch(data: List[EDPIMModel]):
if not data:
return {"status": "error", "message": "Empty batch"}
results = [None] * len(data)
# Validate all inputs and group by method/hysteresis
from collections import defaultdict
grouped = defaultdict(list)
for idx, _data in enumerate(data):
try:
from ..vulnerability_modeller.backbone import (
Backbone, get_infill_spo)
backbone = _data["backbone"]
hysteresis = _data.get('hysteresis', 'bilin')
method = _data.get('method', "")
im_type = _data.get("im_type", "sa")
b = Backbone(
backbone,
_data.get('backboneMethod', 'backbone'),
hysteresis,
)
_data['backbone'] = b.backbone
if method == "shahnazaryan-oreilly" and \
hysteresis == "bilin":
backbone = BackboneModel.model_validate(b.backbone)
elif hysteresis == "infill":
b.backbone['im_type'] = im_type
b.backbone["r-plot"], b.backbone["mu-plot"] = get_infill_spo(
b.backbone)
_data["backbone"] = b.backbone
if hysteresis != "infill":
validated = EDPIMModel.model_validate(_data)
key = (validated.method, validated.hysteresis,
validated.im_type)
grouped[key].append((idx, validated))
else:
key = (method, hysteresis, im_type)
# Validation will be performed later
grouped[key].append((idx, _data))
except Exception as e:
results[idx] = {
"index": idx,
"status": "error",
"error": f"Validation failed: {str(e)}",
"error_type": "validation"
}
# Process each group
for (method, hysteresis, im_type), items in grouped.items():
try:
if hysteresis == "bilin":
_process_group_bilin(
items, method, hysteresis, im_type, results)
elif hysteresis == "infill":
_process_group_infill(items, results)
else:
raise ValueError(
f"Unsupported hysteresis type: '{hysteresis}'. "
f"Expected 'bilin' or 'infill'."
)
except Exception as e:
# Group processing failed - mark all items in group as errors
for idx, _ in items:
# Only if not already marked as error
if results[idx] is None:
results[idx] = {
"index": idx,
"status": "error",
"error": f"Group processing failed: {str(e)}",
"error_type": "group_processing",
"traceback": traceback.format_exc()
}
for idx, result in enumerate(results):
# Catch all (unexpected error)
if result is None:
results[idx] = {
"index": idx,
"status": "error",
"error": "Unexpected: result not set",
"error_type": "unexpected"
}
# Calculate statistics
successes = [r for r in results if r["status"] == "success"]
failures = [r for r in results if r["status"] == "error"]
success_ids = [r["index"] for r in successes]
failure_ids = [r["index"] for r in failures]
# Group errors by type
error_types = {}
for failure in failures:
error_type = failure.get("error_type", "unknown")
error_types[error_type] = error_types.get(error_type, 0) + 1
return to_json_serializable({
"status": "success",
"total": len(data),
"successful": len(successes),
"failed": len(failures),
"error_summary": error_types,
"successes": successes,
"failures": failures,
"success_ids": success_ids,
"failure_ids": failure_ids,
"message": f"Batch processing complete: {len(successes)} succeeded,"
f"{len(failures)} failed"
})
def _process_group_bilin(items, method, hysteresis, im_type, results):
"""Process a group of items with the same method/hysteresis/im_type"""
# XGBoost batch processing
if method == "shahnazaryan-oreilly":
try:
from .XGBPredict import XGBPredict
module = XGBPredict(im_type=im_type, collapse=False)
func = module.make_prediction
scaler, model, dispersions_info = module.model
except Exception as e:
# Model loading failed - mark all items as errors
for idx, _ in items:
results[idx] = {
"index": idx,
"status": "error",
"error": f"Model loading failed: {str(e)}",
"error_type": "model_loading"
}
return
# Process each item using make_prediction
# (model loaded once for entire group)
for idx, data in items:
try:
backbone = data.backbone.model_dump()
backbone["dynamic_ductility"] = backbone['ductility_f']
backbone["strength_ratio"] = None
# Add scaler, model, dispersions to backbone
# (required by make_prediction)
backbone["scaler"] = scaler
backbone["model"] = model
backbone["dispersions"] = dispersions_info
# Used in output preparation
# Processed input is returned with outputs as metadata
backbone["method"] = method
backbone["hysteresis"] = hysteresis
backbone["im_type"] = im_type
# Filter args for make_prediction
filtered_data = filter_args(func, backbone)
# Call the actual make_prediction method
outs = _xgb(filtered_data, func)
# Process dispersions
outs = _get_sr_fractiles(outs)
# Remove xgboost scaler
if 'scaler' in backbone:
# Not JSON-serializable
del backbone['scaler']
del backbone['model']
# Entire XGB dispersion predictions, unnecessary
del backbone['dispersions']
inputs = data.model_dump(by_alias=True)
results[idx] = {
"index": idx,
"status": "success",
"out": outs.get('data', {}),
"input": inputs,
}
except Exception as e:
results[idx] = {
"index": idx,
"status": "error",
"error": f"Prediction failed: {str(e)}",
"error_type": "prediction",
"traceback": traceback.format_exc()
}
# Other methods (r_mu_t based) - process individually within group
elif method != "shahnazaryan-oreilly" and im_type == "sa":
# Only SA (R)
try:
module = importlib.import_module(f"djura.edp_im.r_mu_t.{method}")
func = getattr(module, 'make_prediction', None)
if not func:
raise ValueError(
f"make_prediction function not found for method {method}")
except Exception as e:
# Module loading failed - mark all items as errors
for idx, _ in items:
results[idx] = {
"index": idx,
"status": "error",
"error": f"Module loading failed: {str(e)}",
"error_type": "module_loading"
}
return
# Process each item individually (these methods don't support batching)
for idx, data in items:
try:
backbone = data.backbone.model_dump()
backbone["dynamic_ductility"] = backbone['ductility_f']
backbone["strength_ratio"] = None
filtered_data = filter_args(func, backbone)
if method == "guerrini":
outs = _strength_ratio_based(filtered_data, func)
else:
outs = _ductility_based(filtered_data, func)
processed = _get_sr_fractiles(outs)
results[idx] = {
"index": idx,
"status": "success",
"out": processed.get('data', {}),
"input": backbone,
}
except Exception as e:
results[idx] = {
"index": idx,
"status": "error",
"error": f"Individual prediction failed: {str(e)}",
"error_type": "prediction",
"traceback": traceback.format_exc()
}
else:
# Unknown method/hysteresis combination
for idx, _ in items:
results[idx] = {
"index": idx,
"status": "error",
"error": f"Unsupported method/hysteresis/IM type combination: "
f"{method}/{hysteresis}/{im_type}",
"error_type": "unsupported_method"
}
def _process_group_infill(items, results):
for idx, data in items:
try:
outs = edp_im_infill(data['backbone'])
results[idx] = {
"index": idx,
"status": "success",
"out": outs.get("data", {}),
"input": data
}
except Exception as e:
results[idx] = {
"index": idx,
"status": "error",
"error": f"Prediction failed: {str(e)}",
"error_type": "prediction",
"traceback": traceback.format_exc()
}
def _get_sr_fractiles(outs):
try:
dispersions = outs.get("dispersions")
sr = outs.get("strength-ratios")
if isinstance(dispersions, float):
pass
elif dispersions is not None:
sr16 = lognorm.ppf(0.16, s=dispersions, scale=sr)
sr84 = lognorm.ppf(0.84, s=dispersions, scale=sr)
sr16[dispersions == 0] = sr[dispersions == 0]
sr84[dispersions == 0] = sr[dispersions == 0]
outs["strength-ratios-16%"] = sr16
outs["strength-ratios-84%"] = sr84
return to_json_serializable({
"status": "success",
"data": outs,
"message": "Prediction success"
})
except Exception as e:
return {
"status": "error",
"data": {},
"message": f"Dispersion calculation failed: {str(e)}"
}