Skip to content

Commit

Permalink
Merge pull request #1014 from JGreenlee/segmentation_optimization
Browse files Browse the repository at this point in the history
Trip Segmentation Optimization
  • Loading branch information
shankari authored Jan 31, 2025
2 parents 47ab8be + 11fd09e commit c941e5d
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 35 deletions.
36 changes: 24 additions & 12 deletions emission/analysis/intake/segmentation/restart_checking.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,46 @@
import emission.core.wrapper.transition as ecwt
import emission.storage.timeseries.timequery as estt

def is_tracking_restarted_in_range(start_ts, end_ts, timeseries):
def is_tracking_restarted_in_range(start_ts, end_ts, timeseries, transition_df=None):
"""
Check to see if tracing was restarted between the times specified
:param start_ts: the start of the time range to check
:param end_ts: the end of the time range to check
:param timeseries: the timeseries to use for checking
:param transition_df: dataframe of transitions to use (if None, will be fetched from timeseries)
:return:
"""
import emission.storage.timeseries.timequery as estt
if transition_df is not None:
transition_df = transition_df[
(transition_df['ts'] >= start_ts) & (transition_df['ts'] <= end_ts)
]
else:
import emission.storage.timeseries.timequery as estt
tq = estt.TimeQuery(timeType="data.ts", startTs=start_ts,
endTs=end_ts)
transition_df = timeseries.get_data_df("statemachine/transition", tq)

tq = estt.TimeQuery(timeType="data.ts", startTs=start_ts,
endTs=end_ts)
transition_df = timeseries.get_data_df("statemachine/transition", tq)
if len(transition_df) == 0:
logging.debug("In range %s -> %s found no transitions" %
(tq.startTs, tq.endTs))
(start_ts, end_ts))
return False
logging.debug("In range %s -> %s found transitions %s" %
(tq.startTs, tq.endTs, transition_df[["fmt_time", "curr_state", "transition"]]))
(start_ts, end_ts, transition_df[["fmt_time", "curr_state", "transition"]]))
return _is_tracking_restarted_android(transition_df) or \
_is_tracking_restarted_ios(transition_df)

def get_ongoing_motion_in_range(start_ts, end_ts, timeseries):
tq = estt.TimeQuery(timeType = "data.ts", startTs = start_ts,
endTs = end_ts)
motion_list = list(timeseries.find_entries(["background/motion_activity"], tq))
def get_ongoing_motion_in_range(start_ts, end_ts, timeseries, motion_list=None):
if motion_list is not None:
motion_list = [
m for m in motion_list if m['data']['ts'] >= start_ts and m['data']['ts'] <= end_ts
]
else:
tq = estt.TimeQuery(timeType = "data.ts", startTs = start_ts,
endTs = end_ts)
motion_list = list(timeseries.find_entries(["background/motion_activity"], tq))

logging.debug("Found %s motion_activity entries in range %s -> %s" %
(len(motion_list), tq.startTs, tq.endTs))
(len(motion_list), start_ts, end_ts))
logging.debug("sample activities are %s" % motion_list[0:5])
return motion_list

Expand Down
6 changes: 4 additions & 2 deletions emission/analysis/intake/segmentation/trip_segmentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def segment_current_trips(user_id):
if len(filters_in_df) == 1:
# Common case - let's make it easy
with ect.Timer() as t_segment_trips:
segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(ts, time_query)
segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(ts, time_query, loc_df)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips", time.time(), t_segment_trips.elapsed)
else:
with ect.Timer() as t_get_combined_segmentation:
Expand Down Expand Up @@ -165,7 +165,9 @@ def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filt
time_query.endTs = loc_df.iloc[endIndex+1].ts
logging.debug("for filter %s, startTs = %d and endTs = %d" %
(curr_filter, time_query.startTs, time_query.endTs))
segmentation_map[time_query.startTs] = filter_methods[curr_filter].segment_into_trips(ts, time_query)
curr_filter_loc_df = loc_df.loc[startIndex:endIndex]
curr_filter_loc_df.reset_index(drop=True, inplace=True)
segmentation_map[time_query.startTs] = filter_methods[curr_filter].segment_into_trips(ts, time_query, curr_filter_loc_df)
logging.debug("After filtering, segmentation_map has keys %s" % list(segmentation_map.keys()))
sortedStartTsList = sorted(segmentation_map.keys())
segmentation_points = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(self, time_threshold, point_threshold, distance_threshold):
self.point_threshold = point_threshold
self.distance_threshold = distance_threshold

def segment_into_trips(self, timeseries, time_query):
def segment_into_trips(self, timeseries, time_query, filtered_points_df):
"""
Examines the timeseries database for a specific range and returns the
segmentation points. Note that the input is the entire timeseries and
Expand All @@ -51,7 +51,7 @@ def segment_into_trips(self, timeseries, time_query):
segmentation points.
"""
with ect.Timer() as t_get_filtered_points:
self.filtered_points_df = timeseries.get_data_df("background/filtered_location", time_query)
self.filtered_points_df = filtered_points_df
user_id = self.filtered_points_df["user_id"].iloc[0]
esds.store_pipeline_time(
user_id,
Expand All @@ -63,6 +63,7 @@ def segment_into_trips(self, timeseries, time_query):
self.filtered_points_df.loc[:, "valid"] = True

self.transition_df = timeseries.get_data_df("statemachine/transition", time_query)
self.motion_list = list(timeseries.find_entries(["background/motion_activity"], time_query))

if len(self.transition_df) > 0:
logging.debug("self.transition_df = %s" % self.transition_df[["fmt_time", "transition"]])
Expand Down Expand Up @@ -207,14 +208,14 @@ def has_trip_ended(self, lastPoint, currPoint, timeseries):
# for this kind of test
speedThreshold = old_div(float(self.distance_threshold * 2), (old_div(self.time_threshold, 2)))

if eaisr.is_tracking_restarted_in_range(lastPoint.ts, currPoint.ts, timeseries):
if eaisr.is_tracking_restarted_in_range(lastPoint.ts, currPoint.ts, timeseries, self.transition_df):
logging.debug("tracking was restarted, ending trip")
return True

# In general, we get multiple locations between each motion activity. If we see a bunch of motion activities
# between two location points, and there is a large gap between the last location and the first
# motion activity as well, let us just assume that there was a restart
ongoing_motion_in_range = eaisr.get_ongoing_motion_in_range(lastPoint.ts, currPoint.ts, timeseries)
ongoing_motion_in_range = eaisr.get_ongoing_motion_in_range(lastPoint.ts, currPoint.ts, timeseries, self.motion_list)
ongoing_motion_check = len(ongoing_motion_in_range) > 0
if timeDelta > self.time_threshold and not ongoing_motion_check:
logging.debug("lastPoint.ts = %s, currPoint.ts = %s, threshold = %s, large gap = %s, ongoing_motion_in_range = %s, ending trip" %
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,28 +57,28 @@ def __init__(self, time_threshold, point_threshold, distance_threshold):
self.point_threshold = point_threshold
self.distance_threshold = distance_threshold

def segment_into_trips(self, timeseries, time_query):
def segment_into_trips(self, timeseries, time_query, filtered_points_df):
"""
Examines the timeseries database for a specific range and returns the
segmentation points. Note that the input is the entire timeseries and
the time range. This allows algorithms to use whatever combination of
data that they want from the sensor streams in order to determine the
segmentation points.
"""
filtered_points_pre_ts_diff_df = timeseries.get_data_df("background/filtered_location", time_query)
user_id = filtered_points_pre_ts_diff_df["user_id"].iloc[0]
user_id = filtered_points_df["user_id"].iloc[0]
# Sometimes, we can get bogus points because data.ts and
# metadata.write_ts are off by a lot. If we don't do this, we end up
# appearing to travel back in time
# https://github.com/e-mission/e-mission-server/issues/457
filtered_points_df = filtered_points_pre_ts_diff_df[
(filtered_points_pre_ts_diff_df.metadata_write_ts - filtered_points_pre_ts_diff_df.ts) < 1000
filtered_points_df = filtered_points_df[
(filtered_points_df.metadata_write_ts - filtered_points_df.ts) < 1000
]
filtered_points_df.reset_index(inplace=True)
transition_df = timeseries.get_data_df("statemachine/transition", time_query)
self.transition_df = timeseries.get_data_df("statemachine/transition", time_query)
self.motion_list = list(timeseries.find_entries(["background/motion_activity"], time_query))

if len(transition_df) > 0:
logging.debug("transition_df = %s" % transition_df[["fmt_time", "transition"]])
if len(self.transition_df) > 0:
logging.debug("self.transition_df = %s" % self.transition_df[["fmt_time", "transition"]])
else:
logging.debug("no transitions found. This can happen for continuous sensing")

Expand Down Expand Up @@ -186,11 +186,11 @@ def segment_into_trips(self, timeseries, time_query):
t_loop.elapsed
)

logging.debug("Iterated over all points, just_ended = %s, len(transition_df) = %s" %
(just_ended, len(transition_df)))
if not just_ended and len(transition_df) > 0:
stopped_moving_after_last = transition_df[
(transition_df.ts > currPoint.ts) & (transition_df.transition == 2)
logging.debug("Iterated over all points, just_ended = %s, len(self.transition_df) = %s" %
(just_ended, len(self.transition_df)))
if not just_ended and len(self.transition_df) > 0:
stopped_moving_after_last = self.transition_df[
(self.transition_df.ts > currPoint.ts) & (self.transition_df.transition == 2)
]
logging.debug("looking after %s, found transitions %s" %
(currPoint.ts, stopped_moving_after_last))
Expand Down Expand Up @@ -253,11 +253,11 @@ def has_trip_ended(self, prev_point, curr_point, timeseries, last10PointsDistanc
speedDelta = np.nan
speedThreshold = old_div(float(self.distance_threshold), self.time_threshold)

if eaisr.is_tracking_restarted_in_range(prev_point.ts, curr_point.ts, timeseries):
if eaisr.is_tracking_restarted_in_range(prev_point.ts, curr_point.ts, timeseries, self.transition_df):
logging.debug("tracking was restarted, ending trip")
return True

ongoing_motion_check = len(eaisr.get_ongoing_motion_in_range(prev_point.ts, curr_point.ts, timeseries)) > 0
ongoing_motion_check = len(eaisr.get_ongoing_motion_in_range(prev_point.ts, curr_point.ts, timeseries, self.motion_list)) > 0
if timeDelta > 2 * self.time_threshold and not ongoing_motion_check:
logging.debug("lastPoint.ts = %s, currPoint.ts = %s, threshold = %s, large gap = %s, ongoing_motion_in_range = %s, ending trip" %
(prev_point.ts, curr_point.ts,self.time_threshold, curr_point.ts - prev_point.ts, ongoing_motion_check))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ def testSegmentationPointsDwellSegmentationTimeFilter(self):
dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold = 5 * 60, # 5 mins
point_threshold = 10,
distance_threshold = 100) # 100 m
segmentation_points = dstfsm.segment_into_trips(ts, tq)
loc_df = ts.get_data_df("background/filtered_location", tq)
segmentation_points = dstfsm.segment_into_trips(ts, tq, loc_df)
for (start, end) in segmentation_points:
logging.debug("trip is from %s (%f) -> %s (%f)" % (start.fmt_time, start.ts, end.fmt_time, end.ts))
self.assertIsNotNone(segmentation_points)
Expand All @@ -88,7 +89,8 @@ def testSegmentationPointsDwellSegmentationDistFilter(self):
dstdsm = dsdf.DwellSegmentationDistFilter(time_threshold = 10 * 60, # 5 mins
point_threshold = 10,
distance_threshold = 100) # 100 m
segmentation_points = dstdsm.segment_into_trips(ts, tq)
loc_df = ts.get_data_df("background/filtered_location", tq)
segmentation_points = dstdsm.segment_into_trips(ts, tq, loc_df)
for (start, end) in segmentation_points:
logging.debug("trip is from %s (%f) -> %s (%f)" % (start.fmt_time, start.ts, end.fmt_time, end.ts))
self.assertIsNotNone(segmentation_points)
Expand Down

0 comments on commit c941e5d

Please sign in to comment.