Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregator Code Bug Fixes & Example In Go Code #31

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 38 additions & 3 deletions activity/aggregate/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/project-flogo/core/activity"
"github.com/project-flogo/core/data/coerce"
"github.com/project-flogo/core/data/metadata"
"github.com/project-flogo/stream/activity/aggregate/window"
"github.com/project-flogo/stream/pipeline/support"
Expand Down Expand Up @@ -63,7 +64,8 @@ func New(ctx activity.InitContext) (activity.Activity, error) {
return nil, err
}

act := &Activity{settings: s, additionalSettings: additionalSettings}
sharedData := make(map[string]interface{})
act := &Activity{settings: s, additionalSettings: additionalSettings, sharedData: sharedData}

return act, nil
}
Expand All @@ -73,6 +75,7 @@ type Activity struct {
settings *Settings
additionalSettings map[string]string
mutex sync.Mutex
sharedData map[string]interface{}
}

// Metadata returns the activity's metadata
Expand All @@ -83,7 +86,7 @@ func (a *Activity) Metadata() *activity.Metadata {
// Eval implements api.Activity.Eval - Aggregates the Message
func (a *Activity) Eval(ctx activity.Context) (done bool, err error) {

sharedData := ctx.GetSharedTempData()
sharedData := a.sharedData

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ctx.GetSharedTempData() should work. Do you have the panic that this causes?

This call should end up in state.go#107 which should create a map if one doesn't exist.

wv, defined := sharedData[sdWindow]

timerSupport, timerSupported := support.GetTimerSupport(ctx)
Expand Down Expand Up @@ -179,7 +182,7 @@ func (a *Activity) PostEval(ctx activity.Context, userData interface{}) (done bo

func (a *Activity) moveWindow(ctx activity.Context) bool {

sharedData := ctx.GetSharedTempData()
sharedData := a.sharedData

wv, _ := sharedData[sdWindow]

Expand Down Expand Up @@ -220,3 +223,35 @@ func toParams(values string) (map[string]string, error) {

return params, nil
}

func (o *Output) ToMap() map[string]interface{} {
return map[string]interface{}{
"report": o.Report,
"result": o.Result,
}
}

func (r *Input) ToMap() map[string]interface{} {
return map[string]interface{}{
"value": r.Value,
}
}

func (i *Input) FromMap(values map[string]interface{}) error {

i.Value = values["value"]

return nil
}

func (o *Output) FromMap(values map[string]interface{}) error {

var err error
o.Report, err = coerce.ToBool(values["report"])
if err != nil {
return err
}
o.Result = values["result"]

return nil
}
22 changes: 22 additions & 0 deletions examples/aggregator/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package main

import (
"fmt"

"github.com/project-flogo/core/api"
"github.com/project-flogo/core/engine"
)

func main() {

app := StreamTest()

e, err := api.NewEngine(app)

if err != nil {
fmt.Println("Error:", err)
return
}

engine.RunEngine(e)
}
132 changes: 132 additions & 0 deletions examples/aggregator/streamTest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package main

import (
"context"
"encoding/json"
"fmt"

"github.com/project-flogo/contrib/activity/log"
"github.com/project-flogo/contrib/activity/rest"
restTrig "github.com/project-flogo/contrib/trigger/rest"
"github.com/project-flogo/contrib/trigger/timer"
"github.com/project-flogo/core/activity"
"github.com/project-flogo/core/api"
"github.com/project-flogo/core/data/coerce"
"github.com/project-flogo/stream/activity/aggregate"
)

// Stores all the activities of this app
var actStream map[string]activity.Activity

func StreamTest() *api.App {
app := api.NewApp()

// REST Trigger to receive HTTP message
trg := app.NewTrigger(&restTrig.Trigger{}, &restTrig.Settings{Port: 9090})
h, _ := trg.NewHandler(&restTrig.HandlerSettings{Method: "POST", Path: "/stream"})
h.NewAction(runActivitiesStream)

// Timer Trigger to send HTTP message repeatedly
tmrTrg := app.NewTrigger(&timer.Trigger{}, nil)
tmrHandler, _ := tmrTrg.NewHandler(&timer.HandlerSettings{StartInterval: "2s", RepeatInterval: "1s"})
tmrHandler.NewAction(runTimerActivitiesStream)

// A REST Activity to send data to Uri
stng := &rest.Settings{Method: "POST", Uri: "http://localhost:9090/stream",
Headers: map[string]string{"Accept": "application/json"}}
restAct, _ := api.NewActivity(&rest.Activity{}, stng)

// A log Activity for logging
logAct, _ := api.NewActivity(&log.Activity{})

// Aggregate Activities to aggregate data obtained at 9090 port
aggStng1 := &aggregate.Settings{Function: "accumulate", WindowType: "tumbling",
WindowSize: 3, ProceedOnlyOnEmit: true}
aggAct1, _ := api.NewActivity(&aggregate.Activity{}, aggStng1)

aggStng2 := &aggregate.Settings{Function: "avg", WindowType: "tumbling",
WindowSize: 3, ProceedOnlyOnEmit: false}
aggAct2, _ := api.NewActivity(&aggregate.Activity{}, aggStng2)

//Store in map to avoid activity instance recreation
actStream = map[string]activity.Activity{}
actStream["log"] = logAct
actStream["rest"] = restAct
actStream["agg1"] = aggAct1
actStream["agg2"] = aggAct2

return app
}

func runActivitiesStream(ctx context.Context, inputs map[string]interface{}) (map[string]interface{}, error) {

// Get REST Trigger Output
trgOut := &restTrig.Output{}
trgOut.FromMap(inputs)

// Coerce the required outputs to string
content, _ := coerce.ToString(trgOut.Content)

response := handleStreamInput(content)

reply := &restTrig.Reply{Code: 200, Data: response}
return reply.ToMap(), nil
}

type inputStreamData struct {
Value float64 `json:"value"`
}

func handleStreamInput(input string) map[string]interface{} {

var in inputStreamData
err := json.Unmarshal([]byte(input), &in)

if err != nil {
fmt.Println("Hello, Some problem occured during json unmarshaling")
return nil
}

response := make(map[string]interface{})
response["value"] = in.Value

output, err := api.EvalActivity(actStream["agg1"], &aggregate.Input{Value: in.Value})

if err != nil {
return nil
}

if output["report"] == true {
fmt.Println("[@9090]$ Accumulator Output : ", output["result"])
}

output, err = api.EvalActivity(actStream["agg2"], &aggregate.Input{Value: in.Value})

if err != nil {
return nil
}

if output["report"] == true {
fmt.Printf("[@9090]$ Average Output : %0.4f\n", output["result"])
fmt.Println()
}

return response
}

var num float64 = 0

func runTimerActivitiesStream(ctx context.Context, inputs map[string]interface{}) (map[string]interface{}, error) {

num += 0.4
input := fmt.Sprintf("{\"value\": %f }", num)

_, err := api.EvalActivity(actStream["rest"],
&rest.Input{Content: input})

if err != nil {
return nil, err
}

return nil, nil
}