Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/catalog improvements #208

Merged
merged 4 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ doc = [
openquake = [
"openquake-engine @ git+https://github.com/gem/oq-engine.git",
"numpy < 2",
"fiona",
]
jupyter = ["notebook"]

Expand Down
237 changes: 120 additions & 117 deletions seismostats/catalogs/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
else:
_openquake_available = True

REQUIRED_COLS_CATALOG = ['longitude', 'latitude', 'depth',
'time', 'magnitude']
CATALOG_COLUMNS = ['longitude', 'latitude', 'depth',
'time', 'magnitude', 'magnitude_type']

QML_TEMPLATE = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'catalog_templates', 'quakeml.j2')
Expand All @@ -37,32 +37,45 @@
'hour', 'minute', 'second', 'microsecond']


def _catalog_constructor_with_fallback(*args, **kwargs):
df = Catalog(*args, **kwargs)
if not _check_required_cols(df, REQUIRED_COLS_CATALOG):
return pd.DataFrame(*args, **kwargs)
if not _check_required_cols(df, required_cols=['catalog_id']):
def _catalog_constructor_with_fallback(df, **kwargs):
if not _check_required_cols(df, ['magnitude']):
return df
return ForecastCatalog(*args, **kwargs)
if not _check_required_cols(df, ['catalog_id']):
return Catalog(df, **kwargs)
return ForecastCatalog(df, **kwargs)


class Catalog(pd.DataFrame):
"""
A subclass of pandas DataFrame that represents a catalog of earthquakes.

To be a valid Catalog object, the DataFrame must have the following
columns: longitude, latitude, depth, time, and magnitude.
To be a valid Catalog object, the DataFrame must have at least a
`magnitude`column. Depending on the method the following
columns: `longitude, latitude, depth, time, and magnitude` are
also required.

data: Any | None = None,
name: str | None = None,
starttime: pd.Timestamp | None = None,
endtime: pd.Timestamp | None = None,
mc: float | None = None,
delta_m: float | None = None,
b_value: float | None = None,
bounding_polygon: Polygon | str | None = None,
depth_min: float | None = None,
depth_max: float | None = None,

Args:
data: array-like, Iterable, dict, or DataFrame, optional
Data to initialize the catalog with.
data: Data to initialize the catalog with.
name: Name of the catalog.
args: Additional arguments to pass to pandas
DataFrame constructor.
starttime: Start time of the catalog.
endtime: End time of the catalog.
mc: Completeness magnitude of the catalog.
delta_m: Magnitude binning of the catalog.
b_value: Gutenberg-Richter b-value of the catalog.
bounding_polygon: 2D boundary of the catalog.
depth_min: Minimum depth for which events are included in the catalog.
depth_max: Maximum depth for which events are included in the catalog.
kwargs: Additional keyword arguments to pass to pandas
DataFrame constructor.

Expand Down Expand Up @@ -92,50 +105,82 @@ class Catalog(pd.DataFrame):

_metadata = ['name', '_required_cols', 'mc',
'delta_m', 'b_value', 'starttime', 'endtime',
'bounding_polygon', 'depth_min', 'depth_max']
_required_cols = REQUIRED_COLS_CATALOG
'bounding_polygon', 'depth_min', 'depth_max',
'logger']

_required_cols = CATALOG_COLUMNS

@property
def _constructor(self):
"""
Required for subclassing Pandas DataFrame.
"""
return _catalog_constructor_with_fallback

def __init__(
self,
data: Any | None = None,
*args,
name: str | None = None,
starttime: pd.Timestamp | None = None,
endtime: pd.Timestamp | None = None,
mc: float | None = None,
delta_m: float | None = None,
b_value: float | None = None,
bounding_polygon: Polygon | str | None = None,
depth_min: float | None = None,
depth_max: float | None = None,
**kwargs
):
if data is None and 'columns' not in kwargs:
super().__init__(columns=REQUIRED_COLS_CATALOG, *args, **kwargs)
else:
super().__init__(data, *args, **kwargs)
self,
data: Any | None = None,
name: str | None = None,
starttime: pd.Timestamp | None = None,
endtime: pd.Timestamp | None = None,
mc: float | None = None,
delta_m: float | None = None,
b_value: float | None = None,
bounding_polygon: Polygon | str | None = None,
depth_min: float | None = None,
depth_max: float | None = None,
**kwargs):

if self.columns.empty:
self = self.reindex(self.columns.union(
REQUIRED_COLS_CATALOG), axis=1)
self.logger = logging.getLogger(__name__)

# should be able to create a dataframe
if data is not None or 'columns' in kwargs:
super().__init__(data, **kwargs)
# if this dataframe is empty however, set some default columns
if data is None or self.columns.empty:
super().__init__(columns=CATALOG_COLUMNS, **kwargs)

self.name = name
self.mc = mc
self.b_value = b_value
self.delta_m = delta_m

self.starttime = starttime if isinstance(
starttime, pd.Timestamp) else pd.to_datetime(starttime)

self.endtime = endtime if isinstance(
endtime, pd.Timestamp) else pd.to_datetime(endtime)

self.starttime = pd.to_datetime(starttime)
self.endtime = pd.to_datetime(endtime)
self.bounding_polygon = bounding_polygon
self.depth_min = depth_min
self.depth_max = depth_max

numeric_cols = ['magnitude', 'latitude', 'longitude', 'depth',
'associatedphasecount', 'usedphasecount',
'associatedstationcount', 'usedstationcount',
'standarderror', 'azimuthalgap',
'secondaryazimuthalgap', 'maximumdistance',
'minimumdistance', 'mediandistance']
string_cols = ['magnitude_type', 'event_type']
time_cols = ['time']

for num in numeric_cols:
if num in self.columns:
self[num] = pd.to_numeric(self[num], errors='coerce')

for tc in time_cols:
if tc in self.columns:
self[tc] = pd.to_datetime(self[tc]).dt.tz_localize(None)

# make sure empty rows in string columns are NoneType
for strc in string_cols:
if strc in self.columns:
self[strc] = self[strc].replace(
to_replace=['',
'nan', 'NaN',
'none', 'None',
'na', 'Na', 'NA',
'null', 'Null', 'NULL'],
value=None)

@classmethod
def from_quakeml(cls, quakeml: str,
def from_quakeml(cls,
quakeml: str,
include_all_magnitudes: bool = True,
include_uncertainties: bool = False,
include_ids: bool = False,
Expand All @@ -145,18 +190,18 @@ def from_quakeml(cls, quakeml: str,

Args:
quakeml: Path to a QuakeML file or QuakeML
as a string.
as a string.
include_all_magnitudes: Whether all available magnitude types
should be included.
should be included.
include_uncertainties: Whether value columns with uncertainties
should be included.
should be included.
include_ids: Whether event, origin, and magnitude IDs
should be included.
should be included.
include_quality: Whether columns with quality information
should be included.
should be included.

Returns:
Catalog
catalog: Catalog object
"""
if os.path.isfile(quakeml):
catalog = parse_quakeml_file(
Expand All @@ -165,79 +210,41 @@ def from_quakeml(cls, quakeml: str,
catalog = parse_quakeml(
quakeml, include_all_magnitudes, include_quality)

df = cls.from_dict(catalog, include_uncertainties, include_ids)
df = cls.from_dict(catalog,
include_uncertainties,
include_ids)

return df

@classmethod
def from_dict(cls,
data: list[dict],
include_uncertainties: bool = True,
include_ids: bool = True, *args, **kwargs) -> Catalog:
include_ids: bool = True,
**kwargs) -> Catalog:
"""
Create a Catalog from a list of dictionaries.

Args:
data: A list of earthquake event information
dictionaries.
dictionaries.
include_uncertainties: Whether value columns with uncertainties
should be included.
should be included.
include_ids: Whether event, origin, and magnitude IDs
should be included.
should be included.

Returns:
Catalog
"""

df = super().from_dict(data, *args, **kwargs)
df = pd.DataFrame.from_dict(data, **kwargs)
df = cls(df)

numeric_cols = ['magnitude', 'latitude', 'longitude', 'depth',
'associatedphasecount', 'usedphasecount',
'associatedstationcount', 'usedstationcount',
'standarderror', 'azimuthalgap',
'secondaryazimuthalgap', 'maximumdistance',
'minimumdistance', 'mediandistance']

string_cols = ['magnitude_type', 'event_type']

for num in numeric_cols:
if num in df.columns:
df[num] = pd.to_numeric(df[num], errors='coerce')

# make sure empty rows in string columns are NoneType
for strc in string_cols:
if strc in df.columns:
df[strc] = df[strc].replace(
to_replace=['',
'nan', 'NaN',
'none', 'None',
'na', 'Na', 'NA',
'null', 'Null', 'NULL'],
value=None)

if 'time' in df.columns:
df['time'] = pd.to_datetime(df['time']).dt.tz_localize(None)

if not include_uncertainties and isinstance(df, Catalog):
if not include_uncertainties: # and isinstance(df, Catalog):
df = df.drop_uncertainties()
if not include_ids and isinstance(df, Catalog):
if not include_ids: # and isinstance(df, Catalog):
df = df.drop_ids()

if not isinstance(df, Catalog):
df = Catalog(df)

if df.empty:
df = Catalog(columns=REQUIRED_COLS_CATALOG + ['magnitude_type'])

full_len = len(df)

df = df.dropna(subset=['latitude', 'longitude', 'time'])

if len(df) < full_len:
df.logger.info(
f"Dropped {full_len - len(df)} rows with missing values")

return df

@classmethod
Expand Down Expand Up @@ -288,7 +295,8 @@ def _convert_to_datetime(row):
cat.drop(columns=_PD_TIME_COLS, inplace=True)
return cat

@require_cols(require=REQUIRED_COLS_CATALOG)
@require_cols(require=[
'longitude', 'latitude', 'depth', 'time', 'magnitude'])
def to_openquake(self) -> OQCatalogue:
"""
Converts the Catalog to an openquake Catalogue
Expand Down Expand Up @@ -355,10 +363,6 @@ def drop_ids(self) -> Catalog:
df = self.drop(columns=cols)
return df

@property
def _constructor(self):
return _catalog_constructor_with_fallback

@require_cols(require=_required_cols)
def strip(self, inplace: bool = False) -> Catalog | None:
"""
Expand Down Expand Up @@ -589,7 +593,7 @@ def _create_ids(self) -> Catalog:

return df

@require_cols(require=_required_cols + ['magnitude_type'])
@require_cols(require=_required_cols)
def to_quakeml(self, agencyID=' ', author=' ') -> str:
"""
Convert the catalog to QuakeML format.
Expand Down Expand Up @@ -665,26 +669,25 @@ class ForecastCatalog(Catalog):
catalog_id.

Args:
data: array-like, Iterable, dict, or DataFrame, optional.
Data to initialize the catalog with.
name: Name of the catalog.
n_catalogs: Total number of catalogs represented,
data: Data to initialize the catalog with.
name: Name of the catalog.
n_catalogs: Total number of catalogs represented,
including empty catalogs.
args: Additional arguments to pass to pandas
args: Additional arguments to pass to pandas
DataFrame constructor.
starttime: Start time of the catalog.
endtime: End time of the catalog.
mc: Completeness magnitude of the catalog.
delta_m: Magnitude binning of the catalog.
kwargs: Additional keyword arguments to pass to pandas
starttime: Start time of the catalog.
endtime: End time of the catalog.
mc: Completeness magnitude of the catalog.
delta_m: Magnitude binning of the catalog.
kwargs: Additional keyword arguments to pass to pandas
DataFrame constructor.

Notes:
The Catalog class is a subclass of pandas DataFrame, and inherits
all of its methods and attributes.
"""

_required_cols = REQUIRED_COLS_CATALOG + ['catalog_id']
_required_cols = CATALOG_COLUMNS + ['catalog_id']
_metadata = Catalog._metadata + ['n_catalogs']

def __init__(self, data=None, *args, n_catalogs=None, **kwargs):
Expand Down
Loading