A workflow orchestration engine for Go applications. Provides DAG-based execution, distributed task processing, and durable state management.
go get github.com/parevo/flowAll dependencies (database drivers, Redis client, etc.) are automatically installed.
- Go 1.23 or higher
- MySQL 5.7+, PostgreSQL 12+, or Redis 6+ (optional - in-memory storage available)
package main
import (
"context"
"github.com/parevo/flow"
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
)
func main() {
// Initialize storage
db, _ := sqlx.Connect("mysql", "user:pass@tcp(localhost:3306)/db?parseTime=true")
storage, _ := flow.NewMySQLStorage(db)
// Create engine and registry
registry := flow.NewRegistry()
engine := flow.NewEngine(storage, registry)
// Register function
registry.RegisterFunction("ProcessData", func(ctx context.Context, input map[string]interface{}) (map[string]interface{}, error) {
// Your logic here
return map[string]interface{}{"status": "processed"}, nil
})
// Define workflow
wf := flow.NewWorkflow("process-workflow", "Data Processing").
AddNode("process", flow.NodeTypeFunction).
WithConfig("function", "ProcessData").
Build()
// Register and execute
ctx := context.Background()
engine.RegisterWorkflow(ctx, "default", wf)
// Start worker
go engine.StartWorker(ctx, "default", "worker-1")
// Execute workflow
execID, _ := engine.Execute(ctx, "default", "process-workflow", []byte(`{"data":"value"}`))
// Query status
execution, _ := engine.GetExecution(ctx, "default", execID)
}db, _ := sqlx.Connect("mysql", "user:pass@tcp(host:3306)/db?parseTime=true")
storage, _ := flow.NewMySQLStorage(db)Database schema is created automatically on first connection.
db, _ := sqlx.Connect("postgres", "postgres://user:pass@host/db?sslmode=disable")
storage, _ := flow.NewPostgreSQLStorage(db)storage := flow.NewRedisStorage("localhost:6379", "", 0)storage := flow.NewMemoryStorage()Suitable for development and testing.
engine := flow.NewEngine(storage, registry)
engine.WithLogger(logger)
engine.RegisterWorkflow(ctx, namespace, workflow)
engine.Execute(ctx, namespace, workflowID, input)
engine.GetExecution(ctx, namespace, executionID)
engine.GetExecutionSteps(ctx, namespace, executionID)
engine.CancelExecution(ctx, namespace, executionID)
engine.FailExecution(ctx, namespace, executionID, message)
engine.SignalExecution(ctx, namespace, executionID, nodeID, data)
engine.StartWorker(ctx, namespace, workerID)registry := flow.NewRegistry()
registry.RegisterFunction(name, handlerFunc)
registry.Register(nodeType, executor)wf := flow.NewWorkflow(id, name).
AddNode(nodeID, nodeType).
WithConfig(key, value).
WithRetry(count).
WithSaga(compensateNodeID).
Connect(sourceID, targetID).
ConnectIf(sourceID, targetID, condition).
Build()worker := flow.NewWorker(workerID, engine, registry, interval)
worker.SetNamespace(namespace)
worker.Start(ctx)Built-in node types are automatically registered:
function- Execute registered Go functionshttp- HTTP requestscondition- Conditional branchinglog- Loggingtransform- Data transformationsignal- Wait for external signalssubworkflow- Nested workflowsai- LLM API callsnotify- Notificationsswitch- Multi-way branchingwait- Delay executionsetvariable- Context manipulation
node := flow.Node{
ID: "task",
Type: flow.NodeTypeFunction,
RetryPolicy: &flow.RetryPolicy{
MaxAttempts: 5,
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
},
}builder.AddNode("charge", flow.NodeTypeFunction).
WithSaga("refund")crypto, _ := flow.NewCrypto("your-32-byte-encryption-key-here")
storage.(*sql.SQLStorage).SetEncryption(crypto)cronMgr := flow.NewCronManager(engine, logger)
cronMgr.Start()
cronMgr.AddSchedule(namespace, workflowID, "0 2 * * *", input)webhookMgr := flow.NewWebhookManager(engine)
http.Handle("/webhooks/", webhookMgr)apiMgr := flow.NewAPIManager(webhookMgr)
http.Handle("/", apiMgr)Endpoints:
GET /health- Health checkGET /metrics- Prometheus metricsGET /api/{namespace}/executions/{id}- Get executionPOST /api/{namespace}/executions/{id}/cancel- Cancel executionPOST /api/{namespace}/executions/{id}/signal/{nodeID}- Send signal
eventBus := flow.NewEventBus()
eventBus.RegisterHandler(flow.EventWorkflowCompleted, handler)
eventBus.RegisterGlobalHandler(handler)Event types:
EventWorkflowStartedEventWorkflowCompletedEventWorkflowFailedEventStepStartedEventStepCompletedEventStepFailed
Parevo Flow provides a flexible, pluggable authorization system. Bring your own auth!
type AuthProvider interface {
CheckAccess(ctx context.Context, resource string, action string) error
}engine := flow.NewEngine(storage, registry)
// No auth provider set - everything is allowedtype MyAuth struct{}
func (a *MyAuth) CheckAccess(ctx context.Context, resource string, action string) error {
userID := ctx.Value("user_id").(string)
// Your custom logic
if userID == "" {
return errors.New("unauthorized")
}
// Check permissions in your database, Firebase, Auth0, etc.
return nil
}
// Set auth provider
engine := flow.NewEngine(storage, registry)
engine.SetAuthProvider(&MyAuth{})type MultiTenantAuth struct{}
func (a *MultiTenantAuth) CheckAccess(ctx context.Context, resource string, action string) error {
customerID := ctx.Value("customer_id").(string)
userID := ctx.Value("user_id").(string)
role := ctx.Value("role").(string)
// Get workflow metadata
workflow := getWorkflowFromDB(resource)
// CRITICAL: Tenant isolation
if workflow.Metadata["customer_id"] != customerID {
return errors.New("forbidden: wrong tenant")
}
// Admin can do anything
if role == "admin" {
return nil
}
// Owner can do anything
if workflow.Metadata["owner_id"] == userID {
return nil
}
// Check visibility
if workflow.Metadata["visibility"] == "organization" {
if action == "view" || action == "execute" {
return nil
}
}
return errors.New("forbidden")
}
engine.SetAuthProvider(&MultiTenantAuth{})wf := &flow.Workflow{
ID: "my-workflow",
Name: "My Workflow",
Metadata: map[string]interface{}{
// Use ANY field names you want - we don't enforce structure
"customer_id": "acme-corp", // Multi-tenant isolation
"user_id": "user-123", // Owner
"owner": "john@acme.com", // Email-based
"slug": "acme/my-wf", // Slug-based
"team_id": "eng-team", // Team access
"visibility": "organization", // Visibility level
// Or use your own auth system fields
"firebase_uid": "abc123",
"auth0_org": "org_xyz",
},
}
// Add auth context when registering
ctx = context.WithValue(ctx, "customer_id", "acme-corp")
ctx = context.WithValue(ctx, "user_id", "user-123")
engine.RegisterWorkflow(ctx, "default", wf)Common actions (you can define your own):
"create"- Create/register workflow"view"- View workflow definition"execute"- Trigger execution"edit"- Modify workflow"delete"- Delete workflow
- Fully pluggable - Use any auth system (Firebase, Auth0, custom DB, etc.)
- No prescribed structure - Use
user_id,userId,email,slug, or whatever you want - Context-based - Pass auth info via
context.Context - Metadata is flexible - Store any auth-related data in
workflow.Metadata
See examples/auth/ for a full multi-tenant implementation with:
- Customer/tenant isolation
- Role-based access control (admin, user, viewer)
- Team-based visibility
- Organization-wide workflows
import "github.com/prometheus/client_golang/prometheus/promhttp"
http.Handle("/metrics", promhttp.Handler())Available metrics:
workflows_started_totalworkflows_completed_totalworkflows_failed_totalsteps_processed_totalstep_duration_secondsactive_workers
import "log/slog"
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
engine.WithLogger(logger)# Run tests
go test
# Run with coverage
go test -cover
# Run with race detector
go test -race
# Run benchmark
go test -bench=.See examples/ directory for complete working examples:
examples/visual_builder/- Web-based workflow builder with drag-and-drop interface
Tables are created automatically when using SQL storage backends:
workflows- Workflow definitionsexecutions- Workflow execution instancesexecution_steps- Individual task states
Indexes are optimized for:
- Namespace-based queries
- Status filtering
- Worker task claims
- Execution lookups
- Workers claim tasks using database-level locking (
SKIP LOCKED) - Multiple workers can run concurrently across different processes/hosts
- Tasks are automatically reassigned if a worker crashes (zombie detection)
- No message broker required - coordination through storage backend
- Failed tasks can be retried with configurable policies
- Saga pattern supported for compensation logic
- Workflow status tracked:
PENDING,RUNNING,COMPLETED,FAILED,CANCELLED - Task status tracked:
PENDING,RUNNING,COMPLETED,FAILED,SKIPPED,WAITING,CANCELLED
MIT License. See LICENSE file.
See CONTRIBUTING.md for guidelines.
Full API documentation: https://pkg.go.dev/github.com/parevo/flow