Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .coderabbit.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json
reviews:
auto_review:
# enable reviews for stacked PRs
base_branches:
- .*
32 changes: 25 additions & 7 deletions src/Backends/S3/Objects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { Chunk, Effect, Option, Stream } from "effect";
import { Readable } from "node-stream";
import type sweb from "node-stream/web";
import {
AccessDenied,
type BackendError,
BadDigest,
type CommonPrefix,
Expand All @@ -28,6 +29,7 @@ import {
type ObjectResponse,
} from "../../Services/Backend.ts";
import { normalizeHeaders } from "../../Services/S3HeaderService.ts";
import { stripAwsChunkedFromContentEncoding } from "../../Services/AwsChunked.ts";
import type {
ChecksumAlgorithm,
ChecksumType,
Expand Down Expand Up @@ -313,6 +315,7 @@ export const makeObjectOps = (
stream,
nativeStream: webStream,
contentType: result.ContentType,
contentEncoding: result.ContentEncoding,
contentLength: result.ContentLength,
etag: result.ETag,
lastModified: result.LastModified,
Expand All @@ -325,6 +328,7 @@ export const makeObjectOps = (
partsCount: result.PartsCount,
contentLength: result.ContentLength,
contentType: result.ContentType,
contentEncoding: result.ContentEncoding,
etag: result.ETag,
lastModified: result.LastModified,
}),
Expand Down Expand Up @@ -365,6 +369,7 @@ export const makeObjectOps = (

return {
contentType: result.ContentType,
contentEncoding: result.ContentEncoding,
contentLength: result.ContentLength,
etag: result.ETag,
lastModified: result.LastModified,
Expand All @@ -377,6 +382,7 @@ export const makeObjectOps = (
partsCount: result.PartsCount,
contentLength: result.ContentLength,
contentType: result.ContentType,
contentEncoding: result.ContentEncoding,
etag: result.ETag,
lastModified: result.LastModified,
}),
Expand All @@ -395,15 +401,22 @@ export const makeObjectOps = (
const normalized = normalizeHeaders(headers);

const contentType = normalized["content-type"]!;
const contentLength = s3Params.contentLength;
const contentEncoding = stripAwsChunkedFromContentEncoding(
normalized["content-encoding"],
);
let contentLength = s3Params.contentLength;

const validatedStream = (yield* checksumService.validate(
bodyStream,
checksums,
)).pipe(
Stream.catchAll((e) => {
// Preserve BadDigest and InvalidRequest errors from checksum validation
if (e instanceof BadDigest || e instanceof InvalidRequest) {
// Preserve known S3-compatible errors from checksum/chunk-signature validation.
if (
e instanceof BadDigest ||
e instanceof InvalidRequest ||
e instanceof AccessDenied
) {
return Stream.fail(e as BackendError);
}
return Stream.fail(
Expand All @@ -414,10 +427,10 @@ export const makeObjectOps = (
}),
);

const isSmall = contentLength !== undefined &&
const shouldBuffer = contentLength === undefined ||
contentLength < 1024 * 1024;

const body = isSmall
const body = shouldBuffer
? yield* Stream.runCollect(validatedStream).pipe(
Effect.map((chunks) => {
const total = Chunk.reduce(chunks, 0, (acc, c) => acc + c.length);
Expand All @@ -427,6 +440,10 @@ export const makeObjectOps = (
res.set(c, off);
off += c.length;
}
// For chunked transfer uploads without Content-Length, infer exact size.
if (contentLength === undefined) {
contentLength = total;
}
return res;
}),
Effect.mapError((e) => {
Expand All @@ -448,6 +465,7 @@ export const makeObjectOps = (
Key: key,
Body: body,
ContentType: contentType,
ContentEncoding: contentEncoding,
ContentLength: contentLength,
Metadata: metadata,
});
Expand Down Expand Up @@ -477,15 +495,15 @@ export const makeObjectOps = (
// Manually inject validated checksums
if (
checksums.sha256 || checksums.sha1 || checksums.crc32 ||
checksums.crc32c || checksums.crc64nvme || !isSmall
checksums.crc32c || checksums.crc64nvme || !shouldBuffer
) {
command.middlewareStack.add(
(next) => (args) => {
const request = args.request as {
headers: Record<string, string>;
duplex?: string;
};
if (!isSmall) {
if (!shouldBuffer) {
request.duplex = "half";
request.headers["x-amz-content-sha256"] = "UNSIGNED-PAYLOAD";
if (contentLength !== undefined) {
Expand Down
61 changes: 58 additions & 3 deletions src/Backends/Swift/Objects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import type {
PutObjectResult,
} from "../../Services/Backend.ts";
import {
AccessDenied,
BadDigest,
InternalError,
InvalidRequest,
} from "../../Services/Backend.ts";
import { normalizeHeaders } from "../../Services/S3HeaderService.ts";
import { stripAwsChunkedFromContentEncoding } from "../../Services/AwsChunked.ts";
import {
encodeObjectKeyForSwift,
formatSwiftTransportError,
Expand Down Expand Up @@ -59,6 +61,39 @@ function resolveContentType(
return contentType;
}

function resolveContentEncoding(
response: HttpClientResponse.HttpClientResponse,
normalizedResp: Record<string, string | undefined>,
s3Headers: Record<string, string>,
): string | undefined {
let contentEncoding = normalizedResp["content-encoding"];

if (
contentEncoding === undefined &&
(response as unknown as { source?: unknown }).source instanceof Response
) {
const src = (response as unknown as { source: Response }).source;
contentEncoding = src.headers.get("content-encoding") ?? undefined;
}

if (contentEncoding === undefined) {
const h = response.headers as unknown as {
get?: (n: string) => string | null;
};
if (typeof h.get === "function") {
contentEncoding = h.get("content-encoding") ??
h.get("Content-Encoding") ?? undefined;
}
}

if (contentEncoding === undefined) {
contentEncoding = s3Headers["Content-Encoding"] ??
s3Headers["content-encoding"];
}

return contentEncoding;
}

export interface SwiftObject {
readonly name?: string;
readonly hash?: string;
Expand Down Expand Up @@ -245,6 +280,9 @@ export const makeObjectOps = (
contentType: (Array.isArray(response.headers["content-type"])
? response.headers["content-type"][0]
: response.headers["content-type"]) || undefined,
contentEncoding: (Array.isArray(response.headers["content-encoding"])
? response.headers["content-encoding"][0]
: response.headers["content-encoding"]) || undefined,
contentLength,
etag: etag || undefined,
lastModified: lastModified ? new Date(lastModified) : undefined,
Expand Down Expand Up @@ -417,6 +455,11 @@ export const makeObjectOps = (
stream: response.stream,
nativeStream: nativeStream || undefined,
contentType,
contentEncoding: resolveContentEncoding(
response,
normalizedResp,
s3Headers,
),
contentLength,
etag: etag || undefined,
lastModified: lastModified ? new Date(lastModified) : undefined,
Expand Down Expand Up @@ -447,6 +490,9 @@ export const makeObjectOps = (
headers,
);
const normalized = normalizeHeaders(headers);
const contentEncoding = stripAwsChunkedFromContentEncoding(
normalized["content-encoding"],
);

const swiftHeaders: Record<string, string> = {
"X-Auth-Token": token,
Expand All @@ -465,8 +511,12 @@ export const makeObjectOps = (
checksums,
)).pipe(
Stream.catchAll((e) => {
// Preserve BadDigest and InvalidRequest errors from checksum validation
if (e instanceof BadDigest || e instanceof InvalidRequest) {
// Preserve known S3-compatible errors from checksum/chunk-signature validation.
if (
e instanceof BadDigest ||
e instanceof InvalidRequest ||
e instanceof AccessDenied
) {
return Stream.fail(e as BackendError);
}
return Stream.fail(
Expand Down Expand Up @@ -498,7 +548,7 @@ export const makeObjectOps = (
})
: validatedStream;

const request = HttpClientRequest.put(`${url}/${encodedKey}`).pipe(
let request = HttpClientRequest.put(`${url}/${encodedKey}`).pipe(
HttpClientRequest.bodyStream(bodyStream),
HttpClientRequest.setHeaders(swiftHeaders),
HttpClientRequest.setHeader(
Expand All @@ -507,6 +557,11 @@ export const makeObjectOps = (
"application/octet-stream") as string,
),
);
if (contentEncoding !== undefined) {
request = request.pipe(
HttpClientRequest.setHeader("Content-Encoding", contentEncoding),
);
}

const response: HttpClientResponse.HttpClientResponse = yield* client
.execute(request).pipe(
Expand Down
Loading
Loading