diff --git a/.github/workflows/audit.yml b/.github/workflows/audit.yml index e6254ea2..a5646ea5 100644 --- a/.github/workflows/audit.yml +++ b/.github/workflows/audit.yml @@ -36,7 +36,7 @@ jobs: name: Audit runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - name: Install cargo-audit run: cargo install cargo-audit - name: Run audit check diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ef456be2..1894e6f5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,6 +16,7 @@ # under the License. --- + name: CI concurrency: @@ -36,7 +37,7 @@ jobs: container: image: amd64/rust steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - name: Setup Clippy run: rustup component add clippy # Run different tests for the library on its own as well as @@ -71,7 +72,7 @@ jobs: env: RUSTDOCFLAGS: "-Dwarnings" steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - name: Run cargo doc run: cargo doc --document-private-items --no-deps --all-features @@ -99,15 +100,15 @@ jobs: AWS_SECRET_ACCESS_KEY: test AWS_ENDPOINT: http://localhost:4566 AWS_ALLOW_HTTP: true - AWS_COPY_IF_NOT_EXISTS: multipart - AWS_CONDITIONAL_PUT: etag + AWS_COPY_IF_NOT_EXISTS: dynamo:test-table:2000 + AWS_CONDITIONAL_PUT: dynamo:test-table:2000 AWS_SERVER_SIDE_ENCRYPTION: aws:kms HTTP_URL: "http://localhost:8080" GOOGLE_BUCKET: test-bucket GOOGLE_SERVICE_ACCOUNT: "/tmp/gcs.json" steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 # We are forced to use docker commands instead of service containers as we need to override the entrypoints # which is currently not supported - https://github.com/actions/runner/discussions/1872 @@ -131,6 +132,7 @@ jobs: aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket-for-spawn aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket-for-checksum aws --endpoint-url=http://localhost:4566 s3api create-bucket --bucket test-object-lock --object-lock-enabled-for-bucket + aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 KMS_KEY=$(aws --endpoint-url=http://localhost:4566 kms create-key --description "test key") echo "AWS_SSE_KMS_KEY_ID=$(echo $KMS_KEY | jq -r .KeyMetadata.KeyId)" >> $GITHUB_ENV @@ -139,8 +141,18 @@ jobs: # the magical connection string is from # https://docs.microsoft.com/en-us/azure/storage/common/storage-use-azurite?tabs=visual-studio#http-connection-strings run: | - echo "AZURITE_CONTAINER=$(docker run -d -p 10000:10000 -p 10001:10001 -p 10002:10002 mcr.microsoft.com/azure-storage/azurite)" >> $GITHUB_ENV - az storage container create -n test-bucket --connection-string 'DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://localhost:10000/devstoreaccount1;QueueEndpoint=http://localhost:10001/devstoreaccount1;' + echo "AZURITE_CONTAINER=$(docker run -d -p 10000:10000 -p 10001:10001 -p 10002:10002 mcr.microsoft.com/azure-storage/azurite azurite --blobHost 0.0.0.0 --queueHost 0.0.0.0 --tableHost 0.0.0.0 --skipApiVersionCheck)" >> $GITHUB_ENV + AZURE_CONNECTION_STRING='DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://localhost:10000/devstoreaccount1;QueueEndpoint=http://localhost:10001/devstoreaccount1;' + for i in $(seq 1 30); do + if az storage container create -n test-bucket --connection-string "$AZURE_CONNECTION_STRING"; then + break + fi + if [ "$i" -eq 30 ]; then + echo "Azurite did not become ready in time" + exit 1 + fi + sleep 2 + done - name: Setup Rust toolchain run: | @@ -180,7 +192,7 @@ jobs: container: image: amd64/rust steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 with: submodules: true - name: Install clang (needed for ring) @@ -195,7 +207,7 @@ jobs: run: cargo build --all-features --target wasm32-wasip1 - name: Install wasm-pack run: curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh - - uses: actions/setup-node@v4 + - uses: actions/setup-node@v5 with: node-version: 20 - name: Run wasm32-unknown-unknown tests (via Node) @@ -205,7 +217,7 @@ jobs: name: cargo test LocalFileSystem (win64) runs-on: windows-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 with: submodules: true - name: Run LocalFileSystem tests diff --git a/.github/workflows/dev.yml b/.github/workflows/dev.yml index f71ce4e4..8acbb7a6 100644 --- a/.github/workflows/dev.yml +++ b/.github/workflows/dev.yml @@ -34,9 +34,9 @@ jobs: name: Release Audit Tool (RAT) runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - name: Setup Python - uses: actions/setup-python@v5 + uses: actions/setup-python@v6 with: python-version: 3.8 - name: Audit licenses diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index a92e756c..2a097264 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -37,7 +37,7 @@ jobs: container: image: amd64/rust steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - name: Setup rustfmt run: rustup component add rustfmt - name: Format object_store @@ -49,18 +49,36 @@ jobs: container: image: amd64/rust steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - name: Install cargo-msrv run: cargo install cargo-msrv - name: Downgrade object_store dependencies # Necessary because tokio 1.30.0 updates MSRV to 1.63 # and url 2.5.1, updates to 1.67 run: | + cargo update -p parking_lot --precise 0.12.4 + cargo update -p parking_lot_core --precise 0.9.11 + cargo update -p lock_api --precise 0.4.13 + cargo update -p quote --precise 1.0.41 + cargo update -p proc-macro2 --precise 1.0.103 + cargo update -p itoa --precise 1.0.15 + cargo update -p syn --precise 2.0.106 + cargo update -p tokio-util --precise 0.7.17 cargo update -p tokio --precise 1.29.1 cargo update -p url --precise 2.5.0 cargo update -p once_cell --precise 1.20.3 + cargo update -p tracing --precise 0.1.41 cargo update -p tracing-core --precise 0.1.33 cargo update -p tracing-attributes --precise 0.1.28 + cargo update -p futures --precise 0.3.31 + cargo update -p futures-channel --precise 0.3.31 + cargo update -p futures-core --precise 0.3.31 + cargo update -p futures-executor --precise 0.3.31 + cargo update -p futures-io --precise 0.3.31 + cargo update -p futures-macro --precise 0.3.31 + cargo update -p futures-sink --precise 0.3.31 + cargo update -p futures-task --precise 0.3.31 + cargo update -p futures-util --precise 0.3.31 - name: Check run: | # run `cargo msrv verify` to see problems diff --git a/.github/workflows/take.yml b/.github/workflows/take.yml index dd21c794..94a95f6e 100644 --- a/.github/workflows/take.yml +++ b/.github/workflows/take.yml @@ -28,7 +28,7 @@ jobs: if: (!github.event.issue.pull_request) && github.event.comment.body == 'take' runs-on: ubuntu-latest steps: - - uses: actions/github-script@v7 + - uses: actions/github-script@v8 with: script: | github.rest.issues.addAssignees({ diff --git a/CHANGELOG-old.md b/CHANGELOG-old.md index d29ef5a2..23bc7cc7 100644 --- a/CHANGELOG-old.md +++ b/CHANGELOG-old.md @@ -20,6 +20,43 @@ # Historical Changelog +## [v0.12.3](https://github.com/apache/arrow-rs-object-store/tree/v0.12.3) (2025-07-11) + +[Full Changelog](https://github.com/apache/arrow-rs-object-store/compare/v0.12.2...v0.12.3) + +**Implemented enhancements:** + +- S3 store fails without retrying [\#425](https://github.com/apache/arrow-rs-object-store/issues/425) +- Deprecate and Remove DynamoCommit [\#373](https://github.com/apache/arrow-rs-object-store/issues/373) +- Move payload helpers from `GetResult` to `GetResultPayload` [\#352](https://github.com/apache/arrow-rs-object-store/issues/352) +- Retry on 429s and equivalents [\#309](https://github.com/apache/arrow-rs-object-store/issues/309) +- object\_store: Support `container@account.dfs.core.windows.net/path` URL style for `az` protocol [\#285](https://github.com/apache/arrow-rs-object-store/issues/285) +- Rename `PutMultiPartOpts` to `PutMultiPartOptions`, the old name is deprecated and will be removed in the next major release [\#406](https://github.com/apache/arrow-rs-object-store/pull/406) + +**Fixed bugs:** + +- Builder panics on malformed GCS private key instead of returning error [\#419](https://github.com/apache/arrow-rs-object-store/issues/419) +- `cargo check --no-default-features --features=aws,azure,gcp,http` fails [\#411](https://github.com/apache/arrow-rs-object-store/issues/411) +- Incorrect prefix in `ObjectStoreScheme::parse` for Azure HTTP urls [\#398](https://github.com/apache/arrow-rs-object-store/issues/398) + +**Closed issues:** + +- `PutMode::Update` support for `LocalFileSystem`? [\#423](https://github.com/apache/arrow-rs-object-store/issues/423) + +**Merged pull requests:** + +- feat: retry on 408 [\#426](https://github.com/apache/arrow-rs-object-store/pull/426) ([criccomini](https://github.com/criccomini)) +- fix: expose source of `RetryError` [\#422](https://github.com/apache/arrow-rs-object-store/pull/422) ([crepererum](https://github.com/crepererum)) +- fix\(gcp\): throw error instead of panicking if read pem fails [\#421](https://github.com/apache/arrow-rs-object-store/pull/421) ([HugoCasa](https://github.com/HugoCasa)) +- chore: fix clippy 1.88 warnings [\#418](https://github.com/apache/arrow-rs-object-store/pull/418) ([mbrobbel](https://github.com/mbrobbel)) +- Bump quick-xml to version 0.38.0 [\#417](https://github.com/apache/arrow-rs-object-store/pull/417) ([raimannma](https://github.com/raimannma)) +- Prevent compilation error with all cloud features but fs turned on [\#412](https://github.com/apache/arrow-rs-object-store/pull/412) ([jder](https://github.com/jder)) +- Retry requests when status code is 429 [\#410](https://github.com/apache/arrow-rs-object-store/pull/410) ([paraseba](https://github.com/paraseba)) +- minor: Pin `tracing-attributes`, `tracing-core` to fix CI [\#404](https://github.com/apache/arrow-rs-object-store/pull/404) ([kylebarron](https://github.com/kylebarron)) +- feat \(azure\): support for account in `az://` URLs [\#403](https://github.com/apache/arrow-rs-object-store/pull/403) ([ByteBaker](https://github.com/ByteBaker)) +- Fix azure path parsing [\#399](https://github.com/apache/arrow-rs-object-store/pull/399) ([kylebarron](https://github.com/kylebarron)) + + ## [v0.12.2](https://github.com/apache/arrow-rs-object-store/tree/v0.12.2) (2025-06-06) [Full Changelog](https://github.com/apache/arrow-rs-object-store/compare/v0.12.1...v0.12.2) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d6ef777..1b89baa6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,42 +19,16 @@ # Changelog -## [v0.12.3](https://github.com/apache/arrow-rs-object-store/tree/v0.12.3) (2025-07-11) +## [v0.12.5](https://github.com/apache/arrow-rs-object-store/tree/v0.12.5) (2026-01-09) -[Full Changelog](https://github.com/apache/arrow-rs-object-store/compare/v0.12.2...v0.12.3) - -**Implemented enhancements:** - -- S3 store fails without retrying [\#425](https://github.com/apache/arrow-rs-object-store/issues/425) -- Deprecate and Remove DynamoCommit [\#373](https://github.com/apache/arrow-rs-object-store/issues/373) -- Move payload helpers from `GetResult` to `GetResultPayload` [\#352](https://github.com/apache/arrow-rs-object-store/issues/352) -- Retry on 429s and equivalents [\#309](https://github.com/apache/arrow-rs-object-store/issues/309) -- object\_store: Support `container@account.dfs.core.windows.net/path` URL style for `az` protocol [\#285](https://github.com/apache/arrow-rs-object-store/issues/285) -- Rename `PutMultiPartOpts` to `PutMultiPartOptions`, the old name is deprecated and will be removed in the next major release [\#406](https://github.com/apache/arrow-rs-object-store/pull/406) +[Full Changelog](https://github.com/apache/arrow-rs-object-store/compare/v0.12.4...v0.12.5) **Fixed bugs:** -- Builder panics on malformed GCS private key instead of returning error [\#419](https://github.com/apache/arrow-rs-object-store/issues/419) -- `cargo check --no-default-features --features=aws,azure,gcp,http` fails [\#411](https://github.com/apache/arrow-rs-object-store/issues/411) -- Incorrect prefix in `ObjectStoreScheme::parse` for Azure HTTP urls [\#398](https://github.com/apache/arrow-rs-object-store/issues/398) - -**Closed issues:** - -- `PutMode::Update` support for `LocalFileSystem`? [\#423](https://github.com/apache/arrow-rs-object-store/issues/423) +- Only read file metadata once in `LocalFileSystem::get_ranges` [\#596](https://github.com/apache/arrow-rs-object-store/pull/596) ([AdamGS](https://github.com/AdamGS)) +- fix: `docs.rs` build failure [\#600](https://github.com/apache/arrow-rs-object-store/pull/600) ([alamb](https://github.com/alamb)) **Merged pull requests:** -- feat: retry on 408 [\#426](https://github.com/apache/arrow-rs-object-store/pull/426) ([criccomini](https://github.com/criccomini)) -- fix: expose source of `RetryError` [\#422](https://github.com/apache/arrow-rs-object-store/pull/422) ([crepererum](https://github.com/crepererum)) -- fix\(gcp\): throw error instead of panicking if read pem fails [\#421](https://github.com/apache/arrow-rs-object-store/pull/421) ([HugoCasa](https://github.com/HugoCasa)) -- chore: fix clippy 1.88 warnings [\#418](https://github.com/apache/arrow-rs-object-store/pull/418) ([mbrobbel](https://github.com/mbrobbel)) -- Bump quick-xml to version 0.38.0 [\#417](https://github.com/apache/arrow-rs-object-store/pull/417) ([raimannma](https://github.com/raimannma)) -- Prevent compilation error with all cloud features but fs turned on [\#412](https://github.com/apache/arrow-rs-object-store/pull/412) ([jder](https://github.com/jder)) -- Retry requests when status code is 429 [\#410](https://github.com/apache/arrow-rs-object-store/pull/410) ([paraseba](https://github.com/paraseba)) -- minor: Pin `tracing-attributes`, `tracing-core` to fix CI [\#404](https://github.com/apache/arrow-rs-object-store/pull/404) ([kylebarron](https://github.com/kylebarron)) -- feat \(azure\): support for account in `az://` URLs [\#403](https://github.com/apache/arrow-rs-object-store/pull/403) ([ByteBaker](https://github.com/ByteBaker)) -- Fix azure path parsing [\#399](https://github.com/apache/arrow-rs-object-store/pull/399) ([kylebarron](https://github.com/kylebarron)) - - +- Get the 0.12 release branch to green CI [\#597](https://github.com/apache/arrow-rs-object-store/pull/597) ([AdamGS](https://github.com/AdamGS)) -\* *This Changelog was automatically generated by [github_changelog_generator](https://github.com/github-changelog-generator/github-changelog-generator)* diff --git a/Cargo.toml b/Cargo.toml index c03a8d12..2341be54 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "object_store" -version = "0.12.3" +version = "0.12.5" edition = "2021" license = "MIT/Apache-2.0" readme = "README.md" diff --git a/README.md b/README.md index b9cc2338..30ebf2e3 100644 --- a/README.md +++ b/README.md @@ -97,5 +97,6 @@ Planned Release Schedule | Approximate Date | Version | Notes | Ticket | |------------------|----------|--------------------------------|:-------------------------------------------------------------------| | July 2025 | `0.12.3` | Minor, NO breaking API changes | [#428](https://github.com/apache/arrow-rs-object-store/issues/428) | +| Sep 2025 | `0.12.4` | Minor, NO breaking API changes | [#498](https://github.com/apache/arrow-rs-object-store/issues/489) | | TBD | `0.13.0` | Major, breaking API changes | [#367](https://github.com/apache/arrow-rs-object-store/issues/367) | | TBD | `0.13.1` | Minor, NO breaking API changes | [#393](https://github.com/apache/arrow-rs-object-store/issues/393) | diff --git a/dev/release/update_change_log.sh b/dev/release/update_change_log.sh index 64610247..f43ef883 100755 --- a/dev/release/update_change_log.sh +++ b/dev/release/update_change_log.sh @@ -29,8 +29,8 @@ set -e -SINCE_TAG="v0.12.2" -FUTURE_RELEASE="v0.12.3" +SINCE_TAG="v0.12.4" +FUTURE_RELEASE="v0.12.5" SOURCE_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" SOURCE_TOP_DIR="$(cd "${SOURCE_DIR}/../../" && pwd)" diff --git a/src/attributes.rs b/src/attributes.rs index 11cf27c8..cac5b36b 100644 --- a/src/attributes.rs +++ b/src/attributes.rs @@ -45,6 +45,14 @@ pub enum Attribute { /// /// See [Cache-Control](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control) CacheControl, + /// Specifies the storage class of the object. + /// + /// See [AWS](https://aws.amazon.com/s3/storage-classes/), + /// [GCP](https://cloud.google.com/storage/docs/storage-classes), and + /// [Azure](https://learn.microsoft.com/en-us/rest/api/storageservices/set-blob-tier). + /// `StorageClass` is used as the name for this attribute because 2 of the 3 storage providers + /// use that name + StorageClass, /// Specifies a user-defined metadata field for the object /// /// The String is a user-defined key diff --git a/src/aws/builder.rs b/src/aws/builder.rs index ab50aa5d..6e6f8e2f 100644 --- a/src/aws/builder.rs +++ b/src/aws/builder.rs @@ -36,7 +36,7 @@ use serde::{Deserialize, Serialize}; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use tracing::info; +use tracing::debug; use url::Url; /// Default metadata endpoint @@ -156,6 +156,14 @@ pub struct AmazonS3Builder { container_credentials_full_uri: Option, /// Container authorization token file, see container_authorization_token_file: Option, + /// Web identity token file path for AssumeRoleWithWebIdentity + web_identity_token_file: Option, + /// Role ARN to assume when using web identity token + role_arn: Option, + /// Session name for web identity role assumption + role_session_name: Option, + /// Custom STS endpoint for web identity token exchange + sts_endpoint: Option, /// Client options client_options: ClientOptions, /// Credentials @@ -319,6 +327,34 @@ pub enum AmazonS3ConfigKey { /// ContainerAuthorizationTokenFile, + /// Web identity token file path for AssumeRoleWithWebIdentity + /// + /// Supported keys: + /// - `aws_web_identity_token_file` + /// - `web_identity_token_file` + WebIdentityTokenFile, + + /// Role ARN to assume when using web identity token + /// + /// Supported keys: + /// - `aws_role_arn` + /// - `role_arn` + RoleArn, + + /// Session name for web identity role assumption + /// + /// Supported keys: + /// - `aws_role_session_name` + /// - `role_session_name` + RoleSessionName, + + /// Custom STS endpoint for web identity token exchange + /// + /// Supported keys: + /// - `aws_endpoint_url_sts` + /// - `endpoint_url_sts` + StsEndpoint, + /// Configure how to provide `copy_if_not_exists` /// /// See [`S3CopyIfNotExists`] @@ -381,6 +417,10 @@ impl AsRef for AmazonS3ConfigKey { Self::ContainerCredentialsRelativeUri => "aws_container_credentials_relative_uri", Self::ContainerCredentialsFullUri => "aws_container_credentials_full_uri", Self::ContainerAuthorizationTokenFile => "aws_container_authorization_token_file", + Self::WebIdentityTokenFile => "aws_web_identity_token_file", + Self::RoleArn => "aws_role_arn", + Self::RoleSessionName => "aws_role_session_name", + Self::StsEndpoint => "aws_endpoint_url_sts", Self::SkipSignature => "aws_skip_signature", Self::CopyIfNotExists => "aws_copy_if_not_exists", Self::ConditionalPut => "aws_conditional_put", @@ -415,6 +455,12 @@ impl FromStr for AmazonS3ConfigKey { "aws_container_credentials_relative_uri" => Ok(Self::ContainerCredentialsRelativeUri), "aws_container_credentials_full_uri" => Ok(Self::ContainerCredentialsFullUri), "aws_container_authorization_token_file" => Ok(Self::ContainerAuthorizationTokenFile), + "aws_web_identity_token_file" | "web_identity_token_file" => { + Ok(Self::WebIdentityTokenFile) + } + "aws_role_arn" | "role_arn" => Ok(Self::RoleArn), + "aws_role_session_name" | "role_session_name" => Ok(Self::RoleSessionName), + "aws_endpoint_url_sts" | "endpoint_url_sts" => Ok(Self::StsEndpoint), "aws_skip_signature" | "skip_signature" => Ok(Self::SkipSignature), "aws_copy_if_not_exists" | "copy_if_not_exists" => Ok(Self::CopyIfNotExists), "aws_conditional_put" | "conditional_put" => Ok(Self::ConditionalPut), @@ -458,6 +504,10 @@ impl AmazonS3Builder { /// * `AWS_DEFAULT_REGION` -> region /// * `AWS_ENDPOINT` -> endpoint /// * `AWS_SESSION_TOKEN` -> token + /// * `AWS_WEB_IDENTITY_TOKEN_FILE` -> path to file containing web identity token for AssumeRoleWithWebIdentity + /// * `AWS_ROLE_ARN` -> ARN of the role to assume when using web identity token + /// * `AWS_ROLE_SESSION_NAME` -> optional session name for web identity role assumption (defaults to "WebIdentitySession") + /// * `AWS_ENDPOINT_URL_STS` -> optional custom STS endpoint for web identity token exchange (defaults to "https://sts.{region}.amazonaws.com") /// * `AWS_CONTAINER_CREDENTIALS_RELATIVE_URI` -> /// * `AWS_CONTAINER_CREDENTIALS_FULL_URI` -> /// * `AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE` -> @@ -543,6 +593,18 @@ impl AmazonS3Builder { AmazonS3ConfigKey::ContainerAuthorizationTokenFile => { self.container_authorization_token_file = Some(value.into()); } + AmazonS3ConfigKey::WebIdentityTokenFile => { + self.web_identity_token_file = Some(value.into()); + } + AmazonS3ConfigKey::RoleArn => { + self.role_arn = Some(value.into()); + } + AmazonS3ConfigKey::RoleSessionName => { + self.role_session_name = Some(value.into()); + } + AmazonS3ConfigKey::StsEndpoint => { + self.sts_endpoint = Some(value.into()); + } AmazonS3ConfigKey::Client(key) => { self.client_options = self.client_options.with_config(key, value) } @@ -612,6 +674,10 @@ impl AmazonS3Builder { AmazonS3ConfigKey::ContainerAuthorizationTokenFile => { self.container_authorization_token_file.clone() } + AmazonS3ConfigKey::WebIdentityTokenFile => self.web_identity_token_file.clone(), + AmazonS3ConfigKey::RoleArn => self.role_arn.clone(), + AmazonS3ConfigKey::RoleSessionName => self.role_session_name.clone(), + AmazonS3ConfigKey::StsEndpoint => self.sts_endpoint.clone(), AmazonS3ConfigKey::SkipSignature => Some(self.skip_signature.to_string()), AmazonS3ConfigKey::CopyIfNotExists => { self.copy_if_not_exists.as_ref().map(ToString::to_string) @@ -943,7 +1009,7 @@ impl AmazonS3Builder { } else if self.access_key_id.is_some() || self.secret_access_key.is_some() { match (self.access_key_id, self.secret_access_key, self.token) { (Some(key_id), Some(secret_key), token) => { - info!("Using Static credential provider"); + debug!("Using Static credential provider"); let credential = AwsCredential { key_id, secret_key, @@ -959,21 +1025,25 @@ impl AmazonS3Builder { std::env::var("AWS_WEB_IDENTITY_TOKEN_FILE"), std::env::var("AWS_ROLE_ARN"), ) { - // TODO: Replace with `AmazonS3Builder::credentials_from_env` - info!("Using WebIdentity credential provider"); + debug!("Using WebIdentity credential provider"); - let session_name = std::env::var("AWS_ROLE_SESSION_NAME") - .unwrap_or_else(|_| "WebIdentitySession".to_string()); + let session_name = self + .role_session_name + .clone() + .unwrap_or_else(|| "WebIdentitySession".to_string()); - let endpoint = format!("https://sts.{region}.amazonaws.com"); + let endpoint = self + .sts_endpoint + .clone() + .unwrap_or_else(|| format!("https://sts.{region}.amazonaws.com")); // Disallow non-HTTPs requests let options = self.client_options.clone().with_allow_http(false); let token = WebIdentityProvider { - token_path, + token_path: token_path.clone(), session_name, - role_arn, + role_arn: role_arn.clone(), endpoint, }; @@ -983,7 +1053,7 @@ impl AmazonS3Builder { self.retry_config.clone(), )) as _ } else if let Some(uri) = self.container_credentials_relative_uri { - info!("Using Task credential provider"); + debug!("Using Task credential provider"); let options = self.client_options.clone().with_allow_http(true); @@ -998,7 +1068,7 @@ impl AmazonS3Builder { self.container_credentials_full_uri, self.container_authorization_token_file, ) { - info!("Using EKS Pod Identity credential provider"); + debug!("Using EKS Pod Identity credential provider"); let options = self.client_options.clone().with_allow_http(true); @@ -1010,7 +1080,7 @@ impl AmazonS3Builder { cache: Default::default(), }) as _ } else { - info!("Using Instance credential provider"); + debug!("Using Instance credential provider"); let token = InstanceCredentialProvider { imdsv1_fallback: self.imdsv1_fallback.get()?, @@ -1079,6 +1149,7 @@ impl AmazonS3Builder { let config = S3Config { region, + endpoint: self.endpoint, bucket, bucket_endpoint, credentials, @@ -1611,4 +1682,56 @@ mod tests { "expected EKS provider but got: {debug_str}" ); } + + #[test] + fn test_builder_web_identity_with_config() { + let builder = AmazonS3Builder::new() + .with_bucket_name("some-bucket") + .with_config( + AmazonS3ConfigKey::WebIdentityTokenFile, + "/tmp/fake-token-file", + ) + .with_config( + AmazonS3ConfigKey::RoleArn, + "arn:aws:iam::123456789012:role/test-role", + ) + .with_config(AmazonS3ConfigKey::RoleSessionName, "TestSession") + .with_config( + AmazonS3ConfigKey::StsEndpoint, + "https://sts.us-west-2.amazonaws.com", + ); + + assert_eq!( + builder + .get_config_value(&AmazonS3ConfigKey::WebIdentityTokenFile) + .unwrap(), + "/tmp/fake-token-file" + ); + assert_eq!( + builder + .get_config_value(&AmazonS3ConfigKey::RoleArn) + .unwrap(), + "arn:aws:iam::123456789012:role/test-role" + ); + assert_eq!( + builder + .get_config_value(&AmazonS3ConfigKey::RoleSessionName) + .unwrap(), + "TestSession" + ); + assert_eq!( + builder + .get_config_value(&AmazonS3ConfigKey::StsEndpoint) + .unwrap(), + "https://sts.us-west-2.amazonaws.com" + ); + + let s3 = builder.build().expect("should build successfully"); + let creds = &s3.client.config.credentials; + let debug_str = format!("{creds:?}"); + assert!( + debug_str.contains("TokenCredentialProvider"), + "expected TokenCredentialProvider but got: {debug_str}" + ); + } } diff --git a/src/aws/client.rs b/src/aws/client.rs index a99db159..4edb977f 100644 --- a/src/aws/client.rs +++ b/src/aws/client.rs @@ -61,6 +61,7 @@ const VERSION_HEADER: &str = "x-amz-version-id"; const SHA256_CHECKSUM: &str = "x-amz-checksum-sha256"; const USER_DEFINED_METADATA_HEADER_PREFIX: &str = "x-amz-meta-"; const ALGORITHM: &str = "x-amz-checksum-algorithm"; +const STORAGE_CLASS: &str = "x-amz-storage-class"; /// A specialized `Error` for object store-related errors #[derive(Debug, thiserror::Error)] @@ -192,6 +193,7 @@ impl From for Error { #[derive(Debug)] pub(crate) struct S3Config { pub region: String, + pub endpoint: Option, pub bucket: String, pub bucket_endpoint: String, pub credentials: AwsCredentialProvider, @@ -373,6 +375,7 @@ impl Request<'_> { has_content_type = true; builder.header(CONTENT_TYPE, v.as_ref()) } + Attribute::StorageClass => builder.header(STORAGE_CLASS, v.as_ref()), Attribute::Metadata(k_suffix) => builder.header( &format!("{USER_DEFINED_METADATA_HEADER_PREFIX}{k_suffix}"), v.as_ref(), diff --git a/src/aws/dynamo.rs b/src/aws/dynamo.rs new file mode 100644 index 00000000..a6775efa --- /dev/null +++ b/src/aws/dynamo.rs @@ -0,0 +1,595 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A DynamoDB based lock system + +use std::borrow::Cow; +use std::collections::HashMap; +use std::future::Future; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use chrono::Utc; +use http::{Method, StatusCode}; +use serde::ser::SerializeMap; +use serde::{Deserialize, Serialize, Serializer}; + +use crate::aws::client::S3Client; +use crate::aws::credential::CredentialExt; +use crate::aws::{AwsAuthorizer, AwsCredential}; +use crate::client::get::GetClientExt; +use crate::client::retry::RetryExt; +use crate::client::retry::{RequestError, RetryError}; +use crate::path::Path; +use crate::{Error, GetOptions, Result}; + +/// The exception returned by DynamoDB on conflict +const CONFLICT: &str = "ConditionalCheckFailedException"; + +const STORE: &str = "DynamoDB"; + +/// A DynamoDB-based commit protocol, used to provide conditional write support for S3 +/// +/// ## Limitations +/// +/// Only conditional operations, e.g. `copy_if_not_exists` will be synchronized, and can +/// therefore race with non-conditional operations, e.g. `put`, `copy`, `delete`, or +/// conditional operations performed by writers not configured to synchronize with DynamoDB. +/// +/// Workloads making use of this mechanism **must** ensure: +/// +/// * Conditional and non-conditional operations are not performed on the same paths +/// * Conditional operations are only performed via similarly configured clients +/// +/// Additionally as the locking mechanism relies on timeouts to detect stale locks, +/// performance will be poor for systems that frequently delete and then create +/// objects at the same path, instead being optimised for systems that primarily create +/// files with paths never used before, or perform conditional updates to existing files +/// +/// ## Commit Protocol +/// +/// The DynamoDB schema is as follows: +/// +/// * A string partition key named `"path"` +/// * A string sort key named `"etag"` +/// * A numeric [TTL] attribute named `"ttl"` +/// * A numeric attribute named `"generation"` +/// * A numeric attribute named `"timeout"` +/// +/// An appropriate DynamoDB table can be created with the CLI as follows: +/// +/// ```bash +/// $ aws dynamodb create-table --table-name --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S +/// $ aws dynamodb update-time-to-live --table-name --time-to-live-specification Enabled=true,AttributeName=ttl +/// ``` +/// +/// To perform a conditional operation on an object with a given `path` and `etag` (`*` if creating), +/// the commit protocol is as follows: +/// +/// 1. Perform HEAD request on `path` and error on precondition mismatch +/// 2. Create record in DynamoDB with given `path` and `etag` with the configured timeout +/// 1. On Success: Perform operation with the configured timeout +/// 2. On Conflict: +/// 1. Periodically re-perform HEAD request on `path` and error on precondition mismatch +/// 2. If `timeout * max_skew_rate` passed, replace the record incrementing the `"generation"` +/// 1. On Success: GOTO 2.1 +/// 2. On Conflict: GOTO 2.2 +/// +/// Provided no writer modifies an object with a given `path` and `etag` without first adding a +/// corresponding record to DynamoDB, we are guaranteed that only one writer will ever commit. +/// +/// This is inspired by the [DynamoDB Lock Client] but simplified for the more limited +/// requirements of synchronizing object storage. The major changes are: +/// +/// * Uses a monotonic generation count instead of a UUID rvn, as this is: +/// * Cheaper to generate, serialize and compare +/// * Cannot collide +/// * More human readable / interpretable +/// * Relies on [TTL] to eventually clean up old locks +/// +/// It also draws inspiration from the DeltaLake [S3 Multi-Cluster] commit protocol, but +/// generalised to not make assumptions about the workload and not rely on first writing +/// to a temporary path. +/// +/// [TTL]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/howitworks-ttl.html +/// [DynamoDB Lock Client]: https://aws.amazon.com/blogs/database/building-distributed-locks-with-the-dynamodb-lock-client/ +/// [S3 Multi-Cluster]: https://docs.google.com/document/d/1Gs4ZsTH19lMxth4BSdwlWjUNR-XhKHicDvBjd2RqNd8/edit#heading=h.mjjuxw9mcz9h +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct DynamoCommit { + table_name: String, + /// The number of milliseconds a lease is valid for + timeout: u64, + /// The maximum clock skew rate tolerated by the system + max_clock_skew_rate: u32, + /// The length of time a record will be retained in DynamoDB before being cleaned up + /// + /// This is purely an optimisation to avoid indefinite growth of the DynamoDB table + /// and does not impact how long clients may wait to acquire a lock + ttl: Duration, + /// The backoff duration before retesting a condition + test_interval: Duration, +} + +impl DynamoCommit { + /// Create a new [`DynamoCommit`] with a given table name + pub fn new(table_name: String) -> Self { + Self { + table_name, + timeout: 20_000, + max_clock_skew_rate: 3, + ttl: Duration::from_secs(60 * 60), + test_interval: Duration::from_millis(100), + } + } + + /// Overrides the lock timeout. + /// + /// A longer lock timeout reduces the probability of spurious commit failures and multi-writer + /// races, but will increase the time that writers must wait to reclaim a lock lost. The + /// default value of 20 seconds should be appropriate for must use-cases. + pub fn with_timeout(mut self, millis: u64) -> Self { + self.timeout = millis; + self + } + + /// The maximum clock skew rate tolerated by the system. + /// + /// An environment in which the clock on the fastest node ticks twice as fast as the slowest + /// node, would have a clock skew rate of 2. The default value of 3 should be appropriate + /// for most environments. + pub fn with_max_clock_skew_rate(mut self, rate: u32) -> Self { + self.max_clock_skew_rate = rate; + self + } + + /// The length of time a record should be retained in DynamoDB before being cleaned up + /// + /// This should be significantly larger than the configured lock timeout, with the default + /// value of 1 hour appropriate for most use-cases. + pub fn with_ttl(mut self, ttl: Duration) -> Self { + self.ttl = ttl; + self + } + + /// Parse [`DynamoCommit`] from a string + pub(crate) fn from_str(value: &str) -> Option { + Some(match value.split_once(':') { + Some((table_name, timeout)) => { + Self::new(table_name.trim().to_string()).with_timeout(timeout.parse().ok()?) + } + None => Self::new(value.trim().to_string()), + }) + } + + /// Returns the name of the DynamoDB table. + pub(crate) fn table_name(&self) -> &str { + &self.table_name + } + + pub(crate) async fn copy_if_not_exists( + &self, + client: &Arc, + from: &Path, + to: &Path, + ) -> Result<()> { + self.conditional_op(client, to, None, || async { + client.copy_request(from, to).send().await?; + Ok(()) + }) + .await + } + + #[allow(clippy::future_not_send)] // Generics confound this lint + pub(crate) async fn conditional_op( + &self, + client: &Arc, + to: &Path, + etag: Option<&str>, + op: F, + ) -> Result + where + F: FnOnce() -> Fut, + Fut: Future>, + { + check_precondition(client, to, etag).await?; + + let mut previous_lease = None; + + loop { + let existing = previous_lease.as_ref(); + match self.try_lock(client, to.as_ref(), etag, existing).await? { + TryLockResult::Ok(lease) => { + let expiry = lease.acquire + lease.timeout; + return match tokio::time::timeout_at(expiry.into(), op()).await { + Ok(Ok(v)) => Ok(v), + Ok(Err(e)) => Err(e), + Err(_) => Err(Error::Generic { + store: "DynamoDB", + source: format!( + "Failed to perform conditional operation in {} milliseconds", + self.timeout + ) + .into(), + }), + }; + } + TryLockResult::Conflict(conflict) => { + let mut interval = tokio::time::interval(self.test_interval); + let expiry = conflict.timeout * self.max_clock_skew_rate; + loop { + interval.tick().await; + check_precondition(client, to, etag).await?; + if conflict.acquire.elapsed() > expiry { + previous_lease = Some(conflict); + break; + } + } + } + } + } + } + + /// Attempt to acquire a lock, reclaiming an existing lease if provided + async fn try_lock( + &self, + s3: &S3Client, + path: &str, + etag: Option<&str>, + existing: Option<&Lease>, + ) -> Result { + let attributes; + let (next_gen, condition_expression, expression_attribute_values) = match existing { + None => (0_u64, "attribute_not_exists(#pk)", Map(&[])), + Some(existing) => { + attributes = [(":g", AttributeValue::Number(existing.generation))]; + ( + existing.generation.checked_add(1).unwrap(), + "attribute_exists(#pk) AND generation = :g", + Map(attributes.as_slice()), + ) + } + }; + + let ttl = (Utc::now() + self.ttl).timestamp(); + let items = [ + ("path", AttributeValue::from(path)), + ("etag", AttributeValue::from(etag.unwrap_or("*"))), + ("generation", AttributeValue::Number(next_gen)), + ("timeout", AttributeValue::Number(self.timeout)), + ("ttl", AttributeValue::Number(ttl as _)), + ]; + let names = [("#pk", "path")]; + + let req = PutItem { + table_name: &self.table_name, + condition_expression, + expression_attribute_values, + expression_attribute_names: Map(&names), + item: Map(&items), + return_values: None, + return_values_on_condition_check_failure: Some(ReturnValues::AllOld), + }; + + let credential = s3.config.get_credential().await?; + + let acquire = Instant::now(); + match self + .request(s3, credential.as_deref(), "DynamoDB_20120810.PutItem", req) + .await + { + Ok(_) => Ok(TryLockResult::Ok(Lease { + acquire, + generation: next_gen, + timeout: Duration::from_millis(self.timeout), + })), + Err(e) => match parse_error_response(&e) { + Some(e) if e.error.ends_with(CONFLICT) => match extract_lease(&e.item) { + Some(lease) => Ok(TryLockResult::Conflict(lease)), + None => Err(Error::Generic { + store: STORE, + source: "Failed to extract lease from conflict ReturnValuesOnConditionCheckFailure response".into() + }), + }, + _ => Err(Error::Generic { + store: STORE, + source: Box::new(e), + }), + }, + } + } + + async fn request( + &self, + s3: &S3Client, + cred: Option<&AwsCredential>, + target: &str, + req: R, + ) -> Result { + let region = &s3.config.region; + let authorizer = cred.map(|x| AwsAuthorizer::new(x, "dynamodb", region)); + + let builder = match &s3.config.endpoint { + Some(e) => s3.client.request(Method::POST, e), + None => { + let url = format!("https://dynamodb.{region}.amazonaws.com"); + s3.client.request(Method::POST, url) + } + }; + + // TODO: Timeout + builder + .json(&req) + .header("X-Amz-Target", target) + .with_aws_sigv4(authorizer, None) + .send_retry(&s3.config.retry_config) + .await + } +} + +#[derive(Debug)] +enum TryLockResult { + /// Successfully acquired a lease + Ok(Lease), + /// An existing lease was found + Conflict(Lease), +} + +/// Validates that `path` has the given `etag` or doesn't exist if `None` +async fn check_precondition(client: &Arc, path: &Path, etag: Option<&str>) -> Result<()> { + let options = GetOptions { + head: true, + ..Default::default() + }; + + match etag { + Some(expected) => match client.get_opts(path, options).await { + Ok(r) => match r.meta.e_tag { + Some(actual) if expected == actual => Ok(()), + actual => Err(Error::Precondition { + path: path.to_string(), + source: format!("{} does not match {expected}", actual.unwrap_or_default()) + .into(), + }), + }, + Err(Error::NotFound { .. }) => Err(Error::Precondition { + path: path.to_string(), + source: format!("Object at location {path} not found").into(), + }), + Err(e) => Err(e), + }, + None => match client.get_opts(path, options).await { + Ok(_) => Err(Error::AlreadyExists { + path: path.to_string(), + source: "Already Exists".to_string().into(), + }), + Err(Error::NotFound { .. }) => Ok(()), + Err(e) => Err(e), + }, + } +} + +/// Parses the error response if any +fn parse_error_response(e: &RetryError) -> Option> { + match e.inner() { + RequestError::Status { + status: StatusCode::BAD_REQUEST, + body: Some(b), + } => serde_json::from_str(b).ok(), + _ => None, + } +} + +/// Extracts a lease from `item`, returning `None` on error +fn extract_lease(item: &HashMap<&str, AttributeValue<'_>>) -> Option { + let generation = match item.get("generation") { + Some(AttributeValue::Number(generation)) => generation, + _ => return None, + }; + + let timeout = match item.get("timeout") { + Some(AttributeValue::Number(timeout)) => *timeout, + _ => return None, + }; + + Some(Lease { + acquire: Instant::now(), + generation: *generation, + timeout: Duration::from_millis(timeout), + }) +} + +/// A lock lease +#[derive(Debug, Clone)] +struct Lease { + acquire: Instant, + generation: u64, + timeout: Duration, +} + +/// A DynamoDB [PutItem] payload +/// +/// [PutItem]: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_PutItem.html +#[derive(Serialize)] +#[serde(rename_all = "PascalCase")] +struct PutItem<'a> { + /// The table name + table_name: &'a str, + + /// A condition that must be satisfied in order for a conditional PutItem operation to succeed. + condition_expression: &'a str, + + /// One or more substitution tokens for attribute names in an expression + expression_attribute_names: Map<'a, &'a str, &'a str>, + + /// One or more values that can be substituted in an expression + expression_attribute_values: Map<'a, &'a str, AttributeValue<'a>>, + + /// A map of attribute name/value pairs, one for each attribute + item: Map<'a, &'a str, AttributeValue<'a>>, + + /// Use ReturnValues if you want to get the item attributes as they appeared + /// before they were updated with the PutItem request. + #[serde(skip_serializing_if = "Option::is_none")] + return_values: Option, + + /// An optional parameter that returns the item attributes for a PutItem operation + /// that failed a condition check. + #[serde(skip_serializing_if = "Option::is_none")] + return_values_on_condition_check_failure: Option, +} + +#[derive(Deserialize)] +struct ErrorResponse<'a> { + #[serde(rename = "__type")] + error: &'a str, + + #[serde(borrow, default, rename = "Item")] + item: HashMap<&'a str, AttributeValue<'a>>, +} + +#[derive(Serialize)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +enum ReturnValues { + AllOld, +} + +/// A collection of key value pairs +/// +/// This provides cheap, ordered serialization of maps +struct Map<'a, K, V>(&'a [(K, V)]); + +impl Serialize for Map<'_, K, V> { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + if self.0.is_empty() { + return serializer.serialize_none(); + } + let mut map = serializer.serialize_map(Some(self.0.len()))?; + for (k, v) in self.0 { + map.serialize_entry(k, v)? + } + map.end() + } +} + +/// A DynamoDB [AttributeValue] +/// +/// [AttributeValue]: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_AttributeValue.html +#[derive(Debug, Serialize, Deserialize)] +enum AttributeValue<'a> { + #[serde(rename = "S")] + String(Cow<'a, str>), + #[serde(rename = "N", with = "number")] + Number(u64), +} + +impl<'a> From<&'a str> for AttributeValue<'a> { + fn from(value: &'a str) -> Self { + Self::String(Cow::Borrowed(value)) + } +} + +/// Numbers are serialized as strings +mod number { + use serde::{Deserialize, Deserializer, Serializer}; + + pub(crate) fn serialize(v: &u64, s: S) -> Result { + s.serialize_str(&v.to_string()) + } + + pub(crate) fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result { + let v: &str = Deserialize::deserialize(d)?; + v.parse().map_err(serde::de::Error::custom) + } +} + +use crate::client::HttpResponse; +/// Re-export integration_test to be called by s3_test +#[cfg(test)] +pub(crate) use tests::integration_test; + +#[cfg(test)] +mod tests { + use super::*; + use crate::aws::AmazonS3; + use crate::ObjectStore; + use rand::distr::Alphanumeric; + use rand::{rng, Rng}; + + #[test] + fn test_attribute_serde() { + let serde = serde_json::to_string(&AttributeValue::Number(23)).unwrap(); + assert_eq!(serde, "{\"N\":\"23\"}"); + let back: AttributeValue<'_> = serde_json::from_str(&serde).unwrap(); + assert!(matches!(back, AttributeValue::Number(23))); + } + + /// An integration test for DynamoDB + /// + /// This is a function called by s3_test to avoid test concurrency issues + pub(crate) async fn integration_test(integration: &AmazonS3, d: &DynamoCommit) { + let client = &integration.client; + + let src = Path::from("dynamo_path_src"); + integration.put(&src, "asd".into()).await.unwrap(); + + let dst = Path::from("dynamo_path"); + let _ = integration.delete(&dst).await; // Delete if present + + // Create a lock if not already exists + let existing = match d.try_lock(client, dst.as_ref(), None, None).await.unwrap() { + TryLockResult::Conflict(l) => l, + TryLockResult::Ok(l) => l, + }; + + // Should not be able to acquire a lock again + let r = d.try_lock(client, dst.as_ref(), None, None).await; + assert!(matches!(r, Ok(TryLockResult::Conflict(_)))); + + // But should still be able to reclaim lock and perform copy + d.copy_if_not_exists(client, &src, &dst).await.unwrap(); + + match d.try_lock(client, dst.as_ref(), None, None).await.unwrap() { + TryLockResult::Conflict(new) => { + // Should have incremented generation to do so + assert_eq!(new.generation, existing.generation + 1); + } + _ => panic!("Should conflict"), + } + + let rng = rng(); + let etag = String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap(); + let t = Some(etag.as_str()); + + let l = match d.try_lock(client, dst.as_ref(), t, None).await.unwrap() { + TryLockResult::Ok(l) => l, + _ => panic!("should not conflict"), + }; + + match d.try_lock(client, dst.as_ref(), t, None).await.unwrap() { + TryLockResult::Conflict(c) => assert_eq!(l.generation, c.generation), + _ => panic!("should conflict"), + } + + match d.try_lock(client, dst.as_ref(), t, Some(&l)).await.unwrap() { + TryLockResult::Ok(new) => assert_eq!(new.generation, l.generation + 1), + _ => panic!("should not conflict"), + } + } +} diff --git a/src/aws/mod.rs b/src/aws/mod.rs index 8dac2bd7..4abf3748 100644 --- a/src/aws/mod.rs +++ b/src/aws/mod.rs @@ -56,6 +56,7 @@ mod builder; mod checksum; mod client; mod credential; +mod dynamo; mod precondition; #[cfg(not(target_arch = "wasm32"))] @@ -63,6 +64,7 @@ mod resolve; pub use builder::{AmazonS3Builder, AmazonS3ConfigKey}; pub use checksum::Checksum; +pub use dynamo::DynamoCommit; pub use precondition::{S3ConditionalPut, S3CopyIfNotExists}; #[cfg(not(target_arch = "wasm32"))] @@ -195,6 +197,11 @@ impl ObjectStore for AmazonS3 { r => r, } } + #[allow(deprecated)] + (PutMode::Create, S3ConditionalPut::Dynamo(d)) => { + d.conditional_op(&self.client, location, None, move || request.do_put()) + .await + } (PutMode::Update(v), put) => { let etag = v.e_tag.ok_or_else(|| Error::Generic { store: STORE, @@ -222,6 +229,13 @@ impl ObjectStore for AmazonS3 { r => r, } } + #[allow(deprecated)] + S3ConditionalPut::Dynamo(d) => { + d.conditional_op(&self.client, location, Some(&etag), move || { + request.do_put() + }) + .await + } S3ConditionalPut::Disabled => Err(Error::NotImplemented), } } @@ -355,6 +369,10 @@ impl ObjectStore for AmazonS3 { return res; } + #[allow(deprecated)] + Some(S3CopyIfNotExists::Dynamo(lock)) => { + return lock.copy_if_not_exists(&self.client, from, to).await + } None => { return Err(Error::NotSupported { source: "S3 does not support copy-if-not-exists".to_string().into(), @@ -622,6 +640,12 @@ mod tests { let builder = AmazonS3Builder::from_env().with_checksum_algorithm(Checksum::SHA256); let integration = builder.build().unwrap(); put_get_delete_list(&integration).await; + + match &integration.client.config.copy_if_not_exists { + #[allow(deprecated)] + Some(S3CopyIfNotExists::Dynamo(d)) => dynamo::integration_test(&integration, d).await, + _ => eprintln!("Skipping dynamo integration test - dynamo not configured"), + }; } #[tokio::test] diff --git a/src/aws/precondition.rs b/src/aws/precondition.rs index 52ecb9f3..2f11e4f9 100644 --- a/src/aws/precondition.rs +++ b/src/aws/precondition.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::aws::dynamo::DynamoCommit; use crate::config::Parse; use itertools::Itertools; @@ -60,6 +61,16 @@ pub enum S3CopyIfNotExists { /// /// Encoded as `multipart` ignoring whitespace. Multipart, + /// The name of a DynamoDB table to use for coordination + /// + /// Encoded as either `dynamo:` or `dynamo::` + /// ignoring whitespace. The default timeout is used if not specified + /// + /// See [`DynamoCommit`] for more information + /// + /// This will use the same region, credentials and endpoint as configured for S3 + #[deprecated(note = "Use S3CopyIfNotExists::Multipart")] + Dynamo(DynamoCommit), } impl std::fmt::Display for S3CopyIfNotExists { @@ -70,6 +81,8 @@ impl std::fmt::Display for S3CopyIfNotExists { write!(f, "header-with-status: {k}: {v}: {}", code.as_u16()) } Self::Multipart => f.write_str("multipart"), + #[allow(deprecated)] + Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()), } } } @@ -97,6 +110,8 @@ impl S3CopyIfNotExists { code, )) } + #[allow(deprecated)] + "dynamo" => Some(Self::Dynamo(DynamoCommit::from_str(value)?)), _ => None, } } @@ -127,6 +142,17 @@ pub enum S3ConditionalPut { #[default] ETagMatch, + /// The name of a DynamoDB table to use for coordination + /// + /// Encoded as either `dynamo:` or `dynamo::` + /// ignoring whitespace. The default timeout is used if not specified + /// + /// See [`DynamoCommit`] for more information + /// + /// This will use the same region, credentials and endpoint as configured for S3 + #[deprecated(note = "Use S3ConditionalPut::ETagMatch")] + Dynamo(DynamoCommit), + /// Disable `conditional put` Disabled, } @@ -135,6 +161,8 @@ impl std::fmt::Display for S3ConditionalPut { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::ETagMatch => write!(f, "etag"), + #[allow(deprecated)] + Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()), Self::Disabled => write!(f, "disabled"), } } @@ -145,7 +173,11 @@ impl S3ConditionalPut { match s.trim() { "etag" => Some(Self::ETagMatch), "disabled" => Some(Self::Disabled), - _ => None, + trimmed => match trimmed.split_once(':')? { + #[allow(deprecated)] + ("dynamo", s) => Some(Self::Dynamo(DynamoCommit::from_str(s)?)), + _ => None, + }, } } } @@ -162,6 +194,7 @@ impl Parse for S3ConditionalPut { #[cfg(test)] mod tests { use super::S3CopyIfNotExists; + use crate::aws::{DynamoCommit, S3ConditionalPut}; #[test] fn parse_s3_copy_if_not_exists_header() { @@ -186,6 +219,26 @@ mod tests { assert_eq!(expected, S3CopyIfNotExists::from_str(input)); } + #[test] + #[allow(deprecated)] + fn parse_s3_copy_if_not_exists_dynamo() { + let input = "dynamo: table:100"; + let expected = Some(S3CopyIfNotExists::Dynamo( + DynamoCommit::new("table".into()).with_timeout(100), + )); + assert_eq!(expected, S3CopyIfNotExists::from_str(input)); + } + + #[test] + #[allow(deprecated)] + fn parse_s3_condition_put_dynamo() { + let input = "dynamo: table:1300"; + let expected = Some(S3ConditionalPut::Dynamo( + DynamoCommit::new("table".into()).with_timeout(1300), + )); + assert_eq!(expected, S3ConditionalPut::from_str(input)); + } + #[test] fn parse_s3_copy_if_not_exists_header_whitespace_invariant() { let expected = Some(S3CopyIfNotExists::Header( diff --git a/src/azure/client.rs b/src/azure/client.rs index c7440a07..fa15c653 100644 --- a/src/azure/client.rs +++ b/src/azure/client.rs @@ -48,6 +48,7 @@ use std::time::Duration; use url::Url; const VERSION_HEADER: &str = "x-ms-version-id"; +const ACCESS_TIER_HEADER: &str = "x-ms-access-tier"; const USER_DEFINED_METADATA_HEADER_PREFIX: &str = "x-ms-meta-"; static MS_CACHE_CONTROL: HeaderName = HeaderName::from_static("x-ms-blob-cache-control"); static MS_CONTENT_TYPE: HeaderName = HeaderName::from_static("x-ms-blob-content-type"); @@ -242,6 +243,7 @@ impl PutRequest<'_> { has_content_type = true; builder.header(&MS_CONTENT_TYPE, v.as_ref()) } + Attribute::StorageClass => builder.header(ACCESS_TIER_HEADER, v.as_ref()), Attribute::Metadata(k_suffix) => builder.header( &format!("{USER_DEFINED_METADATA_HEADER_PREFIX}{k_suffix}"), v.as_ref(), @@ -1203,10 +1205,12 @@ pub(crate) struct UserDelegationKey { #[cfg(test)] mod tests { use super::*; + use crate::client::mock_server::MockServer; use crate::StaticCredentialProvider; use bytes::Bytes; + use http::Response; use regex::bytes::Regex; - use reqwest::Client; + use reqwest::{Client, Method}; #[test] fn deserde_azure() { @@ -1550,4 +1554,53 @@ Time:2018-06-14T16:46:54.6040685Z\r assert_eq!("404", code); assert_eq!("The specified blob does not exist.", reason); } + + async fn assert_create_conflict_status_is_mapped(status: http::StatusCode) { + let server = MockServer::new().await; + server.push_fn(move |req| { + assert_eq!(req.method(), Method::PUT); + assert_eq!(req.headers().get(IF_NONE_MATCH).unwrap(), "*"); + Response::builder() + .status(status) + .body(String::new()) + .unwrap() + }); + + let credential_provider = Arc::new(StaticCredentialProvider::new( + AzureCredential::BearerToken("static-token".to_string()), + )); + + let config = AzureConfig { + account: "testaccount".to_string(), + container: "testcontainer".to_string(), + credentials: credential_provider, + service: server.url().try_into().unwrap(), + retry_config: Default::default(), + is_emulator: false, + skip_signature: true, + disable_tagging: false, + client_options: Default::default(), + }; + + let client = AzureClient::new(config, HttpClient::new(Client::new())); + let path = Path::from("already-exists"); + + let err = client + .put_blob(&path, "test".into(), PutMode::Create.into()) + .await + .unwrap_err(); + + assert!(matches!(err, crate::Error::AlreadyExists { .. }), "{err}"); + server.shutdown().await; + } + + #[tokio::test] + async fn test_put_blob_create_maps_precondition_to_already_exists() { + assert_create_conflict_status_is_mapped(http::StatusCode::PRECONDITION_FAILED).await; + } + + #[tokio::test] + async fn test_put_blob_create_maps_not_modified_to_already_exists() { + assert_create_conflict_status_is_mapped(http::StatusCode::NOT_MODIFIED).await; + } } diff --git a/src/azure/credential.rs b/src/azure/credential.rs index c34a8e3c..37208842 100644 --- a/src/azure/credential.rs +++ b/src/azure/credential.rs @@ -507,7 +507,7 @@ fn string_to_sign(h: &HeaderMap, u: &Url, method: &Method, account: &str) -> Str fn canonicalize_header(headers: &HeaderMap) -> String { let mut names = headers .iter() - .filter(|&(k, _)| (k.as_str().starts_with("x-ms"))) + .filter(|&(k, _)| k.as_str().starts_with("x-ms")) // TODO remove unwraps .map(|(k, _)| (k.as_str(), headers.get(k).unwrap().to_str().unwrap())) .collect::>(); diff --git a/src/buffered.rs b/src/buffered.rs index f189c534..00bea050 100644 --- a/src/buffered.rs +++ b/src/buffered.rs @@ -19,8 +19,8 @@ use crate::path::Path; use crate::{ - Attributes, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayloadMut, TagSet, - WriteMultipart, + Attributes, Extensions, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, + PutPayloadMut, TagSet, WriteMultipart, }; use bytes::Bytes; use futures::future::{BoxFuture, FutureExt}; @@ -222,7 +222,7 @@ pub struct BufWriter { max_concurrency: usize, attributes: Option, tags: Option, - extensions: Option<::http::Extensions>, + extensions: Option, state: BufWriterState, store: Arc, } @@ -297,7 +297,7 @@ impl BufWriter { /// that need to pass context-specific information (like tracing spans) via trait methods. /// /// These extensions are ignored entirely by backends offered through this crate. - pub fn with_extensions(self, extensions: ::http::Extensions) -> Self { + pub fn with_extensions(self, extensions: Extensions) -> Self { Self { extensions: Some(extensions), ..self diff --git a/src/client/builder.rs b/src/client/builder.rs index f74c5ec1..257cb570 100644 --- a/src/client/builder.rs +++ b/src/client/builder.rs @@ -165,7 +165,7 @@ impl HttpRequestBuilder { self } - #[cfg(feature = "gcp")] + #[cfg(any(feature = "aws", feature = "gcp"))] pub(crate) fn json(mut self, s: S) -> Self { match (serde_json::to_vec(&s), &mut self.request) { (Ok(json), Ok(request)) => { diff --git a/src/client/http/spawn.rs b/src/client/http/spawn.rs index c3f1e717..32c7fc49 100644 --- a/src/client/http/spawn.rs +++ b/src/client/http/spawn.rs @@ -83,7 +83,9 @@ impl HttpService for SpawnService { } while let Some(x) = body.frame().await { - sender.send(x).unwrap(); + if sender.send(x).is_err() { + return; + } } })); diff --git a/src/client/mod.rs b/src/client/mod.rs index a71814b9..5a11b7ad 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -498,12 +498,21 @@ impl ClientOptions { self } - /// Set a request timeout + /// Set timeout for the overall request /// - /// The timeout is applied from when the request starts connecting until the - /// response body has finished + /// The timeout starts from when the request starts connecting until the + /// response body has finished. If the request does not complete within the + /// timeout, the client returns a timeout error. + /// + /// Timeout errors are retried, subject to the [`RetryConfig`] /// /// Default is 30 seconds + /// + /// # See Also + /// * [`Self::with_timeout_disabled`] to disable the timeout + /// * [`Self::with_connect_timeout`] to set a timeout for the connect phase + /// + /// [`RetryConfig`]: crate::RetryConfig pub fn with_timeout(mut self, timeout: Duration) -> Self { self.timeout = Some(ConfigValue::Parsed(timeout)); self @@ -511,7 +520,8 @@ impl ClientOptions { /// Disables the request timeout /// - /// See [`Self::with_timeout`] + /// # See Also + /// * [`Self::with_timeout`] pub fn with_timeout_disabled(mut self) -> Self { self.timeout = None; self @@ -519,7 +529,19 @@ impl ClientOptions { /// Set a timeout for only the connect phase of a Client /// + /// This is the time allowed for the client to establish a connection + /// and if the connection is not established within this time, + /// the client returns a timeout error. + /// + /// Timeout errors are retried, subject to the [`RetryConfig`] + /// /// Default is 5 seconds + /// + /// # See Also + /// * [`Self::with_timeout`] to set a timeout for the overall request + /// * [`Self::with_connect_timeout_disabled`] to disable the connect timeout + /// + /// [`RetryConfig`]: crate::RetryConfig pub fn with_connect_timeout(mut self, timeout: Duration) -> Self { self.connect_timeout = Some(ConfigValue::Parsed(timeout)); self @@ -527,7 +549,8 @@ impl ClientOptions { /// Disables the connection timeout /// - /// See [`Self::with_connect_timeout`] + /// # See Also + /// * [`Self::with_connect_timeout`] pub fn with_connect_timeout_disabled(mut self) -> Self { self.connect_timeout = None; self diff --git a/src/client/retry.rs b/src/client/retry.rs index aaa4dd34..c222202e 100644 --- a/src/client/retry.rs +++ b/src/client/retry.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! A shared HTTP client implementation incorporating retries +//! [`RetryConfig`] connection retry policy use crate::client::backoff::{Backoff, BackoffConfig}; use crate::client::builder::HttpRequestBuilder; @@ -423,14 +423,16 @@ impl RetryableRequest { // Use debug level until retries reach 80% of max_retries if ctx.retries * 100 >= ctx.max_retries * 80 { info!( - "Encountered server error, backing off for {} seconds, retry {} of {}", + "Encountered server error with status {}, backing off for {} seconds, retry {} of {}", + status, sleep.as_secs_f32(), ctx.retries, ctx.max_retries, ); } else { debug!( - "Encountered server error, backing off for {} seconds, retry {} of {}", + "Encountered server error with status {}, backing off for {} seconds, retry {} of {}", + status, sleep.as_secs_f32(), ctx.retries, ctx.max_retries, @@ -458,7 +460,8 @@ impl RetryableRequest { // Use debug level until retries reach 80% of max_retries if ctx.retries * 100 >= ctx.max_retries * 80 { info!( - "Encountered transport error backing off for {} seconds, retry {} of {}: {}", + "Encountered transport error of kind {:?}, backing off for {} seconds, retry {} of {}: {}", + e.kind(), sleep.as_secs_f32(), ctx.retries, ctx.max_retries, @@ -466,7 +469,8 @@ impl RetryableRequest { ); } else { debug!( - "Encountered transport error backing off for {} seconds, retry {} of {}: {}", + "Encountered transport error of kind {:?}, backing off for {} seconds, retry {} of {}: {}", + e.kind(), sleep.as_secs_f32(), ctx.retries, ctx.max_retries, @@ -864,6 +868,8 @@ mod tests { // Reset the connection on the first n-1 attempts for _ in 0..retry.max_retries { let (stream, _) = listener.accept().await.unwrap(); + // TcpStream::set_linger is deprecated but this use case is valid to reset the stream + #[allow(deprecated)] stream.set_linger(Some(Duration::from_secs(0))).unwrap(); } // Succeed on the last attempt diff --git a/src/client/token.rs b/src/client/token.rs index bfd5aeba..45ea5781 100644 --- a/src/client/token.rs +++ b/src/client/token.rs @@ -54,7 +54,7 @@ impl Default for TokenCache { } } -impl TokenCache { +impl TokenCache { /// Override the minimum remaining TTL for a cached token to be used #[cfg(any(feature = "aws", feature = "gcp"))] pub(crate) fn with_min_ttl(self, min_ttl: Duration) -> Self { @@ -157,6 +157,7 @@ mod test { async fn test_min_ttl_causes_refresh() { let cache = TokenCache { cache: Default::default(), + refresh_lock: Default::default(), min_ttl: Duration::from_secs(1), fetch_backoff: Duration::from_millis(1), }; diff --git a/src/gcp/builder.rs b/src/gcp/builder.rs index f7607eea..f22d66d2 100644 --- a/src/gcp/builder.rs +++ b/src/gcp/builder.rs @@ -195,7 +195,9 @@ impl FromStr for GoogleConfigKey { | "service_account_path" => Ok(Self::ServiceAccount), "google_service_account_key" | "service_account_key" => Ok(Self::ServiceAccountKey), "google_bucket" | "google_bucket_name" | "bucket" | "bucket_name" => Ok(Self::Bucket), - "google_application_credentials" => Ok(Self::ApplicationCredentials), + "google_application_credentials" | "application_credentials" => { + Ok(Self::ApplicationCredentials) + } "google_skip_signature" | "skip_signature" => Ok(Self::SkipSignature), _ => match s.strip_prefix("google_").unwrap_or(s).parse() { Ok(key) => Ok(Self::Client(key)), diff --git a/src/gcp/client.rs b/src/gcp/client.rs index a988cc45..47af709d 100644 --- a/src/gcp/client.rs +++ b/src/gcp/client.rs @@ -51,6 +51,7 @@ use std::sync::Arc; const VERSION_HEADER: &str = "x-goog-generation"; const DEFAULT_CONTENT_TYPE: &str = "application/octet-stream"; const USER_DEFINED_METADATA_HEADER_PREFIX: &str = "x-goog-meta-"; +const STORAGE_CLASS: &str = "x-goog-storage-class"; static VERSION_MATCH: HeaderName = HeaderName::from_static("x-goog-if-generation-match"); @@ -201,6 +202,7 @@ impl Request<'_> { has_content_type = true; builder.header(CONTENT_TYPE, v.as_ref()) } + Attribute::StorageClass => builder.header(STORAGE_CLASS, v.as_ref()), Attribute::Metadata(k_suffix) => builder.header( &format!("{USER_DEFINED_METADATA_HEADER_PREFIX}{k_suffix}"), v.as_ref(), diff --git a/src/http/client.rs b/src/http/client.rs index 272f7c60..d08e9faf 100644 --- a/src/http/client.rs +++ b/src/http/client.rs @@ -196,6 +196,10 @@ impl Client { has_content_type = true; builder.header(CONTENT_TYPE, v.as_ref()) } + Attribute::StorageClass => { + tracing::warn!("StorageClass attribute not supported on HTTP client as header key is unknown"); + builder + } // Ignore metadata attributes Attribute::Metadata(_) => builder, }; diff --git a/src/integration.rs b/src/integration.rs index 49b7be57..99ee86da 100644 --- a/src/integration.rs +++ b/src/integration.rs @@ -34,8 +34,10 @@ use crate::{ use bytes::Bytes; use futures::stream::FuturesUnordered; use futures::{StreamExt, TryStreamExt}; +use rand::distr::Alphanumeric; use rand::{rng, Rng}; use std::collections::HashSet; +use std::slice; pub(crate) async fn flatten_list_stream( storage: &DynObjectStore, @@ -67,11 +69,11 @@ pub async fn put_get_delete_list(storage: &DynObjectStore) { // List everything let content_list = flatten_list_stream(storage, None).await.unwrap(); - assert_eq!(content_list, &[location.clone()]); + assert_eq!(content_list, slice::from_ref(&location)); // Should behave the same as no prefix let content_list = flatten_list_stream(storage, Some(&root)).await.unwrap(); - assert_eq!(content_list, &[location.clone()]); + assert_eq!(content_list, slice::from_ref(&location)); // List with delimiter let result = storage.list_with_delimiter(None).await.unwrap(); @@ -96,7 +98,7 @@ pub async fn put_get_delete_list(storage: &DynObjectStore) { // List everything starting with a prefix that should return results let prefix = Path::from("test_dir"); let content_list = flatten_list_stream(storage, Some(&prefix)).await.unwrap(); - assert_eq!(content_list, &[location.clone()]); + assert_eq!(content_list, slice::from_ref(&location)); // List everything starting with a prefix that shouldn't return results let prefix = Path::from("something"); @@ -628,8 +630,15 @@ pub async fn get_opts(storage: &dyn ObjectStore) { /// Tests conditional writes pub async fn put_opts(storage: &dyn ObjectStore, supports_update: bool) { + // When using DynamoCommit repeated runs of this test will produce the same sequence of records in DynamoDB + // As a result each conditional operation will need to wait for the lease to timeout before proceeding + // One solution would be to clear DynamoDB before each test, but this would require non-trivial additional code + // so we instead just generate a random suffix for the filenames + let rng = rng(); + let suffix = String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap(); + delete_fixtures(storage).await; - let path = Path::from("put_opts"); + let path = Path::from(format!("put_opts_{suffix}")); let v1 = storage .put_opts(&path, "a".into(), PutMode::Create.into()) .await @@ -687,7 +696,7 @@ pub async fn put_opts(storage: &dyn ObjectStore, supports_update: bool) { const NUM_WORKERS: usize = 5; const NUM_INCREMENTS: usize = 10; - let path = Path::from("RACE"); + let path = Path::from(format!("RACE-{suffix}")); let mut futures: FuturesUnordered<_> = (0..NUM_WORKERS) .map(|_| async { for _ in 0..NUM_INCREMENTS { @@ -855,7 +864,7 @@ pub async fn list_uses_directories_correctly(storage: &DynObjectStore) { let prefix = Path::from("foo"); let content_list = flatten_list_stream(storage, Some(&prefix)).await.unwrap(); - assert_eq!(content_list, &[location1.clone()]); + assert_eq!(content_list, slice::from_ref(&location1)); let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap(); assert_eq!(result.objects.len(), 1); diff --git a/src/lib.rs b/src/lib.rs index fdf0594a..edbe18bf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#![cfg_attr(docsrs, feature(doc_auto_cfg))] +#![cfg_attr(docsrs, feature(doc_cfg))] #![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)] #![warn( missing_copy_implementations, @@ -128,7 +128,7 @@ //! to support a wide variety of user-defined store configurations, with minimal additional //! application complexity. //! -//! ```no_run +//! ```no_run,ignore-wasm32 //! # #[cfg(feature = "aws")] { //! # use url::Url; //! # use object_store::{parse_url, parse_url_opts}; @@ -163,7 +163,7 @@ //! Use the [`ObjectStore::list`] method to iterate over objects in //! remote storage or files in the local filesystem: //! -//! ``` +//! ```ignore-wasm32 //! # use object_store::local::LocalFileSystem; //! # use std::sync::Arc; //! # use object_store::{path::Path, ObjectStore}; @@ -207,7 +207,7 @@ //! Use the [`ObjectStore::get`] method to fetch the data bytes //! from remote storage or files in the local filesystem as a stream. //! -//! ``` +//! ```ignore-wasm32 //! # use futures::TryStreamExt; //! # use object_store::local::LocalFileSystem; //! # use std::sync::Arc; @@ -254,7 +254,7 @@ //! //! Use the [`ObjectStore::put`] method to atomically write data. //! -//! ``` +//! ```ignore-wasm32 //! # use object_store::local::LocalFileSystem; //! # use object_store::{ObjectStore, PutPayload}; //! # use std::sync::Arc; @@ -275,7 +275,7 @@ //! //! Use the [`ObjectStore::put_multipart`] method to atomically write a large amount of data //! -//! ``` +//! ```ignore-wasm32 //! # use object_store::local::LocalFileSystem; //! # use object_store::{ObjectStore, WriteMultipart}; //! # use std::sync::Arc; @@ -304,7 +304,7 @@ //! [`ObjectStore::get_ranges`] provides an efficient way to perform such vectored IO, and will //! automatically coalesce adjacent ranges into an appropriate number of parallel requests. //! -//! ``` +//! ```ignore-wasm32 //! # use object_store::local::LocalFileSystem; //! # use object_store::ObjectStore; //! # use std::sync::Arc; @@ -336,7 +336,7 @@ //! possible to instead allocate memory in chunks and avoid bump allocating. [`PutPayloadMut`] //! encapsulates this approach //! -//! ``` +//! ```ignore-wasm32 //! # use object_store::local::LocalFileSystem; //! # use object_store::{ObjectStore, PutPayloadMut}; //! # use std::sync::Arc; @@ -567,6 +567,9 @@ pub use payload::*; pub use upload::*; pub use util::{coalesce_ranges, collect_bytes, GetRange, OBJECT_STORE_COALESCE_DEFAULT}; +// Re-export HTTP types used in public API +pub use ::http::{Extensions, HeaderMap, HeaderValue}; + use crate::path::Path; #[cfg(all(feature = "fs", not(target_arch = "wasm32")))] use crate::util::maybe_spawn_blocking; @@ -690,7 +693,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// filesystems, GCP, and Azure return an error, while S3 and in-memory will /// return Ok. If it is an error, it will be [`Error::NotFound`]. /// - /// ``` + /// ```ignore-wasm32 /// # use futures::{StreamExt, TryStreamExt}; /// # use object_store::local::LocalFileSystem; /// # async fn example() -> Result<(), Box> { @@ -987,7 +990,7 @@ pub struct GetOptions { /// that need to pass context-specific information (like tracing spans) via trait methods. /// /// These extensions are ignored entirely by backends offered through this crate. - pub extensions: ::http::Extensions, + pub extensions: Extensions, } impl GetOptions { @@ -1184,7 +1187,7 @@ pub struct PutOptions { /// These extensions are ignored entirely by backends offered through this crate. /// /// They are also eclused from [`PartialEq`] and [`Eq`]. - pub extensions: ::http::Extensions, + pub extensions: Extensions, } impl PartialEq for PutOptions { @@ -1256,7 +1259,7 @@ pub struct PutMultipartOptions { /// These extensions are ignored entirely by backends offered through this crate. /// /// They are also eclused from [`PartialEq`] and [`Eq`]. - pub extensions: ::http::Extensions, + pub extensions: Extensions, } impl PartialEq for PutMultipartOptions { @@ -1649,4 +1652,22 @@ mod tests { options.if_match = Some("*".to_string()); // Passes if file exists options.check_preconditions(&meta).unwrap(); } + + #[test] + #[cfg(feature = "http")] + fn test_reexported_types() { + // Test HeaderMap + let mut headers = HeaderMap::new(); + headers.insert("content-type", HeaderValue::from_static("text/plain")); + assert_eq!(headers.len(), 1); + + // Test HeaderValue + let value = HeaderValue::from_static("test-value"); + assert_eq!(value.as_bytes(), b"test-value"); + + // Test Extensions + let mut extensions = Extensions::new(); + extensions.insert("test-key"); + assert!(extensions.get::<&str>().is_some()); + } } diff --git a/src/local.rs b/src/local.rs index 3404bc89..e67206dd 100644 --- a/src/local.rs +++ b/src/local.rs @@ -428,8 +428,8 @@ impl ObjectStore for LocalFileSystem { async fn get_range(&self, location: &Path, range: Range) -> Result { let path = self.path_to_filesystem(location)?; maybe_spawn_blocking(move || { - let (mut file, _) = open_file(&path)?; - read_range(&mut file, &path, range) + let (mut file, metadata) = open_file(&path)?; + read_range(&mut file, metadata.len(), &path, range) }) .await } @@ -439,10 +439,10 @@ impl ObjectStore for LocalFileSystem { let ranges = ranges.to_vec(); maybe_spawn_blocking(move || { // Vectored IO might be faster - let (mut file, _) = open_file(&path)?; + let (mut file, metadata) = open_file(&path)?; ranges .into_iter() - .map(|r| read_range(&mut file, &path, r)) + .map(|r| read_range(&mut file, metadata.len(), &path, r)) .collect() }) .await @@ -899,15 +899,14 @@ pub(crate) fn chunked_stream( .boxed() } -pub(crate) fn read_range(file: &mut File, path: &PathBuf, range: Range) -> Result { - let file_metadata = file.metadata().map_err(|e| Error::Metadata { - source: e.into(), - path: path.to_string_lossy().to_string(), - })?; - +pub(crate) fn read_range( + file: &mut File, + file_len: u64, + path: &PathBuf, + range: Range, +) -> Result { // If none of the range is satisfiable we should error, e.g. if the start offset is beyond the // extents of the file - let file_len = file_metadata.len(); if range.start >= file_len { return Err(Error::InvalidRange { source: InvalidGetRange::StartTooLarge { diff --git a/src/prefix.rs b/src/prefix.rs index e5a917aa..4720c989 100644 --- a/src/prefix.rs +++ b/src/prefix.rs @@ -224,6 +224,8 @@ impl ObjectStore for PrefixStore { #[cfg(not(target_arch = "wasm32"))] #[cfg(test)] mod tests { + use std::slice; + use super::*; use crate::integration::*; use crate::local::LocalFileSystem; @@ -259,11 +261,11 @@ mod tests { let location_prefix = Path::from("test_file.json"); let content_list = flatten_list_stream(&prefix, None).await.unwrap(); - assert_eq!(content_list, &[location_prefix.clone()]); + assert_eq!(content_list, slice::from_ref(&location_prefix)); let root = Path::from("/"); let content_list = flatten_list_stream(&prefix, Some(&root)).await.unwrap(); - assert_eq!(content_list, &[location_prefix.clone()]); + assert_eq!(content_list, slice::from_ref(&location_prefix)); let read_data = prefix .get(&location_prefix)