Skip to content

Commit

Permalink
Code & documentation for demo app
Browse files Browse the repository at this point in the history
  • Loading branch information
davideanastasia committed Oct 27, 2019
1 parent e5eac8c commit c66630f
Show file tree
Hide file tree
Showing 23 changed files with 982 additions and 25 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
target/
*.iml
data/*
dependency-reduced-pom.xml
dependency-reduced-pom.xml
ignore.txt
32 changes: 20 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,28 +1,36 @@
# Apache Beam Spatial

Example on how to use a tile system (Uber H3) to generate 2D-aware sessions in Apache Beam
This project contains a modified session algorithm for the calculation of time/space sessions using Apache Beam. The algorithm leverages
Uber H3 (an hex-based tiling system), and works in either batch or streaming mode. With little modification, it can be extended to work using Google S2 or the classic Geohash algorithm.

Full explanation for the code in this repo is available in the article [Time/Space Sessions in Apache Beam](https://medium.com/@davide.anastasia/time-space-sessions-in-apache-beam-b402cdf8470) on Medium
The code is support material for the article [Time/Space Sessions in Apache Beam](https://medium.com/@davide.anastasia/time-space-sessions-in-apache-beam-b402cdf8470) on Medium.

## Examples

### Reference dataset

Dataset used in the examples/ folder is the [Microsoft T-Drive](https://www.microsoft.com/en-us/research/publication/t-drive-trajectory-data-sample/).
Most of the code in the project (including the demo app) is tested with the [Microsoft T-Drive](https://www.microsoft.com/en-us/research/publication/t-drive-trajectory-data-sample/) dataset.

### How to run?
### On Google Dataflow

Execution of the batch job on Google Dataflow can be performed using:

```bash
export PROJECT_ID=...
export REGION=...
export GCP_PROJECT=...
export GCP_REGION=...
export GOOGLE_APPLICATION_CREDENTIALS=...

java -cp ../target/beam-spatial-examples-bundled-0.0.1.jar com.davideanastasia.beam.spatial.Batch \
--runner=DataflowRunner \
--project=${PROJECT_ID} \
--project=${GCP_PROJECT} \
--region=${GCP_REGION} \
--maxNumWorkers=6 \
--region=${REGION} \
--tempLocation=gs://${PROJECT_ID}-data/temp/ \
--inputFile=gs://${PROJECT_ID}-data/raw-data/*.txt \
--outputFile=gs://${PROJECT_ID}-data/sessions-3/out
```
--tempLocation=gs://${GCP_PROJECT}-data/temp/ \
--inputFile=gs://${GCP_PROJECT}-data/raw-data/*.txt \
--outputFile=gs://${GCP_PROJECT}-data/sessions-3/out
```

# Demo

The folder `demo/` contains a combination of dataflow + python to create a real-time mapping visualization of the taxi data. For further info, please refer to the README.md file in the same folder.

Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public TiledSessions<T> withStrategy(SortingStrategy sortingStrategy) {

@Override
public Collection<TiledIntervalWindow> assignWindows(AssignContext c) throws Exception {
// TODO: expensive to compute at every assignWindows... can it be moved somewhere else?
H3Core h3 = H3Core.newInstance();

Point point = c.element().getValue().getPoint();
Expand Down
3 changes: 3 additions & 0 deletions demo/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
__pycache__
socketio
setenv
19 changes: 19 additions & 0 deletions demo/Pipfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true

[dev-packages]

[packages]
flask = "*"
flask-socketio = "*"
eventlet = "*"
gunicorn = "*"
gevent = "*"
redis = "*"
google-cloud-pubsub = "*"
requests = "*"

[requires]
python_version = "3.7"
380 changes: 380 additions & 0 deletions demo/Pipfile.lock

Large diffs are not rendered by default.

88 changes: 88 additions & 0 deletions demo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Demo App

The demo app uses:
- a Google Dataflow pipeline which outputs sessions at minute intervals
- a python script that reads from a file and publishes message on Pubsub
- a python script that reads sessions from Pubsub and publishes them to a REST API
- a web service that publishes messages into a websocket and serves a static map

## Architecture

The whole architecture hinges around Google Dataflow, that is in charge of emitting sessions every one minute for each of the taxi in the dataset. The input of the Dataflow pipeline is Pubsub, as well as the output.

### Input

The input dataset will be the Microsoft T-Drive (one week of taxi location in Beijing), which can be downloaded for free.
In a real-world scenario, every taxis would be equipped with a tracker that sends data to an API. The API (after authorize/authenticate the tracker) would then forward the payload to a Pubsub queue.
In our dummy example, the input will be simulated by a simple Python script, publishing the payload directly into the Pubsub queue.

### Output

The aim is to display the data on a map in realtime over multiple client (ideally having them in sync), hence I decided to use a Websocket to build a continuous stream of data in the front-end: in this way, the webserver is not overloaded by multiple front-end performing polling, as Websockets are event-driven.

### Diagram

Final diagram of the demo is below:

![Architecture Diagram](doc/architecture-diagram.png)

## How to run?

### Assets

The required assets are:
- GCP project (set two environment variable, `GCP_PROJECT` and `GCP_REGION`)
- Two Pubsub topics: `t-drive-data` and `t-drive-session`)
- One Pubsub subscription for `t-drive-session`, called `t-drive-session-sub`
- One BigQuery table `t_drive_data.session_data` with 5 columns: `taxi_id, start_ts, end_ts, hex_addr, count`

An additional requirement is the creation of a service account and the download of the json file, as well as the setup of the `GOOGLE_APPLICATION_CREDENTIALS` environment variable.

### Input dataset and Pubsub producer

To download the dataset, follow the link to the [Microsoft T-Drive](https://www.microsoft.com/en-us/research/publication/t-drive-trajectory-data-sample/) page.

Once downloaded, the dataset needs to be merged into a single file with a couple of simple bash commands:

```bash
less input/*.txt | awk -F, '{print $2,$1,$3,$4}' OFS=, > output.txt
sort output.txt | awk -F, '{print $2,$1,$3,$4}' OFS=, > output.sorted.txt
```

where `input/` folder contains the original dataset. The final file is used as input of the Pubsub producer.

Once the input dataset is ready, you can run the producer with:

```bash
python pubsub/producer.py ../data/output.sorted.txt
```

### Dataflow Streaming Job

The streaming job can be launched with:

```bash
./scripts/run-stream.sh
```

### Run Webapp

The webapp can be launched with:

```bash
python app/main.py
```

Once running, you will be to see the map at: http://127.0.0.1:5000/

### Pubsub consumer

The consumer can be launched with:

```bash
python pubsub/consumer.py
```

The consumer will receive in-flight sessions


Empty file added demo/app/__init__.py
Empty file.
70 changes: 70 additions & 0 deletions demo/app/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import eventlet

# ref: https://github.com/miguelgrinberg/Flask-SocketIO/issues/235
eventlet.monkey_patch()

import logging

logging.basicConfig(level=logging.DEBUG)

import json

from flask import Flask, render_template, request
from flask_socketio import SocketIO, emit

NAMESPACE = '/tdrive'
LOGGER = logging.getLogger(__name__)

app = Flask(__name__)
# ref: https://github.com/miguelgrinberg/Flask-SocketIO/issues/618
# remember to pass 'app' in the init of SocketIO, or it won't work...
socketio = SocketIO(app,
logger=True,
engineio_logger=True)


@app.route('/')
def index():
return render_template("index.html")


@app.route('/insert', methods=['POST'])
def insert():
if request.json is None:
print('empty push')
else:
# LOGGER.info(request.json)
if 'id' in request.json:
socketio.emit('add',
json.dumps(request.json),
namespace=NAMESPACE,
broadcast=True)

return "OK"


@app.route('/remove', methods=['POST'])
def remove():
# LOGGER.info(request.json)
if 'id' in request.json:
socketio.emit('rem',
json.dumps(request.json),
namespace=NAMESPACE,
broadcast=True)

return "OK"


@socketio.on('connect', namespace=NAMESPACE)
def test_connect():
LOGGER.info('Client connected')
emit('connect_response', {'data': 'Connected'})


@socketio.on('disconnect', namespace=NAMESPACE)
def test_disconnect():
LOGGER.info('Client disconnected')


if __name__ == '__main__':
socketio.run(app, host='0.0.0.0', debug=False)
79 changes: 79 additions & 0 deletions demo/app/static/js/main.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
var map = L.map('map').setView([39.916667, 116.383333], 12);
var colors = {
0: '#800026',
1: '#BD0026',
2: '#E31A1C',
3: '#FC4E2A',
4: '#FD8D3C',
5: '#FEB24C',
6: '#FED976',
7: '#FFEDA0'
}

var tileLayer = L.tileLayer('https://stamen-tiles-{s}.a.ssl.fastly.net/toner/{z}/{x}/{y}{r}.{ext}', {
attribution: 'Map tiles by <a href="http://stamen.com">Stamen Design</a>, <a href="http://creativecommons.org/licenses/by/3.0">CC BY 3.0</a> &mdash; Map data &copy; <a href="https://www.openstreetmap.org/copyright">OpenStreetMap</a> contributors',
subdomains: 'abcd',
minZoom: 0,
maxZoom: 20,
ext: 'png'
}).addTo(map);

var taxis = {}

var socket = io('/tdrive');
socket.on('connect', function() {
socket.emit('my event', {data: 'Connected!'});
});
socket.on('add', function(data) {
var obj = JSON.parse(data);

var taxiId = obj['id'] + '/' + obj['hexAddr']
console.log()

var polygon = taxis[taxiId];
var color = colors[parseInt(obj['id']) % 8]
if (polygon === undefined) {
console.log('taxiId = ' + taxiId + ', add to map');
// add to map
var hexBoundary = h3.h3ToGeoBoundary(obj['hexAddr']);
var polygon = L.polygon(hexBoundary, {color: color, opacity: 0.5, weight: 2}).bindPopup("Taxi " + obj['id'] + "</br>Inserted At " + obj['startTs']);
var poly = polygon.addTo(map);

taxis[taxiId] = {
'hexAddr': obj['hexAddr'],
'polygon': polygon
};
} else if (polygon['hexAddr'] != obj['hexAddr']) {
// NOTE: this case shouldn't really happen...
console.log('taxiId = ' + taxiId + ', different location, previous = ' + polygon['hexAddr']);
map.removeLayer(polygon['polygon']);

var hexBoundary = h3.h3ToGeoBoundary(obj['hexAddr']);
var polygon = L.polygon(hexBoundary, {color: color, opacity: 0.5, weight: 2}).bindPopup("Taxi " + obj['id']);
var poly = polygon.addTo(map);

// overwrite key in cache
taxis[taxiId] = {
'hexAddr': obj['hexAddr'],
'polygon': polygon
};
} else {
// if we get here, the key in the cache is the same of the current object and we do nothing
console.log('taxiId = ' + taxiId + ', same location');
}
});
socket.on('rem', function(data) {
var obj = JSON.parse(data);

var taxiId = obj['id'] + '/' + obj['hexAddr']

var polygon = taxis[taxiId];
if (polygon === undefined) {
return;
}

console.log('taxiId = ' + taxiId + ', remove');
map.removeLayer(polygon['polygon']);

delete taxis[taxiId];
});
31 changes: 31 additions & 0 deletions demo/app/templates/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<html>
<head>
<!-- https://leafletjs.com/examples/quick-start/ -->
<!-- https://maptimeboston.github.io/leaflet-intro/ -->
<title>T-Drive Realtime</title>
<meta name="viewport" content="width=device-width, initial-scale=1, user-scalable=no">
<link rel="stylesheet" href="https://unpkg.com/[email protected]/dist/leaflet.css"
integrity="sha512-xwE/Az9zrjBIphAcBb3F6JVqxf46+CDLwfLMHloNu6KEQCAWi6HcDUbeOfBIptF7tcCzusKFjFw2yuvEpDL9wQ=="
crossorigin=""/>
<script src="https://unpkg.com/[email protected]/dist/leaflet.js"
integrity="sha512-GffPMF3RvMeYyc1LWMHtK8EbPv0iNZ8/oTtHPx9/cc2ILxQ+u905qIwdpULaqDkyBKgOaB57QTMg7ztg8Jm2Og=="
crossorigin=""></script>
<script src="//cdnjs.cloudflare.com/ajax/libs/socket.io/2.2.0/socket.io.js"
integrity="sha256-yr4fRk/GU1ehYJPAs8P4JlTgu0Hdsp4ZKrx8bDEDC3I="
crossorigin="anonymous"></script>
<script src="https://unpkg.com/h3-js"></script>
<style>
#map {
position: absolute;
top: 0;
left: 0;
bottom: 0;
right: 0;
}
</style>
</head>
<body>
<div id="map"></div>
<script src="{{ url_for('static', filename='js/main.js') }}"></script>
</body>
</html>
Binary file added demo/datasomething-meetup-20191029.pdf
Binary file not shown.
Binary file added demo/doc/architecture-diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit c66630f

Please sign in to comment.