A light-weight module for analysis of GPS activity. Package is designed to be trade-off solution for both researchers and developers in Waste Labs. Using gps_activity
you can:
- Cluster your time-series gps records to extract activity points
- Join activity points with original plan or operation report
- Estimate clustering performance
Using pip:
pip3 install gps_activity
- extraction: clusters GPS records and extracts cluster activities (checkout module structure)
- linker: joins route plan and clustered gps records
- metrics: estimates clustering performance based on:
- internal source: ones that based on inter & intra cluster distances
- external source: joined route plan and clustered gps records (output of linker module)
Organized by preprocessing, fragmentation & clustering steps that ultimately are packed into ActivityExtractionSession
object and orchestrated.
- 1 vehicle = 1 session run: to avoid clusters overlap
- No duplicated gps records over vehicle-timstamp: avoids division by zero during computing
velocity
π VHFDBSCAN: Velocity hardlimit fragmentation Density-based spatial clustering of applications with noise
- Fragmentation is performing by applying hardlimiting on velocity computed from
lat
,lon
anddatetime
columns - Clustering is performed by classical DBSCAN that considers non-cluster candidates as noise
from gps_activity import ActivityExtractionSession
from gps_activity.extraction.factory.preprocessing import PreprocessingFactory
from gps_activity.extraction.factory.fragmentation import VelocityFragmentationFactory
from gps_activity.extraction.factory.clustering import FDBSCANFactory
preprocessing = PreprocessingFactory.factory_pipeline(
source_lat_column="lat",
source_lon_column="lon",
source_datetime="datetime",
source_vehicle_id="plate_no",
source_crs="EPSG:4326",
target_crs="EPSG:2326",
)
fragmentation = VelocityFragmentationFactory.factory_pipeline(max_velocity_hard_limit=4)
clustering = FDBSCANFactory.factory_pipeline(eps=30, min_samples=3)
activity_extraction = ActivityExtractionSession(
preprocessing=preprocessing,
fragmentation=fragmentation,
clustering=clustering,
)
activity_extraction.predict(gps)
- Fragmentation is performing by applying hardlimiting on velocity computed from
lat
,lon
anddatetime
columns - Clustering is performed according steps:
- Generated adjacent proximity mask (if cluster pair distance <=
eps
) - Clusters ID are generated according: proximity mask and fragmentation flag
- GPS records grouped by
cluster_id
and aggregated cluster time span - Cluster is validated if time span >=
min_duration_sec
- Validated cluster ids are set to original GPS records
- Generated adjacent proximity mask (if cluster pair distance <=
from gps_activity import ActivityExtractionSession
from gps_activity.extraction.factory.preprocessing import PreprocessingFactory
from gps_activity.extraction.factory.fragmentation import VelocityFragmentationFactory
from gps_activity.extraction.factory.clustering import STCMFactory
preprocessing = PreprocessingFactory.factory_pipeline(
source_lat_column="lat",
source_lon_column="lon",
source_datetime="datetime",
source_vehicle_id="plate_no",
source_crs="EPSG:4326",
target_crs="EPSG:2326",
)
fragmentation = VelocityFragmentationFactory.factory_pipeline(max_velocity_hard_limit=4)
clustering = STCMFactory.factory_pipeline(
source_vehicle_id_column="plate_no",
eps=30,
min_duration_sec=60
)
stcm = ActivityExtractionSession(
preprocessing=preprocessing,
fragmentation=fragmentation,
clustering=clustering,
)
Overview linker module components
# Initilize linkage components
from gps_activity import ActivityLinkageSession
from gps_activity.linker.factory import PreprocessingFactory
from gps_activity.linker.factory import ClusterAggregationFactory
from gps_activity.linker.factory import JoinValidatorFactory
from gps_activity.linker.factory import SpatialJoinerFactory
from gps_activity.linker.factory import CoverageStatisticsFactory
MAX_DISTANCE = 100
MAX_DAYS_DISTANCE = 1
gps_link_preprocess_pipeline = PreprocessingFactory.factory_pipeline(
source_lat_column="lat",
source_lon_column="lon",
source_datetime="datetime",
source_vehicle_id="plate_no",
source_crs=WSG_84,
target_crs=HK_CRS,
generate_primary_key_for="gps",
source_composite_keys=["plate_no", "datetime", "lat", "lon"],
)
plans_link_preprocess_pipeline = PreprocessingFactory.factory_pipeline(
source_lat_column="lat",
source_lon_column="lng",
source_datetime="datetime",
source_vehicle_id="re-assigned by Ricky",
source_crs=WSG_84,
target_crs=HK_CRS,
generate_primary_key_for="plan",
source_composite_keys=["CRN#"],
)
cluster_agg_pipeline = ClusterAggregationFactory.factory_pipeline(
source_lat_column="lat",
source_lon_column="lon",
source_datetime="datetime",
source_vehicle_id="plate_no",
source_crs=WSG_84,
target_crs=HK_CRS,
)
spatial_joiner = SpatialJoinerFactory.factory_pipeline(how="inner", max_distance=MAX_DISTANCE)
spatial_validator = JoinValidatorFactory.factory_pipeline(max_days_distance=MAX_DAYS_DISTANCE,
ensure_vehicle_overlap=True)
coverage_stats_extractor = CoverageStatisticsFactory.factory_pipeline()
gps_linker_session = ActivityLinkageSession(
gps_preprocessor=gps_link_preprocess_pipeline,
plan_preprocessor=plans_link_preprocess_pipeline,
cluster_aggregator=cluster_agg_pipeline,
spatial_joiner=spatial_joiner,
spatial_validator=spatial_validator,
coverage_stats_extractor=coverage_stats_extractor,
)
linker_results = gps_linker_session.transform({
"gps": clustered_gps,
"plan": plans,
})
- NOTE: This module is highly experimental
- NOTE: This module depends on
linker
module
from gps_activity.metrics import ActivityMetricsSession
from gps_activity.metrics.models import Metrics
metrics = ActivityMetricsSession(beta=2)
metrics = metrics.transform(linker_results)