From d323c6e1ef18a5479556f8fa642b7fd69db678e3 Mon Sep 17 00:00:00 2001 From: Yohe-Am <56622350+Yohe-Am@users.noreply.github.com> Date: Tue, 3 Mar 2026 14:59:48 +0300 Subject: [PATCH 1/6] fix: proper SigV4 Signed-off-by: Yohe-Am <56622350+Yohe-Am@users.noreply.github.com> --- src/Frontend/Http.ts | 140 +++++++++++++++++++-- src/Frontend/Multipart/Put.ts | 9 +- src/Frontend/Objects/Put.ts | 10 +- src/Services/Auth.ts | 215 +++++++++++++++++++++----------- src/Services/AwsChunked.ts | 169 +++++++++++++++++++++++++ src/Services/Backend.ts | 11 ++ src/Services/S3HeaderService.ts | 19 ++- src/Services/S3Xml.ts | 10 ++ tests/aws-chunked.test.ts | 77 ++++++++++++ tests/sigv4-integration.test.ts | 186 +++++++++++++++++++++++++++ 10 files changed, 758 insertions(+), 88 deletions(-) create mode 100644 src/Services/AwsChunked.ts create mode 100644 tests/aws-chunked.test.ts create mode 100644 tests/sigv4-integration.test.ts diff --git a/src/Frontend/Http.ts b/src/Frontend/Http.ts index 40659cf..4f88858 100644 --- a/src/Frontend/Http.ts +++ b/src/Frontend/Http.ts @@ -4,8 +4,15 @@ import { HttpRouter, HttpServerResponse, } from "@effect/platform"; -import { Effect, Layer } from "effect"; -import { Backend, MethodNotAllowed } from "../Services/Backend.ts"; +import { Effect, Layer, Option } from "effect"; +import { + AccessDenied, + Backend, + InvalidAccessKeyId, + InvalidArgument, + MethodNotAllowed, + RequestTimeTooSkewed, +} from "../Services/Backend.ts"; import { BackendResolver } from "../Services/BackendResolver.ts"; import { S3Xml } from "../Services/S3Xml.ts"; import { RequestContext } from "./Utils.ts"; @@ -21,6 +28,8 @@ import { headBucket } from "./Buckets/Head.ts"; import { HttpHeraldApi } from "../Api.ts"; import { BadGateway } from "./Api.ts"; import * as HttpServerRequest from "@effect/platform/HttpServerRequest"; +import { HeraldConfig } from "../Config/Layer.ts"; +import { verifyIncomingSigV4Detailed } from "../Services/Auth.ts"; /** * Middleware that at debug log level logs every outgoing response's status and @@ -48,6 +57,42 @@ export const responseDebugLoggingMiddleware = HttpMiddleware.make((app) => }) ); +function hasSigV4Credentials( + request: HttpServerRequest.HttpServerRequest, +): boolean { + if (typeof request.headers["authorization"] === "string") { + return true; + } + const hostHeader = request.headers["host"]; + const host = typeof hostHeader === "string" ? hostHeader : "localhost"; + const protocol = request.url.startsWith("https") ? "https:" : "http:"; + const url = new URL(request.url, `${protocol}//${host}`); + return url.searchParams.has("X-Amz-Signature"); +} + +function isPostObjectMultipartRequest( + request: HttpServerRequest.HttpServerRequest, +): boolean { + if (request.method !== "POST") { + return false; + } + const contentType = request.headers["content-type"] ?? + request.headers["Content-Type"]; + const contentTypeValue = Array.isArray(contentType) + ? contentType[0] + : contentType; + return typeof contentTypeValue === "string" && + contentTypeValue.toLowerCase().startsWith("multipart/form-data"); +} + +function isLegacyAwsAuthorizationRequest( + request: HttpServerRequest.HttpServerRequest, +): boolean { + const authorization = request.headers["authorization"]; + return typeof authorization === "string" && + authorization.startsWith("AWS "); +} + /** Build annotations and log 5xx as error, 4xx as warning; return response. */ function logRequestFailureAndReturn( err: unknown, @@ -99,6 +144,7 @@ export const makeS3Router = (prefix = "") => Effect.gen(function* () { const s3Xml = yield* S3Xml; const resolver = yield* BackendResolver; + const config = yield* HeraldConfig; const frontHandler = ( handler: Effect.Effect, @@ -136,19 +182,95 @@ export const makeS3Router = (prefix = "") => const isHead = request.method === "HEAD"; const method = request.method ?? "UNKNOWN"; - const backend = yield* resolver.getLayerForBucket(bucket); - const backendLayer = Layer.succeed(Backend, backend); - const attrs = { bucket, method }; - return yield* handler.pipe( - Effect.provideService(RequestContext, { bucket }), - Effect.provide(backendLayer), + return yield* Effect.gen(function* () { + if (bucket !== "") { + const authCredentials = config.resolveAuth(bucket); + const skipSigV4Auth = isPostObjectMultipartRequest(request) || + isLegacyAwsAuthorizationRequest(request); + if (Option.isSome(authCredentials) && !skipSigV4Auth) { + if (!hasSigV4Credentials(request)) { + return yield* Effect.fail( + new AccessDenied({ message: "Access Denied" }), + ); + } + + const resolvedBucket = config.lookupBucket(bucket); + if (Option.isNone(resolvedBucket)) { + return yield* Effect.fail( + new AccessDenied({ message: "Access Denied" }), + ); + } + const bucketRegion = resolvedBucket.value.region; + if ( + bucketRegion === undefined || bucketRegion.trim() === "" + ) { + return yield* Effect.fail( + new AccessDenied({ message: "Access Denied" }), + ); + } + + const validation = yield* verifyIncomingSigV4Detailed( + request, + authCredentials.value, + bucketRegion, + ); + if (!validation.valid) { + if ( + validation.failure === "MalformedAuthorization" || + validation.failure === "InvalidExpires" + ) { + return yield* Effect.fail( + new InvalidArgument({ + message: "Authorization header is malformed", + }), + ); + } + if (validation.failure === "RequestTimeTooSkewed") { + return yield* Effect.fail( + new RequestTimeTooSkewed({ + message: + "The difference between the request time and the current time is too large.", + }), + ); + } + if ( + validation.failure === "ExpiredPresign" || + validation.failure === "PresignNotYetValid" || + validation.failure === "PresignExpiresTooLong" + ) { + return yield* Effect.fail( + new AccessDenied({ message: "Request has expired" }), + ); + } + if (validation.failure === "UnknownAccessKey") { + return yield* Effect.fail( + new InvalidAccessKeyId({ + message: + "The AWS Access Key Id you provided does not exist in our records.", + }), + ); + } + return yield* Effect.fail( + new AccessDenied({ message: "Access Denied" }), + ); + } + } + } + + const backend = yield* resolver.getLayerForBucket(bucket); + const backendLayer = Layer.succeed(Backend, backend); + + return yield* handler.pipe( + Effect.provideService(RequestContext, { bucket }), + Effect.provide(backendLayer), + ); + }).pipe( // convert the frontend errors to xml and log failure details Effect.catchAll((err: unknown) => { const response = s3Xml.formatError(err, isHead); return logRequestFailureAndReturn(err, response, bucket, method); }), - ).pipe( Effect.annotateLogs(attrs), Effect.withSpan("herald.s3.request", { attributes: attrs }), ); diff --git a/src/Frontend/Multipart/Put.ts b/src/Frontend/Multipart/Put.ts index b1aa625..d1b9354 100644 --- a/src/Frontend/Multipart/Put.ts +++ b/src/Frontend/Multipart/Put.ts @@ -1,6 +1,10 @@ import { Effect } from "effect"; import { HttpServerRequest, HttpServerResponse } from "@effect/platform"; import { S3RequestParser } from "../Utils.ts"; +import { + decodeAwsChunkedBodyStream, + hasAwsChunkedContentEncoding, +} from "../../Services/AwsChunked.ts"; import { Backend, InvalidRequest } from "../../Services/Backend.ts"; import { S3HeaderService } from "../../Services/S3HeaderService.ts"; import { S3Xml } from "../../Services/S3Xml.ts"; @@ -37,12 +41,15 @@ export const uploadPart = Effect.gen(function* () { // S3 allows 0-byte for the last part; no Frontend rejection here. // Swift backend rejects 0-byte segments at CompleteMultipartUpload (SLO manifest requirement). + const bodyStream = hasAwsChunkedContentEncoding(request.headers) + ? decodeAwsChunkedBodyStream(request.stream) + : request.stream; const result = yield* backend.uploadPart( key, s3Params.uploadId, s3Params.partNumber, - request.stream, + bodyStream, request.headers, ).pipe( Effect.catchAll((e) => { diff --git a/src/Frontend/Objects/Put.ts b/src/Frontend/Objects/Put.ts index 8319dd1..b9cf4cf 100644 --- a/src/Frontend/Objects/Put.ts +++ b/src/Frontend/Objects/Put.ts @@ -6,6 +6,10 @@ import { InvalidRequest, } from "../../Services/Backend.ts"; import { BackendResolver } from "../../Services/BackendResolver.ts"; +import { + decodeAwsChunkedBodyStream, + hasAwsChunkedContentEncoding, +} from "../../Services/AwsChunked.ts"; import { ensureClientReadableKey, ensureClientWritableKey, @@ -241,9 +245,13 @@ export const putObject = Effect.gen(function* () { return yield* copyObject; } + const bodyStream = hasAwsChunkedContentEncoding(request.headers) + ? decodeAwsChunkedBodyStream(request.stream) + : request.stream; + const result = yield* backend.putObject( key, - request.stream, + bodyStream, request.headers, ); diff --git a/src/Services/Auth.ts b/src/Services/Auth.ts index 6c1d9a1..5ad250a 100644 --- a/src/Services/Auth.ts +++ b/src/Services/Auth.ts @@ -17,6 +17,24 @@ export class AuthError extends Schema.TaggedError()("AuthError", { message: Schema.String, }) {} +export type SigV4ValidationFailure = + | "MissingCredentials" + | "MissingRegion" + | "MalformedAuthorization" + | "UnknownAccessKey" + | "MissingDate" + | "InvalidDate" + | "InvalidExpires" + | "ExpiredPresign" + | "PresignNotYetValid" + | "PresignExpiresTooLong" + | "RequestTimeTooSkewed" + | "InvalidSignature"; + +export type SigV4ValidationResult = + | { readonly valid: true } + | { readonly valid: false; readonly failure: SigV4ValidationFailure }; + /** * Resolves authentication credentials from environment variables based on refs. */ @@ -43,9 +61,41 @@ export function verifyIncomingSigV4( credentials: AuthCredentials[], region: string, ): Effect.Effect { + return verifyIncomingSigV4Detailed(request, credentials, region).pipe( + Effect.map((result) => result.valid), + ); +} + +function parseSigV4Date(rawDate: string): Date | undefined { + // SigV4 format: YYYYMMDDTHHMMSSZ + if (/^\d{8}T\d{6}Z$/.test(rawDate)) { + const year = rawDate.substring(0, 4); + const month = rawDate.substring(4, 6); + const day = rawDate.substring(6, 8); + const hour = rawDate.substring(9, 11); + const min = rawDate.substring(11, 13); + const sec = rawDate.substring(13, 15); + const parsed = new Date(`${year}-${month}-${day}T${hour}:${min}:${sec}Z`); + return isNaN(parsed.getTime()) ? undefined : parsed; + } + + // AWS2 and some clients send RFC 1123 dates in x-amz-date/date. + const parsed = new Date(rawDate); + return isNaN(parsed.getTime()) ? undefined : parsed; +} + +/** + * Verifies a SigV4 signature for an incoming request and returns a failure + * reason that can be mapped to S3-compatible XML error codes. + */ +export function verifyIncomingSigV4Detailed( + request: HttpServerRequest.HttpServerRequest, + credentials: AuthCredentials[], + region: string, +): Effect.Effect { return Effect.gen(function* () { if (credentials.length === 0) { - return false; + return { valid: false, failure: "UnknownAccessKey" } as const; } const headers: Record = {}; @@ -61,12 +111,18 @@ export function verifyIncomingSigV4( const queryParams = url.searchParams; const hasSigInQuery = queryParams.has("X-Amz-Signature"); - const authHeader = headers["authorization"]; - if (!authHeader && !hasSigInQuery) { - return false; + const rawAuthorization = headers["authorization"]; + const authHeader = rawAuthorization !== undefined && + rawAuthorization.trim() !== "" + ? rawAuthorization + : undefined; + if (authHeader === undefined && !hasSigInQuery) { + return { valid: false, failure: "MissingCredentials" } as const; } let requestAccessKeyId: string | undefined; + let credentialDate: string | undefined; + let credentialService: string | undefined; let signedHeadersList: string[] = []; let headerRegion: string | undefined; @@ -75,8 +131,10 @@ export function verifyIncomingSigV4( if (match && match[1]) { const parts = match[1].split("/"); requestAccessKeyId = parts[0]; - if (parts.length >= 4) { + if (parts.length >= 5) { + credentialDate = parts[1]; headerRegion = parts[2]; + credentialService = parts[3]; } } @@ -84,13 +142,22 @@ export function verifyIncomingSigV4( if (headersMatch && headersMatch[1]) { signedHeadersList = headersMatch[1].split(";"); } + } else if (authHeader !== undefined && !hasSigInQuery) { + return { valid: false, failure: "MalformedAuthorization" } as const; } else if (hasSigInQuery) { + const algorithm = queryParams.get("X-Amz-Algorithm"); + if (algorithm !== "AWS4-HMAC-SHA256") { + return { valid: false, failure: "MalformedAuthorization" } as const; + } + const credential = queryParams.get("X-Amz-Credential"); if (credential && typeof credential === "string") { const parts = credential.split("/"); requestAccessKeyId = parts[0]; - if (parts.length >= 4) { + if (parts.length >= 5) { + credentialDate = parts[1]; headerRegion = parts[2]; + credentialService = parts[3]; } } @@ -98,90 +165,62 @@ export function verifyIncomingSigV4( if (signedHeaders && typeof signedHeaders === "string") { signedHeadersList = signedHeaders.split(";"); } + + const dateParam = queryParams.get("X-Amz-Date"); + const expiresParam = queryParams.get("X-Amz-Expires"); + const signatureParam = queryParams.get("X-Amz-Signature"); + if ( + dateParam === null || expiresParam === null || signatureParam === null + ) { + return { valid: false, failure: "MalformedAuthorization" } as const; + } } if (!requestAccessKeyId) { - return false; + return { valid: false, failure: "MalformedAuthorization" } as const; + } + if (!credentialDate || !credentialService) { + return { valid: false, failure: "MalformedAuthorization" } as const; } // Use region from header if available, otherwise use provided region const effectiveRegion = headerRegion ?? region; + if (!effectiveRegion || effectiveRegion.trim() === "") { + return { valid: false, failure: "MissingRegion" } as const; + } const matchingCreds = credentials.filter( (c) => c.accessKeyId === requestAccessKeyId, ); if (matchingCreds.length === 0) { - return false; - } - - // Filter headers to only those that were signed - const filteredHeaders: Record = {}; - for (const h of signedHeadersList) { - const val = headers[h]; - if (val !== undefined) { - filteredHeaders[h] = val; - } + return { valid: false, failure: "UnknownAccessKey" } as const; } const encoder = new TextEncoder(); for (const cred of matchingCreds) { - const signer = new SignatureV4({ - credentials: { - accessKeyId: cred.accessKeyId, - secretAccessKey: cred.secretAccessKey, - }, - region: effectiveRegion, - service: "s3", - sha256: Sha256, - uriEscapePath: false, // Path is already encoded in rawPath - }); - // Extract signing date from request if possible const amzDate = headers["x-amz-date"]; const dateHeader = headers["date"]; let signingDate: Date | undefined; if (amzDate) { - // format: YYYYMMDDTHHMMSSZ (minimum 15 characters needed for extraction) - if (amzDate.length >= 15) { - const year = amzDate.substring(0, 4); - const month = amzDate.substring(4, 6); - const day = amzDate.substring(6, 8); - const hour = amzDate.substring(9, 11); - const min = amzDate.substring(11, 13); - const sec = amzDate.substring(13, 15); - signingDate = new Date( - `${year}-${month}-${day}T${hour}:${min}:${sec}Z`, - ); - } + signingDate = parseSigV4Date(amzDate); } else if (dateHeader) { - signingDate = new Date(dateHeader); + signingDate = parseSigV4Date(dateHeader); } else if (hasSigInQuery) { const amzDateQuery = queryParams.get("X-Amz-Date"); - if ( - amzDateQuery && typeof amzDateQuery === "string" && - amzDateQuery.length >= 15 - ) { - const year = amzDateQuery.substring(0, 4); - const month = amzDateQuery.substring(4, 6); - const day = amzDateQuery.substring(6, 8); - const hour = amzDateQuery.substring(9, 11); - const min = amzDateQuery.substring(11, 13); - const sec = amzDateQuery.substring(13, 15); - signingDate = new Date( - `${year}-${month}-${day}T${hour}:${min}:${sec}Z`, - ); - } - } - - if (signingDate && isNaN(signingDate.getTime())) { - signingDate = undefined; + signingDate = amzDateQuery ? parseSigV4Date(amzDateQuery) : undefined; } // Validate signingDate: reject if missing or outside allowed windows if (!signingDate) { - return false; + return { + valid: false, + failure: amzDate || dateHeader || queryParams.get("X-Amz-Date") + ? "InvalidDate" + : "MissingDate", + } as const; } const now = new Date(); @@ -192,24 +231,44 @@ export function verifyIncomingSigV4( // For query-presigned requests: validate X-Amz-Expires const expiresParam = queryParams.get("X-Amz-Expires"); if (!expiresParam) { - return false; + return { valid: false, failure: "InvalidExpires" } as const; } // Type-check X-Amz-Expires: must be a valid integer const expires = parseInt(expiresParam, 10); if (isNaN(expires) || expiresParam !== String(expires) || expires < 0) { - return false; + return { valid: false, failure: "InvalidExpires" } as const; + } + + // AWS SigV4 presigned URLs support at most 7 days. + if (expires > 604800) { + return { valid: false, failure: "PresignExpiresTooLong" } as const; } // Reject if expired: now > signingDate + expires const expirationTime = new Date(signingDate.getTime() + expires * 1000); if (now > expirationTime) { - return false; + return { valid: false, failure: "ExpiredPresign" } as const; + } + + // Presigned requests from the future are also invalid. + const nowWithSkew = new Date(now.getTime() + 15 * 60 * 1000); + if (nowWithSkew < signingDate) { + return { valid: false, failure: "PresignNotYetValid" } as const; } } else { // For header-signed requests: enforce ±15 minutes clock skew if (timeDiffMinutes > 15) { - return false; + return { valid: false, failure: "RequestTimeTooSkewed" } as const; + } + } + + // Filter headers to only those that were signed + const filteredHeaders: Record = {}; + for (const h of signedHeadersList) { + const val = headers[h]; + if (val !== undefined) { + filteredHeaders[h] = val; } } @@ -229,14 +288,11 @@ export function verifyIncomingSigV4( }); // Use raw path from request.url to avoid URL constructor decoding - // We want the part between the host and the query string, as-is. const urlString = request.url; const queryIndex = urlString.indexOf("?"); const withoutQuery = queryIndex === -1 ? urlString : urlString.substring(0, queryIndex); - - // Remove protocol and host if present const rawPath = withoutQuery.replace(/^[a-z]+:\/\/[^/]+/, ""); const signableReq: HttpRequest = { @@ -249,12 +305,23 @@ export function verifyIncomingSigV4( headers: filteredHeaders, }; + const signer = new SignatureV4({ + credentials: { + accessKeyId: cred.accessKeyId, + secretAccessKey: cred.secretAccessKey, + }, + region: effectiveRegion, + service: "s3", + sha256: Sha256, + uriEscapePath: false, + }); + const signedResult = yield* Effect.tryPromise({ - try: async () => { - return await signer.sign(signableReq, { + try: async () => + await signer.sign(signableReq, { signingDate, - }); - }, + signableHeaders: new Set(signedHeadersList), + }), catch: (e) => e, }).pipe(Effect.either); @@ -275,7 +342,7 @@ export function verifyIncomingSigV4( encoder.encode(authHeader), encoder.encode(expectedAuth), ); - if (isValid) return true; + if (isValid) return { valid: true } as const; } else { const expectedSig = (signed.query as Record)[ "X-Amz-Signature" @@ -291,10 +358,10 @@ export function verifyIncomingSigV4( encoder.encode(actualSig), encoder.encode(expectedSig), ); - if (isValid) return true; + if (isValid) return { valid: true } as const; } } - return false; + return { valid: false, failure: "InvalidSignature" } as const; }); } diff --git a/src/Services/AwsChunked.ts b/src/Services/AwsChunked.ts new file mode 100644 index 0000000..50b2fc1 --- /dev/null +++ b/src/Services/AwsChunked.ts @@ -0,0 +1,169 @@ +import { Stream } from "effect"; +import { InvalidRequest } from "./Backend.ts"; +import { normalizeHeaders } from "./S3HeaderService.ts"; + +const CR = 13; +const LF = 10; + +const appendBytes = ( + a: Uint8Array, + b: Uint8Array, +): Uint8Array => { + const out = new Uint8Array(a.length + b.length); + out.set(a, 0); + out.set(b, a.length); + return out; +}; + +const findCrlf = (buffer: Uint8Array): number => { + for (let i = 0; i < buffer.length - 1; i++) { + if (buffer[i] === CR && buffer[i + 1] === LF) { + return i; + } + } + return -1; +}; + +const parseChunkSizeLine = (line: string): number => { + const semi = line.indexOf(";"); + const token = (semi === -1 ? line : line.slice(0, semi)).trim(); + if (!/^[0-9a-fA-F]+$/.test(token)) { + throw new InvalidRequest({ + message: "Invalid aws-chunked chunk-size line", + }); + } + return Number.parseInt(token, 16); +}; + +class AwsChunkedParser { + private buffer: Uint8Array = new Uint8Array(0); + private expectedSize = 0; + private phase: "size" | "data" | "data-crlf" | "trailers" | "done" = "size"; + + feed(chunk: Uint8Array): Uint8Array[] { + this.buffer = appendBytes(this.buffer, chunk); + const out: Uint8Array[] = []; + + while (true) { + if (this.phase === "size") { + const idx = findCrlf(this.buffer); + if (idx === -1) break; + const lineBytes = this.buffer.slice(0, idx); + const line = new TextDecoder().decode(lineBytes); + this.buffer = this.buffer.slice(idx + 2); + this.expectedSize = parseChunkSizeLine(line); + if (this.expectedSize === 0) { + this.phase = "trailers"; + } else { + this.phase = "data"; + } + continue; + } + + if (this.phase === "data") { + if (this.buffer.length < this.expectedSize) break; + out.push(this.buffer.slice(0, this.expectedSize)); + this.buffer = this.buffer.slice(this.expectedSize); + this.phase = "data-crlf"; + continue; + } + + if (this.phase === "data-crlf") { + if (this.buffer.length < 2) break; + if (this.buffer[0] !== CR || this.buffer[1] !== LF) { + throw new InvalidRequest({ + message: "Invalid aws-chunked framing after chunk data", + }); + } + this.buffer = this.buffer.slice(2); + this.phase = "size"; + continue; + } + + if (this.phase === "trailers") { + const idx = findCrlf(this.buffer); + if (idx === -1) break; + const lineBytes = this.buffer.slice(0, idx); + const line = new TextDecoder().decode(lineBytes); + this.buffer = this.buffer.slice(idx + 2); + if (line.length === 0) { + this.phase = "done"; + } + continue; + } + + if (this.phase === "done") { + if (this.buffer.length > 0) { + throw new InvalidRequest({ + message: "Unexpected trailing data after aws-chunked payload", + }); + } + break; + } + } + + return out; + } + + finish(): void { + if (this.phase !== "done") { + throw new InvalidRequest({ + message: "Incomplete aws-chunked payload", + }); + } + if (this.buffer.length !== 0) { + throw new InvalidRequest({ + message: "Unexpected buffered bytes after aws-chunked payload", + }); + } + } +} + +export const hasAwsChunkedContentEncoding = ( + headers: Record, +): boolean => { + const normalized = normalizeHeaders(headers); + const encoding = normalized["content-encoding"]; + if (encoding === undefined || encoding.trim() === "") { + return false; + } + return encoding.toLowerCase().split(",").map((s) => s.trim()).some((token) => + token === "aws-chunked" + ); +}; + +export const decodeAwsChunkedBodyStream = ( + stream: Stream.Stream, +): Stream.Stream => { + const source = Stream.toReadableStream(stream); + const parser = new AwsChunkedParser(); + const decoded = source.pipeThrough( + new TransformStream({ + transform(chunk, controller) { + try { + const parts = parser.feed(chunk); + for (const part of parts) { + controller.enqueue(part); + } + } catch (error) { + controller.error(error); + } + }, + flush(controller) { + try { + parser.finish(); + } catch (error) { + controller.error(error); + } + }, + }), + ); + + return Stream.fromReadableStream( + () => decoded, + (error) => + error instanceof InvalidRequest + ? error + : new InvalidRequest({ message: String(error) }), + ); +}; diff --git a/src/Services/Backend.ts b/src/Services/Backend.ts index a999ad4..6cd62c6 100644 --- a/src/Services/Backend.ts +++ b/src/Services/Backend.ts @@ -39,6 +39,10 @@ export class AccessDenied extends Data.TaggedError("AccessDenied")<{ readonly message: string; }> {} +export class InvalidAccessKeyId extends Data.TaggedError("InvalidAccessKeyId")<{ + readonly message: string; +}> {} + export class BadGateway extends Data.TaggedError("BadGateway")<{ readonly message: string; }> {} @@ -76,6 +80,11 @@ export class InvalidArgument extends Data.TaggedError("InvalidArgument")<{ readonly message: string; }> {} +export class RequestTimeTooSkewed + extends Data.TaggedError("RequestTimeTooSkewed")<{ + readonly message: string; + }> {} + export class MalformedXML extends Data.TaggedError("MalformedXML")<{ readonly message: string; }> {} @@ -100,6 +109,7 @@ export type BackendError = | BucketNotEmpty | InternalError | AccessDenied + | InvalidAccessKeyId | BadGateway | NoSuchUpload | InvalidPart @@ -109,6 +119,7 @@ export type BackendError = | BadDigest | InvalidBucketName | InvalidArgument + | RequestTimeTooSkewed | MalformedXML | MethodNotAllowed | HttpClientError.HttpClientError diff --git a/src/Services/S3HeaderService.ts b/src/Services/S3HeaderService.ts index 1b75610..f820303 100644 --- a/src/Services/S3HeaderService.ts +++ b/src/Services/S3HeaderService.ts @@ -190,9 +190,22 @@ export class S3HeaderService (normalized["x-amz-version-id"] || normalized["versionid"]) || undefined, checksumMode: normalized["x-amz-checksum-mode"], - contentLength: normalized["content-length"] - ? parseInt(normalized["content-length"]) - : undefined, + contentLength: (() => { + const contentEncoding = normalized["content-encoding"]; + const hasAwsChunked = contentEncoding !== undefined && + contentEncoding.toLowerCase().split(",").map((s) => s.trim()) + .includes("aws-chunked"); + if ( + hasAwsChunked && + normalized["x-amz-decoded-content-length"] !== undefined + ) { + return parseInt(normalized["x-amz-decoded-content-length"]); + } + if (normalized["content-length"] !== undefined) { + return parseInt(normalized["content-length"]); + } + return undefined; + })(), }; return { checksums, metadata, objectAttributes, s3Params }; diff --git a/src/Services/S3Xml.ts b/src/Services/S3Xml.ts index f6e5878..eaf584a 100644 --- a/src/Services/S3Xml.ts +++ b/src/Services/S3Xml.ts @@ -13,6 +13,7 @@ import { type DeleteObjectsResult, EntityTooSmall, InternalError, + InvalidAccessKeyId, InvalidArgument, InvalidBucketName, InvalidPart, @@ -29,6 +30,7 @@ import { NoSuchUpload, type ObjectAttributes, type OwnerInfo, + RequestTimeTooSkewed, } from "./Backend.ts"; export class S3Xml extends Context.Tag("S3Xml")< @@ -120,6 +122,10 @@ export const makeS3Xml = Effect.sync(() => { code = "AccessDenied"; message = err.message; status = 403; + } else if (err instanceof InvalidAccessKeyId) { + code = "InvalidAccessKeyId"; + message = err.message; + status = 403; } else if (err instanceof BadGateway) { code = "BadGateway"; message = err.message; @@ -160,6 +166,10 @@ export const makeS3Xml = Effect.sync(() => { code = "InvalidArgument"; message = err.message; status = 400; + } else if (err instanceof RequestTimeTooSkewed) { + code = "RequestTimeTooSkewed"; + message = err.message; + status = 403; } else if (err instanceof MalformedXML) { code = "MalformedXML"; message = err.message; diff --git a/tests/aws-chunked.test.ts b/tests/aws-chunked.test.ts new file mode 100644 index 0000000..438bec2 --- /dev/null +++ b/tests/aws-chunked.test.ts @@ -0,0 +1,77 @@ +import { Chunk, Effect, Stream } from "effect"; +import { assertEquals } from "@std/assert"; +import { decodeAwsChunkedBodyStream } from "../src/Services/AwsChunked.ts"; +import { S3HeaderService } from "../src/Services/S3HeaderService.ts"; +import { testEffect } from "./utils.ts"; + +const bytes = (s: string) => new TextEncoder().encode(s); +const text = (u: Uint8Array) => new TextDecoder().decode(u); + +const collectBytes = (chunks: Chunk.Chunk): Uint8Array => { + const total = Chunk.reduce(chunks, 0, (acc, c) => acc + c.length); + const out = new Uint8Array(total); + let off = 0; + for (const c of chunks) { + out.set(c, off); + off += c.length; + } + return out; +}; + +testEffect("aws-chunked/decode/basic", () => + Effect.gen(function* () { + const framed = bytes( + "b;chunk-signature=abc\r\nhello world\r\n0;chunk-signature=def\r\n\r\n", + ); + const decoded = decodeAwsChunkedBodyStream(Stream.succeed(framed)); + const chunks = yield* Stream.runCollect(decoded); + assertEquals(text(collectBytes(chunks)), "hello world"); + })); + +testEffect( + "aws-chunked/decode/split-boundaries", + () => + Effect.gen(function* () { + const c1 = bytes("5;chunk-signature=111\r\nhello\r\n"); + const c2 = bytes("6;chunk-signature=222\r\n world\r\n"); + const c3 = bytes("0;chunk-signature=333\r\n"); + const c4 = bytes("\r\n"); + const decoded = decodeAwsChunkedBodyStream( + Stream.fromIterable([c1, c2, c3, c4]), + ); + const chunks = yield* Stream.runCollect(decoded); + assertEquals(text(collectBytes(chunks)), "hello world"); + }), +); + +testEffect("aws-chunked/decode/invalid-framing", () => + Effect.gen(function* () { + const invalid = bytes( + "5;chunk-signature=abc\r\nhelloX0;chunk-signature=def\r\n\r\n", + ); + const exit = yield* Stream.runCollect( + decodeAwsChunkedBodyStream(Stream.succeed(invalid)), + ).pipe(Effect.exit); + if (exit._tag !== "Failure") { + throw new Error("Expected aws-chunked decoding to fail"); + } + const failure = exit.cause; + const pretty = String(failure); + if (!pretty.includes("InvalidRequest")) { + throw new Error(`Expected InvalidRequest failure, got: ${pretty}`); + } + })); + +testEffect( + "aws-chunked/decoded-content-length/header-parse", + () => + Effect.gen(function* () { + const headerService = yield* S3HeaderService; + const parsed = headerService.fromRequestHeaders({ + "content-encoding": "aws-chunked", + "content-length": "999", + "x-amz-decoded-content-length": "11", + }); + assertEquals(parsed.s3Params.contentLength, 11); + }).pipe(Effect.provide(S3HeaderService.Default)), +); diff --git a/tests/sigv4-integration.test.ts b/tests/sigv4-integration.test.ts new file mode 100644 index 0000000..e3ea6cf --- /dev/null +++ b/tests/sigv4-integration.test.ts @@ -0,0 +1,186 @@ +import { Effect, Layer, Option } from "effect"; +import { FetchHttpClient, HttpApiBuilder, HttpServer } from "@effect/platform"; +import { HttpHeraldLive } from "../src/Http.ts"; +import { HeraldConfig } from "../src/Config/Layer.ts"; +import { S3ClientFactory } from "../src/Backends/S3/Client.ts"; +import { SwiftClient } from "../src/Backends/Swift/Client.ts"; +import { S3XmlLive } from "../src/Services/S3Xml.ts"; +import { Checksum } from "../src/Services/Checksum.ts"; +import { S3HeaderService } from "../src/Services/S3HeaderService.ts"; +import { BackendResolver } from "../src/Services/BackendResolver.ts"; +import type { GlobalConfig } from "../src/Domain/Config.ts"; +import { lookupBucket } from "../src/Domain/Config.ts"; +import { EffectAssert, testEffect } from "./utils.ts"; + +const formatAmzDate = (date: Date): string => { + const year = date.getUTCFullYear(); + const month = String(date.getUTCMonth() + 1).padStart(2, "0"); + const day = String(date.getUTCDate()).padStart(2, "0"); + const hour = String(date.getUTCHours()).padStart(2, "0"); + const min = String(date.getUTCMinutes()).padStart(2, "0"); + const sec = String(date.getUTCSeconds()).padStart(2, "0"); + return `${year}${month}${day}T${hour}${min}${sec}Z`; +}; + +const testCredentials = { + accessKeyId: "minioadmin", + secretAccessKey: "minioadmin", +}; + +const testConfig: GlobalConfig = { + backends: { + default: { + protocol: "s3", + endpoint: "http://localhost:9000", + region: "us-east-1", + credentials: { + accessKeyId: "minioadmin", + secretAccessKey: "minioadmin", + }, + buckets: "*", + }, + }, + auth: { + accessKeysRefs: ["test"], + }, +}; + +const runRequest = ( + request: Request, +) => + Effect.gen(function* () { + const HeraldConfigLive = Layer.succeed(HeraldConfig, { + raw: testConfig, + lookupBucket: (name: string) => lookupBucket(testConfig, name), + resolveAuth: () => Option.some([testCredentials]), + resolveAuthForBackendId: () => Option.some([testCredentials]), + }); + + const app = HttpHeraldLive.pipe( + Layer.provide(BackendResolver.Default), + Layer.provide(S3ClientFactory.Default), + Layer.provide(SwiftClient.Default), + Layer.provide(S3XmlLive), + Layer.provide(Checksum.Default), + Layer.provide(S3HeaderService.Default), + Layer.provide(HeraldConfigLive), + Layer.provide(FetchHttpClient.layer), + Layer.provideMerge(HttpServer.layerContext), + ); + + return yield* Effect.tryPromise({ + try: async () => { + const webHandler = HttpApiBuilder.toWebHandler(app); + try { + return await webHandler.handler(request); + } finally { + await webHandler.dispose(); + } + }, + catch: (e) => new Error(String(e)), + }).pipe(Effect.orDie); + }); + +testEffect( + "sigv4/integration/missing_authorization_rejected", + () => + Effect.gen(function* () { + const response = yield* runRequest( + new Request("http://localhost/test-bucket", { method: "GET" }), + ); + const body = yield* Effect.tryPromise({ + try: () => response.text(), + catch: (e) => new Error(String(e)), + }).pipe(Effect.orDie); + + yield* EffectAssert.strictEqual(response.status, 403); + yield* EffectAssert.strictEqual( + body.includes("AccessDenied"), + true, + ); + }), +); + +testEffect( + "sigv4/integration/malformed_authorization_rejected", + () => + Effect.gen(function* () { + const response = yield* runRequest( + new Request("http://localhost/test-bucket", { + method: "GET", + headers: { + authorization: "Bearer xyz", + }, + }), + ); + const body = yield* Effect.tryPromise({ + try: () => response.text(), + catch: (e) => new Error(String(e)), + }).pipe(Effect.orDie); + + yield* EffectAssert.strictEqual(response.status, 400); + yield* EffectAssert.strictEqual( + body.includes("InvalidArgument"), + true, + ); + }), +); + +testEffect( + "sigv4/integration/invalid_signature_rejected", + () => + Effect.gen(function* () { + const now = new Date(); + const amzDate = formatAmzDate(now); + const scopeDate = amzDate.substring(0, 8); + const response = yield* runRequest( + new Request("http://localhost/test-bucket", { + method: "GET", + headers: { + host: "localhost", + "x-amz-date": amzDate, + authorization: + `AWS4-HMAC-SHA256 Credential=minioadmin/${scopeDate}/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-date, Signature=invalid`, + }, + }), + ); + const body = yield* Effect.tryPromise({ + try: () => response.text(), + catch: (e) => new Error(String(e)), + }).pipe(Effect.orDie); + + yield* EffectAssert.strictEqual(response.status, 403); + yield* EffectAssert.strictEqual( + body.includes("AccessDenied"), + true, + ); + }), +); + +testEffect( + "sigv4/integration/request_time_too_skewed_rejected", + () => + Effect.gen(function* () { + const response = yield* runRequest( + new Request("http://localhost/test-bucket", { + method: "GET", + headers: { + host: "localhost", + "x-amz-date": "20000101T000000Z", + authorization: + "AWS4-HMAC-SHA256 Credential=minioadmin/20000101/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-date, Signature=invalid", + }, + }), + ); + const body = yield* Effect.tryPromise({ + try: () => response.text(), + catch: (e) => new Error(String(e)), + }).pipe(Effect.orDie); + + yield* EffectAssert.strictEqual(response.status, 403); + yield* EffectAssert.strictEqual( + body.includes("RequestTimeTooSkewed"), + true, + ); + }), +); From a060791ee6feefbf0c44d9bb508911d8365cd68d Mon Sep 17 00:00:00 2001 From: Yohe-Am <56622350+Yohe-Am@users.noreply.github.com> Date: Tue, 3 Mar 2026 16:10:10 +0300 Subject: [PATCH 2/6] feat: cover more write paths Signed-off-by: Yohe-Am <56622350+Yohe-Am@users.noreply.github.com> --- .coderabbit.yaml | 6 + src/Frontend/Http.ts | 11 +- src/Services/Auth.ts | 11 +- src/Services/AwsChunked.ts | 46 +++- src/Services/S3HeaderService.ts | 14 +- tests/integration/aws-chunked-put.test.ts | 292 ++++++++++++++++++++++ tests/integration/postobject.test.ts | 70 ++++++ 7 files changed, 437 insertions(+), 13 deletions(-) create mode 100644 .coderabbit.yaml create mode 100644 tests/integration/aws-chunked-put.test.ts diff --git a/.coderabbit.yaml b/.coderabbit.yaml new file mode 100644 index 0000000..c20f109 --- /dev/null +++ b/.coderabbit.yaml @@ -0,0 +1,6 @@ +# yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json +reviews: + auto_review: + # enable reviews for stacked PRs + base_branches: + - .* diff --git a/src/Frontend/Http.ts b/src/Frontend/Http.ts index 4f88858..b0495bb 100644 --- a/src/Frontend/Http.ts +++ b/src/Frontend/Http.ts @@ -186,9 +186,16 @@ export const makeS3Router = (prefix = "") => return yield* Effect.gen(function* () { if (bucket !== "") { const authCredentials = config.resolveAuth(bucket); - const skipSigV4Auth = isPostObjectMultipartRequest(request) || - isLegacyAwsAuthorizationRequest(request); + const skipSigV4Auth = isPostObjectMultipartRequest(request); if (Option.isSome(authCredentials) && !skipSigV4Auth) { + if (isLegacyAwsAuthorizationRequest(request)) { + return yield* Effect.fail( + new InvalidArgument({ + message: + "The authorization mechanism you have provided is not supported. Please use AWS4-HMAC-SHA256.", + }), + ); + } if (!hasSigV4Credentials(request)) { return yield* Effect.fail( new AccessDenied({ message: "Access Denied" }), diff --git a/src/Services/Auth.ts b/src/Services/Auth.ts index 5ad250a..217ea27 100644 --- a/src/Services/Auth.ts +++ b/src/Services/Auth.ts @@ -204,13 +204,13 @@ export function verifyIncomingSigV4Detailed( const dateHeader = headers["date"]; let signingDate: Date | undefined; - if (amzDate) { + if (hasSigInQuery) { + const amzDateQuery = queryParams.get("X-Amz-Date"); + signingDate = amzDateQuery ? parseSigV4Date(amzDateQuery) : undefined; + } else if (amzDate) { signingDate = parseSigV4Date(amzDate); } else if (dateHeader) { signingDate = parseSigV4Date(dateHeader); - } else if (hasSigInQuery) { - const amzDateQuery = queryParams.get("X-Amz-Date"); - signingDate = amzDateQuery ? parseSigV4Date(amzDateQuery) : undefined; } // Validate signingDate: reject if missing or outside allowed windows @@ -286,6 +286,9 @@ export function verifyIncomingSigV4Detailed( queryBag[k] = v; } }); + if (hasSigInQuery) { + delete queryBag["X-Amz-Signature"]; + } // Use raw path from request.url to avoid URL constructor decoding const urlString = request.url; diff --git a/src/Services/AwsChunked.ts b/src/Services/AwsChunked.ts index 50b2fc1..40cc542 100644 --- a/src/Services/AwsChunked.ts +++ b/src/Services/AwsChunked.ts @@ -4,6 +4,11 @@ import { normalizeHeaders } from "./S3HeaderService.ts"; const CR = 13; const LF = 10; +const MAX_CONTROL_LINE_LENGTH = 8 * 1024; +const MAX_CHUNK_SIZE_BYTES = 128 * 1024 * 1024; +const MAX_TOTAL_BUFFERED_BYTES = MAX_CHUNK_SIZE_BYTES + + MAX_CONTROL_LINE_LENGTH + + 4; const appendBytes = ( a: Uint8Array, @@ -27,29 +32,53 @@ const findCrlf = (buffer: Uint8Array): number => { const parseChunkSizeLine = (line: string): number => { const semi = line.indexOf(";"); const token = (semi === -1 ? line : line.slice(0, semi)).trim(); + if (token.length === 0 || token.length > MAX_CONTROL_LINE_LENGTH) { + throw new InvalidRequest({ + message: "Invalid aws-chunked chunk-size line", + }); + } if (!/^[0-9a-fA-F]+$/.test(token)) { throw new InvalidRequest({ message: "Invalid aws-chunked chunk-size line", }); } - return Number.parseInt(token, 16); + const size = Number.parseInt(token, 16); + if (!Number.isFinite(size) || size < 0 || size > MAX_CHUNK_SIZE_BYTES) { + throw new InvalidRequest({ + message: "Invalid aws-chunked chunk-size line", + }); + } + return size; }; class AwsChunkedParser { private buffer: Uint8Array = new Uint8Array(0); private expectedSize = 0; private phase: "size" | "data" | "data-crlf" | "trailers" | "done" = "size"; + private readonly decoder = new TextDecoder(); feed(chunk: Uint8Array): Uint8Array[] { this.buffer = appendBytes(this.buffer, chunk); + if (this.buffer.length > MAX_TOTAL_BUFFERED_BYTES) { + throw new InvalidRequest({ + message: "Invalid aws-chunked framing", + }); + } const out: Uint8Array[] = []; while (true) { if (this.phase === "size") { const idx = findCrlf(this.buffer); - if (idx === -1) break; + if (idx === -1) { + if (this.buffer.length > MAX_CONTROL_LINE_LENGTH) { + throw new InvalidRequest({ + message: "Invalid aws-chunked chunk-size line", + }); + } + break; + } const lineBytes = this.buffer.slice(0, idx); - const line = new TextDecoder().decode(lineBytes); + const line = this.decoder.decode(lineBytes); this.buffer = this.buffer.slice(idx + 2); this.expectedSize = parseChunkSizeLine(line); if (this.expectedSize === 0) { @@ -82,9 +111,16 @@ class AwsChunkedParser { if (this.phase === "trailers") { const idx = findCrlf(this.buffer); - if (idx === -1) break; + if (idx === -1) { + if (this.buffer.length > MAX_CONTROL_LINE_LENGTH) { + throw new InvalidRequest({ + message: "Invalid aws-chunked framing", + }); + } + break; + } const lineBytes = this.buffer.slice(0, idx); - const line = new TextDecoder().decode(lineBytes); + const line = this.decoder.decode(lineBytes); this.buffer = this.buffer.slice(idx + 2); if (line.length === 0) { this.phase = "done"; diff --git a/src/Services/S3HeaderService.ts b/src/Services/S3HeaderService.ts index f820303..92d2e42 100644 --- a/src/Services/S3HeaderService.ts +++ b/src/Services/S3HeaderService.ts @@ -46,6 +46,14 @@ export const normalizeHeaders = ( return normalized; }; +const parseNonNegativeInteger = (value: string): number | undefined => { + const parsed = parseInt(value, 10); + if (!Number.isInteger(parsed) || parsed < 0) { + return undefined; + } + return parsed; +}; + export class S3HeaderService extends Effect.Service()("S3HeaderService", { succeed: { @@ -199,10 +207,12 @@ export class S3HeaderService hasAwsChunked && normalized["x-amz-decoded-content-length"] !== undefined ) { - return parseInt(normalized["x-amz-decoded-content-length"]); + return parseNonNegativeInteger( + normalized["x-amz-decoded-content-length"], + ); } if (normalized["content-length"] !== undefined) { - return parseInt(normalized["content-length"]); + return parseNonNegativeInteger(normalized["content-length"]); } return undefined; })(), diff --git a/tests/integration/aws-chunked-put.test.ts b/tests/integration/aws-chunked-put.test.ts new file mode 100644 index 0000000..9a66815 --- /dev/null +++ b/tests/integration/aws-chunked-put.test.ts @@ -0,0 +1,292 @@ +import { + CompleteMultipartUploadCommand, + CreateBucketCommand, + CreateMultipartUploadCommand, + DeleteBucketCommand, + DeleteObjectCommand, + GetObjectCommand, + type S3Client, +} from "@aws-sdk/client-s3"; +import { SignatureV4 } from "@smithy/signature-v4"; +import { Sha256 } from "@aws-crypto/sha256"; +import { harness, type ProxyTestCase } from "../utils.ts"; +import type { GlobalConfig } from "../../src/Domain/Config.ts"; + +const testConfig: GlobalConfig = { + backends: { + minio: { + protocol: "s3", + endpoint: "http://localhost:9000", + region: "us-east-1", + credentials: { + accessKeyId: "minioadmin", + secretAccessKey: "minioadmin", + }, + buckets: "*", + }, + }, +}; + +const BUCKET = "test-aws-chunked-put-bucket"; +const KEY = "aws-chunked-object.txt"; +const MULTIPART_KEY = "aws-chunked-multipart-object.txt"; +const PLAINTEXT = "hello world"; +const CHUNKED_PAYLOAD = + "b;chunk-signature=abc\r\nhello world\r\n0;chunk-signature=def\r\n\r\n"; + +const credentials = { + accessKeyId: "minioadmin", + secretAccessKey: "minioadmin", +}; + +async function sendAwsChunkedPut(baseUrl: string): Promise { + const url = new URL(`${baseUrl}/${BUCKET}/${KEY}`); + const body = new TextEncoder().encode(CHUNKED_PAYLOAD); + const signer = new SignatureV4({ + credentials, + region: "us-east-1", + service: "s3", + sha256: Sha256, + }); + + const signed = await signer.sign({ + method: "PUT", + protocol: url.protocol, + hostname: url.hostname, + port: url.port === "" ? undefined : parseInt(url.port, 10), + path: url.pathname, + query: {}, + headers: { + "content-encoding": "aws-chunked", + "content-length": String(body.length), + "x-amz-content-sha256": "UNSIGNED-PAYLOAD", + "x-amz-decoded-content-length": String(PLAINTEXT.length), + }, + body, + }); + + const requestHeaders = new Headers(); + for (const [key, value] of Object.entries(signed.headers)) { + if (key.toLowerCase() === "host") { + continue; + } + requestHeaders.set(key, value); + } + + return await fetch(url, { + method: "PUT", + headers: requestHeaders, + body, + // @ts-ignore duplex is required for non-GET body in Deno fetch with streams/body bytes + duplex: "half", + }); +} + +async function sendAwsChunkedUploadPart( + baseUrl: string, + uploadId: string, +): Promise { + const url = new URL(`${baseUrl}/${BUCKET}/${MULTIPART_KEY}`); + url.searchParams.set("partNumber", "1"); + url.searchParams.set("uploadId", uploadId); + + const body = new TextEncoder().encode(CHUNKED_PAYLOAD); + const signer = new SignatureV4({ + credentials, + region: "us-east-1", + service: "s3", + sha256: Sha256, + }); + + const signed = await signer.sign({ + method: "PUT", + protocol: url.protocol, + hostname: url.hostname, + port: url.port === "" ? undefined : parseInt(url.port, 10), + path: url.pathname, + query: { + partNumber: "1", + uploadId, + }, + headers: { + "content-encoding": "aws-chunked", + "content-length": String(body.length), + "x-amz-content-sha256": "UNSIGNED-PAYLOAD", + "x-amz-decoded-content-length": String(PLAINTEXT.length), + }, + body, + }); + + const requestHeaders = new Headers(); + for (const [key, value] of Object.entries(signed.headers)) { + if (key.toLowerCase() === "host") { + continue; + } + requestHeaders.set(key, value); + } + + return await fetch(url, { + method: "PUT", + headers: requestHeaders, + body, + // @ts-ignore duplex is required for non-GET body in Deno fetch with streams/body bytes + duplex: "half", + }); +} + +async function verifyStoredBody(client: S3Client): Promise { + const out = await client.send( + new GetObjectCommand({ + Bucket: BUCKET, + Key: KEY, + }), + ); + + const bytes = await out.Body?.transformToByteArray(); + if (!bytes) { + throw new Error("Expected object body"); + } + const text = new TextDecoder().decode(bytes); + if (text !== PLAINTEXT) { + throw new Error( + `Decoded payload mismatch; expected "${PLAINTEXT}", got "${ + text.slice(0, 120) + }"`, + ); + } +} + +const cases: ProxyTestCase[] = [{ + name: "objects/put/aws-chunked-decoding", + config: testConfig, + beforeAll: async (client) => { + try { + await client.send(new CreateBucketCommand({ Bucket: BUCKET })); + } catch { + // Ignore already-exists races. + } + }, + fn: async (client, context) => { + if (!context?.baseUrl) { + throw new Error("Missing baseUrl in test context"); + } + const putResponse = await sendAwsChunkedPut(context.baseUrl); + if (putResponse.status !== 200) { + const body = await putResponse.text(); + throw new Error( + `aws-chunked PUT failed: status=${putResponse.status} body=${ + body.slice(0, 200) + }`, + ); + } + await verifyStoredBody(client); + }, + afterAll: async (client) => { + try { + await client.send(new DeleteObjectCommand({ Bucket: BUCKET, Key: KEY })); + } catch { + // Ignore cleanup failures. + } + try { + await client.send(new DeleteBucketCommand({ Bucket: BUCKET })); + } catch { + // Ignore cleanup failures. + } + }, + ignoreBaseline: true, + skipSnapshot: true, +}, { + name: "objects/multipart/aws-chunked-uploadpart-decoding", + config: testConfig, + beforeAll: async (client) => { + try { + await client.send(new CreateBucketCommand({ Bucket: BUCKET })); + } catch { + // Ignore already-exists races. + } + }, + fn: async (client, context) => { + if (!context?.baseUrl) { + throw new Error("Missing baseUrl in test context"); + } + + const { UploadId } = await client.send( + new CreateMultipartUploadCommand({ + Bucket: BUCKET, + Key: MULTIPART_KEY, + }), + ); + if (!UploadId) { + throw new Error("Expected UploadId"); + } + + const uploadPartResponse = await sendAwsChunkedUploadPart( + context.baseUrl, + UploadId, + ); + if (uploadPartResponse.status !== 200) { + const body = await uploadPartResponse.text(); + throw new Error( + `aws-chunked UploadPart failed: status=${uploadPartResponse.status} body=${ + body.slice(0, 200) + }`, + ); + } + + const etag = uploadPartResponse.headers.get("etag"); + if (!etag) { + throw new Error("UploadPart response did not include ETag header"); + } + + await client.send( + new CompleteMultipartUploadCommand({ + Bucket: BUCKET, + Key: MULTIPART_KEY, + UploadId, + MultipartUpload: { + Parts: [{ + PartNumber: 1, + ETag: etag, + }], + }, + }), + ); + + const out = await client.send( + new GetObjectCommand({ + Bucket: BUCKET, + Key: MULTIPART_KEY, + }), + ); + const bytes = await out.Body?.transformToByteArray(); + if (!bytes) { + throw new Error("Expected multipart object body"); + } + const text = new TextDecoder().decode(bytes); + if (text !== PLAINTEXT) { + throw new Error( + `Multipart decoded payload mismatch; expected "${PLAINTEXT}", got "${ + text.slice(0, 120) + }"`, + ); + } + }, + afterAll: async (client) => { + try { + await client.send( + new DeleteObjectCommand({ Bucket: BUCKET, Key: MULTIPART_KEY }), + ); + } catch { + // Ignore cleanup failures. + } + try { + await client.send(new DeleteBucketCommand({ Bucket: BUCKET })); + } catch { + // Ignore cleanup failures. + } + }, + ignoreBaseline: true, + skipSnapshot: true, +}]; + +harness(cases); diff --git a/tests/integration/postobject.test.ts b/tests/integration/postobject.test.ts index a64cd4b..bf4a5bc 100644 --- a/tests/integration/postobject.test.ts +++ b/tests/integration/postobject.test.ts @@ -9,6 +9,7 @@ import { createHash, createHmac } from "node-crypto"; import { CreateBucketCommand, DeleteBucketCommand, + DeleteObjectCommand, GetObjectCommand, type S3Client, } from "@aws-sdk/client-s3"; @@ -763,6 +764,56 @@ async function postObjectExtraFormFieldNotInPolicy( ); } +async function postObjectBodyIntegrityChunkLikePayload( + client: S3Client, + context: ProxyTestContext, +): Promise { + const key = "chunk-like-body.txt"; + const body = + "46f;chunk-signature=abc\r\nhello\r\n0;chunk-signature=def\r\n\r\n"; + const { policy, signature } = buildPolicyAndSignature( + BUCKET, + "chunk-like", + 4096, + "minioadmin", + "minioadmin", + ); + + const form = new FormData(); + form.append("key", key); + form.append("AWSAccessKeyId", "minioadmin"); + form.append("acl", "private"); + form.append("signature", signature); + form.append("policy", policy); + form.append("Content-Type", "text/plain"); + form.append("file", new Blob([body]), "file.txt"); + + const res = await fetch(`${context.baseUrl}/${BUCKET}`, { + method: "POST", + body: form, + }); + const resText = await res.text(); + assertEquals( + res.status, + 204, + `Expected 204 for PostObject body integrity test, got ${res.status}. Body: ${ + resText.slice(0, 400) + }`, + ); + + const getRes = await client.send( + new GetObjectCommand({ Bucket: BUCKET, Key: key }), + ); + const gotBody = await getRes.Body?.transformToByteArray() ?? + new Uint8Array(0); + const storedText = new TextDecoder().decode(gotBody); + assertEquals(storedText.includes("46f;chunk-signature=abc"), true); + assertEquals(storedText.includes("hello"), true); + assertEquals(storedText.includes("0;chunk-signature=def"), true); + + await client.send(new DeleteObjectCommand({ Bucket: BUCKET, Key: key })); +} + const cases: ProxyTestCase[] = [ { name: "postobject/authenticated", @@ -1068,6 +1119,25 @@ const cases: ProxyTestCase[] = [ return postObjectExtraFormFieldNotInPolicy(client, context); }, }, + { + name: "postobject/body_integrity_chunk_like_payload", + config: testConfig, + skipSnapshot: true, + beforeAll: async (c) => { + try { + await c.send(new CreateBucketCommand({ Bucket: BUCKET })); + } catch { /* ignore */ } + }, + afterAll: async (c) => { + try { + await c.send(new DeleteBucketCommand({ Bucket: BUCKET })); + } catch { /* ignore */ } + }, + fn: (client, context) => { + if (!context) throw new Error("PostObject tests require baseUrl"); + return postObjectBodyIntegrityChunkLikePayload(client, context); + }, + }, ]; harness(cases); From 70001561c5f4a6c8afd394605bd13847595d993f Mon Sep 17 00:00:00 2001 From: Yohe-Am <56622350+Yohe-Am@users.noreply.github.com> Date: Tue, 3 Mar 2026 16:39:29 +0300 Subject: [PATCH 3/6] wip: debug logs Signed-off-by: Yohe-Am <56622350+Yohe-Am@users.noreply.github.com> --- src/Frontend/Http.ts | 45 +++++++++++++++++++++++++++++++++++ src/Frontend/Multipart/Put.ts | 31 +++++++++++++++++++++++- src/Frontend/Objects/Put.ts | 17 ++++++++++++- 3 files changed, 91 insertions(+), 2 deletions(-) diff --git a/src/Frontend/Http.ts b/src/Frontend/Http.ts index b0495bb..31e748b 100644 --- a/src/Frontend/Http.ts +++ b/src/Frontend/Http.ts @@ -93,6 +93,20 @@ function isLegacyAwsAuthorizationRequest( authorization.startsWith("AWS "); } +function getHeaderValue( + headers: Record, + name: string, +): string | undefined { + const entry = Object.entries(headers).find(([key]) => + key.toLowerCase() === name.toLowerCase() + ); + if (!entry) { + return undefined; + } + const value = entry[1]; + return Array.isArray(value) ? value[0] : value; +} + /** Build annotations and log 5xx as error, 4xx as warning; return response. */ function logRequestFailureAndReturn( err: unknown, @@ -181,9 +195,40 @@ export const makeS3Router = (prefix = "") => const bucket = pathWithoutPrefix.split("/").filter(Boolean)[0] || ""; const isHead = request.method === "HEAD"; const method = request.method ?? "UNKNOWN"; + const query = request.url.includes("?") + ? request.url.slice(request.url.indexOf("?") + 1) + : ""; const attrs = { bucket, method }; return yield* Effect.gen(function* () { + yield* Effect.logDebug("Incoming request", { + method, + path: pathname, + query, + bucket, + contentEncoding: getHeaderValue( + request.headers, + "content-encoding", + ), + transferEncoding: getHeaderValue( + request.headers, + "transfer-encoding", + ), + amzContentSha256: getHeaderValue( + request.headers, + "x-amz-content-sha256", + ), + amzDecodedContentLength: getHeaderValue( + request.headers, + "x-amz-decoded-content-length", + ), + contentLength: getHeaderValue(request.headers, "content-length"), + contentType: getHeaderValue(request.headers, "content-type"), + hasAuthorization: + getHeaderValue(request.headers, "authorization") !== + undefined, + }); + if (bucket !== "") { const authCredentials = config.resolveAuth(bucket); const skipSigV4Auth = isPostObjectMultipartRequest(request); diff --git a/src/Frontend/Multipart/Put.ts b/src/Frontend/Multipart/Put.ts index d1b9354..ec6a835 100644 --- a/src/Frontend/Multipart/Put.ts +++ b/src/Frontend/Multipart/Put.ts @@ -9,6 +9,19 @@ import { Backend, InvalidRequest } from "../../Services/Backend.ts"; import { S3HeaderService } from "../../Services/S3HeaderService.ts"; import { S3Xml } from "../../Services/S3Xml.ts"; +function getHeader( + headers: Record, + name: string, +): string | undefined { + const lower = name.toLowerCase(); + const entry = Object.entries(headers).find( + ([k]) => k.toLowerCase() === lower, + ); + if (!entry) return undefined; + const v = entry[1]; + return Array.isArray(v) ? v[0] : v; +} + export const uploadPart = Effect.gen(function* () { const backend = yield* Backend; const request = yield* HttpServerRequest.HttpServerRequest; @@ -41,7 +54,23 @@ export const uploadPart = Effect.gen(function* () { // S3 allows 0-byte for the last part; no Frontend rejection here. // Swift backend rejects 0-byte segments at CompleteMultipartUpload (SLO manifest requirement). - const bodyStream = hasAwsChunkedContentEncoding(request.headers) + const hasAwsChunked = hasAwsChunkedContentEncoding(request.headers); + yield* Effect.logDebug("UploadPart aws-chunked decision", { + key, + uploadId: s3Params.uploadId, + partNumber: s3Params.partNumber, + hasAwsChunked, + contentEncoding: getHeader(request.headers, "content-encoding"), + transferEncoding: getHeader(request.headers, "transfer-encoding"), + amzContentSha256: getHeader(request.headers, "x-amz-content-sha256"), + amzDecodedContentLength: getHeader( + request.headers, + "x-amz-decoded-content-length", + ), + contentLength: getHeader(request.headers, "content-length"), + contentType: getHeader(request.headers, "content-type"), + }); + const bodyStream = hasAwsChunked ? decodeAwsChunkedBodyStream(request.stream) : request.stream; diff --git a/src/Frontend/Objects/Put.ts b/src/Frontend/Objects/Put.ts index b9cf4cf..c0550dd 100644 --- a/src/Frontend/Objects/Put.ts +++ b/src/Frontend/Objects/Put.ts @@ -245,7 +245,22 @@ export const putObject = Effect.gen(function* () { return yield* copyObject; } - const bodyStream = hasAwsChunkedContentEncoding(request.headers) + const hasAwsChunked = hasAwsChunkedContentEncoding(request.headers); + yield* Effect.logDebug("PutObject aws-chunked decision", { + key, + hasAwsChunked, + contentEncoding: getHeader(request.headers, "content-encoding"), + transferEncoding: getHeader(request.headers, "transfer-encoding"), + amzContentSha256: getHeader(request.headers, "x-amz-content-sha256"), + amzDecodedContentLength: getHeader( + request.headers, + "x-amz-decoded-content-length", + ), + contentLength: getHeader(request.headers, "content-length"), + contentType: getHeader(request.headers, "content-type"), + }); + + const bodyStream = hasAwsChunked ? decodeAwsChunkedBodyStream(request.stream) : request.stream; From 6b015d43854b491037cff20b7053e4cb44e155cd Mon Sep 17 00:00:00 2001 From: Yohe-Am <56622350+Yohe-Am@users.noreply.github.com> Date: Tue, 3 Mar 2026 17:41:03 +0300 Subject: [PATCH 4/6] feat: streamed hashing and content-encoding Signed-off-by: Yohe-Am <56622350+Yohe-Am@users.noreply.github.com> --- src/Backends/S3/Objects.ts | 23 +- src/Backends/Swift/Objects.ts | 52 ++- src/Frontend/Http.ts | 6 +- src/Frontend/Multipart/Put.ts | 7 +- src/Frontend/Objects/Put.ts | 6 +- src/Frontend/Utils.ts | 2 + src/Services/Auth.ts | 67 +++- src/Services/AwsChunked.ts | 259 ++++++++++++++- src/Services/Backend.ts | 2 + src/Services/S3HeaderService.ts | 10 +- tests/aws-chunked.test.ts | 182 ++++++++++- tests/integration/aws-chunked-put.test.ts | 361 ++++++++++++++++++++- tests/integration/streaming-compat.test.ts | 217 +++++++++++++ 13 files changed, 1146 insertions(+), 48 deletions(-) create mode 100644 tests/integration/streaming-compat.test.ts diff --git a/src/Backends/S3/Objects.ts b/src/Backends/S3/Objects.ts index b45c4a0..6ba873b 100644 --- a/src/Backends/S3/Objects.ts +++ b/src/Backends/S3/Objects.ts @@ -28,6 +28,7 @@ import { type ObjectResponse, } from "../../Services/Backend.ts"; import { normalizeHeaders } from "../../Services/S3HeaderService.ts"; +import { stripAwsChunkedFromContentEncoding } from "../../Services/AwsChunked.ts"; import type { ChecksumAlgorithm, ChecksumType, @@ -313,6 +314,7 @@ export const makeObjectOps = ( stream, nativeStream: webStream, contentType: result.ContentType, + contentEncoding: result.ContentEncoding, contentLength: result.ContentLength, etag: result.ETag, lastModified: result.LastModified, @@ -325,6 +327,7 @@ export const makeObjectOps = ( partsCount: result.PartsCount, contentLength: result.ContentLength, contentType: result.ContentType, + contentEncoding: result.ContentEncoding, etag: result.ETag, lastModified: result.LastModified, }), @@ -365,6 +368,7 @@ export const makeObjectOps = ( return { contentType: result.ContentType, + contentEncoding: result.ContentEncoding, contentLength: result.ContentLength, etag: result.ETag, lastModified: result.LastModified, @@ -377,6 +381,7 @@ export const makeObjectOps = ( partsCount: result.PartsCount, contentLength: result.ContentLength, contentType: result.ContentType, + contentEncoding: result.ContentEncoding, etag: result.ETag, lastModified: result.LastModified, }), @@ -395,7 +400,10 @@ export const makeObjectOps = ( const normalized = normalizeHeaders(headers); const contentType = normalized["content-type"]!; - const contentLength = s3Params.contentLength; + const contentEncoding = stripAwsChunkedFromContentEncoding( + normalized["content-encoding"], + ); + let contentLength = s3Params.contentLength; const validatedStream = (yield* checksumService.validate( bodyStream, @@ -414,10 +422,10 @@ export const makeObjectOps = ( }), ); - const isSmall = contentLength !== undefined && + const shouldBuffer = contentLength === undefined || contentLength < 1024 * 1024; - const body = isSmall + const body = shouldBuffer ? yield* Stream.runCollect(validatedStream).pipe( Effect.map((chunks) => { const total = Chunk.reduce(chunks, 0, (acc, c) => acc + c.length); @@ -427,6 +435,10 @@ export const makeObjectOps = ( res.set(c, off); off += c.length; } + // For chunked transfer uploads without Content-Length, infer exact size. + if (contentLength === undefined) { + contentLength = total; + } return res; }), Effect.mapError((e) => { @@ -448,6 +460,7 @@ export const makeObjectOps = ( Key: key, Body: body, ContentType: contentType, + ContentEncoding: contentEncoding, ContentLength: contentLength, Metadata: metadata, }); @@ -477,7 +490,7 @@ export const makeObjectOps = ( // Manually inject validated checksums if ( checksums.sha256 || checksums.sha1 || checksums.crc32 || - checksums.crc32c || checksums.crc64nvme || !isSmall + checksums.crc32c || checksums.crc64nvme || !shouldBuffer ) { command.middlewareStack.add( (next) => (args) => { @@ -485,7 +498,7 @@ export const makeObjectOps = ( headers: Record; duplex?: string; }; - if (!isSmall) { + if (!shouldBuffer) { request.duplex = "half"; request.headers["x-amz-content-sha256"] = "UNSIGNED-PAYLOAD"; if (contentLength !== undefined) { diff --git a/src/Backends/Swift/Objects.ts b/src/Backends/Swift/Objects.ts index dd71284..22e7e81 100644 --- a/src/Backends/Swift/Objects.ts +++ b/src/Backends/Swift/Objects.ts @@ -17,6 +17,7 @@ import { InvalidRequest, } from "../../Services/Backend.ts"; import { normalizeHeaders } from "../../Services/S3HeaderService.ts"; +import { stripAwsChunkedFromContentEncoding } from "../../Services/AwsChunked.ts"; import { encodeObjectKeyForSwift, formatSwiftTransportError, @@ -59,6 +60,39 @@ function resolveContentType( return contentType; } +function resolveContentEncoding( + response: HttpClientResponse.HttpClientResponse, + normalizedResp: Record, + s3Headers: Record, +): string | undefined { + let contentEncoding = normalizedResp["content-encoding"]; + + if ( + contentEncoding === undefined && + (response as unknown as { source?: unknown }).source instanceof Response + ) { + const src = (response as unknown as { source: Response }).source; + contentEncoding = src.headers.get("content-encoding") ?? undefined; + } + + if (contentEncoding === undefined) { + const h = response.headers as unknown as { + get?: (n: string) => string | null; + }; + if (typeof h.get === "function") { + contentEncoding = h.get("content-encoding") ?? + h.get("Content-Encoding") ?? undefined; + } + } + + if (contentEncoding === undefined) { + contentEncoding = s3Headers["Content-Encoding"] ?? + s3Headers["content-encoding"]; + } + + return contentEncoding; +} + export interface SwiftObject { readonly name?: string; readonly hash?: string; @@ -245,6 +279,9 @@ export const makeObjectOps = ( contentType: (Array.isArray(response.headers["content-type"]) ? response.headers["content-type"][0] : response.headers["content-type"]) || undefined, + contentEncoding: (Array.isArray(response.headers["content-encoding"]) + ? response.headers["content-encoding"][0] + : response.headers["content-encoding"]) || undefined, contentLength, etag: etag || undefined, lastModified: lastModified ? new Date(lastModified) : undefined, @@ -417,6 +454,11 @@ export const makeObjectOps = ( stream: response.stream, nativeStream: nativeStream || undefined, contentType, + contentEncoding: resolveContentEncoding( + response, + normalizedResp, + s3Headers, + ), contentLength, etag: etag || undefined, lastModified: lastModified ? new Date(lastModified) : undefined, @@ -447,6 +489,9 @@ export const makeObjectOps = ( headers, ); const normalized = normalizeHeaders(headers); + const contentEncoding = stripAwsChunkedFromContentEncoding( + normalized["content-encoding"], + ); const swiftHeaders: Record = { "X-Auth-Token": token, @@ -498,7 +543,7 @@ export const makeObjectOps = ( }) : validatedStream; - const request = HttpClientRequest.put(`${url}/${encodedKey}`).pipe( + let request = HttpClientRequest.put(`${url}/${encodedKey}`).pipe( HttpClientRequest.bodyStream(bodyStream), HttpClientRequest.setHeaders(swiftHeaders), HttpClientRequest.setHeader( @@ -507,6 +552,11 @@ export const makeObjectOps = ( "application/octet-stream") as string, ), ); + if (contentEncoding !== undefined) { + request = request.pipe( + HttpClientRequest.setHeader("Content-Encoding", contentEncoding), + ); + } const response: HttpClientResponse.HttpClientResponse = yield* client .execute(request).pipe( diff --git a/src/Frontend/Http.ts b/src/Frontend/Http.ts index 31e748b..5abe9f2 100644 --- a/src/Frontend/Http.ts +++ b/src/Frontend/Http.ts @@ -30,6 +30,7 @@ import { BadGateway } from "./Api.ts"; import * as HttpServerRequest from "@effect/platform/HttpServerRequest"; import { HeraldConfig } from "../Config/Layer.ts"; import { verifyIncomingSigV4Detailed } from "../Services/Auth.ts"; +import type { SigV4VerifiedContext } from "../Services/Auth.ts"; /** * Middleware that at debug log level logs every outgoing response's status and @@ -201,6 +202,8 @@ export const makeS3Router = (prefix = "") => const attrs = { bucket, method }; return yield* Effect.gen(function* () { + let sigV4Context: SigV4VerifiedContext | undefined; + yield* Effect.logDebug("Incoming request", { method, path: pathname, @@ -307,6 +310,7 @@ export const makeS3Router = (prefix = "") => new AccessDenied({ message: "Access Denied" }), ); } + sigV4Context = validation.context; } } @@ -314,7 +318,7 @@ export const makeS3Router = (prefix = "") => const backendLayer = Layer.succeed(Backend, backend); return yield* handler.pipe( - Effect.provideService(RequestContext, { bucket }), + Effect.provideService(RequestContext, { bucket, sigV4Context }), Effect.provide(backendLayer), ); }).pipe( diff --git a/src/Frontend/Multipart/Put.ts b/src/Frontend/Multipart/Put.ts index ec6a835..a44bc33 100644 --- a/src/Frontend/Multipart/Put.ts +++ b/src/Frontend/Multipart/Put.ts @@ -8,6 +8,7 @@ import { import { Backend, InvalidRequest } from "../../Services/Backend.ts"; import { S3HeaderService } from "../../Services/S3HeaderService.ts"; import { S3Xml } from "../../Services/S3Xml.ts"; +import { RequestContext } from "../Utils.ts"; function getHeader( headers: Record, @@ -25,6 +26,7 @@ function getHeader( export const uploadPart = Effect.gen(function* () { const backend = yield* Backend; const request = yield* HttpServerRequest.HttpServerRequest; + const { sigV4Context } = yield* RequestContext; const { key, s3Params } = yield* S3RequestParser; const headerService = yield* S3HeaderService; const s3Xml = yield* S3Xml; @@ -71,7 +73,10 @@ export const uploadPart = Effect.gen(function* () { contentType: getHeader(request.headers, "content-type"), }); const bodyStream = hasAwsChunked - ? decodeAwsChunkedBodyStream(request.stream) + ? decodeAwsChunkedBodyStream(request.stream, { + headers: request.headers, + sigV4Context, + }) : request.stream; const result = yield* backend.uploadPart( diff --git a/src/Frontend/Objects/Put.ts b/src/Frontend/Objects/Put.ts index c0550dd..599cb1e 100644 --- a/src/Frontend/Objects/Put.ts +++ b/src/Frontend/Objects/Put.ts @@ -232,6 +232,7 @@ const copyObject = Effect.gen(function* () { export const putObject = Effect.gen(function* () { const backend = yield* Backend; const request = yield* HttpServerRequest.HttpServerRequest; + const { sigV4Context } = yield* RequestContext; const { key, s3Params } = yield* S3RequestParser; yield* ensureClientWritableKey(key); const headerService = yield* S3HeaderService; @@ -261,7 +262,10 @@ export const putObject = Effect.gen(function* () { }); const bodyStream = hasAwsChunked - ? decodeAwsChunkedBodyStream(request.stream) + ? decodeAwsChunkedBodyStream(request.stream, { + headers: request.headers, + sigV4Context, + }) : request.stream; const result = yield* backend.putObject( diff --git a/src/Frontend/Utils.ts b/src/Frontend/Utils.ts index 4513242..ae83585 100644 --- a/src/Frontend/Utils.ts +++ b/src/Frontend/Utils.ts @@ -2,6 +2,7 @@ import { HttpServerRequest, Url } from "@effect/platform"; import { Context, Effect, Either, Schema } from "effect"; import { InternalError } from "../Services/Backend.ts"; import { S3HeaderService } from "../Services/S3HeaderService.ts"; +import type { SigV4VerifiedContext } from "../Services/Auth.ts"; /** * Context for S3 operations (bucket or object). @@ -10,6 +11,7 @@ export class RequestContext extends Context.Tag("RequestContext")< RequestContext, { readonly bucket: string; + readonly sigV4Context?: SigV4VerifiedContext; } >() {} diff --git a/src/Services/Auth.ts b/src/Services/Auth.ts index 217ea27..8361311 100644 --- a/src/Services/Auth.ts +++ b/src/Services/Auth.ts @@ -31,8 +31,20 @@ export type SigV4ValidationFailure = | "RequestTimeTooSkewed" | "InvalidSignature"; +export interface SigV4VerifiedContext { + readonly accessKeyId: string; + readonly secretAccessKey: string; + readonly scopeDate: string; + readonly scopeRegion: string; + readonly scopeService: string; + readonly amzDate: string; + readonly initialSignature: string; + readonly signedHeaders: readonly string[]; + readonly isPresigned: boolean; +} + export type SigV4ValidationResult = - | { readonly valid: true } + | { readonly valid: true; readonly context: SigV4VerifiedContext } | { readonly valid: false; readonly failure: SigV4ValidationFailure }; /** @@ -125,6 +137,7 @@ export function verifyIncomingSigV4Detailed( let credentialService: string | undefined; let signedHeadersList: string[] = []; let headerRegion: string | undefined; + let parsedHeaderSignature: string | undefined; if (authHeader?.startsWith("AWS4-HMAC-SHA256")) { const match = authHeader.match(/Credential=([^, ]+)/); @@ -142,6 +155,10 @@ export function verifyIncomingSigV4Detailed( if (headersMatch && headersMatch[1]) { signedHeadersList = headersMatch[1].split(";"); } + const signatureMatch = authHeader.match(/Signature=([0-9a-fA-F]+)/); + if (signatureMatch && signatureMatch[1]) { + parsedHeaderSignature = signatureMatch[1].toLowerCase(); + } } else if (authHeader !== undefined && !hasSigInQuery) { return { valid: false, failure: "MalformedAuthorization" } as const; } else if (hasSigInQuery) { @@ -345,7 +362,32 @@ export function verifyIncomingSigV4Detailed( encoder.encode(authHeader), encoder.encode(expectedAuth), ); - if (isValid) return { valid: true } as const; + if (isValid) { + const initialSignature = parsedHeaderSignature; + const amzDateFromRequest = headers["x-amz-date"] ?? + queryParams.get("X-Amz-Date") ?? + ""; + if ( + initialSignature === undefined || + amzDateFromRequest === "" + ) { + continue; + } + return { + valid: true, + context: { + accessKeyId: cred.accessKeyId, + secretAccessKey: cred.secretAccessKey, + scopeDate: credentialDate, + scopeRegion: effectiveRegion, + scopeService: credentialService, + amzDate: amzDateFromRequest, + initialSignature, + signedHeaders: [...signedHeadersList], + isPresigned: false, + }, + } as const; + } } else { const expectedSig = (signed.query as Record)[ "X-Amz-Signature" @@ -361,7 +403,26 @@ export function verifyIncomingSigV4Detailed( encoder.encode(actualSig), encoder.encode(expectedSig), ); - if (isValid) return { valid: true } as const; + if (isValid) { + const amzDateFromQuery = queryParams.get("X-Amz-Date"); + if (amzDateFromQuery === null) { + continue; + } + return { + valid: true, + context: { + accessKeyId: cred.accessKeyId, + secretAccessKey: cred.secretAccessKey, + scopeDate: credentialDate, + scopeRegion: effectiveRegion, + scopeService: credentialService, + amzDate: amzDateFromQuery, + initialSignature: actualSig.toLowerCase(), + signedHeaders: [...signedHeadersList], + isPresigned: true, + }, + } as const; + } } } diff --git a/src/Services/AwsChunked.ts b/src/Services/AwsChunked.ts index 40cc542..e8e1d27 100644 --- a/src/Services/AwsChunked.ts +++ b/src/Services/AwsChunked.ts @@ -1,6 +1,8 @@ import { Stream } from "effect"; -import { InvalidRequest } from "./Backend.ts"; +import { AccessDenied, InvalidRequest } from "./Backend.ts"; import { normalizeHeaders } from "./S3HeaderService.ts"; +import type { SigV4VerifiedContext } from "./Auth.ts"; +import { createHash, createHmac } from "node-crypto"; const CR = 13; const LF = 10; @@ -9,6 +11,9 @@ const MAX_CHUNK_SIZE_BYTES = 128 * 1024 * 1024; const MAX_TOTAL_BUFFERED_BYTES = MAX_CHUNK_SIZE_BYTES + MAX_CONTROL_LINE_LENGTH + 4; +const EMPTY_SHA256_HEX = + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; +const SIGV4_STREAMING_ALGORITHM = "AWS4-HMAC-SHA256-PAYLOAD"; const appendBytes = ( a: Uint8Array, @@ -29,7 +34,9 @@ const findCrlf = (buffer: Uint8Array): number => { return -1; }; -const parseChunkSizeLine = (line: string): number => { +const parseChunkControlLine = ( + line: string, +): { size: number; signature?: string } => { const semi = line.indexOf(";"); const token = (semi === -1 ? line : line.slice(0, semi)).trim(); if (token.length === 0 || token.length > MAX_CONTROL_LINE_LENGTH) { @@ -48,14 +55,120 @@ const parseChunkSizeLine = (line: string): number => { message: "Invalid aws-chunked chunk-size line", }); } - return size; + + if (semi === -1) { + return { size }; + } + + const extensions = line.slice(semi + 1).split(";"); + let signature: string | undefined; + for (const ext of extensions) { + const eq = ext.indexOf("="); + if (eq === -1) continue; + const key = ext.slice(0, eq).trim().toLowerCase(); + const value = ext.slice(eq + 1).trim(); + if (key === "chunk-signature" && value !== "") { + signature = value.toLowerCase(); + break; + } + } + + return { size, signature }; +}; + +export type AwsChunkedMode = + | "none" + | "aws-chunked-encoding" + | "streaming-signed-payload" + | "streaming-unsigned-payload-trailer"; + +const getSigV4StreamingMode = ( + headers: Record, +): AwsChunkedMode => { + const normalized = normalizeHeaders(headers); + const amzContentSha256 = normalized["x-amz-content-sha256"]?.trim() + .toUpperCase(); + if (amzContentSha256 === "STREAMING-AWS4-HMAC-SHA256-PAYLOAD") { + return "streaming-signed-payload"; + } + if (amzContentSha256 === "STREAMING-UNSIGNED-PAYLOAD-TRAILER") { + return "streaming-unsigned-payload-trailer"; + } + const encoding = normalized["content-encoding"]; + const hasDecodedContentLength = normalized["x-amz-decoded-content-length"] !== + undefined && + normalized["x-amz-decoded-content-length"].trim() !== ""; + const hasAwsChunkedEncoding = encoding !== undefined && + encoding.trim() !== "" && + encoding.toLowerCase().split(",").map((s) => s.trim()).some((token) => + token === "aws-chunked" + ); + return hasAwsChunkedEncoding && hasDecodedContentLength + ? "aws-chunked-encoding" + : "none"; +}; + +const deriveSigV4SigningKey = ( + secretAccessKey: string, + scopeDate: string, + scopeRegion: string, + scopeService: string, +): Uint8Array => { + const kDate = createHmac("sha256", `AWS4${secretAccessKey}`) + .update(scopeDate, "utf8") + .digest(); + const kRegion = createHmac("sha256", kDate).update(scopeRegion, "utf8") + .digest(); + const kService = createHmac("sha256", kRegion).update(scopeService, "utf8") + .digest(); + return createHmac("sha256", kService).update("aws4_request", "utf8").digest(); +}; + +const verifyStreamingChunkSignature = ( + signingKey: Uint8Array, + previousSignature: string, + amzDate: string, + scopeDate: string, + scopeRegion: string, + scopeService: string, + chunkBytes: Uint8Array, + chunkSignature: string, +): string => { + const scope = `${scopeDate}/${scopeRegion}/${scopeService}/aws4_request`; + const chunkHash = createHash("sha256").update(chunkBytes).digest("hex"); + const stringToSign = + `${SIGV4_STREAMING_ALGORITHM}\n${amzDate}\n${scope}\n${previousSignature}\n${EMPTY_SHA256_HEX}\n${chunkHash}`; + const expected = createHmac("sha256", signingKey).update(stringToSign, "utf8") + .digest("hex"); + if (expected !== chunkSignature.toLowerCase()) { + throw new AccessDenied({ + message: + "The request signature we calculated does not match the signature you provided.", + }); + } + return expected; }; +interface AwsChunkedParserOptions { + readonly requireChunkSignatures: boolean; + readonly signingKey?: Uint8Array; + readonly sigV4Context?: SigV4VerifiedContext; + readonly expectedDecodedLength?: number; +} + class AwsChunkedParser { private buffer: Uint8Array = new Uint8Array(0); private expectedSize = 0; + private expectedChunkSignature: string | undefined; private phase: "size" | "data" | "data-crlf" | "trailers" | "done" = "size"; private readonly decoder = new TextDecoder(); + private previousSignature: string | undefined; + private decodedBytes = 0; + + constructor(private readonly options: AwsChunkedParserOptions) { + this.previousSignature = options.sigV4Context?.initialSignature + .toLowerCase(); + } feed(chunk: Uint8Array): Uint8Array[] { this.buffer = appendBytes(this.buffer, chunk); @@ -77,11 +190,38 @@ class AwsChunkedParser { } break; } - const lineBytes = this.buffer.slice(0, idx); - const line = this.decoder.decode(lineBytes); + const line = this.decoder.decode(this.buffer.slice(0, idx)); this.buffer = this.buffer.slice(idx + 2); - this.expectedSize = parseChunkSizeLine(line); + const parsed = parseChunkControlLine(line); + this.expectedSize = parsed.size; + this.expectedChunkSignature = parsed.signature; + if ( + this.options.requireChunkSignatures && !this.expectedChunkSignature + ) { + throw new AccessDenied({ + message: + "The request signature we calculated does not match the signature you provided.", + }); + } if (this.expectedSize === 0) { + if ( + this.options.requireChunkSignatures && + this.options.signingKey && + this.options.sigV4Context && + this.expectedChunkSignature && + this.previousSignature + ) { + this.previousSignature = verifyStreamingChunkSignature( + this.options.signingKey, + this.previousSignature, + this.options.sigV4Context.amzDate, + this.options.sigV4Context.scopeDate, + this.options.sigV4Context.scopeRegion, + this.options.sigV4Context.scopeService, + new Uint8Array(0), + this.expectedChunkSignature, + ); + } this.phase = "trailers"; } else { this.phase = "data"; @@ -91,7 +231,27 @@ class AwsChunkedParser { if (this.phase === "data") { if (this.buffer.length < this.expectedSize) break; - out.push(this.buffer.slice(0, this.expectedSize)); + const payload = this.buffer.slice(0, this.expectedSize); + if ( + this.options.requireChunkSignatures && + this.options.signingKey && + this.options.sigV4Context && + this.expectedChunkSignature && + this.previousSignature + ) { + this.previousSignature = verifyStreamingChunkSignature( + this.options.signingKey, + this.previousSignature, + this.options.sigV4Context.amzDate, + this.options.sigV4Context.scopeDate, + this.options.sigV4Context.scopeRegion, + this.options.sigV4Context.scopeService, + payload, + this.expectedChunkSignature, + ); + } + out.push(payload); + this.decodedBytes += payload.length; this.buffer = this.buffer.slice(this.expectedSize); this.phase = "data-crlf"; continue; @@ -147,6 +307,15 @@ class AwsChunkedParser { message: "Incomplete aws-chunked payload", }); } + if ( + this.options.expectedDecodedLength !== undefined && + this.decodedBytes !== this.options.expectedDecodedLength + ) { + throw new InvalidRequest({ + message: + "Decoded payload length does not match x-amz-decoded-content-length", + }); + } if (this.buffer.length !== 0) { throw new InvalidRequest({ message: "Unexpected buffered bytes after aws-chunked payload", @@ -158,21 +327,77 @@ class AwsChunkedParser { export const hasAwsChunkedContentEncoding = ( headers: Record, ): boolean => { - const normalized = normalizeHeaders(headers); - const encoding = normalized["content-encoding"]; - if (encoding === undefined || encoding.trim() === "") { - return false; + return getSigV4StreamingMode(headers) !== "none"; +}; + +export const stripAwsChunkedFromContentEncoding = ( + contentEncoding: string | undefined, +): string | undefined => { + if (contentEncoding === undefined) { + return undefined; } - return encoding.toLowerCase().split(",").map((s) => s.trim()).some((token) => - token === "aws-chunked" - ); + const filtered = contentEncoding + .split(",") + .map((token) => token.trim()) + .filter((token) => token !== "" && token.toLowerCase() !== "aws-chunked"); + if (filtered.length === 0) { + return undefined; + } + return filtered.join(", "); }; +interface AwsChunkedDecodeOptions { + readonly headers?: Record; + readonly sigV4Context?: SigV4VerifiedContext; +} + export const decodeAwsChunkedBodyStream = ( stream: Stream.Stream, -): Stream.Stream => { + options?: AwsChunkedDecodeOptions, +): Stream.Stream => { + const normalized = options?.headers ? normalizeHeaders(options.headers) : {}; + const mode = options?.headers + ? getSigV4StreamingMode(options.headers) + : "aws-chunked-encoding"; + const expectedDecodedLengthRaw = normalized["x-amz-decoded-content-length"]; + let expectedDecodedLength: number | undefined; + if (expectedDecodedLengthRaw !== undefined) { + const parsedDecodedLength = Number.parseInt(expectedDecodedLengthRaw, 10); + if (!Number.isInteger(parsedDecodedLength) || parsedDecodedLength < 0) { + return Stream.fail( + new InvalidRequest({ + message: "Invalid x-amz-decoded-content-length", + }), + ); + } + expectedDecodedLength = parsedDecodedLength; + } + + const requireChunkSignatures = mode === "streaming-signed-payload"; + if (requireChunkSignatures && options?.sigV4Context === undefined) { + return Stream.fail( + new AccessDenied({ + message: + "The request signature we calculated does not match the signature you provided.", + }), + ); + } + const signingKey = requireChunkSignatures && options?.sigV4Context + ? deriveSigV4SigningKey( + options.sigV4Context.secretAccessKey, + options.sigV4Context.scopeDate, + options.sigV4Context.scopeRegion, + options.sigV4Context.scopeService, + ) + : undefined; + const source = Stream.toReadableStream(stream); - const parser = new AwsChunkedParser(); + const parser = new AwsChunkedParser({ + requireChunkSignatures, + signingKey, + sigV4Context: options?.sigV4Context, + expectedDecodedLength, + }); const decoded = source.pipeThrough( new TransformStream({ transform(chunk, controller) { @@ -198,7 +423,7 @@ export const decodeAwsChunkedBodyStream = ( return Stream.fromReadableStream( () => decoded, (error) => - error instanceof InvalidRequest + error instanceof InvalidRequest || error instanceof AccessDenied ? error : new InvalidRequest({ message: String(error) }), ); diff --git a/src/Services/Backend.ts b/src/Services/Backend.ts index 6cd62c6..a017ef6 100644 --- a/src/Services/Backend.ts +++ b/src/Services/Backend.ts @@ -188,6 +188,7 @@ export interface ObjectResponse extends ChecksumInfo { readonly stream: Stream.Stream; readonly nativeStream?: ReadableStream; readonly contentType?: string; + readonly contentEncoding?: string; readonly contentLength?: number; readonly etag?: string; readonly lastModified?: Date; @@ -198,6 +199,7 @@ export interface ObjectResponse extends ChecksumInfo { export interface HeadObjectResult extends ChecksumInfo { readonly contentType?: string; + readonly contentEncoding?: string; readonly contentLength?: number; readonly etag?: string; readonly lastModified?: Date; diff --git a/src/Services/S3HeaderService.ts b/src/Services/S3HeaderService.ts index 92d2e42..ba984bf 100644 --- a/src/Services/S3HeaderService.ts +++ b/src/Services/S3HeaderService.ts @@ -80,6 +80,9 @@ export class S3HeaderService if ("contentType" in result && result.contentType) { headers["Content-Type"] = result.contentType; } + if ("contentEncoding" in result && result.contentEncoding) { + headers["Content-Encoding"] = result.contentEncoding; + } // Metadata if ("metadata" in result && result.metadata) { @@ -203,8 +206,11 @@ export class S3HeaderService const hasAwsChunked = contentEncoding !== undefined && contentEncoding.toLowerCase().split(",").map((s) => s.trim()) .includes("aws-chunked"); + const amzContentSha256 = normalized["x-amz-content-sha256"]; + const hasStreamingSigV4 = amzContentSha256 !== undefined && + amzContentSha256.trim().toUpperCase().startsWith("STREAMING-"); if ( - hasAwsChunked && + (hasAwsChunked || hasStreamingSigV4) && normalized["x-amz-decoded-content-length"] !== undefined ) { return parseNonNegativeInteger( @@ -253,6 +259,8 @@ export class S3HeaderService s3Headers[`x-amz-meta-${metaKey}`] = decodedValue; } else if (k === "content-type") { s3Headers["Content-Type"] = v; + } else if (k === "content-encoding") { + s3Headers["Content-Encoding"] = v; } else if (k === "content-length") { s3Headers["Content-Length"] = v; } else if (k === "etag") { diff --git a/tests/aws-chunked.test.ts b/tests/aws-chunked.test.ts index 438bec2..52730e3 100644 --- a/tests/aws-chunked.test.ts +++ b/tests/aws-chunked.test.ts @@ -1,11 +1,66 @@ import { Chunk, Effect, Stream } from "effect"; import { assertEquals } from "@std/assert"; -import { decodeAwsChunkedBodyStream } from "../src/Services/AwsChunked.ts"; +import { createHash, createHmac } from "node-crypto"; +import { + decodeAwsChunkedBodyStream, + hasAwsChunkedContentEncoding, +} from "../src/Services/AwsChunked.ts"; import { S3HeaderService } from "../src/Services/S3HeaderService.ts"; import { testEffect } from "./utils.ts"; +import type { SigV4VerifiedContext } from "../src/Services/Auth.ts"; const bytes = (s: string) => new TextEncoder().encode(s); const text = (u: Uint8Array) => new TextDecoder().decode(u); +const EMPTY_SHA256_HEX = + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; + +const deriveSigningKey = ( + secretAccessKey: string, + scopeDate: string, + scopeRegion: string, + scopeService: string, +): Uint8Array => { + const kDate = createHmac("sha256", `AWS4${secretAccessKey}`) + .update(scopeDate, "utf8").digest(); + const kRegion = createHmac("sha256", kDate).update(scopeRegion, "utf8") + .digest(); + const kService = createHmac("sha256", kRegion).update(scopeService, "utf8") + .digest(); + return createHmac("sha256", kService).update("aws4_request", "utf8").digest(); +}; + +const buildSignedPayload = ( + plaintext: string, + context: SigV4VerifiedContext, +): string => { + const signingKey = deriveSigningKey( + context.secretAccessKey, + context.scopeDate, + context.scopeRegion, + context.scopeService, + ); + const scope = + `${context.scopeDate}/${context.scopeRegion}/${context.scopeService}/aws4_request`; + const chunkHash = createHash("sha256").update(plaintext, "utf8").digest( + "hex", + ); + const chunkStringToSign = + `AWS4-HMAC-SHA256-PAYLOAD\n${context.amzDate}\n${scope}\n${context.initialSignature}\n${EMPTY_SHA256_HEX}\n${chunkHash}`; + const chunkSignature = createHmac("sha256", signingKey).update( + chunkStringToSign, + "utf8", + ).digest("hex"); + + const finalStringToSign = + `AWS4-HMAC-SHA256-PAYLOAD\n${context.amzDate}\n${scope}\n${chunkSignature}\n${EMPTY_SHA256_HEX}\n${EMPTY_SHA256_HEX}`; + const finalSignature = createHmac("sha256", signingKey).update( + finalStringToSign, + "utf8", + ).digest("hex"); + + const sizeHex = plaintext.length.toString(16); + return `${sizeHex};chunk-signature=${chunkSignature}\r\n${plaintext}\r\n0;chunk-signature=${finalSignature}\r\n\r\n`; +}; const collectBytes = (chunks: Chunk.Chunk): Uint8Array => { const total = Chunk.reduce(chunks, 0, (acc, c) => acc + c.length); @@ -75,3 +130,128 @@ testEffect( assertEquals(parsed.s3Params.contentLength, 11); }).pipe(Effect.provide(S3HeaderService.Default)), ); + +testEffect("aws-chunked/detect/by-content-encoding", () => + Effect.sync(() => { + assertEquals( + hasAwsChunkedContentEncoding({ + "content-encoding": "aws-chunked", + }), + false, + ); + })); + +testEffect( + "aws-chunked/detect/by-content-encoding-with-decoded-length", + () => + Effect.sync(() => { + assertEquals( + hasAwsChunkedContentEncoding({ + "content-encoding": "aws-chunked", + "x-amz-decoded-content-length": "11", + }), + true, + ); + }), +); + +testEffect( + "aws-chunked/detect/by-streaming-sha256-header", + () => + Effect.sync(() => { + assertEquals( + hasAwsChunkedContentEncoding({ + "x-amz-content-sha256": "STREAMING-AWS4-HMAC-SHA256-PAYLOAD", + }), + true, + ); + }), +); + +testEffect( + "aws-chunked/decoded-content-length/streaming-sha256-header-parse", + () => + Effect.gen(function* () { + const headerService = yield* S3HeaderService; + const parsed = headerService.fromRequestHeaders({ + "x-amz-content-sha256": "STREAMING-AWS4-HMAC-SHA256-PAYLOAD", + "content-length": "999", + "x-amz-decoded-content-length": "11", + }); + assertEquals(parsed.s3Params.contentLength, 11); + }).pipe(Effect.provide(S3HeaderService.Default)), +); + +testEffect( + "aws-chunked/verify/streaming-signed-payload", + () => + Effect.gen(function* () { + const context: SigV4VerifiedContext = { + accessKeyId: "test", + secretAccessKey: "test-secret", + scopeDate: "20260303", + scopeRegion: "us-east-1", + scopeService: "s3", + amzDate: "20260303T000000Z", + initialSignature: + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + signedHeaders: ["host", "x-amz-date"], + isPresigned: false, + }; + const framed = buildSignedPayload("hello world", context); + const decoded = decodeAwsChunkedBodyStream( + Stream.succeed(bytes(framed)), + { + headers: { + "x-amz-content-sha256": "STREAMING-AWS4-HMAC-SHA256-PAYLOAD", + "x-amz-decoded-content-length": "11", + }, + sigV4Context: context, + }, + ); + const chunks = yield* Stream.runCollect(decoded); + assertEquals(text(collectBytes(chunks)), "hello world"); + }), +); + +testEffect( + "aws-chunked/verify/streaming-signed-payload/bad-signature", + () => + Effect.gen(function* () { + const context: SigV4VerifiedContext = { + accessKeyId: "test", + secretAccessKey: "test-secret", + scopeDate: "20260303", + scopeRegion: "us-east-1", + scopeService: "s3", + amzDate: "20260303T000000Z", + initialSignature: + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + signedHeaders: ["host", "x-amz-date"], + isPresigned: false, + }; + const framed = + "b;chunk-signature=abc\r\nhello world\r\n0;chunk-signature=def\r\n\r\n"; + const exit = yield* Stream.runCollect( + decodeAwsChunkedBodyStream( + Stream.succeed(bytes(framed)), + { + headers: { + "x-amz-content-sha256": "STREAMING-AWS4-HMAC-SHA256-PAYLOAD", + "x-amz-decoded-content-length": "11", + }, + sigV4Context: context, + }, + ), + ).pipe(Effect.exit); + if (exit._tag !== "Failure") { + throw new Error("Expected bad signature to fail"); + } + const pretty = String(exit.cause); + if (!pretty.includes("AccessDenied")) { + throw new Error( + `Expected AccessDenied for bad signature, got: ${pretty}`, + ); + } + }), +); diff --git a/tests/integration/aws-chunked-put.test.ts b/tests/integration/aws-chunked-put.test.ts index 9a66815..f6cb166 100644 --- a/tests/integration/aws-chunked-put.test.ts +++ b/tests/integration/aws-chunked-put.test.ts @@ -9,6 +9,7 @@ import { } from "@aws-sdk/client-s3"; import { SignatureV4 } from "@smithy/signature-v4"; import { Sha256 } from "@aws-crypto/sha256"; +import { createHash, createHmac } from "node-crypto"; import { harness, type ProxyTestCase } from "../utils.ts"; import type { GlobalConfig } from "../../src/Domain/Config.ts"; @@ -33,15 +34,124 @@ const MULTIPART_KEY = "aws-chunked-multipart-object.txt"; const PLAINTEXT = "hello world"; const CHUNKED_PAYLOAD = "b;chunk-signature=abc\r\nhello world\r\n0;chunk-signature=def\r\n\r\n"; +const EMPTY_SHA256_HEX = + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; const credentials = { accessKeyId: "minioadmin", secretAccessKey: "minioadmin", }; -async function sendAwsChunkedPut(baseUrl: string): Promise { +type SigV4StreamingMode = "with-content-encoding" | "streaming-sha256-only"; + +function awsChunkedHeaders( + mode: SigV4StreamingMode, + contentLength?: number, +): Record { + if (mode === "streaming-sha256-only") { + const headers: Record = { + "x-amz-content-sha256": "STREAMING-AWS4-HMAC-SHA256-PAYLOAD", + "x-amz-decoded-content-length": String(PLAINTEXT.length), + }; + if (contentLength !== undefined) { + headers["content-length"] = String(contentLength); + } + return headers; + } + return { + "content-encoding": "aws-chunked", + ...(contentLength !== undefined + ? { "content-length": String(contentLength) } + : {}), + "x-amz-content-sha256": "UNSIGNED-PAYLOAD", + "x-amz-decoded-content-length": String(PLAINTEXT.length), + }; +} + +const sha256Hex = (value: string): string => + createHash("sha256").update(value, "utf8").digest("hex"); + +const hmacHex = ( + key: Uint8Array, + value: string, +): string => createHmac("sha256", key).update(value, "utf8").digest("hex"); + +function deriveSigningKey( + secretAccessKey: string, + scopeDate: string, + scopeRegion: string, + scopeService: string, +): Uint8Array { + const kDate = createHmac("sha256", `AWS4${secretAccessKey}`) + .update(scopeDate, "utf8").digest(); + const kRegion = createHmac("sha256", kDate).update(scopeRegion, "utf8") + .digest(); + const kService = createHmac("sha256", kRegion).update(scopeService, "utf8") + .digest(); + return createHmac("sha256", kService).update("aws4_request", "utf8").digest(); +} + +function parseAuthorizationHeader( + authorization: string, +): { + scopeDate: string; + scopeRegion: string; + scopeService: string; + initialSignature: string; +} { + const credentialMatch = authorization.match(/Credential=([^, ]+)/); + if (!credentialMatch || !credentialMatch[1]) { + throw new Error("Missing Credential in Authorization header"); + } + const parts = credentialMatch[1].split("/"); + if (parts.length < 5) { + throw new Error("Malformed Credential scope in Authorization header"); + } + const signatureMatch = authorization.match(/Signature=([0-9a-fA-F]+)/); + if (!signatureMatch || !signatureMatch[1]) { + throw new Error("Missing Signature in Authorization header"); + } + return { + scopeDate: parts[1], + scopeRegion: parts[2], + scopeService: parts[3], + initialSignature: signatureMatch[1].toLowerCase(), + }; +} + +function buildStreamingSigV4Payload( + plaintext: string, + amzDate: string, + scopeDate: string, + scopeRegion: string, + scopeService: string, + initialSignature: string, +): string { + const signingKey = deriveSigningKey( + credentials.secretAccessKey, + scopeDate, + scopeRegion, + scopeService, + ); + const scope = `${scopeDate}/${scopeRegion}/${scopeService}/aws4_request`; + const chunkHash = sha256Hex(plaintext); + const chunkStringToSign = + `AWS4-HMAC-SHA256-PAYLOAD\n${amzDate}\n${scope}\n${initialSignature}\n${EMPTY_SHA256_HEX}\n${chunkHash}`; + const chunkSignature = hmacHex(signingKey, chunkStringToSign); + + const finalChunkStringToSign = + `AWS4-HMAC-SHA256-PAYLOAD\n${amzDate}\n${scope}\n${chunkSignature}\n${EMPTY_SHA256_HEX}\n${EMPTY_SHA256_HEX}`; + const finalChunkSignature = hmacHex(signingKey, finalChunkStringToSign); + + const sizeHex = plaintext.length.toString(16); + return `${sizeHex};chunk-signature=${chunkSignature}\r\n${plaintext}\r\n0;chunk-signature=${finalChunkSignature}\r\n\r\n`; +} + +async function sendAwsChunkedPut( + baseUrl: string, + mode: SigV4StreamingMode = "with-content-encoding", +): Promise { const url = new URL(`${baseUrl}/${BUCKET}/${KEY}`); - const body = new TextEncoder().encode(CHUNKED_PAYLOAD); const signer = new SignatureV4({ credentials, region: "us-east-1", @@ -49,22 +159,61 @@ async function sendAwsChunkedPut(baseUrl: string): Promise { sha256: Sha256, }); - const signed = await signer.sign({ + let body = new TextEncoder().encode(CHUNKED_PAYLOAD); + let signed = await signer.sign({ method: "PUT", protocol: url.protocol, hostname: url.hostname, port: url.port === "" ? undefined : parseInt(url.port, 10), path: url.pathname, query: {}, - headers: { - "content-encoding": "aws-chunked", - "content-length": String(body.length), - "x-amz-content-sha256": "UNSIGNED-PAYLOAD", - "x-amz-decoded-content-length": String(PLAINTEXT.length), - }, + headers: awsChunkedHeaders(mode, body.length), body, }); + if (mode === "streaming-sha256-only") { + const authorization = signed.headers["authorization"]; + const amzDate = signed.headers["x-amz-date"]; + if (!authorization || !amzDate) { + throw new Error("Expected Authorization and x-amz-date for SigV4"); + } + const auth = parseAuthorizationHeader(authorization); + const payload = buildStreamingSigV4Payload( + PLAINTEXT, + amzDate, + auth.scopeDate, + auth.scopeRegion, + auth.scopeService, + auth.initialSignature, + ); + body = new TextEncoder().encode(payload); + signed = await signer.sign({ + method: "PUT", + protocol: url.protocol, + hostname: url.hostname, + port: url.port === "" ? undefined : parseInt(url.port, 10), + path: url.pathname, + query: {}, + headers: awsChunkedHeaders(mode, body.length), + body, + }); + const authorization2 = signed.headers["authorization"]; + const amzDate2 = signed.headers["x-amz-date"]; + if (!authorization2 || !amzDate2) { + throw new Error("Expected Authorization and x-amz-date for SigV4"); + } + const auth2 = parseAuthorizationHeader(authorization2); + const payload2 = buildStreamingSigV4Payload( + PLAINTEXT, + amzDate2, + auth2.scopeDate, + auth2.scopeRegion, + auth2.scopeService, + auth2.initialSignature, + ); + body = new TextEncoder().encode(payload2); + } + const requestHeaders = new Headers(); for (const [key, value] of Object.entries(signed.headers)) { if (key.toLowerCase() === "host") { @@ -85,12 +234,12 @@ async function sendAwsChunkedPut(baseUrl: string): Promise { async function sendAwsChunkedUploadPart( baseUrl: string, uploadId: string, + mode: SigV4StreamingMode = "with-content-encoding", ): Promise { const url = new URL(`${baseUrl}/${BUCKET}/${MULTIPART_KEY}`); url.searchParams.set("partNumber", "1"); url.searchParams.set("uploadId", uploadId); - const body = new TextEncoder().encode(CHUNKED_PAYLOAD); const signer = new SignatureV4({ credentials, region: "us-east-1", @@ -98,7 +247,8 @@ async function sendAwsChunkedUploadPart( sha256: Sha256, }); - const signed = await signer.sign({ + let body = new TextEncoder().encode(CHUNKED_PAYLOAD); + let signed = await signer.sign({ method: "PUT", protocol: url.protocol, hostname: url.hostname, @@ -108,15 +258,56 @@ async function sendAwsChunkedUploadPart( partNumber: "1", uploadId, }, - headers: { - "content-encoding": "aws-chunked", - "content-length": String(body.length), - "x-amz-content-sha256": "UNSIGNED-PAYLOAD", - "x-amz-decoded-content-length": String(PLAINTEXT.length), - }, + headers: awsChunkedHeaders(mode, body.length), body, }); + if (mode === "streaming-sha256-only") { + const authorization = signed.headers["authorization"]; + const amzDate = signed.headers["x-amz-date"]; + if (!authorization || !amzDate) { + throw new Error("Expected Authorization and x-amz-date for SigV4"); + } + const auth = parseAuthorizationHeader(authorization); + const payload = buildStreamingSigV4Payload( + PLAINTEXT, + amzDate, + auth.scopeDate, + auth.scopeRegion, + auth.scopeService, + auth.initialSignature, + ); + body = new TextEncoder().encode(payload); + signed = await signer.sign({ + method: "PUT", + protocol: url.protocol, + hostname: url.hostname, + port: url.port === "" ? undefined : parseInt(url.port, 10), + path: url.pathname, + query: { + partNumber: "1", + uploadId, + }, + headers: awsChunkedHeaders(mode, body.length), + body, + }); + const authorization2 = signed.headers["authorization"]; + const amzDate2 = signed.headers["x-amz-date"]; + if (!authorization2 || !amzDate2) { + throw new Error("Expected Authorization and x-amz-date for SigV4"); + } + const auth2 = parseAuthorizationHeader(authorization2); + const payload2 = buildStreamingSigV4Payload( + PLAINTEXT, + amzDate2, + auth2.scopeDate, + auth2.scopeRegion, + auth2.scopeService, + auth2.initialSignature, + ); + body = new TextEncoder().encode(payload2); + } + const requestHeaders = new Headers(); for (const [key, value] of Object.entries(signed.headers)) { if (key.toLowerCase() === "host") { @@ -195,6 +386,48 @@ const cases: ProxyTestCase[] = [{ }, ignoreBaseline: true, skipSnapshot: true, +}, { + name: "objects/put/aws-chunked-decoding/streaming-sha256-header-only", + config: testConfig, + beforeAll: async (client) => { + try { + await client.send(new CreateBucketCommand({ Bucket: BUCKET })); + } catch { + // Ignore already-exists races. + } + }, + fn: async (client, context) => { + if (!context?.baseUrl) { + throw new Error("Missing baseUrl in test context"); + } + const putResponse = await sendAwsChunkedPut( + context.baseUrl, + "streaming-sha256-only", + ); + if (putResponse.status !== 200) { + const body = await putResponse.text(); + throw new Error( + `aws-chunked PUT (streaming-sha256-only) failed: status=${putResponse.status} body=${ + body.slice(0, 200) + }`, + ); + } + await verifyStoredBody(client); + }, + afterAll: async (client) => { + try { + await client.send(new DeleteObjectCommand({ Bucket: BUCKET, Key: KEY })); + } catch { + // Ignore cleanup failures. + } + try { + await client.send(new DeleteBucketCommand({ Bucket: BUCKET })); + } catch { + // Ignore cleanup failures. + } + }, + ignoreBaseline: true, + skipSnapshot: true, }, { name: "objects/multipart/aws-chunked-uploadpart-decoding", config: testConfig, @@ -287,6 +520,100 @@ const cases: ProxyTestCase[] = [{ }, ignoreBaseline: true, skipSnapshot: true, +}, { + name: + "objects/multipart/aws-chunked-uploadpart-decoding/streaming-sha256-header-only", + config: testConfig, + beforeAll: async (client) => { + try { + await client.send(new CreateBucketCommand({ Bucket: BUCKET })); + } catch { + // Ignore already-exists races. + } + }, + fn: async (client, context) => { + if (!context?.baseUrl) { + throw new Error("Missing baseUrl in test context"); + } + + const { UploadId } = await client.send( + new CreateMultipartUploadCommand({ + Bucket: BUCKET, + Key: MULTIPART_KEY, + }), + ); + if (!UploadId) { + throw new Error("Expected UploadId"); + } + + const uploadPartResponse = await sendAwsChunkedUploadPart( + context.baseUrl, + UploadId, + "streaming-sha256-only", + ); + if (uploadPartResponse.status !== 200) { + const body = await uploadPartResponse.text(); + throw new Error( + `aws-chunked UploadPart (streaming-sha256-only) failed: status=${uploadPartResponse.status} body=${ + body.slice(0, 200) + }`, + ); + } + + const etag = uploadPartResponse.headers.get("etag"); + if (!etag) { + throw new Error("UploadPart response did not include ETag header"); + } + + await client.send( + new CompleteMultipartUploadCommand({ + Bucket: BUCKET, + Key: MULTIPART_KEY, + UploadId, + MultipartUpload: { + Parts: [{ + PartNumber: 1, + ETag: etag, + }], + }, + }), + ); + + const out = await client.send( + new GetObjectCommand({ + Bucket: BUCKET, + Key: MULTIPART_KEY, + }), + ); + const bytes = await out.Body?.transformToByteArray(); + if (!bytes) { + throw new Error("Expected multipart object body"); + } + const text = new TextDecoder().decode(bytes); + if (text !== PLAINTEXT) { + throw new Error( + `Multipart decoded payload mismatch; expected "${PLAINTEXT}", got "${ + text.slice(0, 120) + }"`, + ); + } + }, + afterAll: async (client) => { + try { + await client.send( + new DeleteObjectCommand({ Bucket: BUCKET, Key: MULTIPART_KEY }), + ); + } catch { + // Ignore cleanup failures. + } + try { + await client.send(new DeleteBucketCommand({ Bucket: BUCKET })); + } catch { + // Ignore cleanup failures. + } + }, + ignoreBaseline: true, + skipSnapshot: true, }]; harness(cases); diff --git a/tests/integration/streaming-compat.test.ts b/tests/integration/streaming-compat.test.ts new file mode 100644 index 0000000..2ee1813 --- /dev/null +++ b/tests/integration/streaming-compat.test.ts @@ -0,0 +1,217 @@ +import { + CreateBucketCommand, + DeleteBucketCommand, + DeleteObjectCommand, + GetObjectCommand, + type S3Client, +} from "@aws-sdk/client-s3"; +import { SignatureV4 } from "@smithy/signature-v4"; +import { Sha256 } from "@aws-crypto/sha256"; +import { assertEquals, harness, type ProxyTestCase } from "../utils.ts"; +import type { GlobalConfig } from "../../src/Domain/Config.ts"; +import { createHash } from "node-crypto"; + +const testConfig: GlobalConfig = { + backends: { + minio: { + protocol: "s3", + endpoint: "http://localhost:9000", + region: "us-east-1", + credentials: { + accessKeyId: "minioadmin", + secretAccessKey: "minioadmin", + }, + buckets: "*", + }, + }, +}; + +const BUCKET = "test-streaming-compat-bucket"; +const KEY_ENCODING = "encoding.txt"; +const KEY_CHUNKED = "chunked-transfer.txt"; +const credentials = { + accessKeyId: "minioadmin", + secretAccessKey: "minioadmin", +}; + +const sha256Hex = (value: string): string => + createHash("sha256").update(value, "utf8").digest("hex"); + +const cleanup = async (client: S3Client) => { + await Promise.all([KEY_ENCODING, KEY_CHUNKED].map(async (key) => { + try { + await client.send(new DeleteObjectCommand({ Bucket: BUCKET, Key: key })); + } catch { + // Ignore cleanup failures. + } + })); + try { + await client.send(new DeleteBucketCommand({ Bucket: BUCKET })); + } catch { + // Ignore cleanup failures. + } +}; + +const signedStreamPutWithoutContentLength = async ( + baseUrl: string, + bucket: string, + key: string, + bodyText: string, +): Promise => { + const url = new URL(`${baseUrl}/${bucket}/${key}`); + const bodyBytes = new TextEncoder().encode(bodyText); + const signer = new SignatureV4({ + credentials, + region: "us-east-1", + service: "s3", + sha256: Sha256, + }); + const signed = await signer.sign({ + method: "PUT", + protocol: url.protocol, + hostname: url.hostname, + port: url.port === "" ? undefined : parseInt(url.port, 10), + path: url.pathname, + query: {}, + headers: { + "x-amz-content-sha256": sha256Hex(bodyText), + }, + body: bodyBytes, + }); + + const requestHeaders = new Headers(); + for (const [headerName, headerValue] of Object.entries(signed.headers)) { + if (headerName.toLowerCase() === "host") { + continue; + } + requestHeaders.set(headerName, headerValue); + } + + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(bodyBytes); + controller.close(); + }, + }); + + return await fetch(url, { + method: "PUT", + headers: requestHeaders, + body: stream, + // @ts-ignore required by fetch implementations for streaming request body + duplex: "half", + }); +}; + +const signedPutWithHeaders = async ( + baseUrl: string, + bucket: string, + key: string, + bodyText: string, + headers: Record, +): Promise => { + const url = new URL(`${baseUrl}/${bucket}/${key}`); + const bodyBytes = new TextEncoder().encode(bodyText); + const signer = new SignatureV4({ + credentials, + region: "us-east-1", + service: "s3", + sha256: Sha256, + }); + const signed = await signer.sign({ + method: "PUT", + protocol: url.protocol, + hostname: url.hostname, + port: url.port === "" ? undefined : parseInt(url.port, 10), + path: url.pathname, + query: {}, + headers: { + ...headers, + "x-amz-content-sha256": sha256Hex(bodyText), + "content-length": String(bodyBytes.length), + }, + body: bodyBytes, + }); + + const requestHeaders = new Headers(); + for (const [headerName, headerValue] of Object.entries(signed.headers)) { + if (headerName.toLowerCase() === "host") { + continue; + } + requestHeaders.set(headerName, headerValue); + } + + return await fetch(url, { + method: "PUT", + headers: requestHeaders, + body: bodyBytes, + // @ts-ignore duplex is required for non-GET body in Deno fetch with streams/body bytes + duplex: "half", + }); +}; + +const cases: ProxyTestCase[] = [ + { + name: "streaming/content-encoding/aws-chunked-stripped", + config: testConfig, + beforeAll: async (client) => { + try { + await client.send(new CreateBucketCommand({ Bucket: BUCKET })); + } catch { + // Ignore already-exists races. + } + }, + fn: async (_client, context) => { + if (!context?.baseUrl) { + throw new Error("Missing baseUrl in test context"); + } + const putResponse = await signedPutWithHeaders( + context.baseUrl, + BUCKET, + KEY_ENCODING, + "hello", + { + "content-encoding": "aws-chunked", + }, + ); + assertEquals(putResponse.status, 200); + }, + afterAll: cleanup, + ignoreBaseline: true, + skipSnapshot: true, + }, + { + name: "streaming/put/chunked-transfer-without-content-length", + config: testConfig, + beforeAll: async (client) => { + try { + await client.send(new CreateBucketCommand({ Bucket: BUCKET })); + } catch { + // Ignore already-exists races. + } + }, + fn: async (client, context) => { + if (!context?.baseUrl) { + throw new Error("Missing baseUrl in test context"); + } + const response = await signedStreamPutWithoutContentLength( + context.baseUrl, + BUCKET, + KEY_CHUNKED, + "bar", + ); + assertEquals(response.status, 200); + + const out = await client.send( + new GetObjectCommand({ Bucket: BUCKET, Key: KEY_CHUNKED }), + ); + const bytes = await out.Body?.transformToByteArray(); + assertEquals(new TextDecoder().decode(bytes ?? new Uint8Array(0)), "bar"); + }, + afterAll: cleanup, + ignoreBaseline: true, + skipSnapshot: true, + }, +]; + +harness(cases); From af06660c1814bf0383ab601a7d7414bd7567b5fc Mon Sep 17 00:00:00 2001 From: Yohe-Am <56622350+Yohe-Am@users.noreply.github.com> Date: Tue, 3 Mar 2026 22:35:29 +0300 Subject: [PATCH 5/6] fix: more test cases Signed-off-by: Yohe-Am <56622350+Yohe-Am@users.noreply.github.com> --- src/Backends/S3/Objects.ts | 9 +- src/Backends/Swift/Objects.ts | 9 +- src/Frontend/Objects/Put.ts | 1 + tests/integration/aws-chunked-put.test.ts | 159 ++++++++++++++++++++++ 4 files changed, 174 insertions(+), 4 deletions(-) diff --git a/src/Backends/S3/Objects.ts b/src/Backends/S3/Objects.ts index 6ba873b..9add487 100644 --- a/src/Backends/S3/Objects.ts +++ b/src/Backends/S3/Objects.ts @@ -17,6 +17,7 @@ import { Chunk, Effect, Option, Stream } from "effect"; import { Readable } from "node-stream"; import type sweb from "node-stream/web"; import { + AccessDenied, type BackendError, BadDigest, type CommonPrefix, @@ -410,8 +411,12 @@ export const makeObjectOps = ( checksums, )).pipe( Stream.catchAll((e) => { - // Preserve BadDigest and InvalidRequest errors from checksum validation - if (e instanceof BadDigest || e instanceof InvalidRequest) { + // Preserve known S3-compatible errors from checksum/chunk-signature validation. + if ( + e instanceof BadDigest || + e instanceof InvalidRequest || + e instanceof AccessDenied + ) { return Stream.fail(e as BackendError); } return Stream.fail( diff --git a/src/Backends/Swift/Objects.ts b/src/Backends/Swift/Objects.ts index 22e7e81..386eb31 100644 --- a/src/Backends/Swift/Objects.ts +++ b/src/Backends/Swift/Objects.ts @@ -12,6 +12,7 @@ import type { PutObjectResult, } from "../../Services/Backend.ts"; import { + AccessDenied, BadDigest, InternalError, InvalidRequest, @@ -510,8 +511,12 @@ export const makeObjectOps = ( checksums, )).pipe( Stream.catchAll((e) => { - // Preserve BadDigest and InvalidRequest errors from checksum validation - if (e instanceof BadDigest || e instanceof InvalidRequest) { + // Preserve known S3-compatible errors from checksum/chunk-signature validation. + if ( + e instanceof BadDigest || + e instanceof InvalidRequest || + e instanceof AccessDenied + ) { return Stream.fail(e as BackendError); } return Stream.fail( diff --git a/src/Frontend/Objects/Put.ts b/src/Frontend/Objects/Put.ts index 599cb1e..21974df 100644 --- a/src/Frontend/Objects/Put.ts +++ b/src/Frontend/Objects/Put.ts @@ -250,6 +250,7 @@ export const putObject = Effect.gen(function* () { yield* Effect.logDebug("PutObject aws-chunked decision", { key, hasAwsChunked, + hasSigV4Context: sigV4Context !== undefined, contentEncoding: getHeader(request.headers, "content-encoding"), transferEncoding: getHeader(request.headers, "transfer-encoding"), amzContentSha256: getHeader(request.headers, "x-amz-content-sha256"), diff --git a/tests/integration/aws-chunked-put.test.ts b/tests/integration/aws-chunked-put.test.ts index f6cb166..2775f54 100644 --- a/tests/integration/aws-chunked-put.test.ts +++ b/tests/integration/aws-chunked-put.test.ts @@ -30,8 +30,10 @@ const testConfig: GlobalConfig = { const BUCKET = "test-aws-chunked-put-bucket"; const KEY = "aws-chunked-object.txt"; +const KOPIA_KEY = "kopia.blobcfg"; const MULTIPART_KEY = "aws-chunked-multipart-object.txt"; const PLAINTEXT = "hello world"; +const KOPIA_PLAINTEXT = "123456789012345678901234567890"; const CHUNKED_PAYLOAD = "b;chunk-signature=abc\r\nhello world\r\n0;chunk-signature=def\r\n\r\n"; const EMPTY_SHA256_HEX = @@ -325,6 +327,96 @@ async function sendAwsChunkedUploadPart( }); } +async function sendKopiaStyleStreamingPut(baseUrl: string): Promise { + const url = new URL(`${baseUrl}/${BUCKET}/${KOPIA_KEY}`); + const signer = new SignatureV4({ + credentials, + region: "us-east-1", + service: "s3", + sha256: Sha256, + }); + + let body = new TextEncoder().encode(CHUNKED_PAYLOAD); + let signed = await signer.sign({ + method: "PUT", + protocol: url.protocol, + hostname: url.hostname, + port: url.port === "" ? undefined : parseInt(url.port, 10), + path: url.pathname, + query: {}, + headers: { + "x-amz-content-sha256": "STREAMING-AWS4-HMAC-SHA256-PAYLOAD", + "x-amz-decoded-content-length": String(KOPIA_PLAINTEXT.length), + "content-type": "application/x-kopia", + "content-length": String(body.length), + }, + body, + }); + + const authorization = signed.headers["authorization"]; + const amzDate = signed.headers["x-amz-date"]; + if (!authorization || !amzDate) { + throw new Error("Expected Authorization and x-amz-date for SigV4"); + } + const auth = parseAuthorizationHeader(authorization); + const payload = buildStreamingSigV4Payload( + KOPIA_PLAINTEXT, + amzDate, + auth.scopeDate, + auth.scopeRegion, + auth.scopeService, + auth.initialSignature, + ); + body = new TextEncoder().encode(payload); + signed = await signer.sign({ + method: "PUT", + protocol: url.protocol, + hostname: url.hostname, + port: url.port === "" ? undefined : parseInt(url.port, 10), + path: url.pathname, + query: {}, + headers: { + "x-amz-content-sha256": "STREAMING-AWS4-HMAC-SHA256-PAYLOAD", + "x-amz-decoded-content-length": String(KOPIA_PLAINTEXT.length), + "content-type": "application/x-kopia", + "content-length": String(body.length), + }, + body, + }); + + const authorization2 = signed.headers["authorization"]; + const amzDate2 = signed.headers["x-amz-date"]; + if (!authorization2 || !amzDate2) { + throw new Error("Expected Authorization and x-amz-date for SigV4"); + } + const auth2 = parseAuthorizationHeader(authorization2); + const payload2 = buildStreamingSigV4Payload( + KOPIA_PLAINTEXT, + amzDate2, + auth2.scopeDate, + auth2.scopeRegion, + auth2.scopeService, + auth2.initialSignature, + ); + body = new TextEncoder().encode(payload2); + + const requestHeaders = new Headers(); + for (const [key, value] of Object.entries(signed.headers)) { + if (key.toLowerCase() === "host") { + continue; + } + requestHeaders.set(key, value); + } + + return await fetch(url, { + method: "PUT", + headers: requestHeaders, + body, + // @ts-ignore duplex is required for non-GET body in Deno fetch with streams/body bytes + duplex: "half", + }); +} + async function verifyStoredBody(client: S3Client): Promise { const out = await client.send( new GetObjectCommand({ @@ -347,6 +439,32 @@ async function verifyStoredBody(client: S3Client): Promise { } } +async function verifyStoredBodyForKey( + client: S3Client, + key: string, + expected: string, +): Promise { + const out = await client.send( + new GetObjectCommand({ + Bucket: BUCKET, + Key: key, + }), + ); + + const bytes = await out.Body?.transformToByteArray(); + if (!bytes) { + throw new Error("Expected object body"); + } + const text = new TextDecoder().decode(bytes); + if (text !== expected) { + throw new Error( + `Decoded payload mismatch for ${key}; expected "${expected}", got "${ + text.slice(0, 120) + }"`, + ); + } +} + const cases: ProxyTestCase[] = [{ name: "objects/put/aws-chunked-decoding", config: testConfig, @@ -428,6 +546,47 @@ const cases: ProxyTestCase[] = [{ }, ignoreBaseline: true, skipSnapshot: true, +}, { + name: "objects/put/aws-chunked-decoding/streaming-sha256-kopia-shape", + config: testConfig, + beforeAll: async (client) => { + try { + await client.send(new CreateBucketCommand({ Bucket: BUCKET })); + } catch { + // Ignore already-exists races. + } + }, + fn: async (client, context) => { + if (!context?.baseUrl) { + throw new Error("Missing baseUrl in test context"); + } + const putResponse = await sendKopiaStyleStreamingPut(context.baseUrl); + if (putResponse.status !== 200) { + const body = await putResponse.text(); + throw new Error( + `aws-chunked PUT (kopia-shape) failed: status=${putResponse.status} body=${ + body.slice(0, 200) + }`, + ); + } + await verifyStoredBodyForKey(client, KOPIA_KEY, KOPIA_PLAINTEXT); + }, + afterAll: async (client) => { + try { + await client.send( + new DeleteObjectCommand({ Bucket: BUCKET, Key: KOPIA_KEY }), + ); + } catch { + // Ignore cleanup failures. + } + try { + await client.send(new DeleteBucketCommand({ Bucket: BUCKET })); + } catch { + // Ignore cleanup failures. + } + }, + ignoreBaseline: true, + skipSnapshot: true, }, { name: "objects/multipart/aws-chunked-uploadpart-decoding", config: testConfig, From 601ed372eb3f3b9028dd7e24d4cc1e2a7497445f Mon Sep 17 00:00:00 2001 From: Yohe-Am <56622350+Yohe-Am@users.noreply.github.com> Date: Wed, 4 Mar 2026 00:08:21 +0300 Subject: [PATCH 6/6] fix: debugging Signed-off-by: Yohe-Am <56622350+Yohe-Am@users.noreply.github.com> --- src/Frontend/Multipart/Put.ts | 8 +++- src/Frontend/Objects/Put.ts | 9 +++- tests/integration/aws-chunked-put.test.ts | 43 +++++++++++++++++++ tests/utils.ts | 52 ++++++++++++++--------- 4 files changed, 88 insertions(+), 24 deletions(-) diff --git a/src/Frontend/Multipart/Put.ts b/src/Frontend/Multipart/Put.ts index a44bc33..ab43eff 100644 --- a/src/Frontend/Multipart/Put.ts +++ b/src/Frontend/Multipart/Put.ts @@ -74,7 +74,13 @@ export const uploadPart = Effect.gen(function* () { }); const bodyStream = hasAwsChunked ? decodeAwsChunkedBodyStream(request.stream, { - headers: request.headers, + headers: hasAwsChunked && sigV4Context === undefined + ? { + ...request.headers, + // No auth context available: decode framing only and skip chunk-signature verification. + "x-amz-content-sha256": "UNSIGNED-PAYLOAD", + } + : request.headers, sigV4Context, }) : request.stream; diff --git a/src/Frontend/Objects/Put.ts b/src/Frontend/Objects/Put.ts index 21974df..62e986a 100644 --- a/src/Frontend/Objects/Put.ts +++ b/src/Frontend/Objects/Put.ts @@ -250,7 +250,6 @@ export const putObject = Effect.gen(function* () { yield* Effect.logDebug("PutObject aws-chunked decision", { key, hasAwsChunked, - hasSigV4Context: sigV4Context !== undefined, contentEncoding: getHeader(request.headers, "content-encoding"), transferEncoding: getHeader(request.headers, "transfer-encoding"), amzContentSha256: getHeader(request.headers, "x-amz-content-sha256"), @@ -264,7 +263,13 @@ export const putObject = Effect.gen(function* () { const bodyStream = hasAwsChunked ? decodeAwsChunkedBodyStream(request.stream, { - headers: request.headers, + headers: hasAwsChunked && sigV4Context === undefined + ? { + ...request.headers, + // No auth context available: decode framing only and skip chunk-signature verification. + "x-amz-content-sha256": "UNSIGNED-PAYLOAD", + } + : request.headers, sigV4Context, }) : request.stream; diff --git a/tests/integration/aws-chunked-put.test.ts b/tests/integration/aws-chunked-put.test.ts index 2775f54..5cd96ec 100644 --- a/tests/integration/aws-chunked-put.test.ts +++ b/tests/integration/aws-chunked-put.test.ts @@ -587,6 +587,49 @@ const cases: ProxyTestCase[] = [{ }, ignoreBaseline: true, skipSnapshot: true, +}, { + name: + "objects/put/aws-chunked-decoding/streaming-sha256-kopia-shape/no-auth-config", + config: testConfig, + disableDefaultAuth: true, + beforeAll: async (client) => { + try { + await client.send(new CreateBucketCommand({ Bucket: BUCKET })); + } catch { + // Ignore already-exists races. + } + }, + fn: async (client, context) => { + if (!context?.baseUrl) { + throw new Error("Missing baseUrl in test context"); + } + const putResponse = await sendKopiaStyleStreamingPut(context.baseUrl); + if (putResponse.status !== 200) { + const body = await putResponse.text(); + throw new Error( + `aws-chunked PUT (kopia-shape no-auth-config) failed: status=${putResponse.status} body=${ + body.slice(0, 200) + }`, + ); + } + await verifyStoredBodyForKey(client, KOPIA_KEY, KOPIA_PLAINTEXT); + }, + afterAll: async (client) => { + try { + await client.send( + new DeleteObjectCommand({ Bucket: BUCKET, Key: KOPIA_KEY }), + ); + } catch { + // Ignore cleanup failures. + } + try { + await client.send(new DeleteBucketCommand({ Bucket: BUCKET })); + } catch { + // Ignore cleanup failures. + } + }, + ignoreBaseline: true, + skipSnapshot: true, }, { name: "objects/multipart/aws-chunked-uploadpart-decoding", config: testConfig, diff --git a/tests/utils.ts b/tests/utils.ts index c4110fb..b270b15 100644 --- a/tests/utils.ts +++ b/tests/utils.ts @@ -41,6 +41,9 @@ export const makeTestHarness = ( ? LogLevel.Debug : LogLevel.Info, ), + options?: { + disableDefaultAuth?: boolean; + }, ) => Effect.gen(function* () { const testCredentials = { @@ -49,20 +52,22 @@ export const makeTestHarness = ( }; // Ensure auth is configured so tests don't fail due to "Deny by default" policy - const configWithAuth: GlobalConfig = { - ...config, - auth: config.auth ?? { - accessKeysRefs: [ - "test", - "main", - "alt", - "tenant", - "iam", - "iam_root", - "iam_alt_root", - ], - }, - }; + const configWithAuth: GlobalConfig = options?.disableDefaultAuth + ? config + : { + ...config, + auth: config.auth ?? { + accessKeysRefs: [ + "test", + "main", + "alt", + "tenant", + "iam", + "iam_root", + "iam_alt_root", + ], + }, + }; const HeraldConfigLive = Layer.succeed(HeraldConfig, { raw: configWithAuth, @@ -143,14 +148,12 @@ export const makeTestHarness = ( Effect.tryPromise({ try: () => server.shutdown(), - catch: (e) => - new Error(`Server shutdown failed: ${e}`), + catch: (e) => new Error(`Server shutdown failed: ${e}`), }).pipe(Effect.orDie) ); yield* Effect.addFinalizer(() => Effect.tryPromise({ - try: () => - webHandler.dispose(), + try: () => webHandler.dispose(), catch: (e) => new Error(`Web handler disposal failed: ${e}`), }).pipe(Effect.orDie) ); @@ -361,11 +364,14 @@ export type ProxyTestCase = { ignoreBaseline?: boolean; only?: boolean; skipSnapshot?: boolean; + disableDefaultAuth?: boolean; }; function baselineRunner(tc: ProxyTestCase, t: Deno.TestContext) { return Effect.gen(function* () { - const h = yield* makeTestHarness(tc.config); + const h = yield* makeTestHarness(tc.config, undefined, { + disableDefaultAuth: tc.disableDefaultAuth, + }); if (tc.beforeAll) { const beforeResult = tc.beforeAll(h.client); @@ -487,7 +493,9 @@ function baselineRunner(tc: ProxyTestCase, t: Deno.TestContext) { function proxyRunner(tc: ProxyTestCase, t: Deno.TestContext) { return Effect.gen(function* () { - const h = yield* makeTestHarness(tc.config); + const h = yield* makeTestHarness(tc.config, undefined, { + disableDefaultAuth: tc.disableDefaultAuth, + }); if (tc.beforeAll) { const beforeResult = tc.beforeAll(h.proxyClient); @@ -678,7 +686,9 @@ function swiftRunner(tc: ProxyTestCase, t: Deno.TestContext) { ); } - const h = yield* makeTestHarness(swiftConfig.value); + const h = yield* makeTestHarness(swiftConfig.value, undefined, { + disableDefaultAuth: tc.disableDefaultAuth, + }); if (tc.beforeAll) { const beforeResult = tc.beforeAll(h.proxyClient);