Djura-Tools companion scripts¶
djura-tools
is a companion repository containing helper functions and ready-to-run
scripts that exercise the djura package end-to-end. It bundles example
PSHA datastores, OQ Engine hazard and disaggregation outputs, and selected
record sets, together with scripts that turn them into inputs for the
Djura web apps.
Clone the repository alongside an installed djura to run the scripts:
git clone https://github.com/djura-risk-data-engineering/djura-tools
cd djura-tools
pip install djura
The scripts live under scripts/ and read their sample inputs from
scripts/data/. The sections below reproduce each script and describe
what it does.
Processing disaggregation (approximate)¶
scripts/process_disagg_approx.py parses OQ Engine disaggregation and
hazard curve outputs, then builds the conditional record-selection input
expected by the
conditional record selector.
"""
Process Disaggregation outputs from Oq-engine
This is an approximate approach
"""
# flake8: noqa
from pathlib import Path
import sys
import json
path = Path(__file__).resolve().parent
sys.path.insert(0, str(path.parent))
from djura.hazard.psha import proc_oq_disaggregation, proc_oq_hazard_curve
# POEs of interest
# Those are probability of exceedances (POEs) associated with intensity levels
# used during record selection
poes = [0.4, 0.2, 0.1, 0.05, 0.02, 0.01, 0.005, 0.0025, 0.001]
# Process the disaggregation excel file from OQ-Engine
disagg_all = proc_oq_disaggregation(
path / 'data/psha', poes,
# out_file=path / 'data/psha/disaggregation.json'
)
hz = proc_oq_hazard_curve(poes, path / 'data/psha')
# SA(0.5) to be used as conditional IM
imt = "SA(0.5)"
imls = hz['cond_imls'][imt]
rs_input = json.load(open(path / "data/djura-conditional-all-input.json"))
rs_input["imi"] = [
'SA(0.05)', 'SA(0.075)', 'SA(0.1)', 'SA(0.15)', 'SA(0.2)',
'SA(0.25)', 'SA(0.3)', 'SA(0.4)', 'SA(0.5)', 'SA(0.59)', 'SA(0.6)',
'SA(0.7)', 'SA(0.8)', 'SA(0.9)', 'SA(1.0)', 'SA(1.25)',
'SA(1.5)', 'SA(2.0)', 'SA(2.5)', 'SA(3.0)'
]
rs_input["im_weights"] = [1 / len(rs_input["imi"])] * len(rs_input["imi"])
rs_input["gmms"] = [
{"ID": 0, "SA": {"names": ['KothaEtAl2020ESHM20'], "weights": [1]}}
]
rs_input["site-parameters"] = {
"z2pt5": "1.38",
"z1pt0": "391.34",
"vs30": "370",
"xvf": "150",
"region": 0
}
rs_input["poes"] = {}
for idx, poe in enumerate(poes):
disagg = disagg_all["imt_disagg"][imt][
'mag_dist_hazard_contributions'][f'poe_{poe}']
ruptures = []
for i in range(len(disagg['mag'])):
ruptures.append(
{
"ID": i,
"gmms": [0],
"rjb": disagg['dist'][i],
"mag": disagg['mag'][i],
"rake": 0.0,
"weight": disagg['hz_cont_occ'][i],
# "weight": disagg['hz_cont_exc'][i],
}
)
rs_input["poes"][poe] = {
"ruptures": ruptures,
"im-star": {"type": imt, "value": imls[idx]},
}
# You may save the rs_input variable to a file for each POE
# This input then may be directly uploaded in
# https://apps.djura.it/hazard/record-selector/conditional
# NOTE: however, be careful as browsers might not easily display
# large amount of data
# If you encounter issues, feel free to contact us for questions
# or to help run the API without a UI
Processing disaggregation with a datastore¶
scripts/process_disagg_approx_dstore.py extends the approximate
workflow by reading rupture context parameters directly from the OQ
Engine datastore (calc_<id>.hdf5) via get_context_from_dstore,
matching each disaggregation scenario to its closest magnitude/distance
context and keeping only the most contributing ruptures.
"""
Process Disaggregation outputs from Oq-engine
This is an approximate approach
"""
# flake8: noqa
from pathlib import Path
import sys
import json
import heapq
import numpy as np
path = Path(__file__).resolve().parent
sys.path.insert(0, str(path.parent))
from djura.hazard.dstore import get_context_from_dstore
from djura.hazard.psha import proc_oq_disaggregation, proc_oq_hazard_curve
# POEs of interest
# Those are probability of exceedances (POEs) associated with intensity levels
# used during record selection
poes = [0.4, 0.2, 0.1, 0.05, 0.02, 0.01, 0.005, 0.0025, 0.001]
# Process the disaggregation excel file from OQ-Engine
disagg_all = proc_oq_disaggregation(
path / 'data/psha', poes,
# out_file=path / 'data/psha/disaggregation.json'
)
hz = proc_oq_hazard_curve(poes, path / 'data/psha')
# SA(0.5) to be used as conditional IM
imt = "SA(0.5)"
imls = hz['cond_imls'][imt]
rs_input = json.load(open(path / "data/djura-conditional-all-input.json"))
# Some of the following input parameters are taken from the job.ini file
rs_input["imi"] = [
'SA(0.05)', 'SA(0.075)', 'SA(0.1)', 'SA(0.15)', 'SA(0.2)',
'SA(0.25)', 'SA(0.3)', 'SA(0.4)', 'SA(0.5)', 'SA(0.6)',
'SA(0.7)', 'SA(0.8)', 'SA(0.9)', 'SA(1.0)', 'SA(1.25)',
'SA(1.5)', 'SA(2.0)', 'SA(2.5)', 'SA(3.0)'
]
rs_input["im_weights"] = [1 / len(rs_input["imi"])] * len(rs_input["imi"])
rs_input["gmms"] = [
{"ID": 0, "SA": {"names": ['KothaEtAl2020ESHM20'], "weights": [1]}}
]
# can be read directly from the datastore too
rs_input["site-parameters"] = {
"z2pt5": "1.38",
"z1pt0": "391.34",
"vs30": "370",
"xvf": "150",
"region": 0
}
rs_input["poes"] = {}
for idx, poe in enumerate(poes):
disagg = disagg_all["imt_disagg"][imt][
'mag_dist_hazard_contributions'][f'poe_{poe}']
ruptures = []
for i in range(len(disagg['mag'])):
ruptures.append(
{
"ID": i,
"gmms": [0],
"rjb": disagg['dist'][i],
"mag": disagg['mag'][i],
"rake": 0.0,
"weight": disagg['hz_cont_occ'][i],
# "weight": disagg['hz_cont_exc'][i],
}
)
rs_input["poes"][poe] = {
"ruptures": ruptures,
"im-star": {"type": imt, "value": imls[idx]},
}
# You may save the rs_input variable to a file for each POE
# This input then may be directly uploaded in
# https://apps.djura.it/hazard/record-selector/conditional
# NOTE: however, be careful as browsers might not easily display
# large amount of data
# If you encounter issues, feel free to contact us for questions
# or to help run the API without a UI
# Most contributing scenarios to select
# Leave None for all
n = 10
if n is not None:
for poe in poes:
ruptures = rs_input['poes'][poe]['ruptures']
# select the most contributing scenarios (unsorted)
ruptures = heapq.nlargest(n, ruptures, key=lambda x: x['weight'])
rs_input['poes'][poe]['ruptures'] = ruptures
# Match the mag and rjb to required parameters
# Get datastore
dstore = "2"
hdf_path = path.parents[3] / f"oqdata/calc_{dstore}.hdf5"
ctx, oq = get_context_from_dstore(hdf_path, im_ref=imt)
site_params = {}
# Based on OQ assignment
params = np.rec.array(np.concatenate(list(ctx['ctx_by_grp'].values())))
for param in ctx['site-parameters']:
site_params[param] = params[param][0]
rs_input['site-parameters'] = site_params
for poe in poes:
ruptures = rs_input["poes"][poe]['ruptures']
for i, rup in enumerate(ruptures):
mag = rup['mag']
rjb = rup['rjb']
print(f"Target Magnitude: {mag}, and Rjb: {rjb}")
# Euclidean distance for each recarray row
mag_normalized = (params['mag'] - params['mag'].min()) / \
(params['mag'].max() - params['mag'].min())
rjb_normalized = (params['rjb'] - params['rjb'].min()) / \
(params['rjb'].max() - params['rjb'].min())
target_mag_norm = (mag - params['mag'].min()) / \
(params['mag'].max() - params['mag'].min())
target_rjb_norm = (rjb - params['rjb'].min()) / \
(params['rjb'].max() - params['rjb'].min())
distances = np.sqrt((mag_normalized - target_mag_norm) **
2 + (rjb_normalized - target_rjb_norm)**2)
closest_idx = np.argmin(distances)
closest_row = params[closest_idx]
print(f"Closest mag: {closest_row['mag']}")
print(f"Closest rjb: {closest_row['rjb']}")
# Required parameters
req_params = ctx['required-parameters']
req_params_values = {}
for param in req_params:
req_params_values[param] = closest_row[param]
rs_input["poes"][poe]['ruptures'][i] = {
**rs_input["poes"][poe]['ruptures'][i], **req_params_values}
def to_json_serializable(data):
from numpy import float32, int32, ndarray
if isinstance(data, dict):
for key, value in data.items():
data[key] = to_json_serializable(value)
elif isinstance(data, list):
return [to_json_serializable(item) for item in data]
elif isinstance(data, ndarray):
return data.tolist()
elif isinstance(data, float32):
return float(data)
elif isinstance(data, int32):
return float(data)
return data
rs_input = to_json_serializable(rs_input)
with open("filename.json", "w") as json_file:
json.dump(rs_input['num-components'], json_file)
Processing disaggregation (exact)¶
scripts/process_disagg_exact.py shows the exact approach, reading the
full rupture context directly from a datastore. This method is only
available through the API, not the web UI.
"""
Process Disaggregation outputs from OQ-engine
Datastore
This method is not available within the UI of Djura
But it can be used through the API.
If you are willing to utilise the API directly without the
UI, feel free to contact us!
This is an exact approach
"""
# flake8: noqa
from pathlib import Path
import sys
import json
path = Path(__file__).resolve().parent
sys.path.insert(0, str(path.parent))
from djura.hazard.dstore import get_context_from_dstore
# OQ Datastores (.hdf5) are typically located in:
# C:\Users\<your-username>\oqdata or
# C:\Users\<your-username>\Documents\oqdata
# Conditional IM, does not have to match the conditional
# IM used during conditional selection of OQ engine
im_ref = "SA(0.5)"
dstore = "85"
hdf_path = path.parents[5] / f"oqdata/calc_{dstore}.hdf5"
ctx, oq = get_context_from_dstore(hdf_path, im_ref=im_ref)
# You may choose to save the context into a pickle
# from djura.utilities import export_results
# export_results(
# path / f"ctx_{dstore}",
# ctx,
# "pickle"
# )
Checking hazard consistency¶
scripts/check_hazard_consistency.py parses hazard curves and selected
record response spectra, then assembles the payload expected by the
hazard consistency app.
# flake8: noqa
from pathlib import Path
import sys
path = Path(__file__).resolve().parent
sys.path.insert(0, str(path.parent))
from djura.utilities import to_json_serializable
from djura.record_selector.hzc import prepare_rs_for_hzc
from djura.hazard.psha import proc_oq_hazard_curve
# POEs of interest
# Those are probability of exceedances (POEs) associated with intensity levels
# used during record selection
poes = [0.4, 0.2, 0.1, 0.05, 0.02, 0.01, 0.005, 0.0025, 0.001]
# IMs of interest
imts = ["SA(0.5)"]
# Conditional IM (for visualisations)
cond_im = "SA(0.5)"
# Set the hazard-curves
hz = proc_oq_hazard_curve(poes, path / 'data/psha',
out_file=path / 'data/psha/hazard.json')
curves = hz['hazard_curves']
rs = prepare_rs_for_hzc(path / f'data/records', hz["cond_poes"], imts)
# Finally prepare input that can be used inside hazard-consistency application
# at https://apps.djura.it/hazard/hazard-consistency
data = {
"imts": imts,
"response_spectra": rs,
"conditional_poes": hz["cond_poes"],
"conditional_intensities": hz["cond_imls"][cond_im],
"investigation_time": hz["investigation_time"],
"num_im": 500,
"hazard_curves": curves,
}
data = to_json_serializable(data)
Splitting record-selection input per PoE¶
scripts/rs_input_poe.py is a small utility that splits a combined
conditional record-selection input into one file per probability of
exceedance.
import json
from pathlib import Path
path = Path(__file__).parent
imt = "SA(0.59)"
# Load JSON as a Python dict
with open(path / f"data/{imt}.json", "r") as f:
data = json.load(f)
poes = data['poes']
for poe in poes:
_data = data.copy()
ruptures = data['poes'][str(poe)]['ruptures']
im_star = data['poes'][str(poe)]['im-star'] # adjust if key name differs
_data['ruptures'] = ruptures
_data['im-star'] = im_star
output_file = path / f"{imt}_{poe}.json"
# with open(output_file, 'w') as f_out:
# json.dump(_data, f_out, indent=4)
# print(f"Saved {output_file}")