Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions packages/backfill/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,52 @@ BACKFILL_CACHE_PROVIDER="azure-blob"
BACKFILL_CACHE_PROVIDER_OPTIONS='{"connectionString":"...","container":"..."}'
```

### AWS Simple Storage Service (S3)

To cache to a AWS S3 you need to need to supply the required aws-sdk libraries:

yarn add -D @aws-sdk/client-s3 @aws-sdk/lib-storage

You will have to configure backfill and provide a bucket name. If you are configuring
via `backfill.config.js`, you can use the following syntax:

```js
module.exports = {
cacheStorageConfig: {
provider: "s3",
options: {
bucket: "...",
maxSize: 12345,
},
},
};
```

Via environment variables:

```
BACKFILL_CACHE_PROVIDER="s3"
BACKFILL_CACHE_PROVIDER_OPTIONS='{"bucket":"...","prefix":"...",maxSize:50000000}'
AWS_PROFILE=...
AWS_REGION=...
```

#### Options

<dl>
<dt>bucket</dt>
<dd>the name of the AWS bucket to store files in</dd>

<dt>prefix</dt>
<dd>optional prefix to add to keys. For example: "build-cache/"</dd>

<dt>maxSize (<em>optional</em>)</dt>
<dd>
max size of a single package cache, in the number of bytes
</dd>
</dl>


### NPM package

To cache to an NPM package you need to provide a package name and the registry
Expand Down
12 changes: 12 additions & 0 deletions packages/backfill/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@
"ts-mockito": "^2.6.1",
"typescript": "~4.7.0"
},
"peerDependencies": {
"@aws-sdk/client-s3": "^3.804.0",
"@aws-sdk/lib-storage": "^3.804.0"
},
"peerDependenciesMeta": {
"@aws-sdk/client-s3": {
"optional": true
},
"@aws-sdk/lib-storage": {
"optional": true
}
},
"engines": {
"node": ">=14"
}
Expand Down
14 changes: 14 additions & 0 deletions packages/cache/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
"tar-fs": "^2.1.0"
},
"devDependencies": {
"@aws-sdk/client-s3": "^3.804.0",
"@aws-sdk/lib-storage": "^3.804.0",
"@types/fs-extra": "^9.0.13",
"@types/jest": "^30.0.0",
"@types/node": "^14.18.36",
Expand All @@ -36,6 +38,18 @@
"ts-jest": "^29.0.0",
"typescript": "~4.7.0"
},
"peerDependencies": {
"@aws-sdk/client-s3": "^3.804.0",
"@aws-sdk/lib-storage": "^3.804.0"
},
"peerDependenciesMeta": {
"@aws-sdk/client-s3": {
"optional": true
},
"@aws-sdk/lib-storage": {
"optional": true
}
},
"engines": {
"node": ">=14"
}
Expand Down
56 changes: 3 additions & 53 deletions packages/cache/src/AzureBlobCacheStorage.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as path from "path";
import { Transform, TransformCallback, pipeline } from "stream";
import { pipeline } from "stream";
import tarFs from "tar-fs";

import { Logger } from "backfill-logger";
Expand All @@ -8,62 +8,12 @@ import { AzureBlobCacheStorageOptions } from "backfill-config";
import { stat } from "fs-extra";
import { ContainerClient } from "@azure/storage-blob";
import { CacheStorage } from "./CacheStorage";
import { TimeoutStream } from "./TimeoutStream";
import { SpongeStream } from "./SpongeStream";

const ONE_MEGABYTE = 1024 * 1024;
const FOUR_MEGABYTES = 4 * ONE_MEGABYTE;

/*
* Timeout stream, will emit an error event if the
* input has not started providing data after a given time after
* its creation.
*/
class TimeoutStream extends Transform {
private timeout: NodeJS.Timeout;
constructor(timeout: number, message: string) {
super();
this.timeout = setTimeout(() => {
this.destroy(new Error(message));
}, timeout);
}
_transform(
chunk: any,
_encoding: BufferEncoding,
callback: TransformCallback
): void {
clearTimeout(this.timeout);
this.push(chunk);
callback();
}
}

/*
* Sponge stream, it will accumulate all the data it receives
* and emit it only if and when the input stream sends the "end" event.
*/
class SpongeStream extends Transform {
constructor() {
super({
// This stream should never receive more data than its readableHighWaterMark
// otherwise the stream will get into a deadlock
// 1 TB should give enough room :)
readableHighWaterMark: 1024 * 1024 * 1024 * 1024,
});
}
_transform(
chunk: any,
_encoding: BufferEncoding,
callback: TransformCallback
): void {
this.pause();
this.push(chunk);
callback();
}
_flush(callback: TransformCallback): void {
this.resume();
callback();
}
}

const uploadOptions = {
bufferSize: FOUR_MEGABYTES,
maxBuffers: 5,
Expand Down
122 changes: 122 additions & 0 deletions packages/cache/src/S3CacheStorage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import path from "path";
import { PassThrough, pipeline } from "stream";
import tarFs from "tar-fs";
import { Logger } from "backfill-logger";
import { stat } from "fs-extra";
import { GetObjectCommand, S3Client } from "@aws-sdk/client-s3";
import { Upload } from "@aws-sdk/lib-storage";
import { S3CacheStorageOptions } from "backfill-config";
import { CacheStorage } from "./CacheStorage";
import { TimeoutStream } from "./TimeoutStream";
import { SpongeStream } from "./SpongeStream";

/**
* Implementation of backfill storage using AWS S3. To use it,
* specify a custom
*/
export class S3CacheStorage extends CacheStorage {
private readonly s3Client: S3Client;

constructor(
private options: S3CacheStorageOptions,
logger: Logger,
cwd: string,
incrementalCaching = false
) {
super(logger, cwd, incrementalCaching);
this.s3Client = new S3Client(options.clientConfig || {});
}

protected async _fetch(hash: string): Promise<boolean> {
try {
const command = new GetObjectCommand({
Bucket: this.options.bucket,
Key: (this.options.prefix ?? "") + hash,
});

const response = await this.s3Client.send(command);

if (
this.options.maxSize &&
response.ContentLength &&
response.ContentLength > this.options.maxSize
) {
this.logger.verbose(
`Object is too large to be downloaded: ${hash}, size: ${response.ContentLength} bytes`
);
return false;
}

const objectStream = response.Body;
if (!objectStream) {
throw new Error("Unable to fetch object.");
}

const tarWritableStream = tarFs.extract(this.cwd);

const spongeStream = new SpongeStream();

const timeoutStream = new TimeoutStream(
10 * 60 * 1000,
`The fetch request to ${hash} seems to be hanging`
);

const extractionPipeline = new Promise<void>((resolve, reject) =>
pipeline(
objectStream as any,
spongeStream,
timeoutStream,
tarWritableStream,
(err) => {
if (err) {
reject(err);
} else {
resolve();
}
}
)
);

await extractionPipeline;
return true;
} catch (error) {
if (error && (error as any).name === "NoSuchKey") {
return false;
} else {
throw error;
}
}
}

protected async _put(hash: string, filesToCache: string[]): Promise<void> {
const tarStream = tarFs.pack(this.cwd, { entries: filesToCache });
// If there's a maxSize limit, first sum up the total size of bytes of all the outputGlobbed files
if (this.options.maxSize) {
let total = 0;
for (const file of filesToCache) {
total = total + (await stat(path.join(this.cwd, file))).size;
}

if (total > this.options.maxSize) {
this.logger.verbose(
`The output is too large to be uploaded: ${hash}, size: ${total} bytes`
);
return;
}
}

const pass = new PassThrough();
tarStream.pipe(pass);

const upload = new Upload({
client: this.s3Client,
params: {
Bucket: this.options.bucket,
ContentType: "application/x-tar",
Key: (this.options.prefix ?? "") + hash,
Body: pass,
},
});
await upload.done();
}
}
29 changes: 29 additions & 0 deletions packages/cache/src/SpongeStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { Transform, TransformCallback } from "stream";

/*
* Sponge stream, it will accumulate all the data it receives
* and emit it only if and when the input stream sends the "end" event.
*/
export class SpongeStream extends Transform {
constructor() {
super({
// This stream should never receive more data than its readableHighWaterMark
// otherwise the stream will get into a deadlock
// 1 TB should give enough room :)
readableHighWaterMark: 1024 * 1024 * 1024 * 1024,
});
}
_transform(
chunk: any,
_encoding: BufferEncoding,
callback: TransformCallback
): void {
this.pause();
this.push(chunk);
callback();
}
_flush(callback: TransformCallback): void {
this.resume();
callback();
}
}
25 changes: 25 additions & 0 deletions packages/cache/src/TimeoutStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { Transform, TransformCallback } from "stream";

/*
* Timeout stream, will emit an error event if the
* input has not started providing data after a given time after
* its creation.
*/
export class TimeoutStream extends Transform {
private timeout: NodeJS.Timeout;
constructor(timeout: number, message: string) {
super();
this.timeout = setTimeout(() => {
this.destroy(new Error(message));
}, timeout);
}
_transform(
chunk: any,
_encoding: BufferEncoding,
callback: TransformCallback
): void {
clearTimeout(this.timeout);
this.push(chunk);
callback();
}
}
8 changes: 8 additions & 0 deletions packages/cache/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { AzureBlobCacheStorage } from "./AzureBlobCacheStorage";
import { LocalCacheStorage } from "./LocalCacheStorage";
import { NpmCacheStorage } from "./NpmCacheStorage";
import { LocalSkipCacheStorage } from "./LocalSkipCacheStorage";
import { S3CacheStorage } from "./S3CacheStorage";
export { ICacheStorage, CacheStorage } from "./CacheStorage";

export function isCustomProvider(
Expand Down Expand Up @@ -54,6 +55,13 @@ export function getCacheStorageProvider(
cwd,
incrementalCaching
);
} else if (cacheStorageConfig.provider === "s3") {
cacheStorage = new S3CacheStorage(
cacheStorageConfig.options,
logger,
cwd,
incrementalCaching
);
} else if (cacheStorageConfig.provider === "local-skip") {
cacheStorage = new LocalSkipCacheStorage(
internalCacheFolder,
Expand Down
2 changes: 2 additions & 0 deletions packages/config/src/cacheConfig.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { Logger } from "backfill-logger";
import type { AzureBlobCacheStorageConfig } from "./azureBlobCacheConfig";
import type { NpmCacheStorageConfig } from "./npmCacheConfig";
import type { S3CacheStorageConfig } from "./s3CacheConfig";

export interface ICacheStorage {
fetch: (hash: string) => Promise<boolean>;
Expand All @@ -21,6 +22,7 @@ export type CacheStorageConfig =
}
| NpmCacheStorageConfig
| AzureBlobCacheStorageConfig
| S3CacheStorageConfig
| CustomStorageConfig;

/**
Expand Down
Loading