Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 2 additions & 17 deletions api.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,10 @@
package main

import (
"github.com/feedlabs/elasticfeed/service"
"github.com/feedlabs/elasticfeed/plugin"
"github.com/feedlabs/elasticfeed/workflow"
"github.com/feedlabs/elasticfeed/event"
"github.com/feedlabs/elasticfeed/resource"
"github.com/feedlabs/elasticfeed/elasticfeed"
)

var (
ServerEngine *elasticfeed.Elasticfeed
)

func main() {
rm := resource.NewResourceManager()
em := event.NewEventManager()
pm := plugin.NewPluginManager(rm)
wm := workflow.NewWorkflowManager(nil, pm, em)
sm := service.NewServiceManager()

ServerEngine = elasticfeed.NewElasticfeed(rm, em, sm, pm, wm)
ServerEngine.Run()
engine := elasticfeed.NewElasticfeed()
engine.Run()
}
50 changes: 33 additions & 17 deletions elasticfeed/elasticfeed.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package elasticfeed

import (
"github.com/feedlabs/elasticfeed/elasticfeed/model"

"github.com/feedlabs/elasticfeed/plugin"
"github.com/feedlabs/elasticfeed/workflow"
"github.com/feedlabs/elasticfeed/service"
Expand All @@ -11,41 +13,55 @@ import (
)

type Elasticfeed struct {
rm *resource.ResourceManager
em *event.EventManager
sm *service.ServiceManager
pm *plugin.PluginManager
wm *workflow.WorkflowManager
R model.ResourceManager
E model.EventManager
S model.ServiceManager
P model.PluginManager
W model.WorkflowManager
}

func (this *Elasticfeed) GetEventManager() model.EventManager {
return this.E
}

func (this *Elasticfeed) GetEventManager() *event.EventManager {
return this.em
func (this *Elasticfeed) GetResourceManager() model.ResourceManager {
return this.R
}

func (this *Elasticfeed) GetResourceManager() *resource.ResourceManager {
return this.rm
func (this *Elasticfeed) GetServiceManager() model.ServiceManager {
return this.S
}

func (this *Elasticfeed) GetServiceManager() *service.ServiceManager {
return this.sm
func (this *Elasticfeed) GetPluginManager() model.PluginManager {
return this.P
}

func (this *Elasticfeed) GetPluginManager() *plugin.PluginManager {
return this.pm
func (this *Elasticfeed) GetWorkflowManager() model.WorkflowManager {
return this.W
}

func (this *Elasticfeed) GetWorkflowManager() *workflow.WorkflowManager {
return this.wm
func (this *Elasticfeed) GetConfig() map[string]interface {} {
return make(map[string]interface {})
}

func (this *Elasticfeed) Run() {
this.GetResourceManager().Init()
this.GetServiceManager().Init()
this.GetWorkflowManager().Init()

feedify.SetStaticPath("/static", "public")
feedify.Run()
}

func NewElasticfeed(rm *resource.ResourceManager, em *event.EventManager, sm *service.ServiceManager, pm *plugin.PluginManager, wm *workflow.WorkflowManager) *Elasticfeed {
return &Elasticfeed{rm, em, sm, pm, wm}
func NewElasticfeed() model.Elasticfeed {

engine := &Elasticfeed{}

engine.R = resource.NewResourceManager(engine)
engine.E = event.NewEventManager(engine)
engine.P = plugin.NewPluginManager(engine)
engine.W = workflow.NewWorkflowManager(engine)
engine.S = service.NewServiceManager(engine)

return engine
}
18 changes: 18 additions & 0 deletions elasticfeed/model/elasticfeed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package model

type Elasticfeed interface {

GetEventManager() EventManager

GetResourceManager() ResourceManager

GetServiceManager() ServiceManager

GetPluginManager() PluginManager

GetWorkflowManager() WorkflowManager

GetConfig() map[string]interface{}

Run()
}
4 changes: 4 additions & 0 deletions elasticfeed/model/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package model

type EventManager interface {
}
10 changes: 10 additions & 0 deletions elasticfeed/model/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package model

import (
pmodel "github.com/feedlabs/elasticfeed/plugin/model"
)

type PluginManager interface {

LoadPipeline(name string) (pmodel.Pipeline, error)
}
6 changes: 6 additions & 0 deletions elasticfeed/model/resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package model

type ResourceManager interface {

Init()
}
12 changes: 12 additions & 0 deletions elasticfeed/model/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package model

import (
"github.com/feedlabs/elasticfeed/service/stream"
)

type ServiceManager interface {

GetStreamService() *stream.StreamService

Init()
}
6 changes: 6 additions & 0 deletions elasticfeed/model/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package model

type WorkflowManager interface {

Init()
}
33 changes: 28 additions & 5 deletions event/manager.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,42 @@
package event

import (
"github.com/feedlabs/elasticfeed/elasticfeed/model"
)

const (
EVENT_STORING = "storing"
EVENT_PROCESSING = "processing"
EVENT_DISTRIBUTING = "distributing"
EVENT_LEARNING = "learning"

EVENT_STORING_CREATE_ENTRY = "create-entry"
EVENT_STORING_CREATE_VIEWER = "create-viewer"
EVENT_PROCESSING_FEED_MAINTAINER = "feed-maintainer"
EVENT_PROCESSING_SENSOR_UPDATE = "sensor-update"
EVENT_DISTRIBUTING_PUSH_ENTRY = "push-entry"
EVENT_LEARNING_CREATE_METRIC = "create-metric"
)

/**

- COULD DEFINE EVENTS
- COULD TRIGGER ON BIND-ED LISTENERS

- COULD DEFINE ALARM CLOCK
- COULD DEFINE INTERRUPTS

EVENT
- SHOULD HAVE DATA/CALLBACK
- SHOULD HAVE TYPE
- SHOULD HAVE PARENT
- SHOULD HAVE DESTINATION

*/

type EventManager struct {
events map[string]interface {}
engine model.Elasticfeed
events map[string]interface{}
}

func (this *EventManager) On(name string, callback func(event *Event)) {
Expand All @@ -33,14 +56,14 @@ func (this *EventManager) Trigger(name string, data interface{}) {

func (this *EventManager) GetEventsMap() map[string]interface{} {
return map[string]interface{}{
EVENT_STORING: []string{EVENT_STORING_CREATE_ENTRY},
EVENT_PROCESSING: []string{EVENT_PROCESSING_FEED_MAINTAINER},
EVENT_STORING: []string{EVENT_STORING_CREATE_ENTRY, EVENT_STORING_CREATE_VIEWER},
EVENT_PROCESSING: []string{EVENT_PROCESSING_FEED_MAINTAINER, EVENT_PROCESSING_SENSOR_UPDATE},
EVENT_DISTRIBUTING: []string{EVENT_DISTRIBUTING_PUSH_ENTRY},
EVENT_LEARNING: []string{EVENT_LEARNING_CREATE_METRIC},
}
}

func NewEventManager() *EventManager {
func NewEventManager(engine model.Elasticfeed) model.EventManager {
e := make(map[string]interface{})
return &EventManager{e}
return &EventManager{engine, e}
}
4 changes: 2 additions & 2 deletions plugin/cmd_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ func (b *cmdPipeline) Prepare(config ...interface{}) ([]string, error) {
return b.pipeline.Prepare(config...)
}

func (b *cmdPipeline) Run(cache model.Cache) (model.Artifact, error) {
func (b *cmdPipeline) Run(data interface {}) (interface {}, error) {
defer func() {
r := recover()
b.checkExit(r, nil)
}()

return b.pipeline.Run(cache)
return b.pipeline.Run(data)
}

func (b *cmdPipeline) Cancel() {
Expand Down
57 changes: 7 additions & 50 deletions plugin/manager.go
Original file line number Diff line number Diff line change
@@ -1,57 +1,33 @@
package plugin

import (
"strconv"

"fmt"

"log"
"os/exec"
"path/filepath"
"strings"

"github.com/feedlabs/elasticfeed/resource"
"github.com/feedlabs/elasticfeed/plugin/model"
"github.com/feedlabs/elasticfeed/common/config"

"github.com/mitchellh/osext"

emodel "github.com/feedlabs/elasticfeed/elasticfeed/model"
)

type PluginManager struct {
engine emodel.Elasticfeed

Indexers map[string]string
Crawlers map[string]string
Sensors map[string]string
Pipelines map[string]string
Scenarios map[string]string
Helpers map[string]string

Store map[int]map[string]interface{}

api *model.ResourceApi

PluginMinPort uint
PluginMaxPort uint
}

func (this *PluginManager) GetResourceApi() interface{} {
return this.api
}

func (this *PluginManager) InitPlugin(name string, profiler *model.Profiler) *Plugin {

// _p := resource.FindPluginByName(name)
// findIsRunningWithProfiler(_p, profiler)
_p := resource.NewPlugin("", "", "", "", "", "")
p := NewPlugin(_p, this, this.api, profiler)

p.Init()

_group, _ := strconv.Atoi(_p.Group)
this.Store[_group][_p.Id] = p

return p
}

func (this *PluginManager) FindPlugin(name string, profiler *model.Profiler) *interface{} {
return nil
}
Expand All @@ -66,10 +42,6 @@ func (this *PluginManager) RunPlugin(p Plugin) (err error) {
return nil
}

func (this *PluginManager) GetIndexers() map[string]interface{} {
return this.Store[resource.PLUGIN_INDEXER]
}

// Discover discovers plugins.
//
// This looks in the directory of the executable and the CWD, in that
Expand Down Expand Up @@ -237,26 +209,11 @@ func (c *PluginManager) pluginClient(path string) *Client {
return NewClient(&config)
}

func NewPluginManager(engine emodel.Elasticfeed) emodel.PluginManager {

func NewPluginManager(resourceManager interface{}) *PluginManager {
pm := &PluginManager{}

pm.api = model.NewResourceApi(resourceManager)

pm.discover(filepath.Join(config.GetHomeAbsolutePath(), "plugins/pipeline-ann"))

list := []string{"ann", "ann1", "ann2"}

for _, name := range(list) {
ann, _ := pm.LoadPipeline(name)

ann.Prepare()
a, b := ann.Run(nil)

fmt.Println(a)
fmt.Println(b)
}
pm := &PluginManager{engine, nil, nil, nil, nil, nil, nil, 40000, 41000}

pm.discover(filepath.Join(config.GetHomeAbsolutePath(), "public/userfiles/plugin/imports"))

return pm
}
2 changes: 1 addition & 1 deletion plugin/model/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ type Pipeline interface {

Prepare(...interface{}) ([]string, error)

Run(cache Cache) (Artifact, error)
Run(data interface {}) (interface {}, error)

Cancel()
}
Loading