diff --git a/execution/evm/engine_rpc_client.go b/execution/evm/engine_rpc_client.go new file mode 100644 index 000000000..ec04564aa --- /dev/null +++ b/execution/evm/engine_rpc_client.go @@ -0,0 +1,47 @@ +package evm + +import ( + "context" + + "github.com/ethereum/go-ethereum/beacon/engine" + "github.com/ethereum/go-ethereum/rpc" +) + +var _ EngineRPCClient = (*engineRPCClient)(nil) + +// engineRPCClient is the concrete implementation wrapping *rpc.Client. +type engineRPCClient struct { + client *rpc.Client +} + +// NewEngineRPCClient creates a new Engine API client. +func NewEngineRPCClient(client *rpc.Client) EngineRPCClient { + return &engineRPCClient{client: client} +} + +func (e *engineRPCClient) ForkchoiceUpdated(ctx context.Context, state engine.ForkchoiceStateV1, args map[string]any) (*engine.ForkChoiceResponse, error) { + var result engine.ForkChoiceResponse + err := e.client.CallContext(ctx, &result, "engine_forkchoiceUpdatedV3", state, args) + if err != nil { + return nil, err + } + return &result, nil +} + +func (e *engineRPCClient) GetPayload(ctx context.Context, payloadID engine.PayloadID) (*engine.ExecutionPayloadEnvelope, error) { + var result engine.ExecutionPayloadEnvelope + err := e.client.CallContext(ctx, &result, "engine_getPayloadV4", payloadID) + if err != nil { + return nil, err + } + return &result, nil +} + +func (e *engineRPCClient) NewPayload(ctx context.Context, payload *engine.ExecutableData, blobHashes []string, parentBeaconBlockRoot string, executionRequests [][]byte) (*engine.PayloadStatusV1, error) { + var result engine.PayloadStatusV1 + err := e.client.CallContext(ctx, &result, "engine_newPayloadV4", payload, blobHashes, parentBeaconBlockRoot, executionRequests) + if err != nil { + return nil, err + } + return &result, nil +} diff --git a/execution/evm/engine_rpc_tracing.go b/execution/evm/engine_rpc_tracing.go new file mode 100644 index 000000000..f5bf09e4b --- /dev/null +++ b/execution/evm/engine_rpc_tracing.go @@ -0,0 +1,122 @@ +package evm + +import ( + "context" + + "github.com/ethereum/go-ethereum/beacon/engine" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" +) + +var _ EngineRPCClient = (*tracedEngineRPCClient)(nil) + +// tracedEngineRPCClient wraps an EngineRPCClient and records spans. +type tracedEngineRPCClient struct { + inner EngineRPCClient + tracer trace.Tracer +} + +// withTracingEngineRPCClient decorates an EngineRPCClient with OpenTelemetry spans. +func withTracingEngineRPCClient(inner EngineRPCClient) EngineRPCClient { + return &tracedEngineRPCClient{ + inner: inner, + tracer: otel.Tracer("ev-node/execution/engine-rpc"), + } +} + +func (t *tracedEngineRPCClient) ForkchoiceUpdated(ctx context.Context, state engine.ForkchoiceStateV1, args map[string]any) (*engine.ForkChoiceResponse, error) { + ctx, span := t.tracer.Start(ctx, "Engine.ForkchoiceUpdated", + trace.WithAttributes( + attribute.String("method", "engine_forkchoiceUpdatedV3"), + attribute.String("head_block_hash", state.HeadBlockHash.Hex()), + attribute.String("safe_block_hash", state.SafeBlockHash.Hex()), + attribute.String("finalized_block_hash", state.FinalizedBlockHash.Hex()), + ), + ) + defer span.End() + + result, err := t.inner.ForkchoiceUpdated(ctx, state, args) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + attributes := []attribute.KeyValue{ + attribute.String("payload_status", result.PayloadStatus.Status), + } + + if result.PayloadID != nil { + attributes = append(attributes, attribute.String("payload_id", result.PayloadID.String())) + } + + if result.PayloadStatus.LatestValidHash != nil { + attributes = append(attributes, attribute.String("latest_valid_hash", result.PayloadStatus.LatestValidHash.Hex())) + } + + span.SetAttributes( + attributes..., + ) + + return result, nil +} + +func (t *tracedEngineRPCClient) GetPayload(ctx context.Context, payloadID engine.PayloadID) (*engine.ExecutionPayloadEnvelope, error) { + ctx, span := t.tracer.Start(ctx, "Engine.GetPayload", + trace.WithAttributes( + attribute.String("method", "engine_getPayloadV4"), + attribute.String("payload_id", payloadID.String()), + ), + ) + defer span.End() + + result, err := t.inner.GetPayload(ctx, payloadID) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + span.SetAttributes( + attribute.Int64("block_number", int64(result.ExecutionPayload.Number)), + attribute.String("block_hash", result.ExecutionPayload.BlockHash.Hex()), + attribute.String("state_root", result.ExecutionPayload.StateRoot.Hex()), + attribute.Int("tx_count", len(result.ExecutionPayload.Transactions)), + attribute.Int64("gas_used", int64(result.ExecutionPayload.GasUsed)), + ) + + return result, nil +} + +func (t *tracedEngineRPCClient) NewPayload(ctx context.Context, payload *engine.ExecutableData, blobHashes []string, parentBeaconBlockRoot string, executionRequests [][]byte) (*engine.PayloadStatusV1, error) { + ctx, span := t.tracer.Start(ctx, "Engine.NewPayload", + trace.WithAttributes( + attribute.String("method", "engine_newPayloadV4"), + attribute.Int64("block_number", int64(payload.Number)), + attribute.String("block_hash", payload.BlockHash.Hex()), + attribute.String("parent_hash", payload.ParentHash.Hex()), + attribute.Int("tx_count", len(payload.Transactions)), + attribute.Int64("gas_used", int64(payload.GasUsed)), + ), + ) + defer span.End() + + result, err := t.inner.NewPayload(ctx, payload, blobHashes, parentBeaconBlockRoot, executionRequests) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + attributes := []attribute.KeyValue{attribute.String("payload_status", result.Status)} + + if result.LatestValidHash != nil { + attributes = append(attributes, attribute.String("latest_valid_hash", result.LatestValidHash.Hex())) + } + + span.SetAttributes(attributes...) + + return result, nil +} diff --git a/execution/evm/execution.go b/execution/evm/execution.go index 20fc59e51..e360867b5 100644 --- a/execution/evm/execution.go +++ b/execution/evm/execution.go @@ -130,11 +130,23 @@ func retryWithBackoffOnPayloadStatus(ctx context.Context, fn func() error, maxRe return fmt.Errorf("max retries (%d) exceeded for %s", maxRetries, operation) } +// EngineRPCClient abstracts Engine API RPC calls for tracing and testing. +type EngineRPCClient interface { + // ForkchoiceUpdated updates the forkchoice state and optionally starts payload building. + ForkchoiceUpdated(ctx context.Context, state engine.ForkchoiceStateV1, args map[string]any) (*engine.ForkChoiceResponse, error) + + // GetPayload retrieves a previously requested execution payload. + GetPayload(ctx context.Context, payloadID engine.PayloadID) (*engine.ExecutionPayloadEnvelope, error) + + // NewPayload submits a new execution payload for validation. + NewPayload(ctx context.Context, payload *engine.ExecutableData, blobHashes []string, parentBeaconBlockRoot string, executionRequests [][]byte) (*engine.PayloadStatusV1, error) +} + // EngineClient represents a client that interacts with an Ethereum execution engine // through the Engine API. It manages connections to both the engine and standard Ethereum // APIs, and maintains state related to block processing. type EngineClient struct { - engineClient *rpc.Client // Client for Engine API calls + engineClient EngineRPCClient // Client for Engine API calls ethClient *ethclient.Client // Client for standard Ethereum API calls genesisHash common.Hash // Hash of the genesis block initialHeight uint64 @@ -206,11 +218,19 @@ func NewEngineExecutionClient( } return nil })) - engineClient, err := rpc.DialOptions(context.Background(), engineURL, engineOptions...) + rawEngineClient, err := rpc.DialOptions(context.Background(), engineURL, engineOptions...) if err != nil { return nil, err } + // raw engine client + engineClient := NewEngineRPCClient(rawEngineClient) + + // if tracing enabled, wrap with traced decorator + if tracingEnabled { + engineClient = withTracingEngineRPCClient(engineClient) + } + return &EngineClient{ engineClient: engineClient, ethClient: ethClient, @@ -238,8 +258,7 @@ func (c *EngineClient) InitChain(ctx context.Context, genesisTime time.Time, ini // Acknowledge the genesis block with retry logic for SYNCING status err := retryWithBackoffOnPayloadStatus(ctx, func() error { - var forkchoiceResult engine.ForkChoiceResponse - err := c.engineClient.CallContext(ctx, &forkchoiceResult, "engine_forkchoiceUpdatedV3", + forkchoiceResult, err := c.engineClient.ForkchoiceUpdated(ctx, engine.ForkchoiceStateV1{ HeadBlockHash: c.genesisHash, SafeBlockHash: c.genesisHash, @@ -372,8 +391,7 @@ func (c *EngineClient) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight // 3. Call forkchoice update to get PayloadID var newPayloadID *engine.PayloadID err = retryWithBackoffOnPayloadStatus(ctx, func() error { - var forkchoiceResult engine.ForkChoiceResponse - err := c.engineClient.CallContext(ctx, &forkchoiceResult, "engine_forkchoiceUpdatedV3", args, evPayloadAttrs) + forkchoiceResult, err := c.engineClient.ForkchoiceUpdated(ctx, args, evPayloadAttrs) if err != nil { return fmt.Errorf("forkchoice update failed: %w", err) } @@ -522,8 +540,7 @@ func (c *EngineClient) setFinalWithHeight(ctx context.Context, blockHash common. func (c *EngineClient) doForkchoiceUpdate(ctx context.Context, args engine.ForkchoiceStateV1, operation string) error { // Call forkchoice update with retry logic for SYNCING status err := retryWithBackoffOnPayloadStatus(ctx, func() error { - var forkchoiceResult engine.ForkChoiceResponse - err := c.engineClient.CallContext(ctx, &forkchoiceResult, "engine_forkchoiceUpdatedV3", args, nil) + forkchoiceResult, err := c.engineClient.ForkchoiceUpdated(ctx, args, nil) if err != nil { return fmt.Errorf("forkchoice update failed: %w", err) } @@ -774,8 +791,7 @@ func (c *EngineClient) filterTransactions(ctx context.Context, txs [][]byte, blo // processPayload handles the common logic of getting, submitting, and finalizing a payload. func (c *EngineClient) processPayload(ctx context.Context, payloadID engine.PayloadID, txs [][]byte) ([]byte, uint64, error) { // 1. Get Payload - var payloadResult engine.ExecutionPayloadEnvelope - err := c.engineClient.CallContext(ctx, &payloadResult, "engine_getPayloadV4", payloadID) + payloadResult, err := c.engineClient.GetPayload(ctx, payloadID) if err != nil { return nil, 0, fmt.Errorf("get payload failed: %w", err) } @@ -784,9 +800,8 @@ func (c *EngineClient) processPayload(ctx context.Context, payloadID engine.Payl blockTimestamp := int64(payloadResult.ExecutionPayload.Timestamp) // 2. Submit Payload (newPayload) - var newPayloadResult engine.PayloadStatusV1 err = retryWithBackoffOnPayloadStatus(ctx, func() error { - err := c.engineClient.CallContext(ctx, &newPayloadResult, "engine_newPayloadV4", + newPayloadResult, err := c.engineClient.NewPayload(ctx, payloadResult.ExecutionPayload, []string{}, // No blob hashes common.Hash{}.Hex(), // Use zero hash for parentBeaconBlockRoot @@ -796,7 +811,7 @@ func (c *EngineClient) processPayload(ctx context.Context, payloadID engine.Payl return fmt.Errorf("new payload submission failed: %w", err) } - if err := validatePayloadStatus(newPayloadResult); err != nil { + if err := validatePayloadStatus(*newPayloadResult); err != nil { c.logger.Warn(). Str("status", newPayloadResult.Status). Str("latestValidHash", latestValidHashHex(newPayloadResult.LatestValidHash)). diff --git a/execution/evm/go.mod b/execution/evm/go.mod index 8197f36a6..d77618007 100644 --- a/execution/evm/go.mod +++ b/execution/evm/go.mod @@ -10,9 +10,20 @@ require ( github.com/ipfs/go-datastore v0.9.0 github.com/rs/zerolog v1.34.0 github.com/stretchr/testify v1.11.1 + go.opentelemetry.io/otel v1.39.0 + go.opentelemetry.io/otel/trace v1.39.0 google.golang.org/protobuf v1.36.10 ) +require ( + go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.38.0 // indirect + go.opentelemetry.io/otel/metric v1.39.0 // indirect + go.opentelemetry.io/otel/sdk v1.38.0 // indirect + go.opentelemetry.io/proto/otlp v1.7.1 // indirect +) + require ( github.com/DataDog/zstd v1.5.5 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect @@ -78,14 +89,6 @@ require ( github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect - go.opentelemetry.io/auto/sdk v1.2.1 // indirect - go.opentelemetry.io/otel v1.39.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.38.0 // indirect - go.opentelemetry.io/otel/metric v1.39.0 // indirect - go.opentelemetry.io/otel/sdk v1.38.0 // indirect - go.opentelemetry.io/otel/trace v1.39.0 // indirect - go.opentelemetry.io/proto/otlp v1.7.1 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/crypto v0.46.0 // indirect golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect