Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions src/trace/listener.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,62 @@ describe("TraceListener", () => {
}
});

it("parents aws.lambda to the durable execution init inferred span on first invocation", async () => {
const startSpanSpy = TracerWrapper.prototype.startSpan as jest.Mock;
startSpanSpy.mockClear();

const listener = new TraceListener(defaultConfig);
mockController.mockTraceSource = TraceSource.Event;
mockController.mockSpanContext = {
toTraceId: () => "4110911582297405551",
toSpanId: () => "797643193680388251",
_sampling: {
priority: "2",
},
};
mockController.mockSpanContextWrapper = {
spanContext: mockController.mockSpanContext,
};

const durableEvent = {
DurableExecutionArn:
"arn:aws:lambda:us-east-1:123456789012:function:my-func:1/durable-execution/my-execution/550e8400-e29b-41d4-a716-446655440004",
CheckpointToken: "checkpoint-token",
InitialExecutionState: {
Operations: [
{
Id: "op-1",
Name: "input",
Status: "RUNNING",
StartTimestamp: 1710000000000,
},
],
},
};

await listener.onStartInvocation(durableEvent, context as any);
const unwrappedFunc = () => {};
listener.onWrap(unwrappedFunc);

expect(startSpanSpy).toHaveBeenCalledWith(
"aws.durable.execution_init",
expect.objectContaining({
childOf: mockController.mockSpanContext,
startTime: 1710000000000,
}),
);
expect(wrapSpy).toHaveBeenCalledWith(
"aws.lambda",
expect.objectContaining({
childOf: expect.objectContaining({
toSpanId: expect.any(Function),
toTraceId: expect.any(Function),
}),
}),
unwrappedFunc,
);
});

it("sets execution_status tag on the aws.lambda span when result.Status is valid", async () => {
const mockSetTag = jest.fn();
const mockSpan = { setTag: mockSetTag };
Expand Down
3 changes: 2 additions & 1 deletion src/trace/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,14 @@ export class TraceListener {
traceSource: this.contextService.traceSource,
});
}
this.durableFunctionContext = extractDurableFunctionContext(event);
if (this.config.createInferredSpan) {
this.inferredSpan = this.inferrer.createInferredSpan(
event,
context,
parentSpanContext,
this.config.encodeAuthorizerContext,
this.durableFunctionContext,
);
}

Expand All @@ -157,7 +159,6 @@ export class TraceListener {
const eventSource = parseEventSource(event);
this.triggerTags = extractTriggerTags(event, context, eventSource);
this.stepFunctionContext = StepFunctionContextService.instance().context;
this.durableFunctionContext = extractDurableFunctionContext(event);

if (this.config.addSpanPointers) {
this.spanPointerAttributesList = getSpanPointerAttributes(eventSource, event);
Expand Down
92 changes: 92 additions & 0 deletions src/trace/span-inferrer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,53 @@ const apiGatewayWSSRequestAuthorizerConnect = require("../../event_samples/api-g
const apiGatewayWSSRequestAuthorizerMessage = require("../../event_samples/api-gateway-traced-authorizer-request-websocket-message.json");
const s3Event = require("../../event_samples/s3.json");
const functionUrlEvent = require("../../event_samples/lambda-function-urls.json");

const durableFirstInvocationEvent = {
DurableExecutionArn:
"arn:aws:lambda:us-east-1:123456789012:function:my-func:1/durable-execution/my-execution/550e8400-e29b-41d4-a716-446655440004",
CheckpointToken: "checkpoint-token",
InitialExecutionState: {
Operations: [
{
Id: "op-1",
Name: "input",
Status: "RUNNING",
StartTimestamp: 1710000000000,
},
],
},
};

const durableReplayInvocationEvent = {
...durableFirstInvocationEvent,
InitialExecutionState: {
Operations: [
{
Id: "op-1",
Name: "input",
Status: "SUCCEEDED",
StartTimestamp: 1710000000000,
},
{
Id: "op-2",
Name: "step_1",
Status: "RUNNING",
},
],
},
};

const durableFirstInvocationContext = {
"aws_lambda.durable_function.execution_name": "my-execution",
"aws_lambda.durable_function.execution_id": "550e8400-e29b-41d4-a716-446655440004",
"aws_lambda.durable_function.first_invocation": "true",
};

const durableReplayInvocationContext = {
...durableFirstInvocationContext,
"aws_lambda.durable_function.first_invocation": "false",
};

const mockWrapper = {
startSpan: jest.fn(),
};
Expand Down Expand Up @@ -876,6 +923,51 @@ describe("SpanInferrer", () => {
]);
});

it("creates an inferred execution init span for first durable execution invocation", () => {
const inferrer = new SpanInferrer(mockWrapper as unknown as TracerWrapper);
inferrer.createInferredSpan(
durableFirstInvocationEvent,
{} as any,
{} as SpanContext,
true,
durableFirstInvocationContext as any,
);

expect(mockWrapper.startSpan).toBeCalledWith("aws.durable.execution_init", {
childOf: {},
startTime: 1710000000000,
tags: {
_inferred_span: { synchronicity: "async", tag_source: "self" },
"durable.execution_arn":
"arn:aws:lambda:us-east-1:123456789012:function:my-func:1/durable-execution/my-execution/550e8400-e29b-41d4-a716-446655440004",
"durable.execution_id": "550e8400-e29b-41d4-a716-446655440004",
"durable.execution_name": "my-execution",
operation_name: "aws.durable.execution_init",
"peer.service": "mock-lambda-service",
request_id: undefined,
"resource.name": "my-execution",
resource_names: "my-execution",
service: "aws.durable-execution",
"service.name": "aws.durable-execution",
"span.kind": "server",
"span.type": "serverless",
},
});
});

it("does not create an execution init span for replay durable invocations", () => {
const inferrer = new SpanInferrer(mockWrapper as unknown as TracerWrapper);
inferrer.createInferredSpan(
durableReplayInvocationEvent,
{} as any,
{} as SpanContext,
true,
durableReplayInvocationContext as any,
);

expect(mockWrapper.startSpan).not.toBeCalled();
});

it("creates an inferred span for websocket events", () => {
const inferrer = new SpanInferrer(mockWrapper as unknown as TracerWrapper);
inferrer.createInferredSpan(webSocketEvent, {} as any, {} as SpanContext);
Expand Down
58 changes: 58 additions & 0 deletions src/trace/span-inferrer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { logDebug } from "../utils";
import { parseLambdaARN } from "../utils/arn";
import { HTTPEventTraceExtractor } from "./context/extractors";
import { HTTPEventSubType } from "./context/extractors/http";
import { DurableFunctionContext } from "./durable-function-context";

export class SpanInferrer {
private static serviceMapping: Record<string, string> = {};
Expand Down Expand Up @@ -47,7 +48,16 @@ export class SpanInferrer {
context: Context | undefined,
parentSpanContext: SpanContext | undefined,
decodeAuthorizerContext: boolean = true,
durableFunctionContext?: DurableFunctionContext,
): any {
if (durableFunctionContext) {
return this.createInferredSpanForDurableExecutionInit(
event,
context,
parentSpanContext,
durableFunctionContext,
);
}
const eventSource = parseEventSource(event);
if (eventSource === eventTypes.lambdaUrl) {
return this.createInferredSpanForLambdaUrl(event, context, parentSpanContext);
Expand Down Expand Up @@ -75,6 +85,54 @@ export class SpanInferrer {
}
}

createInferredSpanForDurableExecutionInit(
event: any,
context: Context | undefined,
parentSpanContext: SpanContext | undefined,
durableFunctionContext: DurableFunctionContext,
): SpanWrapper | undefined {

if (durableFunctionContext["aws_lambda.durable_function.first_invocation"] !== "true") return;

const durableExecutionArn = event?.DurableExecutionArn;
const parsedStartTime = Number(event?.InitialExecutionState?.Operations?.[0]?.StartTimestamp);
if (!Number.isFinite(parsedStartTime)) {
return;
}

const serviceName = process.env.DD_DURABLE_EXECUTION_SERVICE || "aws.durable-execution";
const executionName = durableFunctionContext["aws_lambda.durable_function.execution_name"];
const executionId = durableFunctionContext["aws_lambda.durable_function.execution_id"];
const resourceName = executionName;

const options: SpanOptions = {
startTime: parsedStartTime,
tags: {
operation_name: "aws.durable.execution_init",
resource_names: resourceName,
request_id: context?.awsRequestId,
service: serviceName,
"service.name": serviceName,
"span.type": "serverless",
"resource.name": resourceName,
"peer.service": this.service,
"span.kind": "server",
"durable.execution_arn": durableExecutionArn,
"durable.execution_name": executionName,
"durable.execution_id": executionId,
_inferred_span: {
tag_source: "self",
synchronicity: "async",
},
},
};
if (parentSpanContext) {
options.childOf = parentSpanContext;
}

return new SpanWrapper(this.traceWrapper.startSpan("aws.durable.execution_init", options), { isAsync: true });
}

isApiGatewayAsync(event: any): string {
if (event.headers && event.headers["X-Amz-Invocation-Type"] && event.headers["X-Amz-Invocation-Type"] === "Event") {
return "async";
Expand Down
Loading