Skip to content

Commit

Permalink
Merge pull request #9763 from gem/gh
Browse files Browse the repository at this point in the history
`Refactored build_global_exposure`
  • Loading branch information
micheles authored Jun 7, 2024
2 parents 87397b4 + 54da8dd commit 37dfe61
Show file tree
Hide file tree
Showing 7 changed files with 310 additions and 182 deletions.
7 changes: 6 additions & 1 deletion openquake/commands/plot_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ def main(calc_id: int = -1, site_model=False,
from openquake.hmtk.plotting.patch import PolygonPatch

dstore = datastore.read(calc_id)
oq = dstore['oqparam']
try:
region = dstore['oqparam'].region
region = oq.region
except KeyError:
region = None
sitecol = dstore['sitecol']
Expand Down Expand Up @@ -72,6 +73,10 @@ def main(calc_id: int = -1, site_model=False,
disc = numpy.unique(dstore['discarded']['lon', 'lat'])
p.scatter(disc['lon'], disc['lat'], marker='x', color='red',
label='discarded', s=markersize_discarded)
if oq.rupture_xml or oq.rupture_dict:
lon, lat, dep = dstore['ruptures'][0]['hypo']
print('rupture(%s, %s)' % (lon, lat))
p.scatter([lon], [lat], marker='o', color='red', label='rupture')

ax = add_borders(ax)

Expand Down
213 changes: 213 additions & 0 deletions openquake/commonlib/expo_to_hdf5.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2024, GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

import logging
import operator
import pandas
import numpy
import h5py
from openquake.baselib import hdf5, sap, general
from openquake.baselib.parallel import Starmap
from openquake.hazardlib.geo.utils import geohash3
from openquake.commonlib.datastore import build_dstore_log
from openquake.risklib.asset import _get_exposure

U16 = numpy.uint16
U32 = numpy.uint32
F32 = numpy.float32
CONV = {n: F32 for n in '''
BUILDINGS COST_CONTENTS_USD COST_NONSTRUCTURAL_USD
COST_STRUCTURAL_USD LATITUDE LONGITUDE OCCUPANTS_PER_ASSET
OCCUPANTS_PER_ASSET_AVERAGE OCCUPANTS_PER_ASSET_DAY
OCCUPANTS_PER_ASSET_NIGHT OCCUPANTS_PER_ASSET_TRANSIT
TOTAL_AREA_SQM'''.split()}
CONV['ASSET_ID'] = (numpy.bytes_, 24)
for f in (None, 'ID_1'):
CONV[f] = str
TAGS = {'TAXONOMY': [], 'ID_0': [], 'ID_1': [], 'OCCUPANCY': []}
IGNORE = set('NAME_0 NAME_1 SETTLEMENT TOTAL_REPL_COST_USD COST_PER_AREA_USD'
.split())
FIELDS = {'TAXONOMY', 'COST_NONSTRUCTURAL_USD', 'LONGITUDE',
'COST_CONTENTS_USD', 'ASSET_ID', 'OCCUPANCY',
'OCCUPANTS_PER_ASSET', 'OCCUPANTS_PER_ASSET_AVERAGE',
'OCCUPANTS_PER_ASSET_DAY', 'OCCUPANTS_PER_ASSET_NIGHT',
'OCCUPANTS_PER_ASSET_TRANSIT', 'TOTAL_AREA_SQM',
'BUILDINGS', 'COST_STRUCTURAL_USD',
'LATITUDE', 'ID_0', 'ID_1'}


def add_geohash3(array):
"""
Add field "geohash3" to a structured array
"""
if len(array) == 0:
return ()
dt = array.dtype
dtlist = [('geohash3', U16)] + [(n, dt[n]) for n in dt.names]
out = numpy.zeros(len(array), dtlist)
for n in dt.names:
out[n] = array[n]
out['geohash3'] = geohash3(array['LONGITUDE'], array['LATITUDE'])
return out


def fix(arr):
# prepend the country to ASSET_ID and ID_1
ID0 = arr['ID_0']
ID1 = arr['ID_1']
arr['ASSET_ID'] = numpy.char.add(numpy.array(ID0, 'S3'), arr['ASSET_ID'])
for i, (id0, id1) in enumerate(zip(ID0, ID1)):
if not id1.startswith(id0):
ID1[i] = '%s-%s' % (id0, ID1[i])


def exposure_by_geohash(array, monitor):
"""
Yields pairs (geohash, array)
"""
array = add_geohash3(array)
fix(array)
for gh in numpy.unique(array['geohash3']):
yield gh, array[array['geohash3']==gh]


def store_tagcol(dstore):
"""
A TagCollection is stored as arrays like taxonomy = [
"?", "Adobe", "Concrete", "Stone-Masonry", "Unreinforced-Brick-Masonry",
"Wood"] with attributes __pyclass__, tagnames, tagsize
"""
tagsizes = []
tagnames = []
for tagname in TAGS:
name = 'taxonomy' if tagname == 'TAXONOMY' else tagname
tagnames.append(name)
tagvalues = numpy.concatenate(TAGS[tagname])
uvals, inv, counts = numpy.unique(
tagvalues, return_inverse=1, return_counts=1)
size = len(uvals) + 1
tagsizes.append(size)
logging.info('Storing %s[%d/%d]', tagname, size, len(inv))
hdf5.extend(dstore[f'assets/{tagname}'], inv + 1) # indices start from 1
dstore['tagcol/' + name] = numpy.concatenate([['?'], uvals])
if name == 'ID_0':
dtlist = [('country', (numpy.bytes_, 3)), ('counts', int)]
arr = numpy.empty(len(uvals), dtlist)
arr['country'] = uvals
arr['counts'] = counts
dstore['assets_by_country'] = arr

dic = dict(__pyclass__='openquake.risklib.asset.TagCollection',
tagnames=numpy.array(tagnames, hdf5.vstr),
tagsizes=tagsizes)
dstore.getitem('tagcol').attrs.update(dic)


def gen_tasks(files, monitor):
"""
Generate tasks of kind exposure_by_geohash for large files
"""
for file in files:
# read CSV in chunks
dfs = pandas.read_csv(
file.fname, names=file.header, dtype=CONV,
usecols=file.fields, skiprows=1, chunksize=1_000_000)
for i, df in enumerate(dfs):
if len(df) == 0:
continue
if 'ID_1' not in df.columns: # happens for many islands
df['ID_1'] = '???'
dt = hdf5.build_dt(CONV, df.columns, file.fname)
array = numpy.zeros(len(df), dt)
for col in df.columns:
array[col] = df[col].to_numpy()
if i == 0:
yield from exposure_by_geohash(array, monitor)
else:
print(file.fname)
yield exposure_by_geohash, array


def store(exposures_xml, dstore):
"""
Store the given exposures in the datastore
"""
csvfiles = []
for xml in exposures_xml:
exposure, _ = _get_exposure(xml)
csvfiles.extend(exposure.datafiles)
files = hdf5.sniff(csvfiles, ',', IGNORE)
dtlist = [(t, U32) for t in TAGS] + \
[(f, F32) for f in set(CONV)-set(TAGS)-{'ASSET_ID', None}] + \
[('ASSET_ID', h5py.string_dtype('ascii', 25))]
for name, dt in dtlist:
logging.info('Creating assets/%s', name)
dstore['exposure'] = exposure
dstore.create_df('assets', dtlist, 'gzip')
slc_dt = numpy.dtype([('gh3', U16), ('start', U32), ('stop', U32)])
dstore.create_dset('assets/slice_by_gh3', slc_dt, fillvalue=None)
dstore.swmr_on()
smap = Starmap.apply(gen_tasks, (files,),
weight=operator.attrgetter('size'), h5=dstore.hdf5)
num_assets = 0
# NB: we need to keep everything in memory to make gzip efficient
acc = general.AccumDict(accum=[])
for gh3, arr in smap:
for name in FIELDS:
if name in TAGS:
TAGS[name].append(arr[name])
else:
acc[name].append(arr[name])
n = len(arr)
slc = numpy.array([(gh3, num_assets, num_assets + n)], slc_dt)
hdf5.extend(dstore['assets/slice_by_gh3'], slc)
num_assets += n
Starmap.shutdown()
for name in sorted(acc):
lst = acc.pop(name)
arr = numpy.concatenate(lst, dtype=lst[0].dtype)
logging.info(f'Storing assets/{name}')
hdf5.extend(dstore['assets/' + name], arr)
store_tagcol(dstore)

# sanity check
for name in FIELDS:
n = len(dstore['assets/' + name])
assert n == num_assets, (name, n, num_assets)

logging.info('Stored {:_d} assets in {}'.
format(n, dstore.filename))


def main(exposures_xml):
"""
An utility to convert an exposure from XML+CSV format into HDF5.
NB: works only for the exposures of the global risk model, having
field names like LONGITUDE, LATITUDE, etc
"""
dstore, log = build_dstore_log()
with dstore, log:
store(exposures_xml, dstore)
return dstore.filename

main.exposure_xml = dict(help='Exposure pathnames', nargs='+')


if __name__ == '__main__':
# python -m openquake.risklib.expo_to_hdf5 exposure.xml
sap.run(main)
5 changes: 1 addition & 4 deletions openquake/commonlib/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,9 @@ def __init__(self, job_ini, calc_id, log_level='info', log_file=None,
if os.path.exists(path): # sanity check on the calculation ID
raise RuntimeError('There is a pre-existing file %s' % path)
self.usedb = True
elif calc_id == -1:
# only works in single-user situations
self.calc_id = get_last_calc_id() + 1
self.usedb = False
else:
# assume the calc_id was alreay created in the db
assert calc_id > 0, calc_id
self.calc_id = calc_id
self.usedb = True

Expand Down
11 changes: 11 additions & 0 deletions openquake/commonlib/tests/data/grm_exposure.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
OCCUPANTS_PER_ASSET_AVERAGE,COST_NONSTRUCTURAL_USD,ID_1,OCCUPANTS_PER_ASSET_DAY,OCCUPANTS_PER_ASSET_TRANSIT,NAME_0,OCCUPANTS_PER_ASSET,COST_PER_AREA_USD,SETTLEMENT,OCCUPANTS_PER_ASSET_NIGHT,ID_2,TOTAL_REPL_COST_USD,TOTAL_AREA_SQM,COST_STRUCTURAL_USD,TAXONOMY,COST_CONTENTS_USD,BUILDINGS,LONGITUDE,NAME_1,OCCUPANCY,ID_0,ASSET_ID,LATITUDE,NAME_1_CHINESE,NAME_1_PINYIN,NAME_2_CHINESE,NAME_2_PINYIN
68.4,2544048.0,V,21.6,59.1,Taiwan,134.4,780.0,Urban,124.5,V02,5653440.0,5436.0,1696032.0,CR/LFINF+DUH/HBET:2-5/RES,1413360.0,8.0,121.3270828794676,Taitung County,Res,TWN,Res_0,23.011250091869623,No_tag,No_tag,No_tag,No_tag
2.9,106002.0,V,0.9,2.5,Taiwan,5.6,780.1,Urban,5.2,V02,235560.0,226.5,70668.0,CR/LFINF+DUL/H:1/RES,58890.0,2.0,121.3270828794676,Taitung County,Res,TWN,Res_1,23.011250091869623,No_tag,No_tag,No_tag,No_tag
538.8,20034378.0,V,169.9,465.7,Taiwan,1059.1,780.0,Urban,980.8,V02,44520840.0,42808.6,13356252.0,CR/LFINF+DUL/HBET:2-5/RES,11130210.0,63.0,121.3270828794676,Taitung County,Res,TWN,Res_2,23.011250091869623,No_tag,No_tag,No_tag,No_tag
68.4,2544048.0,V,21.6,59.1,Taiwan,134.5,780.0,Urban,124.5,V02,5653440.0,5436.1,1696032.0,CR/LFINF+DUM/HBET:2-5/RES,1413360.0,8.0,121.3270828794676,Taitung County,Res,TWN,Res_3,23.011250091869623,No_tag,No_tag,No_tag,No_tag
57.0,2500560.0,V,18.0,49.2,Taiwan,112.0,920.0,Urban,103.7,V02,5556800.0,4530.0,1667040.0,CR/LWAL+DUH/HBET:6-12/RES,1389200.0,2.0,121.3270828794676,Taitung County,Res,TWN,Res_4,23.011250091869623,No_tag,No_tag,No_tag,No_tag
199.6,8751960.0,V,62.9,172.5,Taiwan,392.4,920.0,Urban,363.4,V02,19448800.0,15855.0,5834640.0,CR/LWAL+DUL/HBET:6-12/RES,4862200.0,7.0,121.3270828794676,Taitung County,Res,TWN,Res_5,23.011250091869623,No_tag,No_tag,No_tag,No_tag
28.5,1250280.0,V,9.0,24.6,Taiwan,56.0,920.0,Urban,51.8,V02,2778400.0,2265.0,833520.0,CR/LWAL+DUM/HBET:6-12/RES,694600.0,1.0,121.3270828794676,Taitung County,Res,TWN,Res_6,23.011250091869623,No_tag,No_tag,No_tag,No_tag
48.4,792000.7,V,15.3,41.8,Taiwan,95.1,520.0,Urban,88.1,V02,3168003.5,4264.5,1425601.6,MCF/LWAL+DUL/H:1/RES,950400.9,34.0,121.3270828794676,Taitung County,Res,TWN,Res_7,23.011250091869623,No_tag,No_tag,No_tag,No_tag
94.1,1537413.4,V,29.7,81.3,Taiwan,184.9,520.0,Urban,171.3,V02,6149653.8,8278.4,2767344.2,MCF/LWAL+DUL/HBET:2-5/RES,1844896.2,11.0,121.3270828794676,Taitung County,Res,TWN,Res_8,23.011250091869623,No_tag,No_tag,No_tag,No_tag
10.0,163058.8,V,3.2,8.7,Taiwan,19.8,520.1,Urban,18.3,V02,652236.1,877.9,293506.3,MCF/LWAL+DUM/H:1/RES,195671.0,7.0,121.3270828794676,Taitung County,Res,TWN,Res_9,23.011250091869623,No_tag,No_tag,No_tag,No_tag
37 changes: 37 additions & 0 deletions openquake/commonlib/tests/data/grm_exposure.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<nrml xmlns="http://openquake.org/xmlns/nrml/0.4" xmlns:gml="http://www.opengis.net/gml">

<exposureModel category="buildings" id="exposure" taxonomySource="GEM taxonomy">
<description>exposure model</description>

<conversions>
<costTypes>
<costType name="structural" type="aggregated" unit="USD"/>
<costType name="nonstructural" type="aggregated" unit="USD"/>
<costType name="contents" type="aggregated" unit="USD"/>
</costTypes>
</conversions>

<exposureFields>
<field oq="id" input="ASSET_ID" />
<field oq="lon" input="LONGITUDE" />
<field oq="lat" input="LATITUDE" />
<field oq="taxonomy" input="TAXONOMY" />
<field oq="number" input="BUILDINGS" />
<field oq="area" input="TOTAL_AREA_SQM" />
<field oq="structural" input="COST_STRUCTURAL_USD" />
<field oq="nonstructural" input="COST_NONSTRUCTURAL_USD" />
<field oq="contents" input="COST_CONTENTS_USD" />
<field oq="day" input="OCCUPANTS_PER_ASSET_DAY" />
<field oq="night" input="OCCUPANTS_PER_ASSET_NIGHT" />
<field oq="transit" input="OCCUPANTS_PER_ASSET_TRANSIT" />
<field oq="residents" input="OCCUPANTS_PER_ASSET" />
</exposureFields>

<occupancyPeriods>day night transit</occupancyPeriods>

<tagNames>OCCUPANCY ID_0 ID_1 ID_2 NAME_0 NAME_1</tagNames>
<assets>grm_exposure.csv</assets>
</exposureModel>

</nrml>
35 changes: 35 additions & 0 deletions openquake/commonlib/tests/expo_to_hdf5_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2024, GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import os
import unittest
from openquake.baselib import hdf5
from openquake.commonlib.expo_to_hdf5 import main


def test_main():
raise unittest.SkipTest('Avoid error MRD01TestCase: There is a pre-existing file')
# only tests that it runs
expo_xml = os.path.join(os.path.dirname(__file__),
'data', 'grm_exposure.xml')
out = main([expo_xml])
with hdf5.File(out) as dstore:
assets = list(dstore['assets/ASSET_ID'])
assert assets == [b'TWNRes_0', b'TWNRes_1', b'TWNRes_2', b'TWNRes_3',
b'TWNRes_4', b'TWNRes_5', b'TWNRes_6', b'TWNRes_7',
b'TWNRes_8', b'TWNRes_9']

Loading

0 comments on commit 37dfe61

Please sign in to comment.