Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 47 additions & 8 deletions graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type graphNode struct {
dependents []*graphNode
dependentsByKey map[ID][]*graphNode
tracer trace.Tracer
logger Logger
}

const (
Expand Down Expand Up @@ -150,8 +151,8 @@ func (gn *graphNode) execute(ctx context.Context, rs *runState) (err error) {
tCtx, span := gn.tracer.Start(ctx, gn.task.Name())
defer span.End()

log.Debugf("Starting task %s", gn.task.Name())
defer log.Debugf("Finished task %s", gn.task.Name())
gn.logger.Debugf("Starting task %s", gn.task.Name())
defer gn.logger.Debugf("Finished task %s", gn.task.Name())

bindings, err := gn.task.Execute(tCtx, rs)
if err != nil {
Expand All @@ -169,20 +170,28 @@ func (gn *graphNode) execute(ctx context.Context, rs *runState) (err error) {
}
}
var extra []string
errors := []string{}
providesSet := set.NewSet[ID](gn.task.Provides()...)
for _, binding := range bindings {
if !providesSet.Contains(binding.ID()) {
extra = append(extra, binding.ID().String())
}

if binding.Status() == Absent {
err := binding.Error()
if err != nil {
errors = append(errors, fmt.Sprintf("[%s: %s]", binding.ID().String(), err))
}

span.SetAttributes(
attribute.String(
traceTaskgraphAbsentKeysPrefix+binding.ID().String(),
fmt.Sprintf("%v", binding.Error()),
fmt.Sprintf("%v", err),
),
)
}
}

if len(extra) > 0 || len(missing) > 0 {
return wrapStackErrorf(
"task %s: mismatch between task Provides declaration and returned bindings: missing bindings [%s], got extra bindings [%s]",
Expand All @@ -192,8 +201,16 @@ func (gn *graphNode) execute(ctx context.Context, rs *runState) (err error) {
)
}

if len(errors) > 0 {
gn.logger.Debugf(
"task %s has binding errors: %s",
gn.task.Name(),
strings.Join(errors, ", "),
)
}

for _, dependent := range gn.dependents {
log.Debugf("task %s signalling dependent %s\n", gn.task.Name(), dependent.task.Name())
gn.logger.Debugf("task %s signalling dependent %s\n", gn.task.Name(), dependent.task.Name())
if err := rs.signal(tCtx, dependent.id); err != nil {
return err
}
Expand All @@ -212,10 +229,10 @@ func (gn *graphNode) canExecute(b Binder) bool {
func (gn *graphNode) runFunc(ctx context.Context, rs *runState) func() error {
return func() error {
if gn.canExecute(rs) {
log.Debugf("task %s starting immediately\n", gn.task.Name())
gn.logger.Debugf("task %s starting immediately\n", gn.task.Name())
return gn.execute(ctx, rs)
}
log.Debugf("task %s has dependencies missing; cannot start immediately\n", gn.task.Name())
gn.logger.Debugf("task %s has dependencies missing; cannot start immediately\n", gn.task.Name())
signal, ok := rs.signals[gn.id]
if !ok {
return wrapStackErrorf("signal channel missing for id %q", gn.id)
Expand All @@ -224,10 +241,10 @@ func (gn *graphNode) runFunc(ctx context.Context, rs *runState) func() error {
select {
case <-signal:
if gn.canExecute(rs) {
log.Debugf("task %s starting\n", gn.task.Name())
gn.logger.Debugf("task %s starting\n", gn.task.Name())
return gn.execute(ctx, rs)
}
log.Debugf("task %s still has dependencies missing\n", gn.task.Name())
gn.logger.Debugf("task %s still has dependencies missing\n", gn.task.Name())
case <-ctx.Done():
return nil
}
Expand All @@ -241,6 +258,7 @@ type graph struct {
allDependencies, allProvided set.Set[ID]
nodes []*graphNode
tracer trace.Tracer
logger Logger
}

func (g *graph) buildInputBinder(inputs ...Binding) (Binder, error) {
Expand Down Expand Up @@ -429,9 +447,15 @@ func (g *graph) Graphviz(includeInputs bool) string {
return buf.String()
}

// Logger logger interface for the graph.
type Logger interface {
Debugf(format string, args ...interface{})
}

type graphOptions struct {
tasks []Task
tracer trace.Tracer
logger Logger
}

// A GraphOption is used to configure a new Graph.
Expand Down Expand Up @@ -459,6 +483,15 @@ func WithTracer(tracer trace.Tracer) GraphOption {
}
}

// WithLogger sets a logger for the graph.
func WithLogger(logger Logger) GraphOption {
return func(opts *graphOptions) error {
opts.logger = logger

return nil
}
}

// New creates a new Graph. Exactly one WithTasks option should be passed.
//
// Ideally, Graphs should be created on program startup, rather than creating them dynamically.
Expand All @@ -473,12 +506,17 @@ func New(name string, opts ...GraphOption) (Graph, error) {
}
}

if o.logger == nil {
o.logger = log
}

g := &graph{
name: name,
tasks: o.tasks,
allDependencies: set.NewSet[ID](),
allProvided: set.NewSet[ID](),
tracer: o.tracer,
logger: o.logger,
}

provideTasks := map[string][]string{}
Expand All @@ -498,6 +536,7 @@ func New(name string, opts ...GraphOption) (Graph, error) {
task: t,
dependentsByKey: map[ID][]*graphNode{},
tracer: g.tracer,
logger: g.logger,
}
g.nodes = append(g.nodes, node)

Expand Down