diff --git a/openquake/commands/plot_assets.py b/openquake/commands/plot_assets.py index 15d4f529b8eb..698ef9b01202 100644 --- a/openquake/commands/plot_assets.py +++ b/openquake/commands/plot_assets.py @@ -36,8 +36,9 @@ def main(calc_id: int = -1, site_model=False, from openquake.hmtk.plotting.patch import PolygonPatch dstore = datastore.read(calc_id) + oq = dstore['oqparam'] try: - region = dstore['oqparam'].region + region = oq.region except KeyError: region = None sitecol = dstore['sitecol'] @@ -72,6 +73,10 @@ def main(calc_id: int = -1, site_model=False, disc = numpy.unique(dstore['discarded']['lon', 'lat']) p.scatter(disc['lon'], disc['lat'], marker='x', color='red', label='discarded', s=markersize_discarded) + if oq.rupture_xml or oq.rupture_dict: + lon, lat, dep = dstore['ruptures'][0]['hypo'] + print('rupture(%s, %s)' % (lon, lat)) + p.scatter([lon], [lat], marker='o', color='red', label='rupture') ax = add_borders(ax) diff --git a/openquake/commonlib/expo_to_hdf5.py b/openquake/commonlib/expo_to_hdf5.py new file mode 100644 index 000000000000..0705ca816670 --- /dev/null +++ b/openquake/commonlib/expo_to_hdf5.py @@ -0,0 +1,213 @@ +# -*- coding: utf-8 -*- +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright (C) 2024, GEM Foundation +# +# OpenQuake is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# OpenQuake is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with OpenQuake. If not, see . + +import logging +import operator +import pandas +import numpy +import h5py +from openquake.baselib import hdf5, sap, general +from openquake.baselib.parallel import Starmap +from openquake.hazardlib.geo.utils import geohash3 +from openquake.commonlib.datastore import build_dstore_log +from openquake.risklib.asset import _get_exposure + +U16 = numpy.uint16 +U32 = numpy.uint32 +F32 = numpy.float32 +CONV = {n: F32 for n in ''' +BUILDINGS COST_CONTENTS_USD COST_NONSTRUCTURAL_USD +COST_STRUCTURAL_USD LATITUDE LONGITUDE OCCUPANTS_PER_ASSET +OCCUPANTS_PER_ASSET_AVERAGE OCCUPANTS_PER_ASSET_DAY +OCCUPANTS_PER_ASSET_NIGHT OCCUPANTS_PER_ASSET_TRANSIT +TOTAL_AREA_SQM'''.split()} +CONV['ASSET_ID'] = (numpy.bytes_, 24) +for f in (None, 'ID_1'): + CONV[f] = str +TAGS = {'TAXONOMY': [], 'ID_0': [], 'ID_1': [], 'OCCUPANCY': []} +IGNORE = set('NAME_0 NAME_1 SETTLEMENT TOTAL_REPL_COST_USD COST_PER_AREA_USD' + .split()) +FIELDS = {'TAXONOMY', 'COST_NONSTRUCTURAL_USD', 'LONGITUDE', + 'COST_CONTENTS_USD', 'ASSET_ID', 'OCCUPANCY', + 'OCCUPANTS_PER_ASSET', 'OCCUPANTS_PER_ASSET_AVERAGE', + 'OCCUPANTS_PER_ASSET_DAY', 'OCCUPANTS_PER_ASSET_NIGHT', + 'OCCUPANTS_PER_ASSET_TRANSIT', 'TOTAL_AREA_SQM', + 'BUILDINGS', 'COST_STRUCTURAL_USD', + 'LATITUDE', 'ID_0', 'ID_1'} + + +def add_geohash3(array): + """ + Add field "geohash3" to a structured array + """ + if len(array) == 0: + return () + dt = array.dtype + dtlist = [('geohash3', U16)] + [(n, dt[n]) for n in dt.names] + out = numpy.zeros(len(array), dtlist) + for n in dt.names: + out[n] = array[n] + out['geohash3'] = geohash3(array['LONGITUDE'], array['LATITUDE']) + return out + + +def fix(arr): + # prepend the country to ASSET_ID and ID_1 + ID0 = arr['ID_0'] + ID1 = arr['ID_1'] + arr['ASSET_ID'] = numpy.char.add(numpy.array(ID0, 'S3'), arr['ASSET_ID']) + for i, (id0, id1) in enumerate(zip(ID0, ID1)): + if not id1.startswith(id0): + ID1[i] = '%s-%s' % (id0, ID1[i]) + + +def exposure_by_geohash(array, monitor): + """ + Yields pairs (geohash, array) + """ + array = add_geohash3(array) + fix(array) + for gh in numpy.unique(array['geohash3']): + yield gh, array[array['geohash3']==gh] + + +def store_tagcol(dstore): + """ + A TagCollection is stored as arrays like taxonomy = [ + "?", "Adobe", "Concrete", "Stone-Masonry", "Unreinforced-Brick-Masonry", + "Wood"] with attributes __pyclass__, tagnames, tagsize + """ + tagsizes = [] + tagnames = [] + for tagname in TAGS: + name = 'taxonomy' if tagname == 'TAXONOMY' else tagname + tagnames.append(name) + tagvalues = numpy.concatenate(TAGS[tagname]) + uvals, inv, counts = numpy.unique( + tagvalues, return_inverse=1, return_counts=1) + size = len(uvals) + 1 + tagsizes.append(size) + logging.info('Storing %s[%d/%d]', tagname, size, len(inv)) + hdf5.extend(dstore[f'assets/{tagname}'], inv + 1) # indices start from 1 + dstore['tagcol/' + name] = numpy.concatenate([['?'], uvals]) + if name == 'ID_0': + dtlist = [('country', (numpy.bytes_, 3)), ('counts', int)] + arr = numpy.empty(len(uvals), dtlist) + arr['country'] = uvals + arr['counts'] = counts + dstore['assets_by_country'] = arr + + dic = dict(__pyclass__='openquake.risklib.asset.TagCollection', + tagnames=numpy.array(tagnames, hdf5.vstr), + tagsizes=tagsizes) + dstore.getitem('tagcol').attrs.update(dic) + + +def gen_tasks(files, monitor): + """ + Generate tasks of kind exposure_by_geohash for large files + """ + for file in files: + # read CSV in chunks + dfs = pandas.read_csv( + file.fname, names=file.header, dtype=CONV, + usecols=file.fields, skiprows=1, chunksize=1_000_000) + for i, df in enumerate(dfs): + if len(df) == 0: + continue + if 'ID_1' not in df.columns: # happens for many islands + df['ID_1'] = '???' + dt = hdf5.build_dt(CONV, df.columns, file.fname) + array = numpy.zeros(len(df), dt) + for col in df.columns: + array[col] = df[col].to_numpy() + if i == 0: + yield from exposure_by_geohash(array, monitor) + else: + print(file.fname) + yield exposure_by_geohash, array + + +def store(exposures_xml, dstore): + """ + Store the given exposures in the datastore + """ + csvfiles = [] + for xml in exposures_xml: + exposure, _ = _get_exposure(xml) + csvfiles.extend(exposure.datafiles) + files = hdf5.sniff(csvfiles, ',', IGNORE) + dtlist = [(t, U32) for t in TAGS] + \ + [(f, F32) for f in set(CONV)-set(TAGS)-{'ASSET_ID', None}] + \ + [('ASSET_ID', h5py.string_dtype('ascii', 25))] + for name, dt in dtlist: + logging.info('Creating assets/%s', name) + dstore['exposure'] = exposure + dstore.create_df('assets', dtlist, 'gzip') + slc_dt = numpy.dtype([('gh3', U16), ('start', U32), ('stop', U32)]) + dstore.create_dset('assets/slice_by_gh3', slc_dt, fillvalue=None) + dstore.swmr_on() + smap = Starmap.apply(gen_tasks, (files,), + weight=operator.attrgetter('size'), h5=dstore.hdf5) + num_assets = 0 + # NB: we need to keep everything in memory to make gzip efficient + acc = general.AccumDict(accum=[]) + for gh3, arr in smap: + for name in FIELDS: + if name in TAGS: + TAGS[name].append(arr[name]) + else: + acc[name].append(arr[name]) + n = len(arr) + slc = numpy.array([(gh3, num_assets, num_assets + n)], slc_dt) + hdf5.extend(dstore['assets/slice_by_gh3'], slc) + num_assets += n + Starmap.shutdown() + for name in sorted(acc): + lst = acc.pop(name) + arr = numpy.concatenate(lst, dtype=lst[0].dtype) + logging.info(f'Storing assets/{name}') + hdf5.extend(dstore['assets/' + name], arr) + store_tagcol(dstore) + + # sanity check + for name in FIELDS: + n = len(dstore['assets/' + name]) + assert n == num_assets, (name, n, num_assets) + + logging.info('Stored {:_d} assets in {}'. + format(n, dstore.filename)) + + +def main(exposures_xml): + """ + An utility to convert an exposure from XML+CSV format into HDF5. + NB: works only for the exposures of the global risk model, having + field names like LONGITUDE, LATITUDE, etc + """ + dstore, log = build_dstore_log() + with dstore, log: + store(exposures_xml, dstore) + return dstore.filename + +main.exposure_xml = dict(help='Exposure pathnames', nargs='+') + + +if __name__ == '__main__': + # python -m openquake.risklib.expo_to_hdf5 exposure.xml + sap.run(main) diff --git a/openquake/commonlib/logs.py b/openquake/commonlib/logs.py index 67a0326df1a2..88df4e040569 100644 --- a/openquake/commonlib/logs.py +++ b/openquake/commonlib/logs.py @@ -217,12 +217,9 @@ def __init__(self, job_ini, calc_id, log_level='info', log_file=None, if os.path.exists(path): # sanity check on the calculation ID raise RuntimeError('There is a pre-existing file %s' % path) self.usedb = True - elif calc_id == -1: - # only works in single-user situations - self.calc_id = get_last_calc_id() + 1 - self.usedb = False else: # assume the calc_id was alreay created in the db + assert calc_id > 0, calc_id self.calc_id = calc_id self.usedb = True diff --git a/openquake/commonlib/tests/data/grm_exposure.csv b/openquake/commonlib/tests/data/grm_exposure.csv new file mode 100644 index 000000000000..76c1b1cf451e --- /dev/null +++ b/openquake/commonlib/tests/data/grm_exposure.csv @@ -0,0 +1,11 @@ +OCCUPANTS_PER_ASSET_AVERAGE,COST_NONSTRUCTURAL_USD,ID_1,OCCUPANTS_PER_ASSET_DAY,OCCUPANTS_PER_ASSET_TRANSIT,NAME_0,OCCUPANTS_PER_ASSET,COST_PER_AREA_USD,SETTLEMENT,OCCUPANTS_PER_ASSET_NIGHT,ID_2,TOTAL_REPL_COST_USD,TOTAL_AREA_SQM,COST_STRUCTURAL_USD,TAXONOMY,COST_CONTENTS_USD,BUILDINGS,LONGITUDE,NAME_1,OCCUPANCY,ID_0,ASSET_ID,LATITUDE,NAME_1_CHINESE,NAME_1_PINYIN,NAME_2_CHINESE,NAME_2_PINYIN +68.4,2544048.0,V,21.6,59.1,Taiwan,134.4,780.0,Urban,124.5,V02,5653440.0,5436.0,1696032.0,CR/LFINF+DUH/HBET:2-5/RES,1413360.0,8.0,121.3270828794676,Taitung County,Res,TWN,Res_0,23.011250091869623,No_tag,No_tag,No_tag,No_tag +2.9,106002.0,V,0.9,2.5,Taiwan,5.6,780.1,Urban,5.2,V02,235560.0,226.5,70668.0,CR/LFINF+DUL/H:1/RES,58890.0,2.0,121.3270828794676,Taitung County,Res,TWN,Res_1,23.011250091869623,No_tag,No_tag,No_tag,No_tag +538.8,20034378.0,V,169.9,465.7,Taiwan,1059.1,780.0,Urban,980.8,V02,44520840.0,42808.6,13356252.0,CR/LFINF+DUL/HBET:2-5/RES,11130210.0,63.0,121.3270828794676,Taitung County,Res,TWN,Res_2,23.011250091869623,No_tag,No_tag,No_tag,No_tag +68.4,2544048.0,V,21.6,59.1,Taiwan,134.5,780.0,Urban,124.5,V02,5653440.0,5436.1,1696032.0,CR/LFINF+DUM/HBET:2-5/RES,1413360.0,8.0,121.3270828794676,Taitung County,Res,TWN,Res_3,23.011250091869623,No_tag,No_tag,No_tag,No_tag +57.0,2500560.0,V,18.0,49.2,Taiwan,112.0,920.0,Urban,103.7,V02,5556800.0,4530.0,1667040.0,CR/LWAL+DUH/HBET:6-12/RES,1389200.0,2.0,121.3270828794676,Taitung County,Res,TWN,Res_4,23.011250091869623,No_tag,No_tag,No_tag,No_tag +199.6,8751960.0,V,62.9,172.5,Taiwan,392.4,920.0,Urban,363.4,V02,19448800.0,15855.0,5834640.0,CR/LWAL+DUL/HBET:6-12/RES,4862200.0,7.0,121.3270828794676,Taitung County,Res,TWN,Res_5,23.011250091869623,No_tag,No_tag,No_tag,No_tag +28.5,1250280.0,V,9.0,24.6,Taiwan,56.0,920.0,Urban,51.8,V02,2778400.0,2265.0,833520.0,CR/LWAL+DUM/HBET:6-12/RES,694600.0,1.0,121.3270828794676,Taitung County,Res,TWN,Res_6,23.011250091869623,No_tag,No_tag,No_tag,No_tag +48.4,792000.7,V,15.3,41.8,Taiwan,95.1,520.0,Urban,88.1,V02,3168003.5,4264.5,1425601.6,MCF/LWAL+DUL/H:1/RES,950400.9,34.0,121.3270828794676,Taitung County,Res,TWN,Res_7,23.011250091869623,No_tag,No_tag,No_tag,No_tag +94.1,1537413.4,V,29.7,81.3,Taiwan,184.9,520.0,Urban,171.3,V02,6149653.8,8278.4,2767344.2,MCF/LWAL+DUL/HBET:2-5/RES,1844896.2,11.0,121.3270828794676,Taitung County,Res,TWN,Res_8,23.011250091869623,No_tag,No_tag,No_tag,No_tag +10.0,163058.8,V,3.2,8.7,Taiwan,19.8,520.1,Urban,18.3,V02,652236.1,877.9,293506.3,MCF/LWAL+DUM/H:1/RES,195671.0,7.0,121.3270828794676,Taitung County,Res,TWN,Res_9,23.011250091869623,No_tag,No_tag,No_tag,No_tag diff --git a/openquake/commonlib/tests/data/grm_exposure.xml b/openquake/commonlib/tests/data/grm_exposure.xml new file mode 100644 index 000000000000..549dde952e2b --- /dev/null +++ b/openquake/commonlib/tests/data/grm_exposure.xml @@ -0,0 +1,37 @@ + + + + + exposure model + + + + + + + + + + + + + + + + + + + + + + + + + + day night transit + + OCCUPANCY ID_0 ID_1 ID_2 NAME_0 NAME_1 + grm_exposure.csv + + + diff --git a/openquake/commonlib/tests/expo_to_hdf5_test.py b/openquake/commonlib/tests/expo_to_hdf5_test.py new file mode 100644 index 000000000000..e22a0531bcb0 --- /dev/null +++ b/openquake/commonlib/tests/expo_to_hdf5_test.py @@ -0,0 +1,35 @@ +# -*- coding: utf-8 -*- +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright (C) 2024, GEM Foundation +# +# OpenQuake is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# OpenQuake is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with OpenQuake. If not, see . +import os +import unittest +from openquake.baselib import hdf5 +from openquake.commonlib.expo_to_hdf5 import main + + +def test_main(): + raise unittest.SkipTest('Avoid error MRD01TestCase: There is a pre-existing file') + # only tests that it runs + expo_xml = os.path.join(os.path.dirname(__file__), + 'data', 'grm_exposure.xml') + out = main([expo_xml]) + with hdf5.File(out) as dstore: + assets = list(dstore['assets/ASSET_ID']) + assert assets == [b'TWNRes_0', b'TWNRes_1', b'TWNRes_2', b'TWNRes_3', + b'TWNRes_4', b'TWNRes_5', b'TWNRes_6', b'TWNRes_7', + b'TWNRes_8', b'TWNRes_9'] + diff --git a/utils/build_global_exposure b/utils/build_global_exposure index 9a2961f79c2a..90e47eb1f1b2 100755 --- a/utils/build_global_exposure +++ b/utils/build_global_exposure @@ -19,78 +19,27 @@ import os import logging -import operator import pandas import numpy -import h5py -from openquake.baselib import hdf5, sap, performance, general -from openquake.baselib.parallel import Starmap +from openquake.baselib import sap, performance from openquake.hazardlib import nrml -from openquake.hazardlib.geo.utils import geohash3 +from openquake.risklib.riskmodels import CompositeRiskModel, RiskFuncList from openquake.commonlib.datastore import build_dstore_log from openquake.commonlib.oqvalidation import OqParam -from openquake.risklib.riskmodels import CompositeRiskModel, RiskFuncList -from openquake.risklib.asset import _get_exposure +from openquake.commonlib import expo_to_hdf5 from openquake.qa_tests_data import mosaic U16 = numpy.uint16 -U32 = numpy.uint32 -F32 = numpy.float32 -CONV = {n: F32 for n in ''' -BUILDINGS COST_CONTENTS_USD COST_NONSTRUCTURAL_USD -COST_STRUCTURAL_USD LATITUDE LONGITUDE OCCUPANTS_PER_ASSET -OCCUPANTS_PER_ASSET_AVERAGE OCCUPANTS_PER_ASSET_DAY -OCCUPANTS_PER_ASSET_NIGHT OCCUPANTS_PER_ASSET_TRANSIT -TOTAL_AREA_SQM'''.split()} -CONV['ASSET_ID'] = (numpy.bytes_, 24) -for f in (None, 'ID_1'): - CONV[f] = str -TAGS = {'TAXONOMY': [], 'ID_0': [], 'ID_1': [], 'OCCUPANCY': []} -IGNORE = set('NAME_0 NAME_1 SETTLEMENT TOTAL_REPL_COST_USD COST_PER_AREA_USD' - .split()) MOSAIC_DIR = os.path.dirname(mosaic.__file__) -FIELDS = {'TAXONOMY', 'COST_NONSTRUCTURAL_USD', 'LONGITUDE', - 'COST_CONTENTS_USD', 'ASSET_ID', 'OCCUPANCY', - 'OCCUPANTS_PER_ASSET', 'OCCUPANTS_PER_ASSET_AVERAGE', - 'OCCUPANTS_PER_ASSET_DAY', 'OCCUPANTS_PER_ASSET_NIGHT', - 'OCCUPANTS_PER_ASSET_TRANSIT', 'TOTAL_AREA_SQM', - 'BUILDINGS', 'COST_STRUCTURAL_USD', - 'LATITUDE', 'ID_0', 'ID_1'} - - -def add_geohash3(array): - """ - Add field "geohash3" to a structured array - """ - if len(array) == 0: - return () - dt = array.dtype - dtlist = [('geohash3', U16)] + [(n, dt[n]) for n in dt.names] - out = numpy.zeros(len(array), dtlist) - for n in dt.names: - out[n] = array[n] - out['geohash3'] = geohash3(array['LONGITUDE'], array['LATITUDE']) - return out - - -def fix(arr): - # prepend the country to ASSET_ID and ID_1 - ID0 = arr['ID_0'] - ID1 = arr['ID_1'] - arr['ASSET_ID'] = numpy.char.add(numpy.array(ID0, 'S3'), arr['ASSET_ID']) - for i, (id0, id1) in enumerate(zip(ID0, ID1)): - if not id1.startswith(id0): - ID1[i] = '%s-%s' % (id0, ID1[i]) def collect_exposures(grm_dir): """ Collect the files of kind Exposure_.xml. - :returns: exposure, xmlfiles, csvfiles + :returns: xmlfiles """ out = [] - csvfiles = [] for region in os.listdir(grm_dir): expodir = os.path.join(grm_dir, region, 'Exposure', 'Exposure') if not os.path.exists(expodir): @@ -99,125 +48,7 @@ def collect_exposures(grm_dir): if fname.startswith('Exposure_'): # i.e. Exposure_Chile.xml fullname = os.path.join(expodir, fname) out.append(fullname) - exposure, _ = _get_exposure(fullname) - csvfiles.extend(exposure.datafiles) - return exposure, out, csvfiles - - -def exposure_by_geohash(array, monitor): - """ - Yields pairs (geohash, array) - """ - array = add_geohash3(array) - fix(array) - for gh in numpy.unique(array['geohash3']): - yield gh, array[array['geohash3']==gh] - - -def store_tagcol(dstore): - """ - A TagCollection is stored as arrays like taxonomy = [ - "?", "Adobe", "Concrete", "Stone-Masonry", "Unreinforced-Brick-Masonry", - "Wood"] with attributes __pyclass__, tagnames, tagsize - """ - tagsizes = [] - tagnames = [] - for tagname in TAGS: - name = 'taxonomy' if tagname == 'TAXONOMY' else tagname - tagnames.append(name) - tagvalues = numpy.concatenate(TAGS[tagname]) - uvals, inv, counts = numpy.unique(tagvalues, return_inverse=1, return_counts=1) - size = len(uvals) + 1 - tagsizes.append(size) - logging.info('Storing %s[%d/%d]', tagname, size, len(inv)) - hdf5.extend(dstore[f'assets/{tagname}'], inv + 1) # indices start from 1 - dstore['tagcol/' + name] = numpy.concatenate([['?'], uvals]) - if name == 'ID_0': - dtlist = [('country', (numpy.bytes_, 3)), ('counts', int)] - arr = numpy.empty(len(uvals), dtlist) - arr['country'] = uvals - arr['counts'] = counts - dstore['assets_by_country'] = arr - - dic = dict(__pyclass__='openquake.risklib.asset.TagCollection', - tagnames=numpy.array(tagnames, hdf5.vstr), - tagsizes=tagsizes) - dstore.getitem('tagcol').attrs.update(dic) - - -def gen_tasks(files, monitor): - """ - Generate tasks of kind exposure_by_geohash for large files - """ - for file in files: - # read CSV in chunks - dfs = pandas.read_csv( - file.fname, names=file.header, dtype=CONV, - usecols=file.fields, skiprows=1, chunksize=1_000_000) - for i, df in enumerate(dfs): - if len(df) == 0: - continue - if 'ID_1' not in df.columns: # happens for many islands - df['ID_1'] = '???' - dt = hdf5.build_dt(CONV, df.columns, file.fname) - array = numpy.zeros(len(df), dt) - for col in df.columns: - array[col] = df[col].to_numpy() - if i == 0: - yield from exposure_by_geohash(array, monitor) - else: - print(file.fname) - yield exposure_by_geohash, array - - -def read_world_exposure(grm_dir, dstore): - """ - Read the exposure files for the entire world (assume some conventions - on the file names are respected). - - :param grm_dir: directory containing the global risk model - """ - exposure, fnames, csvfiles = collect_exposures(grm_dir) - dstore['exposure'] = exposure - files = hdf5.sniff(csvfiles, ',', IGNORE) - dtlist = [(t, U32) for t in TAGS] + \ - [(f, F32) for f in set(CONV)-set(TAGS)-{'ASSET_ID', None}] + \ - [('ASSET_ID', h5py.string_dtype('ascii', 25))] - for name, dt in dtlist: - logging.info('Creating assets/%s', name) - dstore.create_df('assets', dtlist, 'gzip') - slc_dt = numpy.dtype([('gh3', U16), ('start', U32), ('stop', U32)]) - dstore.create_dset('assets/slice_by_gh3', slc_dt, fillvalue=None) - dstore.swmr_on() - smap = Starmap.apply(gen_tasks, (files,), - weight=operator.attrgetter('size'), h5=dstore.hdf5) - num_assets = 0 - # NB: we need to keep everything in memory to make gzip efficient - acc = general.AccumDict(accum=[]) - for gh3, arr in smap: - for name in FIELDS: - if name in TAGS: - TAGS[name].append(arr[name]) - else: - acc[name].append(arr[name]) - n = len(arr) - slc = numpy.array([(gh3, num_assets, num_assets + n)], slc_dt) - hdf5.extend(dstore['assets/slice_by_gh3'], slc) - num_assets += n - Starmap.shutdown() - for name in sorted(acc): - lst = acc.pop(name) - arr = numpy.concatenate(lst, dtype=lst[0].dtype) - logging.info(f'Storing assets/{name}') - hdf5.extend(dstore['assets/' + name], arr) - store_tagcol(dstore) - - # sanity check - for name in FIELDS: - n = len(dstore['assets/' + name]) - assert n == num_assets, (name, n, num_assets) - - return num_assets + return out def read_world_vulnerability(grm_dir, dstore): @@ -276,9 +107,8 @@ def main(grm_dir): logging.info('Read %d vulnerability functions', n) n = read_world_tmap(grm_dir, dstore) logging.info('Read %d taxonomy mappings', n) - n = read_world_exposure(grm_dir, dstore) - logging.info('Created {} with {:_d} assets'. - format(dstore.filename, n)) + fnames = collect_exposures(grm_dir) + expo_to_hdf5.store(fnames, dstore) logging.info(mon)