Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions execution/evm/engine_rpc_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package evm

import (
"context"

"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/rpc"
)

var _ EngineRPCClient = (*engineRPCClient)(nil)

// engineRPCClient is the concrete implementation wrapping *rpc.Client.
type engineRPCClient struct {
client *rpc.Client
}

// NewEngineRPCClient creates a new Engine API client.
func NewEngineRPCClient(client *rpc.Client) EngineRPCClient {
return &engineRPCClient{client: client}
}

func (e *engineRPCClient) ForkchoiceUpdated(ctx context.Context, state engine.ForkchoiceStateV1, args map[string]any) (*engine.ForkChoiceResponse, error) {
var result engine.ForkChoiceResponse
err := e.client.CallContext(ctx, &result, "engine_forkchoiceUpdatedV3", state, args)
if err != nil {
return nil, err
}
return &result, nil
}

func (e *engineRPCClient) GetPayload(ctx context.Context, payloadID engine.PayloadID) (*engine.ExecutionPayloadEnvelope, error) {
var result engine.ExecutionPayloadEnvelope
err := e.client.CallContext(ctx, &result, "engine_getPayloadV4", payloadID)
if err != nil {
return nil, err
}
return &result, nil
}

func (e *engineRPCClient) NewPayload(ctx context.Context, payload *engine.ExecutableData, blobHashes []string, parentBeaconBlockRoot string, executionRequests [][]byte) (*engine.PayloadStatusV1, error) {
var result engine.PayloadStatusV1
err := e.client.CallContext(ctx, &result, "engine_newPayloadV4", payload, blobHashes, parentBeaconBlockRoot, executionRequests)
if err != nil {
return nil, err
}
return &result, nil
}
122 changes: 122 additions & 0 deletions execution/evm/engine_rpc_tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package evm

import (
"context"

"github.com/ethereum/go-ethereum/beacon/engine"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

var _ EngineRPCClient = (*tracedEngineRPCClient)(nil)

// tracedEngineRPCClient wraps an EngineRPCClient and records spans.
type tracedEngineRPCClient struct {
inner EngineRPCClient
tracer trace.Tracer
}

// withTracingEngineRPCClient decorates an EngineRPCClient with OpenTelemetry spans.
func withTracingEngineRPCClient(inner EngineRPCClient) EngineRPCClient {
return &tracedEngineRPCClient{
inner: inner,
tracer: otel.Tracer("ev-node/execution/engine-rpc"),
}
}

func (t *tracedEngineRPCClient) ForkchoiceUpdated(ctx context.Context, state engine.ForkchoiceStateV1, args map[string]any) (*engine.ForkChoiceResponse, error) {
ctx, span := t.tracer.Start(ctx, "Engine.ForkchoiceUpdated",
trace.WithAttributes(
attribute.String("method", "engine_forkchoiceUpdatedV3"),
attribute.String("head_block_hash", state.HeadBlockHash.Hex()),
attribute.String("safe_block_hash", state.SafeBlockHash.Hex()),
attribute.String("finalized_block_hash", state.FinalizedBlockHash.Hex()),
),
)
defer span.End()

result, err := t.inner.ForkchoiceUpdated(ctx, state, args)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}

attributes := []attribute.KeyValue{
attribute.String("payload_status", result.PayloadStatus.Status),
}

if result.PayloadID != nil {
attributes = append(attributes, attribute.String("payload_id", result.PayloadID.String()))
}

if result.PayloadStatus.LatestValidHash != nil {
attributes = append(attributes, attribute.String("latest_valid_hash", result.PayloadStatus.LatestValidHash.Hex()))
}

span.SetAttributes(
attributes...,
)

return result, nil
}

func (t *tracedEngineRPCClient) GetPayload(ctx context.Context, payloadID engine.PayloadID) (*engine.ExecutionPayloadEnvelope, error) {
ctx, span := t.tracer.Start(ctx, "Engine.GetPayload",
trace.WithAttributes(
attribute.String("method", "engine_getPayloadV4"),
attribute.String("payload_id", payloadID.String()),
),
)
defer span.End()

result, err := t.inner.GetPayload(ctx, payloadID)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}

span.SetAttributes(
attribute.Int64("block_number", int64(result.ExecutionPayload.Number)),
attribute.String("block_hash", result.ExecutionPayload.BlockHash.Hex()),
attribute.String("state_root", result.ExecutionPayload.StateRoot.Hex()),
attribute.Int("tx_count", len(result.ExecutionPayload.Transactions)),
attribute.Int64("gas_used", int64(result.ExecutionPayload.GasUsed)),
)

return result, nil
}

func (t *tracedEngineRPCClient) NewPayload(ctx context.Context, payload *engine.ExecutableData, blobHashes []string, parentBeaconBlockRoot string, executionRequests [][]byte) (*engine.PayloadStatusV1, error) {
ctx, span := t.tracer.Start(ctx, "Engine.NewPayload",
trace.WithAttributes(
attribute.String("method", "engine_newPayloadV4"),
attribute.Int64("block_number", int64(payload.Number)),
attribute.String("block_hash", payload.BlockHash.Hex()),
attribute.String("parent_hash", payload.ParentHash.Hex()),
attribute.Int("tx_count", len(payload.Transactions)),
attribute.Int64("gas_used", int64(payload.GasUsed)),
),
)
defer span.End()

result, err := t.inner.NewPayload(ctx, payload, blobHashes, parentBeaconBlockRoot, executionRequests)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}

attributes := []attribute.KeyValue{attribute.String("payload_status", result.Status)}

if result.LatestValidHash != nil {
attributes = append(attributes, attribute.String("latest_valid_hash", result.LatestValidHash.Hex()))
}

span.SetAttributes(attributes...)

return result, nil
}
41 changes: 28 additions & 13 deletions execution/evm/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,23 @@ func retryWithBackoffOnPayloadStatus(ctx context.Context, fn func() error, maxRe
return fmt.Errorf("max retries (%d) exceeded for %s", maxRetries, operation)
}

// EngineRPCClient abstracts Engine API RPC calls for tracing and testing.
type EngineRPCClient interface {
// ForkchoiceUpdated updates the forkchoice state and optionally starts payload building.
ForkchoiceUpdated(ctx context.Context, state engine.ForkchoiceStateV1, args map[string]any) (*engine.ForkChoiceResponse, error)

// GetPayload retrieves a previously requested execution payload.
GetPayload(ctx context.Context, payloadID engine.PayloadID) (*engine.ExecutionPayloadEnvelope, error)

// NewPayload submits a new execution payload for validation.
NewPayload(ctx context.Context, payload *engine.ExecutableData, blobHashes []string, parentBeaconBlockRoot string, executionRequests [][]byte) (*engine.PayloadStatusV1, error)
}

// EngineClient represents a client that interacts with an Ethereum execution engine
// through the Engine API. It manages connections to both the engine and standard Ethereum
// APIs, and maintains state related to block processing.
type EngineClient struct {
engineClient *rpc.Client // Client for Engine API calls
engineClient EngineRPCClient // Client for Engine API calls
ethClient *ethclient.Client // Client for standard Ethereum API calls
genesisHash common.Hash // Hash of the genesis block
initialHeight uint64
Expand Down Expand Up @@ -206,11 +218,19 @@ func NewEngineExecutionClient(
}
return nil
}))
engineClient, err := rpc.DialOptions(context.Background(), engineURL, engineOptions...)
rawEngineClient, err := rpc.DialOptions(context.Background(), engineURL, engineOptions...)
if err != nil {
return nil, err
}

// raw engine client
engineClient := NewEngineRPCClient(rawEngineClient)

// if tracing enabled, wrap with traced decorator
if tracingEnabled {
engineClient = withTracingEngineRPCClient(engineClient)
}

return &EngineClient{
engineClient: engineClient,
ethClient: ethClient,
Expand Down Expand Up @@ -238,8 +258,7 @@ func (c *EngineClient) InitChain(ctx context.Context, genesisTime time.Time, ini

// Acknowledge the genesis block with retry logic for SYNCING status
err := retryWithBackoffOnPayloadStatus(ctx, func() error {
var forkchoiceResult engine.ForkChoiceResponse
err := c.engineClient.CallContext(ctx, &forkchoiceResult, "engine_forkchoiceUpdatedV3",
forkchoiceResult, err := c.engineClient.ForkchoiceUpdated(ctx,
engine.ForkchoiceStateV1{
HeadBlockHash: c.genesisHash,
SafeBlockHash: c.genesisHash,
Expand Down Expand Up @@ -372,8 +391,7 @@ func (c *EngineClient) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight
// 3. Call forkchoice update to get PayloadID
var newPayloadID *engine.PayloadID
err = retryWithBackoffOnPayloadStatus(ctx, func() error {
var forkchoiceResult engine.ForkChoiceResponse
err := c.engineClient.CallContext(ctx, &forkchoiceResult, "engine_forkchoiceUpdatedV3", args, evPayloadAttrs)
forkchoiceResult, err := c.engineClient.ForkchoiceUpdated(ctx, args, evPayloadAttrs)
if err != nil {
return fmt.Errorf("forkchoice update failed: %w", err)
}
Expand Down Expand Up @@ -522,8 +540,7 @@ func (c *EngineClient) setFinalWithHeight(ctx context.Context, blockHash common.
func (c *EngineClient) doForkchoiceUpdate(ctx context.Context, args engine.ForkchoiceStateV1, operation string) error {
// Call forkchoice update with retry logic for SYNCING status
err := retryWithBackoffOnPayloadStatus(ctx, func() error {
var forkchoiceResult engine.ForkChoiceResponse
err := c.engineClient.CallContext(ctx, &forkchoiceResult, "engine_forkchoiceUpdatedV3", args, nil)
forkchoiceResult, err := c.engineClient.ForkchoiceUpdated(ctx, args, nil)
if err != nil {
return fmt.Errorf("forkchoice update failed: %w", err)
}
Expand Down Expand Up @@ -774,8 +791,7 @@ func (c *EngineClient) filterTransactions(ctx context.Context, txs [][]byte, blo
// processPayload handles the common logic of getting, submitting, and finalizing a payload.
func (c *EngineClient) processPayload(ctx context.Context, payloadID engine.PayloadID, txs [][]byte) ([]byte, uint64, error) {
// 1. Get Payload
var payloadResult engine.ExecutionPayloadEnvelope
err := c.engineClient.CallContext(ctx, &payloadResult, "engine_getPayloadV4", payloadID)
payloadResult, err := c.engineClient.GetPayload(ctx, payloadID)
if err != nil {
return nil, 0, fmt.Errorf("get payload failed: %w", err)
}
Expand All @@ -784,9 +800,8 @@ func (c *EngineClient) processPayload(ctx context.Context, payloadID engine.Payl
blockTimestamp := int64(payloadResult.ExecutionPayload.Timestamp)

// 2. Submit Payload (newPayload)
var newPayloadResult engine.PayloadStatusV1
err = retryWithBackoffOnPayloadStatus(ctx, func() error {
err := c.engineClient.CallContext(ctx, &newPayloadResult, "engine_newPayloadV4",
newPayloadResult, err := c.engineClient.NewPayload(ctx,
payloadResult.ExecutionPayload,
[]string{}, // No blob hashes
common.Hash{}.Hex(), // Use zero hash for parentBeaconBlockRoot
Expand All @@ -796,7 +811,7 @@ func (c *EngineClient) processPayload(ctx context.Context, payloadID engine.Payl
return fmt.Errorf("new payload submission failed: %w", err)
}

if err := validatePayloadStatus(newPayloadResult); err != nil {
if err := validatePayloadStatus(*newPayloadResult); err != nil {
c.logger.Warn().
Str("status", newPayloadResult.Status).
Str("latestValidHash", latestValidHashHex(newPayloadResult.LatestValidHash)).
Expand Down
19 changes: 11 additions & 8 deletions execution/evm/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,20 @@ require (
github.com/ipfs/go-datastore v0.9.0
github.com/rs/zerolog v1.34.0
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/otel v1.39.0
go.opentelemetry.io/otel/trace v1.39.0
google.golang.org/protobuf v1.36.10
)

require (
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.38.0 // indirect
go.opentelemetry.io/otel/metric v1.39.0 // indirect
go.opentelemetry.io/otel/sdk v1.38.0 // indirect
go.opentelemetry.io/proto/otlp v1.7.1 // indirect
)

require (
github.com/DataDog/zstd v1.5.5 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
Expand Down Expand Up @@ -78,14 +89,6 @@ require (
github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel v1.39.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.38.0 // indirect
go.opentelemetry.io/otel/metric v1.39.0 // indirect
go.opentelemetry.io/otel/sdk v1.38.0 // indirect
go.opentelemetry.io/otel/trace v1.39.0 // indirect
go.opentelemetry.io/proto/otlp v1.7.1 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/crypto v0.46.0 // indirect
golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect
Expand Down
Loading