Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace mode pipeline #892

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
16 changes: 16 additions & 0 deletions emission/analysis/classification/inference/labels/inferrers.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,19 @@ def predict_cluster_confidence_discounting(trip, max_confidence=None, first_conf
labels = copy.deepcopy(labels)
for l in labels: l["p"] *= confidence_coeff
return labels

def predict_gradient_boosted_decision_tree(trip, max_confidence=None, first_confidence=None, confidence_multiplier=None):
# load application config
model_type = eamtc.get_model_type()
Comment on lines +160 to +162
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like it is just a copy/paste of the previous predict_cluster_confidence_discounting
Why does this have to be in the labels directory anyway?
labels is for predicting labels based on other labels
replaced_mode is for predicting the replaced mode based on other characteristics (e.g. demographics).

So while it is appropriate to have this be inspired by the label assist algorithm, it is its own algorithm/model, and for clarity, it should be in its own directory. Its scaffolding can be similar to the label assist, but it is not a label assist.

model_storage = eamtc.get_model_storage()
labels, n = eamur.predict_labels_with_gbdt(trip, model_type, model_storage)
if n <= 0: # No model data or trip didn't match a cluster
logging.debug(f"In predict_gradient_boosted_decision_tree: n={n}; returning as-is")
return labels

# confidence_coeff = n_to_confidence_coeff(n, max_confidence, first_confidence, confidence_multiplier)
# logging.debug(f"In predict_cluster_confidence_discounting: n={n}; discounting with coefficient {confidence_coeff}")

labels = copy.deepcopy(labels)
for l in labels: l["p"] *= confidence_coeff
return labels
Comment on lines +172 to +174
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

concretely, this is also wrong because there will not be a label array or probabilities.
Note that this code as written does not work because confidence_coeff is not defined.

Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Standard imports
import logging
import random
import copy

# Our imports
import emission.storage.pipeline_queries as epq
import emission.storage.timeseries.abstract_timeseries as esta
import emission.storage.decorations.analysis_timeseries_queries as esda
import emission.core.wrapper.labelprediction as ecwl
import emission.core.wrapper.entry as ecwe
import emission.analysis.classification.inference.labels.inferrers as eacili
import emission.analysis.classification.inference.labels.ensembles as eacile


# For each algorithm in ecwl.AlgorithmTypes that runs on a trip (e.g., not the ensemble, which
# runs on the results of other algorithms), primary_algorithms specifies a corresponding
# function in eacili to run. This makes it easy to plug in additional algorithms later.
primary_algorithms = {
ecwl.AlgorithmTypes.GRADIENT_BOOSTED_DECISION_TREE: eacili.predict_gradient_boosted_decision_tree
}

# ensemble specifies which algorithm in eacile to run.
# This makes it easy to test various ways of combining various algorithms.
ensemble = eacile.ensemble_first_prediction


# Does all the work necessary for a given user
def infer_labels(user_id):
time_query = epq.get_time_range_for_label_inference(user_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, this is not the time range to query for because that will return the time range for the label inference algorithm. You are your own algorithm and you need your own time range

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will break the pipeline unless changed.

try:
lip = LabelInferencePipeline()
lip.user_id = user_id
lip.run_prediction_pipeline(user_id, time_query)
if lip.last_trip_done is None:
logging.debug("After run, last_trip_done == None, must be early return")
epq.mark_label_inference_done(user_id, lip.last_trip_done)
except:
logging.exception("Error while inferring labels, timestamp is unchanged")
epq.mark_label_inference_failed(user_id)

# Code structure based on emission.analysis.classification.inference.mode.pipeline
# and emission.analysis.classification.inference.mode.rule_engine
class LabelInferencePipeline:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, this needs to change for clarity

def __init__(self):
self._last_trip_done = None

@property
def last_trip_done(self):
return self._last_trip_done

# For a given user and time range, runs all the primary algorithms and ensemble, saves results
# to the database, and records progress
def run_prediction_pipeline(self, user_id, time_range):
self.ts = esta.TimeSeries.get_time_series(user_id)
self.toPredictTrips = esda.get_entries(
esda.CLEANED_TRIP_KEY, user_id, time_query=time_range)
for cleaned_trip in self.toPredictTrips:
# Create an inferred trip
cleaned_trip_dict = copy.copy(cleaned_trip)["data"]
inferred_trip = ecwe.Entry.create_entry(user_id, "analysis/inferred_trip", cleaned_trip_dict)

Comment on lines +60 to +62
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you have basically copy-pasted the other pipeline.py, you need to understand how it works and adapt it to be a separate step.

# Run the algorithms and the ensemble, store results
results = self.compute_and_save_algorithms(inferred_trip)
ensemble = self.compute_and_save_ensemble(inferred_trip, results)

# Put final results into the inferred trip and store it
inferred_trip["data"]["cleaned_trip"] = cleaned_trip.get_id()
inferred_trip["data"]["inferred_labels"] = ensemble["prediction"]
self.ts.insert(inferred_trip)

if self._last_trip_done is None or self._last_trip_done["data"]["end_ts"] < cleaned_trip["data"]["end_ts"]:
self._last_trip_done = cleaned_trip

# This is where the labels for a given trip are actually predicted.
# Though the only information passed in is the trip object, the trip object can provide the
# user_id and other potentially useful information.
def compute_and_save_algorithms(self, trip):
predictions = []
for algorithm_id, algorithm_fn in primary_algorithms.items():
prediction = algorithm_fn(trip)
lp = ecwl.Labelprediction()
lp.trip_id = trip.get_id()
lp.algorithm_id = algorithm_id
lp.prediction = prediction
lp.start_ts = trip["data"]["start_ts"]
lp.end_ts = trip["data"]["end_ts"]
self.ts.insert_data(self.user_id, "inference/labels", lp)
predictions.append(lp)
return predictions

# Combine all our predictions into a single ensemble prediction.
# As a placeholder, we just take the first prediction.
# TODO: implement a real combination algorithm.
def compute_and_save_ensemble(self, trip, predictions):
il = ecwl.Labelprediction()
il.trip_id = trip.get_id()
il.start_ts = trip["data"]["start_ts"]
il.end_ts = trip["data"]["end_ts"]
(il.algorithm_id, il.prediction) = ensemble(trip, predictions)
self.ts.insert_data(self.user_id, "analysis/inferred_labels", il)
return il
51 changes: 51 additions & 0 deletions emission/analysis/modelling/trip_model/model_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,57 @@ def from_str(cls, str):
msg = f"{str} is not a known ModelStorage, must be one of {names}"
raise KeyError(msg)

def load_model_all_users(model_type: eamum.ModelType, model_storage: ModelStorage) -> Optional[Dict]:
"""load a user label model from a model storage location

:param user_id: the user to request a model for
:param model_type: expected type of model stored
:param model_storage: storage format
:return: the model representation as a Python Dict or None
:raises: TypeError if loaded model has different type than expected type
KeyError if the ModelType is not known
"""
if model_storage == ModelStorage.DOCUMENT_DATABASE:

# retrieve stored model with timestamp that matches/exceeds the most
# recent PipelineState.TRIP_MODEL entry
ms = esma.ModelStorage.get_model_storage(0)
latest_model_entry = ms.get_current_model(key=esda.REPLACE_MODEL_STORE_KEY)

if latest_model_entry is None:
logging.debug(f'no {model_type.name} model found')
return None

write_ts = latest_model_entry['metadata']['write_ts']
logging.debug(f'retrieved latest trip model recorded at timestamp {write_ts}')
logging.debug(latest_model_entry)

# parse str to enum for ModelType
latest_model_type_str = latest_model_entry.get('data', {}).get('model_type')
if latest_model_type_str is None:
raise TypeError('stored model does not have a model type')
latest_model_type = eamum.ModelType.from_str(latest_model_type_str)

# validate and return
if latest_model_entry is None:
return None
elif latest_model_type != model_type:
msg = (
f"loading model has model type '{latest_model_type.name}' "
f"but was expected to have model type {model_type.name}"
)
raise TypeError(msg)
else:
return latest_model_entry['data']['model']

else:
storage_types_str = ",".join(ModelStorage.names())
msg = (
f"unknown model storage type {model_storage}, must be one of "
f"{{{storage_types_str}}}"
)
raise TypeError(msg)

def load_model(user_id, model_type: eamum.ModelType, model_storage: ModelStorage) -> Optional[Dict]:
"""load a user label model from a model storage location

Expand Down
40 changes: 40 additions & 0 deletions emission/analysis/modelling/trip_model/run_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,27 @@ def predict_labels_with_n(
predictions, n = model.predict(trip)
return predictions, n

def predict_labels_with_gbdt(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this called from?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the list of algorithms (but pipeline_replace_mode.py will do it later).

trip: ecwc.Confirmedtrip,
model_type = eamumt.ModelType.GRADIENT_BOOSTED_DECISION_TREE,
model_storage = eamums.ModelStorage.DOCUMENT_DATABASE,
model_config = None):
"""
invoke the user label prediction model to predict labels for a trip.

:param trip: the trip to predict labels for
:param model_type: type of prediction model to run
:param model_storage: location to read/write models
:param model_config: optional configuration for model, for debugging purposes
:return: a list of predictions
"""
user_id = trip['user_id']
model = _load_stored_trip_model_all_users(model_type, model_storage, model_config)
if model is None:
return [], -1
else:
predictions, n = model.predict(trip)
return predictions, n

def _get_training_data(user_id: UUID, time_query: Optional[estt.TimeQuery]):
"""
Expand Down Expand Up @@ -159,6 +180,25 @@ def _load_stored_trip_model(
model.from_dict(model_dict)
return model

def _load_stored_trip_model_all_users(
model_type: eamumt.ModelType,
model_storage: eamums.ModelStorage,
model_config = None) -> Optional[eamuu.TripModel]:
"""helper to build a user label prediction model class with the
contents of a stored model shared across all users.

:param model_type: TripModel type configured for this OpenPATH server
:param model_storage: storage type
:param model_config: optional configuration for model, for debugging purposes
:return: model, or None if no model is stored for this user
"""
model_dict = eamums.load_model_all_users(model_type, model_storage)
if model_dict is None:
return None
else:
model = model_type.build(model_config)
model.from_dict(model_dict)
return model

def _latest_timestamp(trips: List[ecwc.Confirmedtrip]) -> float:
"""extract the latest timestamp observed from a list of trips
Expand Down
1 change: 1 addition & 0 deletions emission/core/wrapper/labelprediction.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class AlgorithmTypes(enum.Enum):
TWO_STAGE_BIN_CLUSTER = 5
PLACEHOLDER_PREDICTOR_DEMO = 6
CONFIDENCE_DISCOUNTED_CLUSTER = 7
GRADIENT_BOOSTED_DECISION_TREE = 8


class Labelprediction(ecwb.WrapperBase):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
METRICS_DAILY_MEAN_MEDIAN_SPEED = "metrics/daily_mean_median_speed"
INFERRED_LABELS_KEY = "inference/labels"
TRIP_MODEL_STORE_KEY = "inference/trip_model"
REPLACE_MODEL_STORE_KEY = "inference/replace_model"

# General methods

Expand Down