-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathxrml.py
1329 lines (1169 loc) · 47.7 KB
/
xrml.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import inspect
import json
import logging
import os
import warnings
from collections.abc import Callable, Hashable, Sequence
from dataclasses import dataclass, replace
from dataclasses import field as dataclass_field
from typing import Any, Literal
import dill
import fsspec
import numpy as np
import numpy.typing as npt
import pandas as pd
import shap
import tqdm
import xarray as xr
from fsspec import AbstractFileSystem, get_filesystem_class
from fsspec.utils import get_protocol
from scipy.stats import kendalltau
from shap import Explanation
from shap.maskers import Independent
from sklearn.base import BaseEstimator, clone
from sklearn.exceptions import ConvergenceWarning
from sklearn.metrics import (
average_precision_score,
brier_score_loss,
explained_variance_score,
f1_score,
log_loss,
matthews_corrcoef,
mean_absolute_error,
mean_squared_error,
ndcg_score,
precision_score,
r2_score,
recall_score,
roc_auc_score,
)
from sklearn.model_selection import GroupKFold, StratifiedGroupKFold, StratifiedKFold
from sklearn.pipeline import Pipeline
from sklearn.utils.multiclass import type_of_target
from xarray import DataArray, Dataset
from pandas import DataFrame as PandasDataFrame
logger = logging.getLogger(__name__)
@dataclass
class Model:
slug: str
""" Human-readable unique model identifier, e.g. 'logreg' """
name: str
""" Display name, e.g. 'Logistic Regression' """
estimator: BaseEstimator
""" Estimator instance """
properties: dict[str, Any] = dataclass_field(default_factory=lambda: {})
""" Arbitrary properties/metadata for the model """
#######################
# Xarray CV Functions #
#######################
def split(ds: Dataset) -> tuple[DataArray | None, DataArray | None]:
"""Break dataset into features and outcomes for modeling"""
X = None
if "feature" in ds:
if ds.dims["features"] < 1:
raise ValueError("Must provide at least one feature")
X = ds["feature"].transpose("index", "features")
assert X.ndim == 2
Y = None
if "outcome" in ds:
if ds.dims["outcomes"] < 1:
raise ValueError("Must provide at least one outcome")
Y = ds["outcome"].transpose("index", "outcomes")
assert Y.ndim == 2
return X, Y
def _split(
ds: Dataset, squeeze_y: bool
) -> tuple[pd.DataFrame, pd.DataFrame | pd.Series]:
X, Y = split(ds)
assert X is not None and Y is not None
assert X.shape[0] == Y.shape[0]
X, Y = X.to_pandas(), Y.to_pandas()
# Squeeze Y if desired to avoid scikit-learn warnings about
# single-task outcomes in 2D arrays
if squeeze_y and Y.shape[1] == 1:
Y = Y.squeeze()
return X, Y
def fit(
ds: Dataset,
groups: Hashable | None = None,
ignore_convergence_warnings: bool = True,
squeeze_y: bool = True,
) -> Dataset:
"""Fit estimators attached to a dataset."""
X, Y = _split(ds, squeeze_y=squeeze_y)
res = []
estimators = np.ravel(ds.estimator.values).tolist()
for estimator in estimators:
logger.info(f"Fitting estimator '{estimator.name}'")
est = estimator.estimator
# Only pass a groups param to .fit if the estimator will support it, nothing that there
# might be a mixture of models that do and do not support it in the same training run if,
# for example, dummy regressors are included that don't actually need inner CV
fit_params = {}
if groups is not None:
if isinstance(est, Pipeline):
if "groups" in inspect.signature(est.steps[-1][1].fit).parameters:
fit_params = {
f"{est.steps[-1][0]}__groups": get_split_groups(
ds, groups
).values
}
else:
if "groups" in inspect.signature(est.fit).parameters:
fit_params = {"groups": get_split_groups(ds, groups).values}
with warnings.catch_warnings():
if ignore_convergence_warnings:
# Ignore convergence warnings noting that this will not work when
# using subprocesses (e.g. as is common with `n_jobs`` != 1 in many estimators)
warnings.filterwarnings("ignore", category=ConvergenceWarning)
est = est.fit(X, Y, **fit_params)
model = replace(estimator, estimator=est)
res.append(model)
return ds.assign(model=("models", res)).assign(models=[est.slug for est in res])
def predict_proba(ds: Dataset) -> Dataset:
"""Predict binary class probabilities for all models in a dataset."""
X, _ = split(ds.drop_vars("outcomes") if "outcomes" in ds else ds)
assert X is not None
X = X.to_pandas()
res = []
for model in ds.model.values:
Y = model.estimator.predict_proba(X)
if Y.ndim == 1:
Y = np.concatenate([1 - Y, Y], axis=1)
if Y.ndim > 2:
raise ValueError(f"Predictions of shape {Y.shape} not supported")
res.append(Y[..., np.newaxis, np.newaxis])
return ds.assign(
prediction=(
["index", "classes", "models", "outcomes"],
np.concatenate(res, axis=2),
)
).assign(classes=["negative", "positive"])
def predict(ds: Dataset) -> Dataset:
"""Predict class labels or continuous outcomes for all models in a dataset."""
X, _ = split(ds.drop_vars("outcomes") if "outcomes" in ds else ds)
assert X is not None
X = X.to_pandas()
res = []
for model in ds.model.values:
Y = model.estimator.predict(X)
if Y.ndim == 1:
Y = np.expand_dims(Y, axis=1)
if Y.ndim > 2:
raise ValueError(f"Predictions of shape {Y.shape} not supported")
res.append(Y[:, np.newaxis, :])
return ds.assign(
prediction=(
["index", "models", "outcomes"],
np.concatenate(res, axis=1),
)
)
def add_single_split(ds: Dataset, split: str = "test") -> Dataset:
"""Add dummy split to a dataset.
This is useful for maintaining compatibility with functions that require
folds to iterate over, though there are cases where this is unnecessary (e.g.
in model refitting).
"""
return ds.assign(
fold=(("index", "folds"), np.full((ds.dims["index"], 1), split, dtype="O"))
).assign_coords(folds=np.arange(1))
def get_split_groups(ds: Dataset, groups: Hashable) -> DataArray:
if groups in ds:
group_values = ds[groups]
elif groups in ds.indexes["index"].names:
group_values = ds[groups]
elif "descriptor" in ds and groups in ds.descriptors:
group_values = ds.descriptor.sel(descriptors=groups)
else:
raise ValueError(f"Failed to find variable for groups '{groups}'")
if group_values.ndim != 1:
raise ValueError(f"Groups array must be 1D, found shape {group_values.shape}")
return group_values
def add_group_splits(ds: Dataset, n_splits: int, groups: Hashable) -> Dataset:
"""Add splits/folds to a dataset based on groupings (typically identifiers)."""
folds = np.full((ds.dims["index"], n_splits), "", dtype="O")
for i, (train, test) in enumerate(
GroupKFold(n_splits=n_splits).split(
X=ds["outcome"].values,
y=ds["outcome"].values,
groups=get_split_groups(ds, groups).values,
)
):
folds[train, i] = "train"
folds[test, i] = "test"
return ds.assign(fold=(("index", "folds"), folds)).assign_coords(
folds=np.arange(n_splits)
)
def add_stratified_splits(
ds: Dataset,
n_splits: int,
seed: int = 0,
groups: Hashable | None = None,
map_label: Callable[[DataArray], DataArray] = lambda x: x,
) -> Dataset:
"""
Add splits/folds to a dataset based on stratified outcome.
When groups is specified, it will use StratifiedGroupKFold, which attempts to create
folds which preserve the percentage of samples for each class as much as possible
given the constraint of non-overlapping groups between splits. If groups is None
it will use StratifiedKFold. You can also map the label via map_label to encode
label into something more friendly for stratification, say encode continuous value as bins.
"""
folds = np.full((ds.dims["index"], n_splits), "", dtype="O")
cv_klass = StratifiedGroupKFold if groups is not None else StratifiedKFold
for i, (train, test) in enumerate(
cv_klass(n_splits=n_splits, random_state=seed, shuffle=True).split(
X=ds["outcome"].values,
y=np.asarray(map_label(ds["outcome"])),
groups=get_split_groups(ds, groups).values if groups is not None else None,
)
):
folds[train, i] = "train"
folds[test, i] = "test"
return ds.assign(fold=(("index", "folds"), folds)).assign_coords(
folds=np.arange(n_splits)
)
def add_unlabeled_data(
ds: Dataset,
*,
df: PandasDataFrame,
index: Sequence[Hashable],
fold_label: str,
) -> Dataset:
"""
Adds out-of-sample examples. These examples won't be used for training, but will
be used during the test phase. These examples are expected to have missing outcome
which will make these ineligible for scoring, but can be explained.
"""
if "folds" not in ds.dims:
raise ValueError("Splits/folds must be added before out of sample examples")
assert (
df[ds["outcomes"]].isna().all().all()
), "Outcomes should be missing/NA for out-of-sample examples"
ds_oos = create_dataset(
df,
index=index,
outcomes=ds["outcomes"].values.tolist(),
descriptors=ds["descriptors"].values.tolist(),
outcome_type=ds.attrs["outcome_type"],
)
return xr.merge([ds, ds_oos]).pipe(
lambda ds: ds.assign(fold=ds["fold"].fillna(fold_label))
)
def explain(
ds: Dataset,
*,
model: str,
method: Literal["tree-shap", "linear-shap", "linear-hamard"] = "tree-shap",
shap_mode: Literal["true-to-model", "true-to-data"] | None = None,
tree_shap_check_additivity: bool = True,
**kwargs: Any,
) -> Dataset:
return (
ds
# Assign a variable with shape (index,) containing the fold index
# for which each sample was held out
.assign(test_fold=lambda ds: (ds.fold == "test").argmax(dim="folds"))
# Run explanations for each group of held-out samples
.groupby("test_fold")
.map(
lambda ds: ds.pipe(
_explain,
model=model,
method=method,
shap_mode=shap_mode,
tree_shap_check_additivity=tree_shap_check_additivity,
**kwargs,
)
)
.pipe(lambda ex_ds: ds.merge(ex_ds))
)
def extract_estimator(pipeline: Pipeline) -> BaseEstimator:
"""
Extract an estimator from a pipeline, assuming it is the final step.
"""
est = pipeline[-1]
if hasattr(est, "best_estimator_"):
est = est.best_estimator_
return est
def explain_tree_prediction(
est: BaseEstimator,
features: PandasDataFrame,
shap_mode: Literal["true-to-model", "true-to-data"] | None,
tree_shap_check_additivity: bool,
labels: npt.ArrayLike | None = None,
**kwargs: Any,
) -> tuple[npt.NDArray[np.float_], npt.NDArray[np.float_]]:
if shap_mode is None or shap_mode == "true-to-model":
data = shap.sample(features, nsamples=1000)
feature_perturbation = "interventional"
else:
data = None
feature_perturbation = "tree_path_dependent"
explanation = shap.TreeExplainer(
est, data=data, feature_perturbation=feature_perturbation, **kwargs
)(features, y=labels, check_additivity=tree_shap_check_additivity)
shap_values, shap_base_values = explanation.values, explanation.base_values
# The shape of `shap_values` depends on the model output used for the explanation
assert shap_values.ndim in (2, 3)
if shap_values.ndim == 3:
if shap_values.shape[2] != 2:
raise ValueError("Only binary outcomes currently supported")
shap_values = shap_values[..., 1]
assert (
shap_values.shape == features.shape
), f"Expected shape {features.shape}, got {shap_values.shape} instead"
# The shape of `shap_base_values` also depends on the model output used for the explanation
if shap_base_values.ndim == 2:
if shap_base_values.shape[1] != 2:
raise ValueError("Only binary outcomes currently supported")
shap_base_values = shap_base_values[..., 1]
assert shap_base_values.shape == (
features.shape[0],
), f"Expected shape {(features.shape[0],)}, got {shap_base_values.shape} instead"
return shap_values, shap_base_values
def explain_linear_prediction(
est: BaseEstimator,
features: PandasDataFrame,
shap_mode: Literal["true-to-model", "true-to-data"] | None,
**kwargs: Any,
) -> tuple[npt.NDArray[np.float_], npt.NDArray[np.float_]]:
if shap_mode is None or shap_mode == "true-to-model":
masker = Independent(features, max_samples=1000)
else:
masker = features
explanation = shap.LinearExplainer(est, masker=masker, **kwargs)(features)
shap_values, shap_base_values = explanation.values, explanation.base_values
assert (
shap_values.ndim == 2
), f"Expected 2D shap values, got {shap_values.shape} instead"
return shap_values, shap_base_values
def explain_linear_hamard_prediction(
features: PandasDataFrame,
coefficients: npt.NDArray[np.float_],
intercept: float,
) -> tuple[npt.NDArray[np.float_], npt.NDArray[np.float_]]:
if np.squeeze(coefficients).ndim != 1:
raise ValueError(
f"Could not coerce coefficients array of shape {coefficients.shape} to 1D"
)
coefficients = np.squeeze(coefficients)
if coefficients.shape[0] != features.shape[1]:
raise ValueError(
f"Number of coefficients ({coefficients.shape[0]}) does not equal number of features ({features.shape[1]})"
)
shap_values = features.values * coefficients[np.newaxis, :]
shap_base_values = np.full(shap_values.shape[0], intercept)
assert (
shap_values.ndim == 2
), f"Expected 2D shap values, got {shap_values.shape} instead"
return shap_values, shap_base_values
def _explain(
ds: Dataset,
*,
model: str,
method: Literal["tree-shap", "linear-shap", "linear-hamard"],
shap_mode: Literal["true-to-model", "true-to-data"] | None,
tree_shap_check_additivity: bool,
**kwargs: Any,
) -> Dataset:
"""
Add per-sample, per-feature, model-specific weights to dataset (e.g. SHAP values)
:param model: model to explain. Can be a sklearn pipeline. Final step can be a CV,
in which case explain will use the best estimator.
:param shap_mode: see: https://github.com/related-sciences/facets/issues/938.
Intuition: `true-to-data`, feature contribution are shared across correlated
features (even if some features are not used by the particular model), useful
when you care more about insights within the data than a particular model.
`true-to-model` tries to break correlation, gives contribution to the
features actually used by the model, useful when you care about this
particular model.
:param tree_shap_check_additivity: control validation check that the sum of the SHAP values
equals the output of the model.
See: https://github.com/slundberg/shap/issues/1098#issuecomment-599622440 and
other issues related to check_additivity.
"""
def get_shap_values(ds: Dataset) -> Dataset:
# NOTE: labels can be NaN here, and will be included
X, Y = _split(ds, squeeze_y=True)
if Y.ndim > 1:
raise NotImplementedError(
f"Explanations not supported for multi-task outcomes (Y.shape={Y.shape})"
)
# Transform the predictors manually so that shap can be provided the
# underlying model directly (i.e. the final pipeline step)
pipeline = ds.model.item(0).estimator
shap_features = X
if len(pipeline.steps) > 1:
transform = Pipeline(pipeline.steps[:-1])
transformed_features = transform.transform(shap_features)
if not isinstance(transformed_features, pd.DataFrame):
raise ValueError(
f"Pipeline for model {model} did not produce a DataFrame after transform. "
"This is necessary for explanations; model pipeline:\n{pipeline}}"
)
if (
invalid_features := transformed_features.columns.difference(
shap_features.columns
)
).size > 0:
logger.warn(
f"Pipeline for model {model} produced {len(invalid_features)} invalid features: {invalid_features}"
)
shap_features = transformed_features
# Resolve final estimator in pipeline
est = extract_estimator(pipeline)
# Branch based on method for generating explanations
if method == "tree-shap":
shap_values, shap_base_values = explain_tree_prediction(
est, shap_features, shap_mode, tree_shap_check_additivity, Y, **kwargs
)
elif method == "linear-shap":
shap_values, shap_base_values = explain_linear_prediction(
est, shap_features, shap_mode, **kwargs
)
elif method == "linear-hamard":
# Only supports single outcome ATM
shap_values, shap_base_values = explain_linear_hamard_prediction(
shap_features, est.coef_, est.intercept_
)
else:
raise ValueError(f'Method "{method}" not valid')
# Reindex SHAP features and values to align to original features, filling in
# any features dropped by the current estimator with NaNs
shap_values = (
pd.DataFrame(
shap_values, index=shap_features.index, columns=shap_features.columns
)
.reindex_like(X)
.values
)
shap_features = shap_features.reindex_like(X).values
assert shap_values.shape == shap_features.shape == X.shape
if not np.all((a := np.asarray(shap_base_values)) == a[0]):
raise NotImplementedError(
f"Multiple SHAP intercepts/base_values not implemented (base_values={shap_base_values})"
)
return xr.Dataset(
data_vars={
"shap_feature": (("index", "features"), shap_features),
"shap_value": (("index", "features"), shap_values),
"shap_base_value": (("index",), shap_base_values),
},
coords=ds.feature.coords,
)
return get_shap_values(
ds.sel(folds=ds.test_fold.item(0), models=model)
).expand_dims("models")
def shap_explanation(ds: Dataset, index_col: str | None = "index") -> Explanation:
"""
Creates SHAP Explanation object based on the given model Dataset. If you
need to create Explanation per fold or model, do all the necessary filters
on the dataset object before passing it in this function.
"""
if not all(i in ds for i in ["shap_base_value", "shap_feature", "shap_value"]):
raise ValueError(
"Dataset must contain shap_base_value, shap_feature and shap_value,"
" which are produced by the `explain` function."
)
return Explanation(
values=ds["shap_value"].values,
base_values=ds["shap_base_value"].values,
data=ds["shap_feature"].values,
feature_names=ds["features"].values,
# This can often cause bugs in certain shap functions when this is not
# a single array of strings, as would be the case with a multi-index,
# so setting it to None will avoid using sample labels at all.
display_data=None if index_col is None else ds[index_col].values,
)
def add_models(
ds: Dataset,
estimator_fn: Callable[..., list[Model]],
**kwargs: Any,
) -> Dataset:
"""
Add models to fit to a dataset. Any kwargs if present are passed into the
estimator_fn function.
"""
if "folds" not in ds.dims:
raise ValueError("Splits/folds must be added before models")
if "feature_names" in inspect.signature(estimator_fn).parameters:
kwargs = {**{"feature_names": ds.features.values.tolist()}, **kwargs}
estimators = [estimator_fn(**kwargs) for _ in range(ds.dims["folds"])]
return ds.assign(estimator=(("folds", "estimators"), estimators)).assign(
estimators=[est.slug for est in estimators[0]]
)
def prepend_common_transformers(
ds: Dataset,
common_transformers: list[BaseEstimator] | Callable[..., list[BaseEstimator]],
**kwargs: Any,
) -> Dataset:
if callable(common_transformers):
transformers = common_transformers(**kwargs)
elif isinstance(common_transformers, list):
transformers = common_transformers
for fold in ds.folds:
for estimator in ds.estimators:
if not isinstance(
(
est := ds.estimator.loc[
{"estimators": estimator, "folds": fold}
].item()
).estimator,
Pipeline,
):
raise TypeError(
f"Can only prepend steps to a scikit-learn pipeline model; received type {type(est)}"
)
# iterate backwards through list to maintain order after prepending
for j, transformer in enumerate(transformers[::-1]):
est.estimator.steps.insert(
0, (f"commontransformer{len(transformers) - j}", clone(transformer))
)
ds.estimator.loc[{"estimators": estimator, "folds": fold}] = est
return ds
def score_regression(ds: Dataset) -> Dataset:
"""Compute continuous outcome metrics."""
scores = get_score_names(get_regression_scores)
def _score(
y_true: npt.NDArray[np.int_], y_pred: npt.NDArray[np.float_]
) -> npt.NDArray[np.float_]:
mask = ~np.isnan(y_true)
y_true, y_pred = y_true[mask], y_pred[mask]
values = get_regression_scores(y_true, y_pred)
return np.array([values[m] for m in scores])
return (
# Use the rows as the input core dims to make sure that each
# call to `score` is provided predictions for a single
# model + fold + outcome prediction
xr.apply_ufunc(
_score,
ds.outcome,
ds.prediction,
vectorize=True,
input_core_dims=[["index"], ["index"]],
output_core_dims=[["scores"]],
)
.rename("score")
.to_dataset()
.assign(scores=scores)
)
def score_classification(
ds: Dataset,
k: Sequence[int] | None = None,
threshold: float | None = None,
score_fn: Any | None = None,
) -> Dataset:
"""Compute binary classifier metrics."""
if score_fn is None:
score_fn = get_classification_scores
scores = get_score_names(
score_fn,
k=k,
threshold=threshold,
)
def _score(
y_true: npt.NDArray[np.int_], y_pred: npt.NDArray[np.float_]
) -> npt.NDArray[np.float_]:
mask = ~np.isnan(y_true)
y_true, y_pred = y_true[mask], y_pred[mask]
values = score_fn(
y_true, y_pred, k=k, threshold=threshold
) # type: ignore[misc]
return np.array([values[m] for m in scores])
return (
# Use the rows as the input core dims to make sure that each
# call to `score` is provided predictions for a single
# model + fold + outcome prediction
xr.apply_ufunc(
_score,
ds.outcome,
ds.prediction.sel(classes="positive"),
vectorize=True,
input_core_dims=[["index"], ["index"]],
output_core_dims=[["scores"]],
)
.rename("score")
.to_dataset()
.assign(scores=scores)
)
def score_ranker(ds: Dataset) -> Dataset:
"""Compute ranker metrics."""
score_names = get_score_names(
get_ranker_scores,
test_y_true=np.array([1, 2, 0]),
test_y_pred=np.array([1, 2, 0]),
)
def _score(
y_true: npt.NDArray[np.int_], y_pred: npt.NDArray[np.float_]
) -> npt.NDArray[np.float_]:
mask = ~np.isnan(y_true)
y_true, y_pred = y_true[mask], y_pred[mask]
values = get_ranker_scores(y_true, y_pred)
return np.array([values[m] for m in score_names])
all_scores_ds = (
# Use the rows as the input core dims to make sure that each
# call to `score` is provided predictions for a single
# model + fold + outcome prediction
ds.sel(descriptors="query_id")
.rename({"descriptor": "query_id"})
.groupby("query_id")
.map(
lambda group: xr.apply_ufunc(
_score,
group.outcome,
group.prediction,
vectorize=True,
input_core_dims=[["index"], ["index"]],
output_core_dims=[["scores"]],
)
)
.rename("score")
.to_dataset()
.assign(scores=score_names)
.assign(score_mean=lambda ds: ds.score.mean("query_id"))
.assign(score_sum=lambda ds: ds.score.sum("query_id"))
.drop_vars("score")
)
# NOTE: scores that start with "n_" can be summed across queries, others should be averaged
return xr.merge(
[
all_scores_ds.sel(scores=[s for s in score_names if s.startswith("n_")])
.drop_vars("score_mean")
.rename({"score_sum": "score"}),
all_scores_ds.sel(scores=[s for s in score_names if not s.startswith("n_")])
.drop_vars("score_sum")
.rename({"score_mean": "score"}),
]
)
def run_cv(
ds: Dataset,
predict_fn: Callable[[Dataset], Dataset],
score_fn: Callable[[Dataset], Dataset],
groups: Hashable | None = None,
) -> Dataset:
"""Run cross-validation for models and folds already defined in a dataset."""
res = []
for fold in tqdm.tqdm(sorted(ds.coords["folds"])):
# Select training data, fit, and predict
split = ds.fold.sel(folds=fold).values
train, test, nans = split == "train", split == "test", pd.isnull(split)
assert train.sum() + test.sum() + nans.sum() == ds.dims["index"]
# Subset rows to those for training split and fit
with warnings.catch_warnings():
# Ignore convergence warnings from lbfgs
warnings.filterwarnings("ignore", category=ConvergenceWarning)
ds_train = (
ds.sel(folds=fold)
.sel(index=train)
.pipe(fit, groups=groups)[["model", "outcome"]]
)
# Subset rows to those for test split and make predictions
ds_test = (
ds.sel(folds=fold)
.sel(index=test)
.assign(model=ds_train["model"])
.pipe(predict_fn)
)
# NOTE: if descriptors are available, they will be included in the scoring logic
if "descriptor" in ds_test.variables:
scoring_var = ["prediction", "model", "outcome", "descriptor"]
else:
scoring_var = ["prediction", "model", "outcome"]
# Evaluate test set predictions
ds_score = score_fn(ds_test[scoring_var])
# Save test split results and scores
res.append(
# Note: scores do not contain row dim
ds_test.merge(ds_score)
)
return ds.merge(xr.concat([r[["prediction"]] for r in res], dim="index")).merge(
xr.concat([r[["model", "score"]] for r in res], dim="folds")
)
def run_classification_cv(ds: Dataset, groups: Hashable | None = None) -> Dataset:
"""Run cross-validation for classification models and folds already defined in a dataset."""
return run_cv(
ds, predict_fn=predict_proba, score_fn=score_classification, groups=groups
)
def run_regression_cv(ds: Dataset, groups: Hashable | None = None) -> Dataset:
"""Run cross-validation for regression models and folds already defined in a dataset."""
return run_cv(ds, predict_fn=predict, score_fn=score_regression, groups=groups)
def run_ranker_cv(ds: Dataset, groups: Hashable | None = None) -> Dataset:
"""Run cross-validation for ranker models and folds already defined in a dataset."""
return run_cv(ds, predict_fn=predict, score_fn=score_ranker, groups=groups)
def create_dataset(
df: PandasDataFrame,
index: Sequence[Hashable],
outcomes: Sequence[Hashable],
descriptors: Sequence[Hashable] | None = None,
outcome_type: str | None = None,
) -> Dataset:
"""Create a dataset from a feature DataFrame using a partitioning of column names.
Parameters
----------
df
Data frame to split into separate DataArray instances in one Dataset
index
List of column names to use as row index
outcomes
List of column names to use as separate outcomes (one outcome for a single-task regression)
descriptors
Optional list of column names with ancillary information; defaults to anything
not in one of the above lists
outcome_type
Optional outcome type; defaults to the result of `sklearn.utils.multiclass.type_of_target`.
Returns
-------
ds : Dataset
A dataset containing all features as one numeric DataArray (indexed by the provided
index fields), an outcomes array, and a descriptors array.
"""
if len(index) == 0:
raise ValueError("Index fields list cannot be empty")
if len(outcomes) == 0:
raise ValueError("Outcomes field list cannot be empty")
descriptors = descriptors or []
# Set features as any numeric field not otherwise classified
features = [
c
for c in df.select_dtypes(include=np.number).columns.tolist()
if c not in (list(outcomes) + list(index) + list(descriptors))
]
# Include all descriptor fields specified as well as those that are not numeric
descriptors = list(
set(
list(descriptors)
+ [c for c in df if c not in list(features) + list(outcomes) + list(index)]
)
)
if len(descriptors) == 0:
# If there's no descriptors, we cook one up and delete it later,
# makes for a cleaner code. Surprisingly some xarray function do not
# work on "empty" inputs, including concat and transpose.
del_descriptor_dim = True
assert "__create_dataset_deleteme__" not in df
df = df.assign(__create_dataset_deleteme__=np.NaN)
descriptors = ["__create_dataset_deleteme__"]
else:
del_descriptor_dim = False
# Ensure that no one column is present in multiple groups
variables = pd.Series(
list(index) + list(features) + list(outcomes) + list(descriptors)
)
if variables.duplicated().any():
raise ValueError(
f"Field partitioning invalid, found duplicated variables: {variables[variables.duplicated()].tolist()}"
)
ds = (
df
# Convert raw feature dataframe to xarray dataset with target index
.to_xarray().set_index(index=index)
# Group features, outcomes, and everything else into separate, individual data arrays
.assign(
# Group all numeric feature values into one 64-bit float array
feature=lambda ds: xr.concat(
[ds[c] for c in features], dim="features"
).astype("f8"),
# Group all the regression outcomes into one array
outcome=lambda ds: xr.concat([ds[c] for c in outcomes], dim="outcomes"),
# Group all textual or otherwise descriptive fields into one array
descriptor=lambda ds: xr.concat(
[ds[c] for c in descriptors], dim="descriptors"
),
)
# Add additional indexes for new dimensions
.assign(
{"features": features, "outcomes": outcomes, "descriptors": descriptors}
)
# Subset to remove original feature fields
.pipe(lambda ds: ds[["feature", "outcome", "descriptor"]])
# Re-orient for consistency
.transpose("index", "features", "outcomes", "descriptors")
)
if outcome_type is None:
target_type = type_of_target(ds.outcome.squeeze().values)
if target_type in ["binary", "multilabel-indicator"]:
outcome_type = f"classification:{target_type}"
elif target_type in ["continuous", "continuous-multioutput"]:
outcome_type = f"regression:{target_type}"
if outcome_type is None:
raise ValueError(f"Outcome type {target_type} not supported")
if del_descriptor_dim:
ds = ds.drop_dims("descriptors")
return ds.assign_attrs(outcome_type=outcome_type, index_columns=index)
#########################
# Performance Functions #
#########################
def average_precision_at_k(y_true: Any, y_score: Any, k: int) -> float:
"""
Calculate the true average precision at k.
Implementation details https://github.com/related-sciences/facets/issues/3802#issuecomment-1921815610
:param y_true: array-like, true binary labels in range {0, 1}
:param y_score: array-like, predicted scores or probabilities for each sample
:param k: int, number of top-scored items to consider for average precision calculation
:return: average precision at k
"""
y_true, y_score = _check_y_true_score(y_true, y_score)
if k > len(y_true):
k = len(y_true)
indices = np.argsort(-y_score)
sorted_labels = y_true[indices][:k]
precisions = np.cumsum(sorted_labels) / (np.arange(k) + 1)
n_pos = np.sum(sorted_labels)
return 0.0 if n_pos == 0 else float(np.sum(precisions * sorted_labels) / n_pos)
def precision_at_k(y_true: Any, y_score: Any, k: int) -> float:
"""
Compute precision at rank `k`
This is defined as the fraction of examples that are positive
amongst the top k predictions.
"""
y_true, y_score = _check_y_true_score(y_true, y_score)
# Sort, reverse, limit to first k
o = np.argsort(y_score)[::-1][: min(k, len(y_true))]
# Find fraction true
return float(y_true[o].mean())
def recall_at_k(y_true: Any, y_score: Any, k: int) -> float:
"""
Compute recall at rank `k`
This is defined as the ratio of positive examples in the top k
predictions to the number of positive examples total.
"""
y_true, y_score = _check_y_true_score(y_true, y_score)
# Sort, reverse, limit to first k
o = np.argsort(y_score)[::-1][: min(k, len(y_true))]
return float(y_true[o].sum() / y_true.sum())
def _check_y_true_score(y_true: Any, y_score: Any) -> tuple[Any, Any]:
"""
Allows y_true to be: [-1, 0], [-1, 1], or [True, False].
If y_true is [-1, 0], it is converted to [0, 1].
"""
accepted_values = [-1, 0, 1]
if len(y_true) != len(y_score):
raise ValueError("y_true and y_score must have the same length")
# Booleans are automatically converted to integers when making comparisons.
if not np.isin(y_true, [-1, 0, 1]).all():
raise ValueError(f"y_true must be f{accepted_values}, got {np.unique(y_true)}")
y_true[y_true == -1] = 0
return y_true, y_score
def _try_get_score(
score_func: Callable[[Any, Any, int], float],
y_true: Any,
y_score: Any,
raise_on_single_class: bool = False,
*args: Any,
**kwargs: Any,
) -> float:
try:
return score_func(y_true, y_score, *args, **kwargs)
except ValueError as e:
if raise_on_single_class or (
str(e)
!= "Only one class present in y_true. Score is not defined in that case."
):
raise e
return np.nan
def get_classification_scores(
y_true: Any,
y_score: Any,
k: Sequence[int] | None = None,
threshold: float | None = None,
raise_on_single_class: bool = False,
) -> dict[str, int | float]:
"""
Calculate performance of a classifier for a variety of metrics.