Skip to content

Commit

Permalink
function documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
justdeko committed Sep 25, 2021
1 parent f35a9ae commit 2eef27e
Show file tree
Hide file tree
Showing 11 changed files with 32 additions and 6 deletions.
8 changes: 8 additions & 0 deletions functions/syncmesh-fn/db_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (db mongoDB) getSensorsInTimeRange(startTime time.Time, endTime time.Time,
return sensors, nil
}

// aggregateSensorsInTimeRange using averages and optional time ranges
func (db mongoDB) aggregateSensorsInTimeRange(startTime interface{}, endTime interface{}) (interface{}, error) {
ctx, _ := context.WithTimeout(context.Background(), 90*time.Second)
var averagesCursor *mongo.Cursor
Expand Down Expand Up @@ -77,6 +78,7 @@ func (db mongoDB) aggregateSensorsInTimeRange(startTime interface{}, endTime int
return averages[0], nil
}

// getSensor for a given id
func (db mongoDB) getSensor(_id string) (interface{}, error) {
var sensor SensorModel
var err error
Expand All @@ -94,6 +96,7 @@ func (db mongoDB) getSensor(_id string) (interface{}, error) {
return sensor, nil
}

// deleteSensorById with a given id
func (db mongoDB) deleteSensorById(_id string) (interface{}, error) {
var sensor SensorModel
var err error
Expand All @@ -111,6 +114,7 @@ func (db mongoDB) deleteSensorById(_id string) (interface{}, error) {
return sensor, nil
}

// deleteSensorByReplicaId with a given replica ID
func (db mongoDB) deleteSensorByReplicaId(replicaID string) (interface{}, error) {
var sensor SensorModel
var err error
Expand All @@ -123,6 +127,7 @@ func (db mongoDB) deleteSensorByReplicaId(replicaID string) (interface{}, error)
return sensor, nil
}

// createSensors using a given list of sensors to add into the db
func (db mongoDB) createSensors(sensors []interface{}) (interface{}, error) {
ctx, _ := context.WithTimeout(context.Background(), 90*time.Second)
res, err := db.collection.InsertMany(ctx, sensors, options.InsertMany().SetOrdered(false))
Expand All @@ -132,6 +137,7 @@ func (db mongoDB) createSensors(sensors []interface{}) (interface{}, error) {
return res.InsertedIDs, nil
}

// update or create (if id does not exist) a sensor with an id using a new body
func (db mongoDB) update(_id string, sensor interface{}, replicaID string) (interface{}, error) {
var err error
var updatedSensor SensorModel
Expand All @@ -154,6 +160,7 @@ func (db mongoDB) update(_id string, sensor interface{}, replicaID string) (inte
return updatedSensor, nil
}

// deleteInTimeRange for deleting sensors in a given time range
func (db mongoDB) deleteInTimeRange(startTime time.Time, endTime time.Time) (interface{}, error) {
ctx, _ := context.WithTimeout(context.Background(), 90*time.Second)
res, err := db.collection.DeleteMany(ctx, bson.M{
Expand All @@ -168,6 +175,7 @@ func (db mongoDB) deleteInTimeRange(startTime time.Time, endTime time.Time) (int
return res.DeletedCount, nil
}

// getDocEstimate approximates the current number of documents in the collection
func (db mongoDB) getDocEstimate() (interface{}, error) {
ctx, _ := context.WithTimeout(context.Background(), 30*time.Second)
opts := options.EstimatedDocumentCount().SetMaxTime(5 * time.Second)
Expand Down
2 changes: 2 additions & 0 deletions functions/syncmesh-fn/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"net/http"
)

// handleStreamEvent handles the stream event request type
// it sends queries to subscribed external nodes to maintain data replication
func handleStreamEvent(ctx context.Context, event StreamEvent) (interface{}, error) {
db := getSyncmeshDB(ctx)
defer db.closeDB()
Expand Down
10 changes: 10 additions & 0 deletions functions/syncmesh-fn/gql_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"
)

// getSensors with a limit and optional end or start time
func getSensors(p graphql.ResolveParams) (interface{}, error) {
limit := p.Args["limit"]
if limit == nil {
Expand All @@ -21,36 +22,43 @@ func getSensors(p graphql.ResolveParams) (interface{}, error) {
return response(db.getSensorsInTimeRange(startTime.(time.Time), endTime.(time.Time), limit.(int)))
}

// aggregateSensors for sensor data averages with an optional end or start time
func aggregateSensors(p graphql.ResolveParams) (interface{}, error) {
start := p.Args["start_time"]
end := p.Args["end_time"]
return response(db.aggregateSensorsInTimeRange(start, end))
}

// getSensor using an id
func getSensor(p graphql.ResolveParams) (interface{}, error) {
id := p.Args["_id"].(string)
return response(db.getSensor(id))
}

// deleteSensor using an id
func deleteSensor(p graphql.ResolveParams) (interface{}, error) {
id := p.Args["_id"].(string)
return response(db.deleteSensorById(id))
}

// deleteReplicaSensor using a replica id
func deleteReplicaSensor(p graphql.ResolveParams) (interface{}, error) {
id := p.Args["replicaID"].(string)
return response(db.deleteSensorByReplicaId(id))
}

// createSensors using a sensor list
func createSensors(p graphql.ResolveParams) (interface{}, error) {
sensors := p.Args["sensors"].([]interface{})
return response(db.createSensors(sensors))
}

// getDocEstimate to get the estimated number of documents
func getDocEstimate(_ graphql.ResolveParams) (interface{}, error) {
return response(db.getDocEstimate())
}

// deleteInTimeRange for deleting sensors in a given time range
func deleteInTimeRange(p graphql.ResolveParams) (interface{}, error) {
startTime := p.Args["start_time"].(time.Time)
endTime := p.Args["end_time"].(time.Time)
Expand All @@ -61,13 +69,15 @@ type SensorInput struct {
replicaID string
}

// update or create a syncmesh node with optional replica ID
func update(p graphql.ResolveParams) (interface{}, error) {
id := p.Args["_id"].(string)
replicaID := p.Args["sensor"].(SensorInput).replicaID
sensor := p.Args["sensor"].(interface{})
return response(db.update(id, sensor, replicaID))
}

// response handles a generic db operation response with error check
func response(result interface{}, err error) (interface{}, error) {
if err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions functions/syncmesh-fn/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ func handleSpecialRequest(req handler.Request, isMeta bool) (handler.Response, e
return functionResponse(b.String(), err)
}

// combineExternalNodes by applying range filtering and sorting by distance
// appends locally stored nodes to those specified in the request
func combineExternalNodes(request *SyncMeshRequest, ctx context.Context) {
var filteredNodes []SyncmeshNode

Expand Down
1 change: 1 addition & 0 deletions functions/syncmesh-fn/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
)

// TestHandlerError by not passing anything in the request and forcing an error
func TestHandlerError(t *testing.T) {
req := handler.Request{}
resp, err := Handle(req)
Expand Down
4 changes: 2 additions & 2 deletions functions/syncmesh-fn/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"go.mongodb.org/mongo-driver/mongo/options"
)

// connect to the database
// connectDB for connecting to the database with a specified db name and collection
func connectDB(ctx context.Context, db string, collection string) mongoDB {
if len(db) == 0 || len(collection) == 0 {
log.Fatal("Database and collection need to be specified")
Expand Down Expand Up @@ -41,7 +41,7 @@ func connectDB(ctx context.Context, db string, collection string) mongoDB {
}
}

// disconnect and close the session
// closeDB to disconnect and close the session
func (db mongoDB) closeDB() {
err := db.session.Disconnect(context.Background())
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions functions/syncmesh-fn/node_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
const MetaDB = "syncmesh_meta"
const NodeCollection = "nodes"

// handleMetaRequest to manage the querying, updating and deletion of locally stored nodes
func handleMetaRequest(ctx context.Context, request SyncmeshMetaRequest) (interface{}, error) {
db := getSyncmeshDB(ctx)
defer db.closeDB()
Expand Down
2 changes: 1 addition & 1 deletion functions/syncmesh-fn/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/graphql-go/graphql"
)

// Init the schema of Sensor data in GraphQL
// initSchema of Sensor data in GraphQL
func initSchema() graphql.Schema {
sensorInputType := graphql.NewInputObject(graphql.InputObjectConfig{
Name: "SensorInputType",
Expand Down
4 changes: 3 additions & 1 deletion functions/syncmesh-fn/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func findOwnNode(nodes []SyncmeshNode) (error, SyncmeshNode, []SyncmeshNode) {
return errors.New("no own node found"), SyncmeshNode{}, nodes
}

// calculateSensorAverages of all "sensor averages" responses by combining them
func calculateSensorAverages(sensors []SensorModelNoId) AveragesResponse {
final := AveragesResponse{AveragePressure: 0, AverageTemperature: 0, AverageHumidity: 0}
size := float64(len(sensors))
Expand All @@ -54,7 +55,7 @@ func calculateSensorAverages(sensors []SensorModelNoId) AveragesResponse {
return final
}

//zip a body using gzip and write it into a byte buffer
// zip a body using gzip and write it into a byte buffer
func zip(body []byte) (*bytes.Buffer, error) {
var buf bytes.Buffer
var err error
Expand Down Expand Up @@ -98,6 +99,7 @@ func unzipResponse(resp *http.Response) ([]byte, error) {
}
}

// filterExternalNodes by a given radius in km and sort them in ascending order by distance
func filterExternalNodes(externalNodes []SyncmeshNode, ownNode SyncmeshNode, radius float64) []SyncmeshNode {
var filteredNodes []SyncmeshNode
for _, node := range externalNodes {
Expand Down
1 change: 0 additions & 1 deletion functions/syncmesh-fn/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ func TestUnzipResponse(t *testing.T) {
}

// TestFilterNodes tests the radius node filtering algorithm.
// uncomment the mongodb update lines to test properly
func TestFilterNodes(t *testing.T) {
radius := 30 // 30km
nodeList := []SyncmeshNode{{
Expand Down
3 changes: 2 additions & 1 deletion mongo_event_listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type StreamEvent struct {
DocumentKey DocKey `bson:"documentKey" json:"documentKey"`
}

// main is a running change stream listener for mongodb change events in the default collection (syncmesh/sensor_data)
func main() {
log.Println("Starting mongoDB change stream listener...")
ctx := context.Background()
Expand Down Expand Up @@ -76,7 +77,7 @@ func main() {
}
}

// logPanic is a fatal log if the error is not nil
// stopFatal is a fatal log if the error is not nil
func stopFatal(err error) {
if err != nil {
log.Fatal(err)
Expand Down

0 comments on commit 2eef27e

Please sign in to comment.