diff --git a/api.go b/api.go index b515977..fc8d74f 100644 --- a/api.go +++ b/api.go @@ -1,25 +1,10 @@ package main import ( - "github.com/feedlabs/elasticfeed/service" - "github.com/feedlabs/elasticfeed/plugin" - "github.com/feedlabs/elasticfeed/workflow" - "github.com/feedlabs/elasticfeed/event" - "github.com/feedlabs/elasticfeed/resource" "github.com/feedlabs/elasticfeed/elasticfeed" ) -var ( - ServerEngine *elasticfeed.Elasticfeed -) - func main() { - rm := resource.NewResourceManager() - em := event.NewEventManager() - pm := plugin.NewPluginManager(rm) - wm := workflow.NewWorkflowManager(nil, pm, em) - sm := service.NewServiceManager() - - ServerEngine = elasticfeed.NewElasticfeed(rm, em, sm, pm, wm) - ServerEngine.Run() + engine := elasticfeed.NewElasticfeed() + engine.Run() } diff --git a/elasticfeed/elasticfeed.go b/elasticfeed/elasticfeed.go index b7c51a4..7a2a9f0 100644 --- a/elasticfeed/elasticfeed.go +++ b/elasticfeed/elasticfeed.go @@ -1,6 +1,8 @@ package elasticfeed import ( + "github.com/feedlabs/elasticfeed/elasticfeed/model" + "github.com/feedlabs/elasticfeed/plugin" "github.com/feedlabs/elasticfeed/workflow" "github.com/feedlabs/elasticfeed/service" @@ -11,41 +13,55 @@ import ( ) type Elasticfeed struct { - rm *resource.ResourceManager - em *event.EventManager - sm *service.ServiceManager - pm *plugin.PluginManager - wm *workflow.WorkflowManager + R model.ResourceManager + E model.EventManager + S model.ServiceManager + P model.PluginManager + W model.WorkflowManager +} + +func (this *Elasticfeed) GetEventManager() model.EventManager { + return this.E } -func (this *Elasticfeed) GetEventManager() *event.EventManager { - return this.em +func (this *Elasticfeed) GetResourceManager() model.ResourceManager { + return this.R } -func (this *Elasticfeed) GetResourceManager() *resource.ResourceManager { - return this.rm +func (this *Elasticfeed) GetServiceManager() model.ServiceManager { + return this.S } -func (this *Elasticfeed) GetServiceManager() *service.ServiceManager { - return this.sm +func (this *Elasticfeed) GetPluginManager() model.PluginManager { + return this.P } -func (this *Elasticfeed) GetPluginManager() *plugin.PluginManager { - return this.pm +func (this *Elasticfeed) GetWorkflowManager() model.WorkflowManager { + return this.W } -func (this *Elasticfeed) GetWorkflowManager() *workflow.WorkflowManager { - return this.wm +func (this *Elasticfeed) GetConfig() map[string]interface {} { + return make(map[string]interface {}) } func (this *Elasticfeed) Run() { this.GetResourceManager().Init() this.GetServiceManager().Init() + this.GetWorkflowManager().Init() feedify.SetStaticPath("/static", "public") feedify.Run() } -func NewElasticfeed(rm *resource.ResourceManager, em *event.EventManager, sm *service.ServiceManager, pm *plugin.PluginManager, wm *workflow.WorkflowManager) *Elasticfeed { - return &Elasticfeed{rm, em, sm, pm, wm} +func NewElasticfeed() model.Elasticfeed { + + engine := &Elasticfeed{} + + engine.R = resource.NewResourceManager(engine) + engine.E = event.NewEventManager(engine) + engine.P = plugin.NewPluginManager(engine) + engine.W = workflow.NewWorkflowManager(engine) + engine.S = service.NewServiceManager(engine) + + return engine } diff --git a/elasticfeed/model/elasticfeed.go b/elasticfeed/model/elasticfeed.go new file mode 100644 index 0000000..c11e032 --- /dev/null +++ b/elasticfeed/model/elasticfeed.go @@ -0,0 +1,18 @@ +package model + +type Elasticfeed interface { + + GetEventManager() EventManager + + GetResourceManager() ResourceManager + + GetServiceManager() ServiceManager + + GetPluginManager() PluginManager + + GetWorkflowManager() WorkflowManager + + GetConfig() map[string]interface{} + + Run() +} diff --git a/elasticfeed/model/event.go b/elasticfeed/model/event.go new file mode 100644 index 0000000..6915a8d --- /dev/null +++ b/elasticfeed/model/event.go @@ -0,0 +1,4 @@ +package model + +type EventManager interface { +} diff --git a/elasticfeed/model/plugin.go b/elasticfeed/model/plugin.go new file mode 100644 index 0000000..d0eaad1 --- /dev/null +++ b/elasticfeed/model/plugin.go @@ -0,0 +1,10 @@ +package model + +import ( + pmodel "github.com/feedlabs/elasticfeed/plugin/model" +) + +type PluginManager interface { + + LoadPipeline(name string) (pmodel.Pipeline, error) +} diff --git a/elasticfeed/model/resource.go b/elasticfeed/model/resource.go new file mode 100644 index 0000000..5cae164 --- /dev/null +++ b/elasticfeed/model/resource.go @@ -0,0 +1,6 @@ +package model + +type ResourceManager interface { + + Init() +} diff --git a/elasticfeed/model/service.go b/elasticfeed/model/service.go new file mode 100644 index 0000000..15a45b1 --- /dev/null +++ b/elasticfeed/model/service.go @@ -0,0 +1,12 @@ +package model + +import ( + "github.com/feedlabs/elasticfeed/service/stream" +) + +type ServiceManager interface { + + GetStreamService() *stream.StreamService + + Init() +} diff --git a/elasticfeed/model/workflow.go b/elasticfeed/model/workflow.go new file mode 100644 index 0000000..f22b148 --- /dev/null +++ b/elasticfeed/model/workflow.go @@ -0,0 +1,6 @@ +package model + +type WorkflowManager interface { + + Init() +} diff --git a/event/manager.go b/event/manager.go index 5e8fd45..ea7762f 100644 --- a/event/manager.go +++ b/event/manager.go @@ -1,5 +1,9 @@ package event +import ( + "github.com/feedlabs/elasticfeed/elasticfeed/model" +) + const ( EVENT_STORING = "storing" EVENT_PROCESSING = "processing" @@ -7,13 +11,32 @@ const ( EVENT_LEARNING = "learning" EVENT_STORING_CREATE_ENTRY = "create-entry" + EVENT_STORING_CREATE_VIEWER = "create-viewer" EVENT_PROCESSING_FEED_MAINTAINER = "feed-maintainer" + EVENT_PROCESSING_SENSOR_UPDATE = "sensor-update" EVENT_DISTRIBUTING_PUSH_ENTRY = "push-entry" EVENT_LEARNING_CREATE_METRIC = "create-metric" ) +/** + + - COULD DEFINE EVENTS + - COULD TRIGGER ON BIND-ED LISTENERS + + - COULD DEFINE ALARM CLOCK + - COULD DEFINE INTERRUPTS + + EVENT + - SHOULD HAVE DATA/CALLBACK + - SHOULD HAVE TYPE + - SHOULD HAVE PARENT + - SHOULD HAVE DESTINATION + + */ + type EventManager struct { - events map[string]interface {} + engine model.Elasticfeed + events map[string]interface{} } func (this *EventManager) On(name string, callback func(event *Event)) { @@ -33,14 +56,14 @@ func (this *EventManager) Trigger(name string, data interface{}) { func (this *EventManager) GetEventsMap() map[string]interface{} { return map[string]interface{}{ - EVENT_STORING: []string{EVENT_STORING_CREATE_ENTRY}, - EVENT_PROCESSING: []string{EVENT_PROCESSING_FEED_MAINTAINER}, + EVENT_STORING: []string{EVENT_STORING_CREATE_ENTRY, EVENT_STORING_CREATE_VIEWER}, + EVENT_PROCESSING: []string{EVENT_PROCESSING_FEED_MAINTAINER, EVENT_PROCESSING_SENSOR_UPDATE}, EVENT_DISTRIBUTING: []string{EVENT_DISTRIBUTING_PUSH_ENTRY}, EVENT_LEARNING: []string{EVENT_LEARNING_CREATE_METRIC}, } } -func NewEventManager() *EventManager { +func NewEventManager(engine model.Elasticfeed) model.EventManager { e := make(map[string]interface{}) - return &EventManager{e} + return &EventManager{engine, e} } diff --git a/plugin/cmd_pipeline.go b/plugin/cmd_pipeline.go index 1074a06..03d807a 100644 --- a/plugin/cmd_pipeline.go +++ b/plugin/cmd_pipeline.go @@ -19,13 +19,13 @@ func (b *cmdPipeline) Prepare(config ...interface{}) ([]string, error) { return b.pipeline.Prepare(config...) } -func (b *cmdPipeline) Run(cache model.Cache) (model.Artifact, error) { +func (b *cmdPipeline) Run(data interface {}) (interface {}, error) { defer func() { r := recover() b.checkExit(r, nil) }() - return b.pipeline.Run(cache) + return b.pipeline.Run(data) } func (b *cmdPipeline) Cancel() { diff --git a/plugin/manager.go b/plugin/manager.go index b4c2076..6bd61c6 100644 --- a/plugin/manager.go +++ b/plugin/manager.go @@ -1,23 +1,22 @@ package plugin import ( - "strconv" - - "fmt" - "log" "os/exec" "path/filepath" "strings" - "github.com/feedlabs/elasticfeed/resource" "github.com/feedlabs/elasticfeed/plugin/model" "github.com/feedlabs/elasticfeed/common/config" "github.com/mitchellh/osext" + + emodel "github.com/feedlabs/elasticfeed/elasticfeed/model" ) type PluginManager struct { + engine emodel.Elasticfeed + Indexers map[string]string Crawlers map[string]string Sensors map[string]string @@ -25,33 +24,10 @@ type PluginManager struct { Scenarios map[string]string Helpers map[string]string - Store map[int]map[string]interface{} - - api *model.ResourceApi - PluginMinPort uint PluginMaxPort uint } -func (this *PluginManager) GetResourceApi() interface{} { - return this.api -} - -func (this *PluginManager) InitPlugin(name string, profiler *model.Profiler) *Plugin { - - // _p := resource.FindPluginByName(name) - // findIsRunningWithProfiler(_p, profiler) - _p := resource.NewPlugin("", "", "", "", "", "") - p := NewPlugin(_p, this, this.api, profiler) - - p.Init() - - _group, _ := strconv.Atoi(_p.Group) - this.Store[_group][_p.Id] = p - - return p -} - func (this *PluginManager) FindPlugin(name string, profiler *model.Profiler) *interface{} { return nil } @@ -66,10 +42,6 @@ func (this *PluginManager) RunPlugin(p Plugin) (err error) { return nil } -func (this *PluginManager) GetIndexers() map[string]interface{} { - return this.Store[resource.PLUGIN_INDEXER] -} - // Discover discovers plugins. // // This looks in the directory of the executable and the CWD, in that @@ -237,26 +209,11 @@ func (c *PluginManager) pluginClient(path string) *Client { return NewClient(&config) } +func NewPluginManager(engine emodel.Elasticfeed) emodel.PluginManager { -func NewPluginManager(resourceManager interface{}) *PluginManager { - pm := &PluginManager{} - - pm.api = model.NewResourceApi(resourceManager) - - pm.discover(filepath.Join(config.GetHomeAbsolutePath(), "plugins/pipeline-ann")) - - list := []string{"ann", "ann1", "ann2"} - - for _, name := range(list) { - ann, _ := pm.LoadPipeline(name) - - ann.Prepare() - a, b := ann.Run(nil) - - fmt.Println(a) - fmt.Println(b) - } + pm := &PluginManager{engine, nil, nil, nil, nil, nil, nil, 40000, 41000} + pm.discover(filepath.Join(config.GetHomeAbsolutePath(), "public/userfiles/plugin/imports")) return pm } diff --git a/plugin/model/pipeline.go b/plugin/model/pipeline.go index 7e60411..83ec15c 100644 --- a/plugin/model/pipeline.go +++ b/plugin/model/pipeline.go @@ -4,7 +4,7 @@ type Pipeline interface { Prepare(...interface{}) ([]string, error) - Run(cache Cache) (Artifact, error) + Run(data interface {}) (interface {}, error) Cancel() } diff --git a/plugin/pipeline/ann/artifact.go b/plugin/pipeline/ann/artifact.go deleted file mode 100644 index 2be8b04..0000000 --- a/plugin/pipeline/ann/artifact.go +++ /dev/null @@ -1,37 +0,0 @@ -package ann - -import ( - "fmt" -) - -const BuilderId = "packer.post-processor.atlas" - -type Artifact struct { - Name string - Type string - Version int -} - -func (*Artifact) BuilderId() string { - return BuilderId -} - -func (a *Artifact) Files() []string { - return nil -} - -func (a *Artifact) Id() string { - return fmt.Sprintf("%s/%s/%d", a.Name, a.Type, a.Version) -} - -func (a *Artifact) String() string { - return fmt.Sprintf("%s/%s (v%d)", a.Name, a.Type, a.Version) -} - -func (*Artifact) State(name string) interface{} { - return nil -} - -func (a *Artifact) Destroy() error { - return nil -} diff --git a/plugin/pipeline/ann/pipeline.go b/plugin/pipeline/ann/pipeline.go deleted file mode 100644 index 7ea9f84..0000000 --- a/plugin/pipeline/ann/pipeline.go +++ /dev/null @@ -1,28 +0,0 @@ -package ann - -import ( - "github.com/feedlabs/elasticfeed/common" - "github.com/feedlabs/elasticfeed/plugin" - "github.com/feedlabs/elasticfeed/plugin/model" -) - -type config struct { - common.ElasticfeedConfig `mapstructure:",squash"` - - tpl *plugin.ConfigTemplate -} - -type Pipeline struct { - config config -} - -func (p *Pipeline) Prepare(raws ...interface{}) ([]string, error) { - return nil, nil -} - -func (p *Pipeline) Run(cache model.Cache) (model.Artifact, error) { - return &Artifact{"aaa", "bbb", 123}, nil -} - -func (p *Pipeline) Cancel() { -} diff --git a/plugin/plugin.go b/plugin/plugin.go index 45fb8fc..15c10a6 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -1,12 +1,11 @@ package plugin import ( - "github.com/feedlabs/elasticfeed/resource" "github.com/feedlabs/elasticfeed/plugin/model" ) type Plugin struct { - plugin *resource.Plugin + plugin interface {} pluginManager *PluginManager resourceApi *model.ResourceApi @@ -27,6 +26,6 @@ func (this *Plugin) GetPid() int { return this.pid } -func NewPlugin(p *resource.Plugin, pm *PluginManager, api *model.ResourceApi, profiler *model.Profiler) *Plugin { +func NewPlugin(p *interface {}, pm *PluginManager, api *model.ResourceApi, profiler *model.Profiler) *Plugin { return &Plugin{p, pm, api, profiler, "", -1} } diff --git a/plugin/rpc_init.go b/plugin/rpc_init.go index 96d677c..f543a27 100644 --- a/plugin/rpc_init.go +++ b/plugin/rpc_init.go @@ -3,8 +3,6 @@ package plugin import "encoding/gob" func init() { -// gob.Register(new(map[string]interface{})) -// gob.Register(new(map[string]string)) gob.Register(make([]interface{}, 0)) gob.Register(new(BasicError)) } diff --git a/plugin/rpc_pipeline.go b/plugin/rpc_pipeline.go index a0ee302..0c8f0d3 100644 --- a/plugin/rpc_pipeline.go +++ b/plugin/rpc_pipeline.go @@ -22,12 +22,13 @@ type PipelineRpcServer struct { } type PipelinePrepareArgs struct { - Configs []interface{} + Data interface{} } type PipelinePrepareResponse struct { Warnings []string Error *BasicError + Data interface {} } func (b *RpcPipeline) Prepare(config ...interface{}) ([]string, error) { @@ -44,27 +45,17 @@ func (b *RpcPipeline) Prepare(config ...interface{}) ([]string, error) { return resp.Warnings, err } -func (b *RpcPipeline) Run(cache model.Cache) (model.Artifact, error) { - nextId := b.mux.NextId() - server := newRpcServerWithMux(b.mux, nextId) - server.RegisterCache(cache) - go server.Serve() +func (h *RpcPipeline) Run(data interface{}) (interface{}, error) { - var responseId uint32 - if err := b.client.Call("Pipeline.Run", nextId, &responseId); err != nil { - return nil, err + args := PipelinePrepareArgs{ + Data: data, } - if responseId == 0 { - return nil, nil - } + var response PipelinePrepareResponse - client, err := newRpcClientWithMux(b.mux, responseId) - if err != nil { - return nil, err - } + _ = h.client.Call("Pipeline.Run", &args, &response) - return client.Artifact(), nil + return response.Data, nil } func (b *RpcPipeline) Cancel() { @@ -74,7 +65,7 @@ func (b *RpcPipeline) Cancel() { } func (b *PipelineRpcServer) Prepare(args *PipelinePrepareArgs, reply *PipelinePrepareResponse) error { - warnings, err := b.pipeline.Prepare(args.Configs...) + warnings, err := b.pipeline.Prepare(nil) *reply = PipelinePrepareResponse{ Warnings: warnings, Error: NewBasicError(err), @@ -82,25 +73,14 @@ func (b *PipelineRpcServer) Prepare(args *PipelinePrepareArgs, reply *PipelinePr return nil } -func (b *PipelineRpcServer) Run(streamId uint32, reply *uint32) error { - client, err := newRpcClientWithMux(b.mux, streamId) - if err != nil { - return NewBasicError(err) - } - defer client.Close() +func (b *PipelineRpcServer) Run(args *PipelinePrepareArgs, reply *PipelinePrepareResponse) (err error) { - artifact, err := b.pipeline.Run(client.Cache()) - if err != nil { - return NewBasicError(err) - } + data, err := b.pipeline.Run(args.Data) - *reply = 0 - if artifact != nil { - streamId = b.mux.NextId() - server := newRpcServerWithMux(b.mux, streamId) - server.RegisterArtifact(artifact) - go server.Serve() - *reply = streamId + *reply = PipelinePrepareResponse{ + Warnings: nil, + Error: nil, + Data: data, } return nil diff --git a/plugins/pipeline-ann/main.go b/plugins/pipeline-ann/main.go deleted file mode 100644 index 5d6432b..0000000 --- a/plugins/pipeline-ann/main.go +++ /dev/null @@ -1,15 +0,0 @@ -package main - -import ( - "github.com/feedlabs/elasticfeed/plugin/pipeline/ann" - "github.com/feedlabs/elasticfeed/plugin" -) - -func main() { - server, err := plugin.Server() - if err != nil { - panic(err) - } - server.RegisterPipeline(new(ann.Pipeline)) - server.Serve() -} diff --git a/plugins/pipeline/ai_neural_network.go b/plugins/pipeline/ai_neural_network.go deleted file mode 100644 index 0fc612c..0000000 --- a/plugins/pipeline/ai_neural_network.go +++ /dev/null @@ -1,11 +0,0 @@ -package pipeline - -func AINeuralNetworkAnimator(data interface{}, viewerBrain interface{}) interface{} { - - // PIPELINE data through the Neural Netowrk - // of current connected Viewer/Audience - - return data -} - -func init() {} diff --git a/plugins/pipeline/pipeline.go b/plugins/pipeline/pipeline.go deleted file mode 100644 index b4ef5ec..0000000 --- a/plugins/pipeline/pipeline.go +++ /dev/null @@ -1,8 +0,0 @@ -package pipeline - -func Filter(data interface{}) interface{} { - // should call plugins of type PIPELINE - return RandomAnimator(data) -} - -func init() {} diff --git a/plugins/pipeline/random_animator.go b/plugins/pipeline/random_animator.go deleted file mode 100644 index 0b0a892..0000000 --- a/plugins/pipeline/random_animator.go +++ /dev/null @@ -1,18 +0,0 @@ -package pipeline - -import ( - "time" - "math/rand" -) - -func RandomAnimator(data interface {}) interface {} { - - // PIPE DELAY SIMULATION - - amt := time.Duration(rand.Intn(200)) - time.Sleep(amt * time.Millisecond) - - return data -} - -func init() {} diff --git a/plugins/scenario/ai_neural_network_trainer.go b/plugins/scenario/ai_neural_network_trainer.go deleted file mode 100644 index e1b1d3f..0000000 --- a/plugins/scenario/ai_neural_network_trainer.go +++ /dev/null @@ -1,17 +0,0 @@ -package scenario - -import ( - _ "github.com/feedlabs/elasticfeed/elasticfeed/ai" -) - -func AINeuralNetworkTrainer(data interface{}, viewerBrain interface{}) interface{} { - - //----------------------------------------------------- - // Train ViewerBrain using Viewer specific metrics from - // front-end behaviours - //----------------------------------------------------- - - return viewerBrain -} - -func init() {} diff --git a/plugins/scenario/rpc_client.go b/plugins/scenario/rpc_client.go deleted file mode 100644 index 418e15b..0000000 --- a/plugins/scenario/rpc_client.go +++ /dev/null @@ -1,27 +0,0 @@ -package main - -import ( - "fmt" - "net/rpc" - "log" -) - -type Args struct { - X, Y int -} - -func main(){ - - client, err := rpc.Dial("tcp", "127.0.0.1:1234") - if err != nil { - log.Fatal("dialing:", err) - } - // Synchronous call - args := &Args{1,2} - var reply int - err = client.Call("bbb.Add", args, &reply) - if err != nil { - log.Fatal("arith error:", err) - } - fmt.Printf("Result: %d+%d=%d", args.X, args.Y, reply) -} diff --git a/plugins/sensor/sensors.go b/plugins/sensor/sensors.go deleted file mode 100644 index 74e2319..0000000 --- a/plugins/sensor/sensors.go +++ /dev/null @@ -1,5 +0,0 @@ -package sensor - -func Weather() {} -func StockIndices() {} -func SunActivity() {} diff --git a/resource/entry.go b/resource/entry.go index df6fbee..4142c49 100644 --- a/resource/entry.go +++ b/resource/entry.go @@ -87,7 +87,9 @@ func AddEntry(feedEntry Entry, FeedId string, ApplicationId string, OrgId string // notify d, _ := json.Marshal(feedEntry) - room.Publish <- room.NewFeedEvent(room.FEED_ENTRY_NEW, feed.Id, string(d)) + // SHOULD CREATE AND TRIGGER EVENT VIA SYSTEM EVENT MANAGER + // STREAM SERVICE SHOUD LISTEN FOR IT AND STREAM TO CONNECTED CLIENTS + room.FeedRoom.Publish <- room.NewFeedEvent(room.FEED_ENTRY_NEW, feed.Id, string(d)) return feedEntry.Id, nil } @@ -104,7 +106,9 @@ func UpdateEntry(id string, FeedId string, ApplicationId string, OrgId string, d // notify d, _ := json.Marshal(entry) - room.Publish <- room.NewEntryEvent(room.ENTRY_UPDATE, entry.Id, string(d)) + // SHOULD CREATE AND TRIGGER EVENT VIA SYSTEM EVENT MANAGER + // STREAM SERVICE SHOUD LISTEN FOR IT AND STREAM TO CONNECTED CLIENTS + room.FeedRoom.Publish <- room.NewEntryEvent(room.ENTRY_UPDATE, entry.Id, string(d)) _id, _ := strconv.Atoi(entry.Id) return storage.SetPropertyNode(_id, "data", data) @@ -126,7 +130,9 @@ func DeleteEntry(id string, FeedId string, ApplicationId string, OrgId string) ( // notify d, _ := json.Marshal(entry) - room.Publish <- room.NewEntryEvent(room.ENTRY_DELETE, entry.Id, string(d)) + // SHOULD CREATE AND TRIGGER EVENT VIA SYSTEM EVENT MANAGER + // STREAM SERVICE SHOUD LISTEN FOR IT AND STREAM TO CONNECTED CLIENTS + room.FeedRoom.Publish <- room.NewEntryEvent(room.ENTRY_DELETE, entry.Id, string(d)) return storage.DeleteNode(_id) } diff --git a/resource/feed.go b/resource/feed.go index 46f5d00..e04e45f 100644 --- a/resource/feed.go +++ b/resource/feed.go @@ -124,11 +124,15 @@ func DeleteFeed(id string) (error) { } func ActionReloadFeed(id string) { - room.Publish <- room.NewFeedEvent(room.FEED_RELOAD, id, "reload") + // SHOULD CREATE AND TRIGGER EVENT VIA SYSTEM EVENT MANAGER + // STREAM SERVICE SHOUD LISTEN FOR IT AND STREAM TO CONNECTED CLIENTS + room.FeedRoom.Publish <- room.NewFeedEvent(room.FEED_RELOAD, id, "reload") } func ActionEmptyFeed(id string) { - room.Publish <- room.NewFeedEvent(room.FEED_EMPTY, id, "empty") + // SHOULD CREATE AND TRIGGER EVENT VIA SYSTEM EVENT MANAGER + // STREAM SERVICE SHOUD LISTEN FOR IT AND STREAM TO CONNECTED CLIENTS + room.FeedRoom.Publish <- room.NewFeedEvent(room.FEED_EMPTY, id, "empty") } func NewFeed(id string, app *Application, data string, entries int, workflows int) *Feed { diff --git a/resource/manager.go b/resource/manager.go index 236ef2f..3c55a6f 100644 --- a/resource/manager.go +++ b/resource/manager.go @@ -1,13 +1,24 @@ package resource -type ResourceManager struct {} +import ( + emodel "github.com/feedlabs/elasticfeed/elasticfeed/model" + "github.com/feedlabs/elasticfeed/service/stream" +) -func (this * ResourceManager) Init() { - InitStorage() - InitResources() - InitStreamCommunicator() +type ResourceManager struct { + engine emodel.Elasticfeed } -func NewResourceManager() *ResourceManager { - return &ResourceManager{} +func (this * ResourceManager) Init() {} + +func (this * ResourceManager) GetStreamService() *stream.StreamService { + return this.GetEngine().GetServiceManager().GetStreamService() +} + +func (this * ResourceManager) GetEngine() emodel.Elasticfeed { + return this.engine +} + +func NewResourceManager(engine emodel.Elasticfeed) emodel.ResourceManager { + return &ResourceManager{engine} } diff --git a/resource/resource.go b/resource/resource.go index 327351c..ed6db86 100644 --- a/resource/resource.go +++ b/resource/resource.go @@ -1,25 +1,11 @@ package resource -/* - TO DO - - - resource should trigger system event via EventManager - */ - - import ( "errors" - "encoding/json" - "time" - "math/rand" "github.com/feedlabs/feedify/service" "github.com/feedlabs/feedify/graph" "github.com/feedlabs/feedify/stream" - - "github.com/feedlabs/elasticfeed/service/stream/controller/room" - "github.com/feedlabs/elasticfeed/service/stream/model" - "github.com/feedlabs/elasticfeed/plugins/pipeline" ) const ( @@ -36,16 +22,16 @@ const ( ) var ( - Orgs map[string]*Org + Orgs map[string]*Org Admins map[string]*Admin Tokens map[string]*Token - Applications map[string]*Application - Feeds map[string]*Feed - Entries map[string]*Entry - Metrics map[string]*Metric - Viewers map[string]*Viewer - Workflows map[string]*Workflow - Plugins map[string]*Plugin + Applications map[string]*Application + Feeds map[string]*Feed + Entries map[string]*Entry + Metrics map[string]*Metric + Viewers map[string]*Viewer + Workflows map[string]*Workflow + Plugins map[string]*Plugin message *stream.StreamMessage storage *graph.GraphStorage @@ -121,102 +107,6 @@ type Plugin struct { License string } -func ResourceStreamManager() { - for { - select { - case socketEvent := <-room.ResourceEvent: - - go ResourceStreamRequest(socketEvent) - } - } -} - -func ResourceStreamRequest(socketEvent model.SocketEvent) { - - // ******************************************************************* - // here should be implemented REAL CONTENT IMPROVEMENT - // based on connected user (viewer) or users (audience)!: habits, behaviours, stats etc. - // PIPE: filtering, customization - // SCENARIO-ENGINE: scenarios - // ******************************************************************* - - // ******************************************************************* - // SCENARIO AND RULES/METRICS - // should use go routine with time limit to query filter rules - // if in specific time there is no rules the results should be sent - // client feed. After this the next package should be sent with - // rules which entries should be remove/hidden from the view! - // ******************************************************************* - - timeout := make(chan bool, 1) - results := make(chan []*Entry, 1) - - list, _ := GetEntryList(socketEvent.FeedId, socketEvent.AppId, socketEvent.OrgId) - - // PIPE TIMEOUT - go func() { - amt := time.Duration(rand.Intn(100)) - time.Sleep(amt * time.Millisecond) - timeout <- true - }() - - // SHOULD BE A FILTER IMPLEMENTATION - go func(list []*Entry, socketEvent model.SocketEvent) { - list = pipeline.Filter(list).([]*Entry) - results <- list - }(list, socketEvent) - - select { - - // IF PIPE TAKES TOO MUCH TIME, DATA DELAYED - case <-timeout: - - event := room.NewFeedEvent(room.FEED_ENTRY_NEW, socketEvent.FeedId, "{Content:\"tiemout\"}") - data, _ := json.Marshal(event) - - if socketEvent.Ws != nil { - amt := time.Duration(rand.Intn(500)) * 1000 - time.Sleep(amt * time.Microsecond) - socketEvent.Ws.WriteMessage(1, data) - } - - if socketEvent.Ch != nil { - socketEvent.Ch <- data - } - - // IF DATA ARRIVES WITHOUT DELAY - case list := <-results: - - // ********************************************************************* - // register socket handler - // needs to send notiffication to long pooling + ws - // join should generate uniqe ID and client should use it - // maybe sessionID could be as uniqeID ? - // room.FeedSubscribers[socketEvent.FeedId][channelID] = socketEvent - // ********************************************************************* - // - // timeout with channels and routines? - // http://blog.golang.org/go-concurrency-patterns-timing-out-and - - // amt := time.Duration(rand.Intn(500)) * 10000 - // time.Sleep(amt * time.Microsecond) - - d, _ := json.Marshal(list) - event := room.NewFeedEvent(room.FEED_ENTRY_INIT, socketEvent.FeedId, string(d)) - data, _ := json.Marshal(event) - - if socketEvent.Ws != nil { - socketEvent.Ws.WriteMessage(1, data) - } - - if socketEvent.Ch != nil { - socketEvent.Ch <- data - } - - } - -} - func Contains(s []string, e string) bool { for _, a := range s { if a == e { return true } } return false @@ -251,6 +141,7 @@ func InitStorage() { storage = graph_service.Storage } -func InitStreamCommunicator() { - go ResourceStreamManager() +func init() { + InitStorage() + InitResources() } diff --git a/service/service.go b/service/service.go index 02da517..582d5dd 100644 --- a/service/service.go +++ b/service/service.go @@ -4,14 +4,18 @@ import ( "github.com/feedlabs/elasticfeed/service/store" "github.com/feedlabs/elasticfeed/service/stream" "github.com/feedlabs/elasticfeed/service/system" + + "github.com/feedlabs/elasticfeed/elasticfeed/model" ) type Service struct {} type ServiceManager struct { - store *store.DbService - stream *stream.StreamService - system *system.SystemService + engine model.Elasticfeed + + store *store.DbService + stream *stream.StreamService + system *system.SystemService } func (this *ServiceManager) Init() { @@ -32,6 +36,11 @@ func (this *ServiceManager) GetSystemService() *system.SystemService { return this.system } -func NewServiceManager() *ServiceManager { - return &ServiceManager{} +func NewServiceManager(engine model.Elasticfeed) *ServiceManager { + + store := store.NewDbService() + stream := stream.NewStreamService() + system := system.NewSystemService() + + return &ServiceManager{engine, store, stream, system} } diff --git a/service/store/store.go b/service/store/store.go index ff25023..17acda3 100644 --- a/service/store/store.go +++ b/service/store/store.go @@ -12,6 +12,6 @@ func (this *DbService) Init() { controller.InitService() } -func NewMetricService() *DbService { +func NewDbService() *DbService { return &DbService{} } diff --git a/service/stream/controller/channel/default.go b/service/stream/controller/channel/default.go new file mode 100644 index 0000000..3b7d420 --- /dev/null +++ b/service/stream/controller/channel/default.go @@ -0,0 +1,9 @@ +package channel + +import ( + "github.com/feedlabs/feedify" +) + +type DefaultController struct { + feedify.Controller +} diff --git a/service/stream/controller/channel/long_pooling.go b/service/stream/controller/channel/long_pooling.go index 0ea3a4f..a0ffbb5 100644 --- a/service/stream/controller/channel/long_pooling.go +++ b/service/stream/controller/channel/long_pooling.go @@ -1,14 +1,12 @@ package channel import ( - "github.com/feedlabs/feedify" - "github.com/feedlabs/elasticfeed/service/stream/model" "github.com/feedlabs/elasticfeed/service/stream/controller/room" ) type LongPollingController struct { - feedify.Controller + DefaultController } func (this *LongPollingController) Join() { @@ -22,7 +20,7 @@ func (this *LongPollingController) Join() { sess := room.GlobalSessions.SessionStart(w, r) defer sess.SessionRelease(w) - room.Join(chid, nil) + room.FeedRoom.Join(chid, nil) list := make(map[string]interface {}) list["response"] = room.NewChannelEvent(room.CHANNEL_JOIN, "system", "join") @@ -47,7 +45,7 @@ func (this *LongPollingController) Post() { ch := make(chan []byte) - room.ResourceEvent <- room.NewSocketEvent([]byte(data), nil, ch) + room.FeedRoom.ResourceEvent <- room.NewSocketEvent([]byte(data), nil, ch) response := <-ch @@ -73,7 +71,7 @@ func (this *LongPollingController) Fetch() { // Wait for new message(s). ch := make(chan bool) - room.WaitingList.PushBack(ch) + room.FeedRoom.WaitingList.PushBack(ch) <-ch this.Data["json"] = model.GetEvents(int(lastReceived)) diff --git a/service/stream/controller/channel/sse.go b/service/stream/controller/channel/sse.go index 17f57a8..7b9cb0d 100644 --- a/service/stream/controller/channel/sse.go +++ b/service/stream/controller/channel/sse.go @@ -1,13 +1,11 @@ package channel import ( - "github.com/feedlabs/feedify" - // "github.com/mroth/sseserver" ) type SSEController struct { - feedify.Controller + DefaultController } func (this *SSEController) Join() { diff --git a/service/stream/controller/channel/websocket.go b/service/stream/controller/channel/websocket.go index c23ab62..f0e070f 100644 --- a/service/stream/controller/channel/websocket.go +++ b/service/stream/controller/channel/websocket.go @@ -11,7 +11,7 @@ import ( ) type WebSocketController struct { - feedify.Controller + DefaultController } func (this *WebSocketController) Join() { @@ -34,8 +34,8 @@ func (this *WebSocketController) Join() { return } - room.Join(chid, ws) - defer room.Leave(chid) + room.FeedRoom.Join(chid, ws) + defer room.FeedRoom.Leave(chid) data, _ := json.Marshal(room.NewChannelEvent(room.CHANNEL_JOIN, chid, "join")) ws.WriteMessage(websocket.TextMessage, data) @@ -46,6 +46,6 @@ func (this *WebSocketController) Join() { return } - room.ResourceEvent <- room.NewSocketEvent(p, ws, nil) + room.FeedRoom.ResourceEvent <- room.NewSocketEvent(p, ws, nil) } } diff --git a/service/stream/controller/controller.go b/service/stream/controller/controller.go index 533e71a..5a0018f 100644 --- a/service/stream/controller/controller.go +++ b/service/stream/controller/controller.go @@ -4,6 +4,6 @@ import ( "github.com/feedlabs/elasticfeed/service/stream/controller/room" ) -func InitRooms() { - room.InitFeedRoom() +func InitSession() { + room.InitSessionManager() } diff --git a/service/stream/controller/room/feed.go b/service/stream/controller/room/feed.go index c839d4b..c3015b8 100644 --- a/service/stream/controller/room/feed.go +++ b/service/stream/controller/room/feed.go @@ -37,16 +37,7 @@ const ( ) var ( - Subscribe = make(chan Subscriber, 10) - Unsubscribe = make(chan string, 10) - Publish = make(chan model.Event, 10) - ResourceEvent = make(chan model.SocketEvent, 10) - - WaitingList = list.New() - Subscribers = list.New() - - FeedSubscribers = make(map[string]interface{}) - + FeedRoom *FeedRoomManager GlobalSessions *session.Manager ) @@ -70,7 +61,7 @@ func NewSocketEvent(msg []byte, ws *websocket.Conn, ch chan []byte) model.Socket json.Unmarshal(msg, &data) - return model.SocketEvent{ws, ch, data["feedId"].(string), data["appId"].(string), data["orgId"].(string)} + return model.SocketEvent{ws, ch, 4, data["feedId"].(string), data["appId"].(string), data["orgId"].(string)} } func NewChannelEvent(ep model.EventType, user, msg string) model.Event { @@ -102,78 +93,103 @@ func NewEntryEvent(ep model.EventType, user, msg string) model.Event { return NewFeedEvent(FEED_ENTRY_MESSAGE, "*", string(data)) } -func Join(user string, ws *websocket.Conn) { - Subscribe <- Subscriber{Name: user, Conn: ws} +type FeedRoomManager struct { + Subscribe chan Subscriber + Unsubscribe chan string + Publish chan model.Event + + ResourceEvent chan model.SocketEvent + + WaitingList *list.List + Subscribers *list.List +} + +func (this *FeedRoomManager) Join(user string, ws *websocket.Conn) { + this.Subscribe <- Subscriber{Name: user, Conn: ws} } -func Leave(user string) { - Unsubscribe <- user +func (this *FeedRoomManager) Leave(user string) { + this.Unsubscribe <- user } -func FeedManager() { - for { - select { +func (this *FeedRoomManager) Run() { + go func() { + for { + select { - case sub := <-Subscribe: - Subscribers.PushBack(sub) + case sub := <-this.Subscribe: + this.Subscribers.PushBack(sub) - case event := <-Publish: + case event := <-this.Publish: - // here must be handled where to send notification - // - or to all sockets - // - or to specific client/feed (single socket) - // - or to public feed (multiple sockets) - // - // could be setup by resource manager go routine - // room.FeedSubscribers[socketEvent.FeedId][channelID] = socketEvent + // here must be handled where to send notification + // - or to all sockets + // - or to specific client/feed (single socket) + // - or to public feed (multiple sockets) + // + // could be setup by resource manager go routine + // room.FeedSubscribers[socketEvent.FeedId][channelID] = socketEvent - model.NewArchive(event) + model.NewArchive(event) - for ch := WaitingList.Back(); ch != nil; ch = ch.Prev() { - ch.Value.(chan bool) <- true - WaitingList.Remove(ch) - } + for ch := this.WaitingList.Back(); ch != nil; ch = ch.Prev() { + ch.Value.(chan bool) <- true + this.WaitingList.Remove(ch) + } - broadcastWebSocket(event) + this.BroadcastWebSocket(event) - case unsub := <-Unsubscribe: - for sub := Subscribers.Front(); sub != nil; sub = sub.Next() { - if sub.Value.(Subscriber).Name == unsub { - Subscribers.Remove(sub) + case unsub := <-this.Unsubscribe: + for sub := this.Subscribers.Front(); sub != nil; sub = sub.Next() { + if sub.Value.(Subscriber).Name == unsub { + this.Subscribers.Remove(sub) - ws := sub.Value.(Subscriber).Conn - if ws != nil { - ws.Close() - feedify.Error("WebSocket closed:", unsub) + ws := sub.Value.(Subscriber).Conn + if ws != nil { + ws.Close() + feedify.Error("WebSocket closed:", unsub) + } + this.Publish <- NewChannelEvent(CHANNEL_LEAVE, unsub, "") + break } - Publish <- NewChannelEvent(CHANNEL_LEAVE, unsub, "") - break } } } - } + }() } -func broadcastWebSocket(event model.Event) { +func (this *FeedRoomManager) BroadcastWebSocket(event model.Event) { data, err := json.Marshal(event) if err != nil { feedify.Error("Fail to marshal event:", err) return } - for sub := Subscribers.Front(); sub != nil; sub = sub.Next() { + for sub := this.Subscribers.Front(); sub != nil; sub = sub.Next() { ws := sub.Value.(Subscriber).Conn if ws != nil { if ws.WriteMessage(websocket.TextMessage, data) != nil { - Unsubscribe <- sub.Value.(Subscriber).Name + this.Unsubscribe <- sub.Value.(Subscriber).Name } } } } -func InitFeedRoom() { - GlobalSessions, _ = session.NewManager("memory", `{"cookieName":"elasticfeedsessid","gclifetime":3600}`) +func NewFeedRoomManager() *FeedRoomManager { + subscribe := make(chan Subscriber, 10) + unsubscribe := make(chan string, 10) + publish := make(chan model.Event, 10) + resourceEvent := make(chan model.SocketEvent, 10) + + waitingList := list.New() + subscribers := list.New() + FeedRoom = &FeedRoomManager{subscribe, unsubscribe, publish, resourceEvent, waitingList, subscribers} + + return FeedRoom +} + +func InitSessionManager() { + GlobalSessions, _ = session.NewManager("memory", `{"cookieName":"elasticfeedsessid","gclifetime":3600}`) go GlobalSessions.GC() - go FeedManager() } diff --git a/service/stream/model/event.go b/service/stream/model/event.go index d3312c9..aac46b5 100644 --- a/service/stream/model/event.go +++ b/service/stream/model/event.go @@ -19,6 +19,9 @@ type Event struct { type SocketEvent struct { Ws *websocket.Conn Ch chan []byte + + ActionId int + FeedId string AppId string OrgId string diff --git a/service/stream/stream.go b/service/stream/stream.go index 56469c0..214b10b 100644 --- a/service/stream/stream.go +++ b/service/stream/stream.go @@ -3,15 +3,34 @@ package stream import ( "github.com/feedlabs/elasticfeed/service/stream/router" "github.com/feedlabs/elasticfeed/service/stream/controller" + + "github.com/feedlabs/elasticfeed/service/stream/controller/room" ) -type StreamService struct {} +type StreamService struct { + feedRoomManager *room.FeedRoomManager +} + +func (this *StreamService) GetFeedRoomManager() *room.FeedRoomManager { + return this.feedRoomManager +} + +func (this *StreamService) InitRooms() { +} func (this *StreamService) Init() { + // should pass controller from here + // should not creates new one in InitRouters router.InitRouters() - controller.InitRooms() + + // should pass Feed Room for controllers to have access to CHANNELS + controller.InitSession() + + this.GetFeedRoomManager().Run() } -func NewMetricService() *StreamService { - return &StreamService{} +func NewStreamService() *StreamService { + frm := room.NewFeedRoomManager() + + return &StreamService{frm} } diff --git a/service/system/system.go b/service/system/system.go index 1087cb6..a847824 100644 --- a/service/system/system.go +++ b/service/system/system.go @@ -11,6 +11,6 @@ func (this *SystemService) Init() { router.InitPluginRouters() } -func NewMetricService() *SystemService { +func NewSystemService() *SystemService { return &SystemService{} } diff --git a/service/system/v1/controller/status.go b/service/system/v1/controller/status.go index 582e939..c806bda 100644 --- a/service/system/v1/controller/status.go +++ b/service/system/v1/controller/status.go @@ -35,8 +35,8 @@ func (this *StatusController) Get() { "mem_sys": strconv.Itoa(int(memstats.Sys)), }, "stream": map[string]interface{} { - "subscribers": strconv.Itoa(room.Subscribers.Len()), - "waitinglist": strconv.Itoa(room.WaitingList.Len()), + "subscribers": strconv.Itoa(room.FeedRoom.Subscribers.Len()), + "waitinglist": strconv.Itoa(room.FeedRoom.WaitingList.Len()), "archived_queue": strconv.Itoa(model.Archive.Len()), }, } diff --git a/plugin/config_template.go b/workflow/config_template.go similarity index 99% rename from plugin/config_template.go rename to workflow/config_template.go index 31860e3..2f5a194 100644 --- a/plugin/config_template.go +++ b/workflow/config_template.go @@ -1,4 +1,4 @@ -package plugin +package workflow import ( "bytes" diff --git a/workflow/hook.go b/workflow/hook.go deleted file mode 100644 index f2afc6f..0000000 --- a/workflow/hook.go +++ /dev/null @@ -1,3 +0,0 @@ -package workflow - -type Hook struct {} diff --git a/workflow/manager.go b/workflow/manager.go index cf3cdd1..62dce53 100644 --- a/workflow/manager.go +++ b/workflow/manager.go @@ -1,19 +1,73 @@ package workflow import ( - "github.com/feedlabs/elasticfeed/plugin" - "github.com/feedlabs/elasticfeed/event" + "encoding/json" + "time" + "math/rand" + + "github.com/feedlabs/elasticfeed/elasticfeed/model" "github.com/feedlabs/elasticfeed/resource" + + emodel "github.com/feedlabs/elasticfeed/elasticfeed/model" + smodel "github.com/feedlabs/elasticfeed/service/stream/model" + pmodel "github.com/feedlabs/elasticfeed/plugin/model" + + "github.com/feedlabs/elasticfeed/service/stream/controller/room" + "github.com/feedlabs/elasticfeed/service/stream" +) + + +var ( + pluginManagerAnn pmodel.Pipeline + entryListCache map[string][]*resource.Entry ) type WorkflowManager struct { - pManager *plugin.PluginManager - eManager *event.EventManager + engine emodel.Elasticfeed + + pManager model.PluginManager + eManager model.EventManager workflows []*WorkflowController template interface{} } +func (this *WorkflowManager) Init() { + this.BindServiceEvents() +} + +/** + MAYBE COULD BIND TO "SYSTEM EVENT MANAGER" + - COULD BIND TO RESOURCE EVENTS: NEW ENTRY, NEW METRIC, NEW VIEWER + - COULD BIND TO CRON JOBS: FEED MAINTAINER, SENSORS UPDATE + + ** OVERALL THE WORKFLOW MANAGER COULD BIND TO EVENTS AND ALSO CREATE OWN ALARMS/INTERRUPTS EVENTS + */ + +/** + - IMPLEMENT EVENTS TRIGGERS + + - IMPLEMENT STREAM SERVICE EVENT/HOOKS BINDING (LISTEN TO EVENTS AND HOOKS ON STREAM SERVICE) + - IMPLEMENT STORE SERVICE EVENT/HOOKS BINDING (SHOULD BE DONE BY "SYSTEM EVENTS MANAGER") + + - IMPLEMENT LOCAL CRON JOB FOR ("SYSTEM EVENTS MANAGER") + - SENSOR REFRESH EVENT + - FEED MAINTAINER EVENT + + - IMPLEMENT RESOURCE API WHICH + - CAN BE PASSED TO PLUGINS + - CAN PROVIDE/CREATE DATA + + */ + +func (this *WorkflowManager) GetStreamService() *stream.StreamService { + return this.GetEngine().GetServiceManager().GetStreamService() +} + +func (this *WorkflowManager) GetEngine() emodel.Elasticfeed { + return this.engine +} + func (this *WorkflowManager) InitTemplate(t interface{}) { // verify event availability into EventsManger // verify hooks workflows @@ -27,13 +81,151 @@ func (this *WorkflowManager) CreateFeedWorkflow(feed *resource.Feed) *WorkflowCo return w } -func NewWorkflowManager(tpl interface{}, pm *plugin.PluginManager, em *event.EventManager) *WorkflowManager { - // load template if not passed - if tpl == nil { - tpl = make(map[string]interface {}) +func (this *WorkflowManager) BindServiceEvents() { + // should bind service-stream-controllers to get handler to channel + // should pass it down to listen for events on streaming controllers + go this.BindStreamServiceEvents() +} + +func (this *WorkflowManager) BindStreamServiceEvents() { + + for { + select { + case socketEvent := <-this.GetStreamService().GetFeedRoomManager().ResourceEvent: + + action := socketEvent.ActionId + + switch { + case action == room.FEED_ENTRY_INIT || action == room.FEED_ENTRY_MORE: + go this.ResourcePipelineRound(socketEvent) + } + + } } +} + +func (this *WorkflowManager) ResourcePipelineRound(socketEvent smodel.SocketEvent) { + + // will run WorkflowManager with Pipeline plugins + + // ******************************************************************* + // REAL CONTENT IMPROVEMENT + // based on connected user (viewer) or users (audience)!: habits, behaviours, stats etc. + // WORKFLOW PIPE: filtering, customization + // WORKFLOW SCENARIO-ENGINE: scenarios SHOULD BE IMPLEMENTED ON METRIC SERVICE + // ******************************************************************* + + // ******************************************************************* + // SCENARIO AND RULES/METRICS + // should use go routine with time limit to query filter rules + // if in specific time there is no rules the results should be sent + // client feed. After this the next package should be sent with + // rules which entries should be remove/hidden from the view! + // ******************************************************************* + + timeout := make(chan bool, 1) + results := make(chan []*resource.Entry, 1) + + // COLLECTING ENTRIES + if entryListCache == nil { + entryListCache = make(map[string][]*resource.Entry) + } + + if entryListCache[socketEvent.FeedId] == nil { + entryListCache[socketEvent.FeedId], _ = resource.GetEntryList(socketEvent.FeedId, socketEvent.AppId, socketEvent.OrgId) + } + + // WORKFLOW TIMEOUT + // !! SHOULD BE CONFIGURABLE OVER RUNTIME SETTING + // !! DEFAULT VALUE SHOULD BE IN CONFIG FILE + go func() { + amt := time.Duration(100) + time.Sleep(amt * time.Millisecond) + timeout <- true + }() + + // WORKFLOW PIPELINE + go func(list []*resource.Entry, socketEvent smodel.SocketEvent) { + + if pluginManagerAnn == nil { + pluginManagerAnn, _ = this.engine.GetPluginManager().LoadPipeline("ann") + pluginManagerAnn.Prepare() + } + + newList, _ := pluginManagerAnn.Run(list) + + var newEntryList []*resource.Entry + + for _, v := range newList.([]interface{}) { + Id := "" + Data := "" + for k, vv := range v.(map[interface{}]interface{}) { + if k == "Id" { + Id = vv.(string) + } + if k == "Data" { + Data = vv.(string) + } + } + if Id != "" && Data != "" { + newEntryList = append(newEntryList, &resource.Entry{Id, nil, Data}) + } + } + + list = newEntryList + + results <- list + }(entryListCache[socketEvent.FeedId], socketEvent) + + select { + + // IF PIPE TAKES TOO MUCH TIME, DATA DELAYED + case <-timeout: + + event := room.NewFeedEvent(room.FEED_ENTRY_NEW, socketEvent.FeedId, "{Content:\"tiemout\"}") + data, _ := json.Marshal(event) + + if socketEvent.Ws != nil { + amt := time.Duration(rand.Intn(500)) * 1000 + time.Sleep(amt * time.Microsecond) + socketEvent.Ws.WriteMessage(1, data) + } + + if socketEvent.Ch != nil { + socketEvent.Ch <- data + } + + // IF DATA ARRIVES WITHOUT DELAY + case list := <-results: + + // ********************************************************************* + // register socket handler + // needs to send notiffication to long pooling + ws + // join should generate uniqe ID and client should use it + // maybe sessionID could be as uniqeID ? + // room.FeedSubscribers[socketEvent.FeedId][channelID] = socketEvent + // ********************************************************************* + + d, _ := json.Marshal(list) + event := room.NewFeedEvent(room.FEED_ENTRY_INIT, socketEvent.FeedId, string(d)) + data, _ := json.Marshal(event) + + if socketEvent.Ws != nil { + socketEvent.Ws.WriteMessage(1, data) + } + + if socketEvent.Ch != nil { + socketEvent.Ch <- data + } + + } + +} + +func NewWorkflowManager(engine emodel.Elasticfeed) *WorkflowManager { + tpl := engine.GetConfig() - wm := &WorkflowManager{pm, em, nil, nil} + wm := &WorkflowManager{engine, engine.GetPluginManager(), engine.GetEventManager(), nil, nil} wm.InitTemplate(tpl) return wm diff --git a/workflow/multi_error.go b/workflow/multi_error.go new file mode 100644 index 0000000..86896e6 --- /dev/null +++ b/workflow/multi_error.go @@ -0,0 +1,50 @@ +package workflow + +import ( + "fmt" + "strings" +) + +// MultiError is an error type to track multiple errors. This is used to +// accumulate errors in cases such as configuration parsing, and returning +// them as a single error. +type MultiError struct { + Errors []error +} + +func (e *MultiError) Error() string { + points := make([]string, len(e.Errors)) + for i, err := range e.Errors { + points[i] = fmt.Sprintf("* %s", err) + } + + return fmt.Sprintf( + "%d error(s) occurred:\n\n%s", + len(e.Errors), strings.Join(points, "\n")) +} + +// MultiErrorAppend is a helper function that will append more errors +// onto a MultiError in order to create a larger multi-error. If the +// original error is not a MultiError, it will be turned into one. +func MultiErrorAppend(err error, errs ...error) *MultiError { + if err == nil { + err = new(MultiError) + } + + switch err := err.(type) { + case *MultiError: + if err == nil { + err = new(MultiError) + } + + err.Errors = append(err.Errors, errs...) + return err + default: + newErrs := make([]error, len(errs)+1) + newErrs[0] = err + copy(newErrs[1:], errs) + return &MultiError{ + Errors: newErrs, + } + } +} diff --git a/workflow/template.go b/workflow/template.go new file mode 100644 index 0000000..d59bb24 --- /dev/null +++ b/workflow/template.go @@ -0,0 +1,744 @@ +package workflow + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "os" + "sort" + "text/template" + "time" + + "github.com/hashicorp/go-version" + "github.com/mitchellh/mapstructure" + jsonutil "github.com/mitchellh/packer/common/json" +) + +// !!!! wrong place for debuging only +type ComponentFinder struct { +// Builder BuilderFunc +// Hook HookFunc +// PostProcessor PostProcessorFunc +// Provisioner ProvisionerFunc +} + +// The rawTemplate struct represents the structure of a template read +// directly from a file. The builders and other components map just to +// "interface{}" pointers since we actually don't know what their contents +// are until we read the "type" field. +type rawTemplate struct { + MinimumPackerVersion string `mapstructure:"min_packer_version"` + + Description string + Builders []map[string]interface{} + Hooks map[string][]string + Push PushConfig + PostProcessors []interface{} `mapstructure:"post-processors"` + Provisioners []map[string]interface{} + Variables map[string]interface{} +} + +// The Template struct represents a parsed template, parsed into the most +// completed form it can be without additional processing by the caller. +type Template struct { + RawContents []byte + Description string + Variables map[string]RawVariable + Builders map[string]RawBuilderConfig + Hooks map[string][]string + Push *PushConfig + PostProcessors [][]RawPostProcessorConfig + Provisioners []RawProvisionerConfig +} + +// PushConfig is the configuration structure for the push settings. +type PushConfig struct { + Name string + Address string + BaseDir string `mapstructure:"base_dir"` + Include []string + Exclude []string + Token string + VCS bool +} + +// The RawBuilderConfig struct represents a raw, unprocessed builder +// configuration. It contains the name of the builder as well as the +// raw configuration. If requested, this is used to compile into a full +// builder configuration at some point. +type RawBuilderConfig struct { + Name string + Type string + + RawConfig interface{} +} + +// RawPostProcessorConfig represents a raw, unprocessed post-processor +// configuration. It contains the type of the post processor as well as the +// raw configuration that is handed to the post-processor for it to process. +type RawPostProcessorConfig struct { + TemplateOnlyExcept `mapstructure:",squash"` + + Type string + KeepInputArtifact bool `mapstructure:"keep_input_artifact"` + RawConfig map[string]interface{} +} + +// RawProvisionerConfig represents a raw, unprocessed provisioner configuration. +// It contains the type of the provisioner as well as the raw configuration +// that is handed to the provisioner for it to process. +type RawProvisionerConfig struct { + TemplateOnlyExcept `mapstructure:",squash"` + + Type string + Override map[string]interface{} + RawPauseBefore string `mapstructure:"pause_before"` + + RawConfig interface{} + + pauseBefore time.Duration +} + +// RawVariable represents a variable configuration within a template. +type RawVariable struct { + Default string // The default value for this variable + Required bool // If the variable is required or not + Value string // The set value for this variable + HasValue bool // True if the value was set +} + +// ParseTemplate takes a byte slice and parses a Template from it, returning +// the template and possibly errors while loading the template. The error +// could potentially be a MultiError, representing multiple errors. Knowing +// and checking for this can be useful, if you wish to format it in a certain +// way. +// +// The second parameter, vars, are the values for a set of user variables. +func ParseTemplate(data []byte, vars map[string]string) (t *Template, err error) { + var rawTplInterface interface{} + err = jsonutil.Unmarshal(data, &rawTplInterface) + if err != nil { + return + } + + // Decode the raw template interface into the actual rawTemplate + // structure, checking for any extranneous keys along the way. + var md mapstructure.Metadata + var rawTpl rawTemplate + decoderConfig := &mapstructure.DecoderConfig{ + Metadata: &md, + Result: &rawTpl, + } + + decoder, err := mapstructure.NewDecoder(decoderConfig) + if err != nil { + return + } + + err = decoder.Decode(rawTplInterface) + if err != nil { + return + } + + if rawTpl.MinimumPackerVersion != "" { + // TODO: NOPE! Replace this + Version := "1.0" + vCur, err := version.NewVersion(Version) + if err != nil { + panic(err) + } + vReq, err := version.NewVersion(rawTpl.MinimumPackerVersion) + if err != nil { + return nil, fmt.Errorf( + "'minimum_packer_version' error: %s", err) + } + + if vCur.LessThan(vReq) { + return nil, fmt.Errorf( + "Template requires Packer version %s. "+ + "Running version is %s.", + vReq, vCur) + } + } + + errors := make([]error, 0) + + if len(md.Unused) > 0 { + sort.Strings(md.Unused) + for _, unused := range md.Unused { + errors = append( + errors, fmt.Errorf("Unknown root level key in template: '%s'", unused)) + } + } + + t = &Template{} + t.RawContents = data + t.Description = rawTpl.Description + t.Variables = make(map[string]RawVariable) + t.Builders = make(map[string]RawBuilderConfig) + t.Hooks = rawTpl.Hooks + t.Push = &rawTpl.Push + t.PostProcessors = make([][]RawPostProcessorConfig, len(rawTpl.PostProcessors)) + t.Provisioners = make([]RawProvisionerConfig, len(rawTpl.Provisioners)) + + // Gather all the variables + for k, v := range rawTpl.Variables { + var variable RawVariable + variable.Required = v == nil + + // Create a new mapstructure decoder in order to decode the default + // value since this is the only value in the regular template that + // can be weakly typed. + decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + Result: &variable.Default, + WeaklyTypedInput: true, + }) + if err != nil { + // This should never happen. + panic(err) + } + + err = decoder.Decode(v) + if err != nil { + errors = append(errors, + fmt.Errorf("Error decoding default value for user var '%s': %s", k, err)) + continue + } + + // Set the value of this variable if we have it + if val, ok := vars[k]; ok { + variable.HasValue = true + variable.Value = val + delete(vars, k) + } + + t.Variables[k] = variable + } + + // Gather all the builders + for i, v := range rawTpl.Builders { + var raw RawBuilderConfig + if err := mapstructure.Decode(v, &raw); err != nil { + if merr, ok := err.(*mapstructure.Error); ok { + for _, err := range merr.Errors { + errors = append(errors, fmt.Errorf("builder %d: %s", i+1, err)) + } + } else { + errors = append(errors, fmt.Errorf("builder %d: %s", i+1, err)) + } + + continue + } + + if raw.Type == "" { + errors = append(errors, fmt.Errorf("builder %d: missing 'type'", i+1)) + continue + } + + // Attempt to get the name of the builder. If the "name" key + // missing, use the "type" field, which is guaranteed to exist + // at this point. + if raw.Name == "" { + raw.Name = raw.Type + } + + // Check if we already have a builder with this name and error if so + if _, ok := t.Builders[raw.Name]; ok { + errors = append(errors, fmt.Errorf("builder with name '%s' already exists", raw.Name)) + continue + } + + // Now that we have the name, remove it from the config - as the builder + // itself doesn't know about, and it will cause a validation error. + delete(v, "name") + + raw.RawConfig = v + + t.Builders[raw.Name] = raw + } + + // Gather all the post-processors. This is a complicated process since there + // are actually three different formats that the user can use to define + // a post-processor. + for i, rawV := range rawTpl.PostProcessors { + rawPP, err := parsePostProcessor(i, rawV) + if err != nil { + errors = append(errors, err...) + continue + } + + configs := make([]RawPostProcessorConfig, 0, len(rawPP)) + for j, pp := range rawPP { + var config RawPostProcessorConfig + if err := mapstructure.Decode(pp, &config); err != nil { + if merr, ok := err.(*mapstructure.Error); ok { + for _, err := range merr.Errors { + errors = append(errors, + fmt.Errorf("Post-processor #%d.%d: %s", i+1, j+1, err)) + } + } else { + errors = append(errors, + fmt.Errorf("Post-processor %d.%d: %s", i+1, j+1, err)) + } + + continue + } + + if config.Type == "" { + errors = append(errors, + fmt.Errorf("Post-processor %d.%d: missing 'type'", i+1, j+1)) + continue + } + + // Remove the input keep_input_artifact option + config.TemplateOnlyExcept.Prune(pp) + delete(pp, "keep_input_artifact") + + // Verify that the only settings are good + if errs := config.TemplateOnlyExcept.Validate(t.Builders); len(errs) > 0 { + for _, err := range errs { + errors = append(errors, + fmt.Errorf("Post-processor %d.%d: %s", i+1, j+1, err)) + } + + continue + } + + config.RawConfig = pp + + // Add it to the list of configs + configs = append(configs, config) + } + + t.PostProcessors[i] = configs + } + + // Gather all the provisioners + for i, v := range rawTpl.Provisioners { + raw := &t.Provisioners[i] + if err := mapstructure.Decode(v, raw); err != nil { + if merr, ok := err.(*mapstructure.Error); ok { + for _, err := range merr.Errors { + errors = append(errors, fmt.Errorf("provisioner %d: %s", i+1, err)) + } + } else { + errors = append(errors, fmt.Errorf("provisioner %d: %s", i+1, err)) + } + + continue + } + + if raw.Type == "" { + errors = append(errors, fmt.Errorf("provisioner %d: missing 'type'", i+1)) + continue + } + + // Delete the keys that we used + raw.TemplateOnlyExcept.Prune(v) + delete(v, "override") + + // Verify that the override keys exist... + for name, _ := range raw.Override { + if _, ok := t.Builders[name]; !ok { + errors = append( + errors, fmt.Errorf("provisioner %d: build '%s' not found for override", i+1, name)) + } + } + + // Verify that the only settings are good + if errs := raw.TemplateOnlyExcept.Validate(t.Builders); len(errs) > 0 { + for _, err := range errs { + errors = append(errors, + fmt.Errorf("provisioner %d: %s", i+1, err)) + } + } + + // Setup the pause settings + if raw.RawPauseBefore != "" { + duration, err := time.ParseDuration(raw.RawPauseBefore) + if err != nil { + errors = append( + errors, fmt.Errorf( + "provisioner %d: pause_before invalid: %s", + i+1, err)) + } + + raw.pauseBefore = duration + } + + // Remove the pause_before setting if it is there so that we don't + // get template validation errors later. + delete(v, "pause_before") + + raw.RawConfig = v + } + + if len(t.Builders) == 0 { + errors = append(errors, fmt.Errorf("No builders are defined in the template.")) + } + + // Verify that all the variable sets were for real variables. + for k, _ := range vars { + errors = append(errors, fmt.Errorf("Unknown user variables: %s", k)) + } + + // If there were errors, we put it into a MultiError and return + if len(errors) > 0 { + err = &MultiError{errors} + t = nil + return + } + + return +} + +// ParseTemplateFile takes the given template file and parses it into +// a single template. +func ParseTemplateFile(path string, vars map[string]string) (*Template, error) { + var data []byte + + if path == "-" { + // Read from stdin... + buf := new(bytes.Buffer) + _, err := io.Copy(buf, os.Stdin) + if err != nil { + return nil, err + } + + data = buf.Bytes() + } else { + var err error + data, err = ioutil.ReadFile(path) + if err != nil { + return nil, err + } + } + + return ParseTemplate(data, vars) +} + +func parsePostProcessor(i int, rawV interface{}) (result []map[string]interface{}, errors []error) { + switch v := rawV.(type) { + case string: + result = []map[string]interface{}{ + {"type": v}, + } + case map[string]interface{}: + result = []map[string]interface{}{v} + case []interface{}: + result = make([]map[string]interface{}, len(v)) + errors = make([]error, 0) + for j, innerRawV := range v { + switch innerV := innerRawV.(type) { + case string: + result[j] = map[string]interface{}{"type": innerV} + case map[string]interface{}: + result[j] = innerV + case []interface{}: + errors = append( + errors, + fmt.Errorf("Post-processor %d.%d: sequences not allowed to be nested in sequences", i+1, j+1)) + default: + errors = append(errors, fmt.Errorf("Post-processor %d.%d is in a bad format.", i+1, j+1)) + } + } + + if len(errors) == 0 { + errors = nil + } + default: + result = nil + errors = []error{fmt.Errorf("Post-processor %d is in a bad format.", i+1)} + } + + return +} + +// BuildNames returns a slice of the available names of builds that +// this template represents. +func (t *Template) BuildNames() []string { + names := make([]string, 0, len(t.Builders)) + for name, _ := range t.Builders { + names = append(names, name) + } + + return names +} + +// Build returns a Build for the given name. +// +// If the build does not exist as part of this template, an error is +// returned. +//func (t *Template) Build(name string, components *ComponentFinder) (b Build, err error) { +func (t *Template) Build(name string, components *ComponentFinder) (b interface {}, err error) { +// // Setup the Builder +// builderConfig, ok := t.Builders[name] +// if !ok { +// err = fmt.Errorf("No such build found in template: %s", name) +// return +// } +// +// // We panic if there is no builder function because this is really +// // an internal bug that always needs to be fixed, not an error. +// if components.Builder == nil { +// panic("no builder function") +// } +// +// // Panic if there are provisioners on the template but no provisioner +// // component finder. This is always an internal error, so we panic. +// if len(t.Provisioners) > 0 && components.Provisioner == nil { +// panic("no provisioner function") +// } +// +// builder, err := components.Builder(builderConfig.Type) +// if err != nil { +// return +// } +// +// if builder == nil { +// err = fmt.Errorf("Builder type not found: %s", builderConfig.Type) +// return +// } +// +// // Process the name +// tpl, variables, err := t.NewConfigTemplate() +// if err != nil { +// return nil, err +// } +// +// rawName := name +// name, err = tpl.Process(name, nil) +// if err != nil { +// return nil, err +// } +// +// // Gather the Hooks +// hooks := make(map[string][]Hook) +// for tplEvent, tplHooks := range t.Hooks { +// curHooks := make([]Hook, 0, len(tplHooks)) +// +// for _, hookName := range tplHooks { +// var hook Hook +// hook, err = components.Hook(hookName) +// if err != nil { +// return +// } +// +// if hook == nil { +// err = fmt.Errorf("Hook not found: %s", hookName) +// return +// } +// +// curHooks = append(curHooks, hook) +// } +// +// hooks[tplEvent] = curHooks +// } +// +// // Prepare the post-processors +// postProcessors := make([][]coreBuildPostProcessor, 0, len(t.PostProcessors)) +// for _, rawPPs := range t.PostProcessors { +// current := make([]coreBuildPostProcessor, 0, len(rawPPs)) +// for _, rawPP := range rawPPs { +// if rawPP.TemplateOnlyExcept.Skip(rawName) { +// continue +// } +// +// pp, err := components.PostProcessor(rawPP.Type) +// if err != nil { +// return nil, err +// } +// +// if pp == nil { +// return nil, fmt.Errorf("PostProcessor type not found: %s", rawPP.Type) +// } +// +// current = append(current, coreBuildPostProcessor{ +// processor: pp, +// processorType: rawPP.Type, +// config: rawPP.RawConfig, +// keepInputArtifact: rawPP.KeepInputArtifact, +// }) +// } +// +// // If we have no post-processors in this chain, just continue. +// // This can happen if the post-processors skip certain builds. +// if len(current) == 0 { +// continue +// } +// +// postProcessors = append(postProcessors, current) +// } +// +// // Prepare the provisioners +// provisioners := make([]coreBuildProvisioner, 0, len(t.Provisioners)) +// for _, rawProvisioner := range t.Provisioners { +// if rawProvisioner.TemplateOnlyExcept.Skip(rawName) { +// continue +// } +// +// var provisioner Provisioner +// provisioner, err = components.Provisioner(rawProvisioner.Type) +// if err != nil { +// return +// } +// +// if provisioner == nil { +// err = fmt.Errorf("Provisioner type not found: %s", rawProvisioner.Type) +// return +// } +// +// configs := make([]interface{}, 1, 2) +// configs[0] = rawProvisioner.RawConfig +// +// if rawProvisioner.Override != nil { +// if override, ok := rawProvisioner.Override[name]; ok { +// configs = append(configs, override) +// } +// } +// +// if rawProvisioner.pauseBefore > 0 { +// provisioner = &PausedProvisioner{ +// PauseBefore: rawProvisioner.pauseBefore, +// Provisioner: provisioner, +// } +// } +// +// coreProv := coreBuildProvisioner{provisioner, configs} +// provisioners = append(provisioners, coreProv) +// } +// +// b = &coreBuild{ +// name: name, +// builder: builder, +// builderConfig: builderConfig.RawConfig, +// builderType: builderConfig.Type, +// hooks: hooks, +// postProcessors: postProcessors, +// provisioners: provisioners, +// variables: variables, +// } +// +// return + return nil, nil +} + +//Build a ConfigTemplate object populated by the values within a +//parsed template +func (t *Template) NewConfigTemplate() (c *ConfigTemplate, variables map[string]string, err error) { + + // Prepare the variable template processor, which is a bit unique + // because we don't allow user variable usage and we add a function + // to read from the environment. + varTpl, err := NewConfigTemplate() + if err != nil { + return nil, nil, err + } + varTpl.Funcs(template.FuncMap{ + "env": templateEnv, + "user": templateDisableUser, + }) + + // Prepare the variables + var varErrors []error + variables = make(map[string]string) + for k, v := range t.Variables { + if v.Required && !v.HasValue { + varErrors = append(varErrors, + fmt.Errorf("Required user variable '%s' not set", k)) + } + + var val string + if v.HasValue { + val = v.Value + } else { + val, err = varTpl.Process(v.Default, nil) + if err != nil { + varErrors = append(varErrors, + fmt.Errorf("Error processing user variable '%s': %s'", k, err)) + } + } + + variables[k] = val + } + + if len(varErrors) > 0 { + return nil, variables, &MultiError{varErrors} + } + + // Process the name + tpl, err := NewConfigTemplate() + if err != nil { + return nil, variables, err + } + tpl.UserVars = variables + + return tpl, variables, nil +} + +// TemplateOnlyExcept contains the logic required for "only" and "except" +// meta-parameters. +type TemplateOnlyExcept struct { + Only []string + Except []string +} + +// Prune will prune out the used values from the raw map. +func (t *TemplateOnlyExcept) Prune(raw map[string]interface{}) { + delete(raw, "except") + delete(raw, "only") +} + +// Skip tests if we should skip putting this item onto a build. +func (t *TemplateOnlyExcept) Skip(name string) bool { + if len(t.Only) > 0 { + onlyFound := false + for _, n := range t.Only { + if n == name { + onlyFound = true + break + } + } + + if !onlyFound { + // Skip this provisioner + return true + } + } + + // If the name is in the except list, then skip that + for _, n := range t.Except { + if n == name { + return true + } + } + + return false +} + +// Validates the only/except parameters. +func (t *TemplateOnlyExcept) Validate(b map[string]RawBuilderConfig) (e []error) { + if len(t.Only) > 0 && len(t.Except) > 0 { + e = append(e, + fmt.Errorf("Only one of 'only' or 'except' may be specified.")) + } + + if len(t.Only) > 0 { + for _, n := range t.Only { + if _, ok := b[n]; !ok { + e = append(e, + fmt.Errorf("'only' specified builder '%s' not found", n)) + } + } + } + + for _, n := range t.Except { + if _, ok := b[n]; !ok { + e = append(e, + fmt.Errorf("'except' specified builder '%s' not found", n)) + } + } + + return +} diff --git a/workflow/template/default.json b/workflow/template/default.json deleted file mode 100644 index 4087d72..0000000 --- a/workflow/template/default.json +++ /dev/null @@ -1,31 +0,0 @@ -{ - "profiler": { - "default-profiler": true, - "mem": 256, - "cpus": 4, - "bandwidth": 1000 - }, - "storing": { - "create-entry": { - "indexer": [], - "crawler": [] - } - }, - "processing": { - "feed-maintainer": { - "scenario": [] - } - }, - "distributing": { - "push-entry-to-ui": { - "sensor": [], - "pipeline": [] - } - }, - "learning": { - "create-metric": { - "sensor": [], - "scenario": [] - } - } -} diff --git a/workflow/workflow.go b/workflow/workflow.go index 4d914f0..13957a1 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -10,6 +10,12 @@ type WorkflowController struct { manager *WorkflowManager profiler *model.Profiler + + IndexerTimeout int + CrawlerTimeout int + SensorTimeout int + PipelineTimeout int + ScenarioTimeout int } func (this *WorkflowController) GetManager() *WorkflowManager { @@ -42,7 +48,7 @@ func (this *WorkflowController) DispatchPipelineHook(data interface{}) interface func NewWorkflowController(feed *resource.Feed, wm *WorkflowManager) *WorkflowController { data := feed.GetWorkflow().GetProfilerRawData() p := model.NewProfiler(data) - w := &WorkflowController{feed, wm, p} + w := &WorkflowController{feed, wm, p, 100, 100, 100, 100, 100} w.Init()