Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

set "github.com/deckarep/golang-set/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -123,6 +124,10 @@ type graphNode struct {
tracer trace.Tracer
}

const (
traceTaskgraphAbsentKeysPrefix = "taskgraph.absent_keys."
)

// Execute the task against the binder provided in the runState.
//
// This assumes that all of the task's dependencies have been bound; it is the responsibility of the
Expand Down Expand Up @@ -169,6 +174,14 @@ func (gn *graphNode) execute(ctx context.Context, rs *runState) (err error) {
if !providesSet.Contains(binding.ID()) {
extra = append(extra, binding.ID().String())
}
if binding.Status() == Absent {
span.SetAttributes(
attribute.String(
traceTaskgraphAbsentKeysPrefix+binding.ID().String(),
fmt.Sprintf("%v", binding.Error()),
),
)
}
}
if len(extra) > 0 || len(missing) > 0 {
return wrapStackErrorf(
Expand Down
37 changes: 37 additions & 0 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"

set "github.com/deckarep/golang-set/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

// A TaskSet defines a nestable collection of tasks. A Task fulfils the TaskSet interface, acting as
Expand Down Expand Up @@ -192,6 +194,8 @@ type Condition interface {
Evaluate(ctx context.Context, b Binder) (bool, error)
// Deps should return the IDs of the keys used by the Evaluate function.
Deps() []ID
// Keys returns the keys in the conditional
Keys() []ReadOnlyKey[bool]
}

// ConditionAnd evaluates to true if and only if all of the keys it contains are bound to true.
Expand Down Expand Up @@ -220,6 +224,11 @@ func (ca ConditionAnd) Deps() []ID {
return deps
}

// Keys returns the keys in the conditional
func (ca ConditionAnd) Keys() []ReadOnlyKey[bool] {
return ca
}

// ConditionOr evaluates to true if any of the keys it contains are bound to true.
type ConditionOr []ReadOnlyKey[bool]

Expand All @@ -246,6 +255,11 @@ func (co ConditionOr) Deps() []ID {
return deps
}

// Keys returns the keys in the conditional
func (co ConditionOr) Keys() []ReadOnlyKey[bool] {
return co
}

// Conditional wraps tasks such that they are only run if given Condition evaluates to true. If it
// evaluates to false, the bindings in DefaultBindings are used, with any missing keys provided by
// the wrapped tasks bound as absent.
Expand All @@ -270,6 +284,10 @@ func (c Conditional) Locate() Conditional {
return c
}

const (
traceTaskgraphConditionalPrefix = "taskgraph.conditional."
)

// Tasks satisfies TaskSet.Tasks.
func (c Conditional) Tasks() []Task {
defaultBindingsMap := map[ID]Binding{}
Expand All @@ -291,6 +309,25 @@ func (c Conditional) Tasks() []Task {
if err != nil {
return nil, err
}
span := trace.SpanFromContext(ctx)
span.SetAttributes(
attribute.Bool(traceTaskgraphConditionalPrefix+"execute", shouldExecute),
)
for _, k := range c.Condition.Keys() {
v, err := k.Get(b)
if err != nil {
span.SetAttributes(
attribute.String(
traceTaskgraphConditionalPrefix+k.ID().String(),
err.Error(),
),
)
} else {
span.SetAttributes(
attribute.Bool(traceTaskgraphConditionalPrefix+k.ID().String(), v),
)
}
}
if shouldExecute {
return t.Execute(ctx, b)
}
Expand Down