diff --git a/src/trace/listener.spec.ts b/src/trace/listener.spec.ts index 246a907f..6e970eee 100644 --- a/src/trace/listener.spec.ts +++ b/src/trace/listener.spec.ts @@ -569,6 +569,62 @@ describe("TraceListener", () => { } }); + it("parents aws.lambda to the durable execution init inferred span on first invocation", async () => { + const startSpanSpy = TracerWrapper.prototype.startSpan as jest.Mock; + startSpanSpy.mockClear(); + + const listener = new TraceListener(defaultConfig); + mockController.mockTraceSource = TraceSource.Event; + mockController.mockSpanContext = { + toTraceId: () => "4110911582297405551", + toSpanId: () => "797643193680388251", + _sampling: { + priority: "2", + }, + }; + mockController.mockSpanContextWrapper = { + spanContext: mockController.mockSpanContext, + }; + + const durableEvent = { + DurableExecutionArn: + "arn:aws:lambda:us-east-1:123456789012:function:my-func:1/durable-execution/my-execution/550e8400-e29b-41d4-a716-446655440004", + CheckpointToken: "checkpoint-token", + InitialExecutionState: { + Operations: [ + { + Id: "op-1", + Name: "input", + Status: "RUNNING", + StartTimestamp: 1710000000000, + }, + ], + }, + }; + + await listener.onStartInvocation(durableEvent, context as any); + const unwrappedFunc = () => {}; + listener.onWrap(unwrappedFunc); + + expect(startSpanSpy).toHaveBeenCalledWith( + "aws.durable.execution_init", + expect.objectContaining({ + childOf: mockController.mockSpanContext, + startTime: 1710000000000, + }), + ); + expect(wrapSpy).toHaveBeenCalledWith( + "aws.lambda", + expect.objectContaining({ + childOf: expect.objectContaining({ + toSpanId: expect.any(Function), + toTraceId: expect.any(Function), + }), + }), + unwrappedFunc, + ); + }); + it("sets execution_status tag on the aws.lambda span when result.Status is valid", async () => { const mockSetTag = jest.fn(); const mockSpan = { setTag: mockSetTag }; diff --git a/src/trace/listener.ts b/src/trace/listener.ts index f068c535..7917aa01 100644 --- a/src/trace/listener.ts +++ b/src/trace/listener.ts @@ -143,12 +143,14 @@ export class TraceListener { traceSource: this.contextService.traceSource, }); } + this.durableFunctionContext = extractDurableFunctionContext(event); if (this.config.createInferredSpan) { this.inferredSpan = this.inferrer.createInferredSpan( event, context, parentSpanContext, this.config.encodeAuthorizerContext, + this.durableFunctionContext, ); } @@ -157,7 +159,6 @@ export class TraceListener { const eventSource = parseEventSource(event); this.triggerTags = extractTriggerTags(event, context, eventSource); this.stepFunctionContext = StepFunctionContextService.instance().context; - this.durableFunctionContext = extractDurableFunctionContext(event); if (this.config.addSpanPointers) { this.spanPointerAttributesList = getSpanPointerAttributes(eventSource, event); diff --git a/src/trace/span-inferrer.spec.ts b/src/trace/span-inferrer.spec.ts index 17332044..d4b4975d 100644 --- a/src/trace/span-inferrer.spec.ts +++ b/src/trace/span-inferrer.spec.ts @@ -24,6 +24,53 @@ const apiGatewayWSSRequestAuthorizerConnect = require("../../event_samples/api-g const apiGatewayWSSRequestAuthorizerMessage = require("../../event_samples/api-gateway-traced-authorizer-request-websocket-message.json"); const s3Event = require("../../event_samples/s3.json"); const functionUrlEvent = require("../../event_samples/lambda-function-urls.json"); + +const durableFirstInvocationEvent = { + DurableExecutionArn: + "arn:aws:lambda:us-east-1:123456789012:function:my-func:1/durable-execution/my-execution/550e8400-e29b-41d4-a716-446655440004", + CheckpointToken: "checkpoint-token", + InitialExecutionState: { + Operations: [ + { + Id: "op-1", + Name: "input", + Status: "RUNNING", + StartTimestamp: 1710000000000, + }, + ], + }, +}; + +const durableReplayInvocationEvent = { + ...durableFirstInvocationEvent, + InitialExecutionState: { + Operations: [ + { + Id: "op-1", + Name: "input", + Status: "SUCCEEDED", + StartTimestamp: 1710000000000, + }, + { + Id: "op-2", + Name: "step_1", + Status: "RUNNING", + }, + ], + }, +}; + +const durableFirstInvocationContext = { + "aws_lambda.durable_function.execution_name": "my-execution", + "aws_lambda.durable_function.execution_id": "550e8400-e29b-41d4-a716-446655440004", + "aws_lambda.durable_function.first_invocation": "true", +}; + +const durableReplayInvocationContext = { + ...durableFirstInvocationContext, + "aws_lambda.durable_function.first_invocation": "false", +}; + const mockWrapper = { startSpan: jest.fn(), }; @@ -876,6 +923,51 @@ describe("SpanInferrer", () => { ]); }); + it("creates an inferred execution init span for first durable execution invocation", () => { + const inferrer = new SpanInferrer(mockWrapper as unknown as TracerWrapper); + inferrer.createInferredSpan( + durableFirstInvocationEvent, + {} as any, + {} as SpanContext, + true, + durableFirstInvocationContext as any, + ); + + expect(mockWrapper.startSpan).toBeCalledWith("aws.durable.execution_init", { + childOf: {}, + startTime: 1710000000000, + tags: { + _inferred_span: { synchronicity: "async", tag_source: "self" }, + "durable.execution_arn": + "arn:aws:lambda:us-east-1:123456789012:function:my-func:1/durable-execution/my-execution/550e8400-e29b-41d4-a716-446655440004", + "durable.execution_id": "550e8400-e29b-41d4-a716-446655440004", + "durable.execution_name": "my-execution", + operation_name: "aws.durable.execution_init", + "peer.service": "mock-lambda-service", + request_id: undefined, + "resource.name": "my-execution", + resource_names: "my-execution", + service: "aws.durable-execution", + "service.name": "aws.durable-execution", + "span.kind": "server", + "span.type": "serverless", + }, + }); + }); + + it("does not create an execution init span for replay durable invocations", () => { + const inferrer = new SpanInferrer(mockWrapper as unknown as TracerWrapper); + inferrer.createInferredSpan( + durableReplayInvocationEvent, + {} as any, + {} as SpanContext, + true, + durableReplayInvocationContext as any, + ); + + expect(mockWrapper.startSpan).not.toBeCalled(); + }); + it("creates an inferred span for websocket events", () => { const inferrer = new SpanInferrer(mockWrapper as unknown as TracerWrapper); inferrer.createInferredSpan(webSocketEvent, {} as any, {} as SpanContext); diff --git a/src/trace/span-inferrer.ts b/src/trace/span-inferrer.ts index d6c4ae8f..7844b88d 100644 --- a/src/trace/span-inferrer.ts +++ b/src/trace/span-inferrer.ts @@ -16,6 +16,7 @@ import { logDebug } from "../utils"; import { parseLambdaARN } from "../utils/arn"; import { HTTPEventTraceExtractor } from "./context/extractors"; import { HTTPEventSubType } from "./context/extractors/http"; +import { DurableFunctionContext } from "./durable-function-context"; export class SpanInferrer { private static serviceMapping: Record = {}; @@ -47,7 +48,16 @@ export class SpanInferrer { context: Context | undefined, parentSpanContext: SpanContext | undefined, decodeAuthorizerContext: boolean = true, + durableFunctionContext?: DurableFunctionContext, ): any { + if (durableFunctionContext) { + return this.createInferredSpanForDurableExecutionInit( + event, + context, + parentSpanContext, + durableFunctionContext, + ); + } const eventSource = parseEventSource(event); if (eventSource === eventTypes.lambdaUrl) { return this.createInferredSpanForLambdaUrl(event, context, parentSpanContext); @@ -75,6 +85,54 @@ export class SpanInferrer { } } + createInferredSpanForDurableExecutionInit( + event: any, + context: Context | undefined, + parentSpanContext: SpanContext | undefined, + durableFunctionContext: DurableFunctionContext, + ): SpanWrapper | undefined { + + if (durableFunctionContext["aws_lambda.durable_function.first_invocation"] !== "true") return; + + const durableExecutionArn = event?.DurableExecutionArn; + const parsedStartTime = Number(event?.InitialExecutionState?.Operations?.[0]?.StartTimestamp); + if (!Number.isFinite(parsedStartTime)) { + return; + } + + const serviceName = process.env.DD_DURABLE_EXECUTION_SERVICE || "aws.durable-execution"; + const executionName = durableFunctionContext["aws_lambda.durable_function.execution_name"]; + const executionId = durableFunctionContext["aws_lambda.durable_function.execution_id"]; + const resourceName = executionName; + + const options: SpanOptions = { + startTime: parsedStartTime, + tags: { + operation_name: "aws.durable.execution_init", + resource_names: resourceName, + request_id: context?.awsRequestId, + service: serviceName, + "service.name": serviceName, + "span.type": "serverless", + "resource.name": resourceName, + "peer.service": this.service, + "span.kind": "server", + "durable.execution_arn": durableExecutionArn, + "durable.execution_name": executionName, + "durable.execution_id": executionId, + _inferred_span: { + tag_source: "self", + synchronicity: "async", + }, + }, + }; + if (parentSpanContext) { + options.childOf = parentSpanContext; + } + + return new SpanWrapper(this.traceWrapper.startSpan("aws.durable.execution_init", options), { isAsync: true }); + } + isApiGatewayAsync(event: any): string { if (event.headers && event.headers["X-Amz-Invocation-Type"] && event.headers["X-Amz-Invocation-Type"] === "Event") { return "async";