From 5563b5e81aeb1c6b57eb0b788e1500f3ef5f65da Mon Sep 17 00:00:00 2001 From: Brenton Farmer Date: Mon, 4 May 2026 16:20:54 -0700 Subject: [PATCH 1/2] Fix: OpenSearch dispatcher robustness + R-24c integration test coverage - StatementDispatcher: pass CAS query params (if_seq_no/if_primary_term) via IRequestParameters instead of embedding in path string (OpenSearch.Net rejects paths with query strings); add CasPutParameters class - StatementDispatcher: handle OpenSearchClientException in DispatchUpdateSettingsAsync and DispatchReindexAsync for ThrowExceptions=true clients - SafeDefaultMergeMiddleware: co-inject conflicts:proceed with op_type:create on REINDEX to prevent abort-on-conflict defeating idempotency (ADR-0011) - R-24c gap-fill tests: handle both ThrowExceptions modes in DynamicStrict assertion; make LedgerWrite cleanup defensive against container teardown races - InitializeTestContainers: add HYPERBEE_TESTS_PROVIDERS_ONLY env gate to scope container startup; add HYPERBEE_TESTS_SKIP_SINGLE_NODE support - CouchbaseTestContainer: remove erroneous port 80 mapping --- .../Internal/Dispatch/StatementDispatcher.cs | 163 +++++++++++++----- .../Middleware/SafeDefaultMergeMiddleware.cs | 17 +- .../Couchbase/CouchbaseTestContainer.cs | 8 +- .../Container/InitializeTestContainers.cs | 52 +++++- .../OpenSearchR24cGapFillIntegrationTests.cs | 41 ++++- 5 files changed, 217 insertions(+), 64 deletions(-) diff --git a/src/Hyperbee.Migrations.Providers.OpenSearch/Internal/Dispatch/StatementDispatcher.cs b/src/Hyperbee.Migrations.Providers.OpenSearch/Internal/Dispatch/StatementDispatcher.cs index 9030193..d03d6b3 100644 --- a/src/Hyperbee.Migrations.Providers.OpenSearch/Internal/Dispatch/StatementDispatcher.cs +++ b/src/Hyperbee.Migrations.Providers.OpenSearch/Internal/Dispatch/StatementDispatcher.cs @@ -6,6 +6,8 @@ using Hyperbee.Migrations.Providers.OpenSearch.Internal.Middleware; using Microsoft.Extensions.Logging; using OpenSearch.Net; +using HttpMethod = OpenSearch.Net.HttpMethod; +using Osc = OpenSearch.Client; namespace Hyperbee.Migrations.Providers.OpenSearch.Internal.Dispatch; @@ -140,7 +142,7 @@ private static async Task FetchClusterVersionAsync( StatementContext co { var ll = context.Client.LowLevel; var response = await ll.DoRequestAsync( - global::OpenSearch.Net.HttpMethod.GET, string.Empty, context.CancellationToken ).ConfigureAwait( false ); + HttpMethod.GET, string.Empty, context.CancellationToken ).ConfigureAwait( false ); if ( !response.Success ) { @@ -433,8 +435,19 @@ private async Task DispatchUpdateSettingsAsync( UpdateSettingsA OpenSearchResponseStatus: 200 ); } - var dynamicResponse = await ll.Indices.UpdateSettingsAsync( - ast.IndexName, PostData.String( body ), ctx: context.CancellationToken ).ConfigureAwait( false ); + StringResponse dynamicResponse; + try + { + dynamicResponse = await ll.Indices.UpdateSettingsAsync( + ast.IndexName, PostData.String( body ), ctx: context.CancellationToken ).ConfigureAwait( false ); + } + catch ( OpenSearchClientException ex ) + { + return new StatementResult( StatementOutcome.Failed, verb, + Detail: ex.Message, + OpenSearchResponseStatus: ex.Response?.HttpStatusCode, + Exception: ex ); + } var result = BuildResult( verb, dynamicResponse, $"settings updated on `{ast.IndexName}`" ); @@ -464,8 +477,8 @@ private static async Task DispatchWaitForHealthAsync( WaitForHe var verb = ast.Verb; var threshold = ast.Threshold == HealthStatus.Green - ? global::OpenSearch.Net.WaitForStatus.Green - : global::OpenSearch.Net.WaitForStatus.Yellow; + ? WaitForStatus.Green + : WaitForStatus.Yellow; var timeout = ast.Timeout ?? context.Options.ImplicitWaitTimeout; @@ -474,7 +487,7 @@ private static async Task DispatchWaitForHealthAsync( WaitForHe { var sel = s.WaitForStatus( threshold ).Timeout( timeout ); if ( ast.IndexName is not null ) - sel = sel.Index( global::OpenSearch.Client.Indices.Index( ast.IndexName ) ); + sel = sel.Index( Osc.Indices.Index( ast.IndexName ) ); return sel; }, ct: context.CancellationToken @@ -577,8 +590,19 @@ private async Task DispatchReindexAsync( ReindexAst ast, Statem // is a Phase 2 enhancement (R-11) — authors who need it can compose with // WAIT UNTIL TASK once the runner exposes the task id. - var response = await ll.ReindexOnServerAsync( - PostData.String( body ), ctx: context.CancellationToken ).ConfigureAwait( false ); + StringResponse response; + try + { + response = await ll.ReindexOnServerAsync( + PostData.String( body ), ctx: context.CancellationToken ).ConfigureAwait( false ); + } + catch ( OpenSearchClientException ex ) + { + return new StatementResult( StatementOutcome.Failed, verb, + Detail: ex.Message, + OpenSearchResponseStatus: ex.Response?.HttpStatusCode, + Exception: ex ); + } var result = BuildResult( verb, response, $"reindex {ast.Source} -> {ast.Destination}" ); @@ -613,7 +637,7 @@ private async Task DispatchAliasSwapAsync( AliasSwapAst ast, St """; var response = await ll.DoRequestAsync( - global::OpenSearch.Net.HttpMethod.POST, + HttpMethod.POST, "_aliases", context.CancellationToken, data: PostData.String( body ) ).ConfigureAwait( false ); @@ -638,7 +662,7 @@ private static async Task DispatchAliasAddAsync( AliasAddAst as """; var response = await ll.DoRequestAsync( - global::OpenSearch.Net.HttpMethod.POST, + HttpMethod.POST, "_aliases", context.CancellationToken, data: PostData.String( body ) ).ConfigureAwait( false ); @@ -658,7 +682,7 @@ private static async Task DispatchAliasRemoveAsync( AliasRemove """; var response = await ll.DoRequestAsync( - global::OpenSearch.Net.HttpMethod.POST, + HttpMethod.POST, "_aliases", context.CancellationToken, data: PostData.String( body ) ).ConfigureAwait( false ); @@ -686,7 +710,7 @@ private static async Task DispatchCreateTemplateAsync( CreateTe // Idempotent: PUT replaces an existing template definition. var response = await ll.DoRequestAsync( - global::OpenSearch.Net.HttpMethod.PUT, + HttpMethod.PUT, $"_index_template/{ast.TemplateName}", context.CancellationToken, data: PostData.String( body ) ).ConfigureAwait( false ); @@ -714,7 +738,7 @@ private static async Task DispatchCreateComponentAsync( CreateC // by composable index templates via `composed_of`. var response = await ll.DoRequestAsync( - global::OpenSearch.Net.HttpMethod.PUT, + HttpMethod.PUT, $"_component_template/{ast.ComponentName}", context.CancellationToken, data: PostData.String( body ) ).ConfigureAwait( false ); @@ -732,7 +756,7 @@ private static async Task DispatchDropTemplateAsync( DropTempla if ( ast.IfExists ) { var existsResponse = await ll.DoRequestAsync( - global::OpenSearch.Net.HttpMethod.HEAD, + HttpMethod.HEAD, $"_index_template/{ast.TemplateName}", context.CancellationToken ).ConfigureAwait( false ); @@ -746,7 +770,7 @@ private static async Task DispatchDropTemplateAsync( DropTempla } var response = await ll.DoRequestAsync( - global::OpenSearch.Net.HttpMethod.DELETE, + HttpMethod.DELETE, $"_index_template/{ast.TemplateName}", context.CancellationToken ).ConfigureAwait( false ); @@ -763,7 +787,7 @@ private static async Task DispatchDropComponentAsync( DropCompo if ( ast.IfExists ) { var existsResponse = await ll.DoRequestAsync( - global::OpenSearch.Net.HttpMethod.HEAD, + HttpMethod.HEAD, $"_component_template/{ast.ComponentName}", context.CancellationToken ).ConfigureAwait( false ); @@ -781,7 +805,7 @@ private static async Task DispatchDropComponentAsync( DropCompo // dispatcher surfaces that error verbatim via BuildResult. var response = await ll.DoRequestAsync( - global::OpenSearch.Net.HttpMethod.DELETE, + HttpMethod.DELETE, $"_component_template/{ast.ComponentName}", context.CancellationToken ).ConfigureAwait( false ); @@ -819,21 +843,35 @@ private async Task DispatchCreatePolicyAsync( CreatePolicyAst a // not under _source — unusual but documented), retry PUT with the // CAS query parameters. Mirrors LockHandle.RenewLockAsync's // optimistic-concurrency pattern. + // + // 409 surfaces either as a non-Success StringResponse (default + // settings) OR as OpenSearchClientException (when the consumer + // configured ConnectionSettings.ThrowExceptions(), which test + // containers and some prod deployments do). Handle both uniformly, + // matching the pattern in OpenSearchRecordStore.AcquireLockAsync. - var firstResponse = await ll.DoRequestAsync( - global::OpenSearch.Net.HttpMethod.PUT, - policyPath, - context.CancellationToken, - data: PostData.String( body ) ).ConfigureAwait( false ); + var (firstResponse, firstHit409) = await PutPolicyAsync( + ll, policyPath, body, context.CancellationToken ).ConfigureAwait( false ); - if ( firstResponse.HttpStatusCode != 409 ) - return BuildResult( verb, firstResponse, $"policy `{ast.PolicyId}` created/updated" ); + if ( !firstHit409 ) + return BuildResult( verb, firstResponse!, $"policy `{ast.PolicyId}` created/updated" ); // 409 — read the current version to retry with CAS. - var getResponse = await ll.DoRequestAsync( - global::OpenSearch.Net.HttpMethod.GET, - policyPath, - context.CancellationToken ).ConfigureAwait( false ); + StringResponse? getResponse; + try + { + getResponse = await ll.DoRequestAsync( + HttpMethod.GET, + policyPath, + context.CancellationToken ).ConfigureAwait( false ); + } + catch ( OpenSearchClientException ex ) + { + return new StatementResult( StatementOutcome.Failed, verb, + Detail: $"policy `{ast.PolicyId}` returned 409 on PUT but the follow-up GET to read _seq_no/_primary_term for CAS retry threw: HTTP {ex.Response?.HttpStatusCode}, {ex.Message}", + OpenSearchResponseStatus: ex.Response?.HttpStatusCode, + Exception: ex ); + } if ( !getResponse.Success || getResponse.Body is null ) { @@ -866,28 +904,63 @@ private async Task DispatchCreatePolicyAsync( CreatePolicyAst a Exception: ex ); } - // Retry the PUT with CAS query params. Inline rather than via a typed - // IRequestParameters because there's no ISM-specific request-parameters - // type in OpenSearch.Net (the endpoint is plugin-served). - var retryPath = $"{policyPath}?if_seq_no={seqNo}&if_primary_term={primaryTerm}"; - var retryResponse = await ll.DoRequestAsync( - global::OpenSearch.Net.HttpMethod.PUT, - retryPath, - context.CancellationToken, - data: PostData.String( body ) ).ConfigureAwait( false ); + // Retry the PUT with CAS query params passed via IRequestParameters — + // OpenSearch.Net rejects paths that embed query strings directly. + var (retryResponse, retryHit409) = await PutPolicyAsync( + ll, policyPath, body, context.CancellationToken, + new CasPutParameters( seqNo, primaryTerm ) ).ConfigureAwait( false ); - if ( retryResponse.HttpStatusCode == 409 ) + if ( retryHit409 ) { // Second 409 means another writer beat us between the GET and // the retry PUT. The lock should make this rare; treat as hard // failure rather than recursing. return new StatementResult( StatementOutcome.Failed, verb, Detail: $"policy `{ast.PolicyId}` CAS retry hit a second 409 — concurrent writer between GET (_seq_no={seqNo}, _primary_term={primaryTerm}) and retry PUT.", - OpenSearchResponseStatus: retryResponse.HttpStatusCode, - Exception: retryResponse.OriginalException ?? new InvalidOperationException( $"concurrent writer on policy {ast.PolicyId}" ) ); + OpenSearchResponseStatus: 409, + Exception: retryResponse?.OriginalException ?? new InvalidOperationException( $"concurrent writer on policy {ast.PolicyId}" ) ); } - return BuildResult( verb, retryResponse, $"policy `{ast.PolicyId}` updated via CAS retry (seq={seqNo}, term={primaryTerm})" ); + return BuildResult( verb, retryResponse!, $"policy `{ast.PolicyId}` updated via CAS retry (seq={seqNo}, term={primaryTerm})" ); + } + + // PUT a policy body and report whether the result was a 409, transparent + // to whether the consumer's ConnectionSettings has ThrowExceptions enabled. + // Returns (response, hit409). When hit409 is true, response may be null + // (the throwing path). + private static async Task<(StringResponse? Response, bool Hit409)> PutPolicyAsync( + IOpenSearchLowLevelClient ll, + string path, + string body, + CancellationToken cancellationToken, + IRequestParameters? requestParameters = null ) + { + try + { + var response = await ll.DoRequestAsync( + HttpMethod.PUT, + path, + cancellationToken, + data: PostData.String( body ), + requestParameters: requestParameters ).ConfigureAwait( false ); + return (response, response.HttpStatusCode == 409); + } + catch ( OpenSearchClientException ex ) when ( ex.Response?.HttpStatusCode == 409 ) + { + return (null, true); + } + } + + private sealed class CasPutParameters : RequestParameters + { + public override HttpMethod DefaultHttpMethod => HttpMethod.PUT; + public override bool SupportsBody => true; + + public CasPutParameters( long seqNo, long primaryTerm ) + { + QueryString["if_seq_no"] = seqNo; + QueryString["if_primary_term"] = primaryTerm; + } } // --- APPLY POLICY TO --- @@ -912,7 +985,7 @@ private async Task DispatchApplyPolicyAsync( ApplyPolicyAst ast """; var response = await ll.DoRequestAsync( - global::OpenSearch.Net.HttpMethod.POST, + HttpMethod.POST, $"{IsmPathPrefix}/add/{ast.IndexPattern}", context.CancellationToken, data: PostData.String( body ) ).ConfigureAwait( false ); @@ -1052,8 +1125,8 @@ public async Task FlushImplicitWaitsAsync( StatementContext context ) private static async Task ExecuteHealthWaitAsync( StatementContext context, IReadOnlyCollection indices ) { var threshold = context.Options.ClusterHealthThreshold == ClusterHealthThreshold.Green - ? global::OpenSearch.Net.WaitForStatus.Green - : global::OpenSearch.Net.WaitForStatus.Yellow; + ? WaitForStatus.Green + : WaitForStatus.Yellow; var timeout = context.Options.ImplicitWaitTimeout; @@ -1063,7 +1136,7 @@ await context.Client.Cluster.HealthAsync( selector: s => s .WaitForStatus( threshold ) .Timeout( timeout ) - .Index( global::OpenSearch.Client.Indices.Index( string.Join( ",", indices ) ) ), + .Index( Osc.Indices.Index( string.Join( ",", indices ) ) ), ct: context.CancellationToken ).ConfigureAwait( false ); } diff --git a/src/Hyperbee.Migrations.Providers.OpenSearch/Internal/Middleware/SafeDefaultMergeMiddleware.cs b/src/Hyperbee.Migrations.Providers.OpenSearch/Internal/Middleware/SafeDefaultMergeMiddleware.cs index 665c149..1dfd4c6 100644 --- a/src/Hyperbee.Migrations.Providers.OpenSearch/Internal/Middleware/SafeDefaultMergeMiddleware.cs +++ b/src/Hyperbee.Migrations.Providers.OpenSearch/Internal/Middleware/SafeDefaultMergeMiddleware.cs @@ -77,14 +77,19 @@ private static JsonObject MergeCreateIndex( CreateIndexAst ast, JsonNode? body ) // 1. If InjectOpTypeCreate is false (UNSAFE branch): pass body through // unchanged. Author owns idempotency. // 2. If body is null: produce - // { "source": { "index": }, "dest": { "index": , "op_type": "create" } } - // 3. If body has dest object missing op_type: merge `op_type: create` - // 4. If body has dest with `op_type: create` already: pass through + // { "source": { "index": }, "dest": { "index": , "op_type": "create" }, "conflicts": "proceed" } + // 3. If body has dest object missing op_type: merge `op_type: create` and `conflicts: proceed` + // (unless body already sets `conflicts`) + // 4. If body has dest with `op_type: create` already: merge `conflicts: proceed` // (idempotent inject) // 5. If body has dest with conflicting op_type (e.g., "index"): throw // SafeDefaultConflictException — author must use REINDEX UNSAFE("...") // to opt out // + // `conflicts: proceed` is co-injected with `op_type: create` because the safe-default + // semantics are "skip pre-existing docs" — aborting on the first conflict (the + // OpenSearch default) would return HTTP 409 and defeat idempotency entirely. + // // The middleware also ensures source.index and dest.index match the AST's // Source/Destination unless the body explicitly overrides them (advanced use). @@ -107,12 +112,18 @@ private static JsonObject MergeReindex( ReindexAst ast, JsonNode? body ) if ( !dest.TryGetPropertyValue( "op_type", out var existing ) || existing is null ) { dest["op_type"] = "create"; + if ( !clone.ContainsKey( "conflicts" ) ) + clone["conflicts"] = "proceed"; return clone; } var existingValue = existing.GetValue(); if ( existingValue == "create" ) + { + if ( !clone.ContainsKey( "conflicts" ) ) + clone["conflicts"] = "proceed"; return clone; // idempotent inject + } throw new SafeDefaultConflictException( $"REINDEX body specifies `op_type: \"{existingValue}\"` which conflicts with the safe-default `op_type: create`. " + diff --git a/tests/Hyperbee.Migrations.Integration.Tests/Container/Couchbase/CouchbaseTestContainer.cs b/tests/Hyperbee.Migrations.Integration.Tests/Container/Couchbase/CouchbaseTestContainer.cs index 66defa8..9f5add8 100644 --- a/tests/Hyperbee.Migrations.Integration.Tests/Container/Couchbase/CouchbaseTestContainer.cs +++ b/tests/Hyperbee.Migrations.Integration.Tests/Container/Couchbase/CouchbaseTestContainer.cs @@ -29,7 +29,13 @@ await network.CreateAsync( cancellationToken ) .WithNetwork( network ) .WithNetworkAliases( "db" ) - .WithPortBinding( 80, 80 ) + // Couchbase Server's documented ports are 8091-8096 (REST/UI/services) + // and 11210/11211 (data) — port 80 is not in the cluster-map and the + // SDK never connects to it, so binding host:80 was always vestigial. + // The previous binding ran into HTTP.sys URL-ACL conflicts on Windows + // hosts where Windows services (SSDP, WinRM, etc.) hold reservations + // on port 80; removing it eliminates the conflict everywhere without + // changing test behavior. .WithPortBinding( 11210, 11210 ) .WithPortBinding( 8091, 8091 ) .WithPortBinding( 8092, 8092 ) diff --git a/tests/Hyperbee.Migrations.Integration.Tests/Container/InitializeTestContainers.cs b/tests/Hyperbee.Migrations.Integration.Tests/Container/InitializeTestContainers.cs index c495c33..b345c8d 100644 --- a/tests/Hyperbee.Migrations.Integration.Tests/Container/InitializeTestContainers.cs +++ b/tests/Hyperbee.Migrations.Integration.Tests/Container/InitializeTestContainers.cs @@ -1,4 +1,4 @@ -using Hyperbee.Migrations.Integration.Tests.Container.Aerospike; +using Hyperbee.Migrations.Integration.Tests.Container.Aerospike; using Hyperbee.Migrations.Integration.Tests.Container.Couchbase; using Hyperbee.Migrations.Integration.Tests.Container.MongoDb; using Hyperbee.Migrations.Integration.Tests.Container.OpenSearch; @@ -9,6 +9,13 @@ namespace Hyperbee.Migrations.Integration.Tests.Container; [TestClass] public class InitializeTestContainers { + // Provider names recognized by HYPERBEE_TESTS_PROVIDERS_ONLY (case-insensitive). + private const string MongoDb = "MongoDb"; + private const string Postgres = "Postgres"; + private const string Couchbase = "Couchbase"; + private const string Aerospike = "Aerospike"; + private const string OpenSearch = "OpenSearch"; + [AssemblyInitialize] public static async Task Initialize( TestContext context ) { @@ -20,10 +27,43 @@ public static async Task Initialize( TestContext context ) if ( Environment.GetEnvironmentVariable( "HYPERBEE_TESTS_SKIP_SINGLE_NODE" ) == "true" ) return; - await MongoDbTestContainer.Initialize( context ); - await PostgresTestContainer.Initialize( context ); - await CouchbaseTestContainer.Initialize( context ); - await AerospikeTestContainer.Initialize( context ); - await OpenSearchTestContainer.Initialize( context ); + // HYPERBEE_TESTS_PROVIDERS_ONLY scopes the assembly initializer to a + // subset of providers. Without it, all five providers' containers + // start up — the historical default. With it, only the named + // providers (comma-separated, case-insensitive) initialize. Examples: + // + // HYPERBEE_TESTS_PROVIDERS_ONLY=OpenSearch + // HYPERBEE_TESTS_PROVIDERS_ONLY=Couchbase,MongoDb + // + // The motivation is that the assembly initializer is otherwise a + // single point of failure for the whole integration suite — any one + // provider's container failing to start (an environmental issue on + // a contributor's machine, a test-fixture timing problem, a flaky + // image pull) prevents EVERY test from running, including ones for + // unrelated providers. Provider scoping lets contributors verify + // changes affecting only their provider without fighting unrelated + // test-fixture problems. + var providers = ParseProviderFilter( Environment.GetEnvironmentVariable( "HYPERBEE_TESTS_PROVIDERS_ONLY" ) ); + + if ( providers is null || providers.Contains( MongoDb ) ) + await MongoDbTestContainer.Initialize( context ); + if ( providers is null || providers.Contains( Postgres ) ) + await PostgresTestContainer.Initialize( context ); + if ( providers is null || providers.Contains( Couchbase ) ) + await CouchbaseTestContainer.Initialize( context ); + if ( providers is null || providers.Contains( Aerospike ) ) + await AerospikeTestContainer.Initialize( context ); + if ( providers is null || providers.Contains( OpenSearch ) ) + await OpenSearchTestContainer.Initialize( context ); + } + + private static HashSet? ParseProviderFilter( string? raw ) + { + if ( string.IsNullOrWhiteSpace( raw ) ) + return null; + + return raw + .Split( ',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries ) + .ToHashSet( StringComparer.OrdinalIgnoreCase ); } } diff --git a/tests/Hyperbee.Migrations.Integration.Tests/OpenSearchR24cGapFillIntegrationTests.cs b/tests/Hyperbee.Migrations.Integration.Tests/OpenSearchR24cGapFillIntegrationTests.cs index adbb01c..c35029f 100644 --- a/tests/Hyperbee.Migrations.Integration.Tests/OpenSearchR24cGapFillIntegrationTests.cs +++ b/tests/Hyperbee.Migrations.Integration.Tests/OpenSearchR24cGapFillIntegrationTests.cs @@ -248,13 +248,34 @@ public async Task DynamicStrict_AutoInjected_RejectsUnmappedFields() index, "1", PostData.String( """{ "id": "u1" }""" ) ); Assert.IsTrue( ok.Success, $"mapped-only doc should index; got: {ok.Body}" ); - // Indexing with an UNMAPPED field must be rejected by - // strict_dynamic_mapping. - var rejected = await ll.IndexAsync( - index, "2", PostData.String( """{ "id": "u2", "unmapped_field": "x" }""" ) ); - Assert.IsFalse( rejected.Success, - $"unmapped field should be rejected by dynamic:strict; got HTTP {rejected.HttpStatusCode}: {rejected.Body}" ); - StringAssert.Contains( rejected.Body!, "strict_dynamic_mapping" ); + // Indexing with an UNMAPPED field must be rejected by strict_dynamic_mapping. + // With ThrowExceptions=false the response captures the rejection; with + // ThrowExceptions=true (test-container default) the client throws instead. + StringResponse? rejected = null; + OpenSearchClientException? strictEx = null; + try + { + rejected = await ll.IndexAsync( + index, "2", PostData.String( """{ "id": "u2", "unmapped_field": "x" }""" ) ); + } + catch ( OpenSearchClientException ex ) when ( ex.Response?.HttpStatusCode == 400 ) + { + strictEx = ex; + } + + if ( rejected != null ) + { + Assert.IsFalse( rejected.Success, + $"unmapped field should be rejected by dynamic:strict; got HTTP {rejected.HttpStatusCode}: {rejected.Body}" ); + StringAssert.Contains( rejected.Body!, "strict_dynamic_mapping" ); + } + else + { + Assert.IsNotNull( strictEx, + "expected strict_dynamic_mapping rejection; got neither a failed response nor an exception" ); + StringAssert.Contains( strictEx!.Message, "strict_dynamic_mapping", + $"expected strict_dynamic_mapping in exception; got: {strictEx.Message}" ); + } } finally { @@ -538,8 +559,10 @@ public async Task LedgerWrite_HundredMigrations_CompletesWithinBudget() finally { var ll = MultiNodeOpenSearchTestContainer.LowLevelClient; - await ll.Indices.DeleteAsync( options.LedgerIndex ); - await ll.Indices.DeleteAsync( options.LockIndex ); + // Best-effort cleanup — connection may fail if containers are torn down by a + // parallel framework's ClassCleanup before this finally block completes. + try { await ll.Indices.DeleteAsync( options.LedgerIndex ); } catch ( Exception ) { } + try { await ll.Indices.DeleteAsync( options.LockIndex ); } catch ( Exception ) { } } } } From 2528250521ed29751c8b6c9d5be88cb07590a121 Mon Sep 17 00:00:00 2001 From: Brenton Farmer Date: Mon, 4 May 2026 17:04:06 -0700 Subject: [PATCH 2/2] Harden bootstrap and document public OpenSearch provider surface - LedgerIndexInitStep / LockIndexInitStep: tolerate the TOCTOU race between Exists() and Create() by detecting `resource_already_exists_exception` in both client modes (response-based and exception-based). On race-loss the ledger step still verifies the existing mapping; the lock step succeeds. - OpenSearchMigrationOptions: add XML docs to every public type, enum value, property, const, and ctor; lift R-19 / R-29 / PA-2 rationale from line comments into /; add links between related options (LockRenewInterval <-> LockStaleAfter, etc.). - OpenSearchExceptions: add XML docs to every exception type and ctor; document RecordId/StatementIndex/FailedStatementIndex properties. --- .../Bootstrap/Steps/LedgerIndexInitStep.cs | 56 ++++++++- .../Bootstrap/Steps/LockIndexInitStep.cs | 54 ++++++++- .../OpenSearchExceptions.cs | 81 ++++++++++--- .../OpenSearchMigrationOptions.cs | 108 +++++++++++++++--- 4 files changed, 252 insertions(+), 47 deletions(-) diff --git a/src/Hyperbee.Migrations.Providers.OpenSearch/Internal/Bootstrap/Steps/LedgerIndexInitStep.cs b/src/Hyperbee.Migrations.Providers.OpenSearch/Internal/Bootstrap/Steps/LedgerIndexInitStep.cs index a29368d..4fc510a 100644 --- a/src/Hyperbee.Migrations.Providers.OpenSearch/Internal/Bootstrap/Steps/LedgerIndexInitStep.cs +++ b/src/Hyperbee.Migrations.Providers.OpenSearch/Internal/Bootstrap/Steps/LedgerIndexInitStep.cs @@ -84,14 +84,35 @@ public async Task ExecuteAsync( BootstrapContext context ) logger.LogInformation( "{step} creating ledger index `{idx}` with strict mapping", Name, indexName ); - var createResponse = await context.Client.LowLevel.Indices.CreateAsync( - indexName, - PostData.String( DefaultMappingJson ), - ctx: context.CancellationToken - ).ConfigureAwait( false ); + StringResponse createResponse; + try + { + createResponse = await context.Client.LowLevel.Indices.CreateAsync( + indexName, + PostData.String( DefaultMappingJson ), + ctx: context.CancellationToken + ).ConfigureAwait( false ); + } + catch ( OpenSearchClientException ex ) when ( IsResourceAlreadyExists( ex.Response ) ) + { + // TOCTOU race: another runner created the index between our Exists() + // check and Create(). Verify the mapping and treat as success. + logger.LogDebug( "{step} ledger index `{idx}` created concurrently by another runner; verifying mapping", Name, indexName ); + var verifyDetail = await VerifyMappingAsync( context, indexName, logger ).ConfigureAwait( false ); + var raceElapsed = context.TimeProvider.GetElapsedTime( start ); + return StepOutcome.Succeeded( Name, raceElapsed, $"{verifyDetail} (raced)" ); + } if ( !createResponse.Success ) { + if ( IsResourceAlreadyExists( createResponse ) ) + { + logger.LogDebug( "{step} ledger index `{idx}` created concurrently by another runner; verifying mapping", Name, indexName ); + var verifyDetail = await VerifyMappingAsync( context, indexName, logger ).ConfigureAwait( false ); + var raceElapsed = context.TimeProvider.GetElapsedTime( start ); + return StepOutcome.Succeeded( Name, raceElapsed, $"{verifyDetail} (raced)" ); + } + var detail = createResponse.OriginalException?.Message ?? createResponse.Body ?? "Unknown create failure"; var ex = new OpenSearchProviderException( $"{Name} could not create ledger index `{indexName}`. {detail}", @@ -155,4 +176,29 @@ private static async Task VerifyMappingAsync( BootstrapContext context, logger.LogDebug( "{step} ledger schema verified ({count} required fields present)", "ledger-init", RequiredFields.Length ); return "verified existing schema"; } + + // Detects the OpenSearch-specific 400 body that signals a TOCTOU race + // between Exists() and Create() — another runner won. Inspect the body + // string rather than the status code alone because OS reuses 400 for + // genuine bad-request shapes (malformed mapping, invalid settings). + private static bool IsResourceAlreadyExists( IApiCallDetails? response ) + { + if ( response is null || response.HttpStatusCode != 400 ) + return false; + + var body = response.ResponseBodyInBytes is { Length: > 0 } bytes + ? System.Text.Encoding.UTF8.GetString( bytes ) + : null; + + return body is not null && body.Contains( "resource_already_exists_exception", StringComparison.Ordinal ); + } + + private static bool IsResourceAlreadyExists( StringResponse response ) + { + if ( response.HttpStatusCode != 400 ) + return false; + + return !string.IsNullOrEmpty( response.Body ) + && response.Body.Contains( "resource_already_exists_exception", StringComparison.Ordinal ); + } } diff --git a/src/Hyperbee.Migrations.Providers.OpenSearch/Internal/Bootstrap/Steps/LockIndexInitStep.cs b/src/Hyperbee.Migrations.Providers.OpenSearch/Internal/Bootstrap/Steps/LockIndexInitStep.cs index 0bcd96e..5909339 100644 --- a/src/Hyperbee.Migrations.Providers.OpenSearch/Internal/Bootstrap/Steps/LockIndexInitStep.cs +++ b/src/Hyperbee.Migrations.Providers.OpenSearch/Internal/Bootstrap/Steps/LockIndexInitStep.cs @@ -67,14 +67,33 @@ public async Task ExecuteAsync( BootstrapContext context ) logger.LogInformation( "{step} creating lock index `{idx}` (replicas=0)", Name, indexName ); - var createResponse = await context.Client.LowLevel.Indices.CreateAsync( - indexName, - PostData.String( DefaultMappingJson ), - ctx: context.CancellationToken - ).ConfigureAwait( false ); + StringResponse createResponse; + try + { + createResponse = await context.Client.LowLevel.Indices.CreateAsync( + indexName, + PostData.String( DefaultMappingJson ), + ctx: context.CancellationToken + ).ConfigureAwait( false ); + } + catch ( OpenSearchClientException ex ) when ( IsResourceAlreadyExists( ex.Response ) ) + { + // TOCTOU race: another runner created the lock index between our + // Exists() check and Create(). Treat as success. + logger.LogDebug( "{step} lock index `{idx}` created concurrently by another runner", Name, indexName ); + var raceElapsed = context.TimeProvider.GetElapsedTime( start ); + return StepOutcome.Succeeded( Name, raceElapsed, "exists (raced)" ); + } if ( !createResponse.Success ) { + if ( IsResourceAlreadyExists( createResponse ) ) + { + logger.LogDebug( "{step} lock index `{idx}` created concurrently by another runner", Name, indexName ); + var raceElapsed = context.TimeProvider.GetElapsedTime( start ); + return StepOutcome.Succeeded( Name, raceElapsed, "exists (raced)" ); + } + var detail = createResponse.OriginalException?.Message ?? createResponse.Body ?? "Unknown create failure"; var ex = new OpenSearchProviderException( $"{Name} could not create lock index `{indexName}`. {detail}", @@ -97,4 +116,29 @@ public async Task ExecuteAsync( BootstrapContext context ) $"{Name} threw an unexpected exception. {ex.Message}", ex ) ); } } + + // Detects the OpenSearch-specific 400 body that signals a TOCTOU race + // between Exists() and Create() — another runner won. Inspect the body + // string rather than the status code alone because OS reuses 400 for + // genuine bad-request shapes (malformed mapping, invalid settings). + private static bool IsResourceAlreadyExists( IApiCallDetails? response ) + { + if ( response is null || response.HttpStatusCode != 400 ) + return false; + + var body = response.ResponseBodyInBytes is { Length: > 0 } bytes + ? System.Text.Encoding.UTF8.GetString( bytes ) + : null; + + return body is not null && body.Contains( "resource_already_exists_exception", StringComparison.Ordinal ); + } + + private static bool IsResourceAlreadyExists( StringResponse response ) + { + if ( response.HttpStatusCode != 400 ) + return false; + + return !string.IsNullOrEmpty( response.Body ) + && response.Body.Contains( "resource_already_exists_exception", StringComparison.Ordinal ); + } } diff --git a/src/Hyperbee.Migrations.Providers.OpenSearch/OpenSearchExceptions.cs b/src/Hyperbee.Migrations.Providers.OpenSearch/OpenSearchExceptions.cs index a93cda2..04b2ff4 100644 --- a/src/Hyperbee.Migrations.Providers.OpenSearch/OpenSearchExceptions.cs +++ b/src/Hyperbee.Migrations.Providers.OpenSearch/OpenSearchExceptions.cs @@ -1,44 +1,79 @@ -#nullable enable +#nullable enable namespace Hyperbee.Migrations.Providers.OpenSearch; // Provider-specific exception hierarchy. Typed exceptions allow callers to // pattern-match on classes of failure without parsing log strings. +/// +/// Base type for all OpenSearch-provider exceptions. Catch this to handle any +/// provider-originated failure without coupling to a specific subclass. +/// public class OpenSearchProviderException : Exception { + /// Initializes a new instance with a descriptive message. public OpenSearchProviderException( string message ) : base( message ) { } + + /// Initializes a new instance with a descriptive message and the underlying cause. public OpenSearchProviderException( string message, Exception inner ) : base( message, inner ) { } } +/// +/// Bootstrap could not bring the cluster to a usable state — the ledger or lock +/// index could not be created/verified, the cluster did not reach the configured +/// health threshold, or a required step failed. +/// public sealed class OpenSearchNotReadyException : OpenSearchProviderException { + /// Initializes a new instance with a descriptive message. public OpenSearchNotReadyException( string message ) : base( message ) { } + + /// Initializes a new instance with a descriptive message and the underlying cause. public OpenSearchNotReadyException( string message, Exception inner ) : base( message, inner ) { } } +/// +/// Ledger index exists but its mapping is missing one of the required forensic +/// fields (R-06). The ledger schema is immutable; recreate the index to recover. +/// public sealed class OpenSearchLedgerSchemaMismatchException : OpenSearchProviderException { + /// Initializes a new instance with a descriptive message. public OpenSearchLedgerSchemaMismatchException( string message ) : base( message ) { } } +/// +/// Migration lock exceeded . +/// The in-flight migration's has +/// been signaled; the runner is winding down. +/// public sealed class MigrationLockExpiredException : OpenSearchProviderException { + /// Initializes a new instance with a descriptive message. public MigrationLockExpiredException( string message ) : base( message ) { } } +/// +/// AWS SigV4 authentication was requested via +/// but the required configuration +/// (region, credentials, service) was not supplied. +/// public sealed class AwsSigV4NotConfiguredException : OpenSearchProviderException { + /// Initializes a new instance with a descriptive message. public AwsSigV4NotConfiguredException( string message ) : base( message ) { } } -// R-19: thrown by RollbackStatementsFromAsync when a statement entry has no -// `rollback` field. The author's intent is "this operation is irreversible"; -// the runner refuses Down rather than guess at an inverse. - +/// +/// R-19: thrown by Down execution when a statement has no rollback field. +/// The author's intent is "this operation is irreversible"; the runner refuses +/// to guess at an inverse rather than risk corruption. +/// public sealed class RollbackNotSupportedException : OpenSearchProviderException { + /// Index of the statement (within the migration's statement list) that lacks a rollback definition. public int StatementIndex { get; } + /// Initializes a new instance for the statement at . public RollbackNotSupportedException( int statementIndex, string message ) : base( message ) { @@ -46,30 +81,40 @@ public RollbackNotSupportedException( int statementIndex, string message ) } } -// R-15: thrown at the resource-runner entry point when a statements.json -// file declares a `context:` block AND the runner is configured with -// ContextResolutionPolicy.RequireExplicit AND ActiveContext is null/empty. -// `RequireExplicit` is the production default (set by WithProductionDefaults -// per R-29); silent prod-everywhere behavior is forbidden by the trust -// boundary, so the runner must fail loud rather than guess. - +/// +/// R-15: thrown at the resource-runner entry point when a statements.json +/// declares a context: block, +/// is in effect, and is unset. +/// +/// +/// is the production default +/// (per R-29's WithProductionDefaults); silent prod-everywhere behavior is +/// forbidden by the trust boundary, so the runner fails loud rather than guess. +/// public sealed class MissingActiveContextException : OpenSearchProviderException { + /// Initializes a new instance with a descriptive message. public MissingActiveContextException( string message ) : base( message ) { } } -// R-19: thrown when a migration's ledger record is in `partially_rolled_back` -// state and the operator has not opted into recovery via OpenSearchMigrationOptions.ForceResume. -// Subsequent runs are refused in either direction until the operator -// inspects the cluster, reconciles state, and explicitly re-runs with -// ForceResume = true (or deletes the record manually for a fresh Up). - +/// +/// R-19: thrown when a migration's ledger record is in partially_rolled_back +/// state and the operator has not opted into recovery via +/// . Subsequent runs are refused +/// in either direction until the operator inspects the cluster, reconciles state, +/// and explicitly re-runs with ForceResume = true (or deletes the record +/// manually for a fresh Up). +/// public sealed class OpenSearchPartialRollbackException : OpenSearchProviderException { + /// Identifier of the migration record stuck in partially_rolled_back state. public string RecordId { get; } + + /// Index of the statement (within the migration's rollback sequence) that failed, if known. public int? FailedStatementIndex { get; } + /// Initializes a new instance describing the stuck record and (optionally) the failing statement index. public OpenSearchPartialRollbackException( string recordId, int? failedStatementIndex, string message ) : base( message ) { diff --git a/src/Hyperbee.Migrations.Providers.OpenSearch/OpenSearchMigrationOptions.cs b/src/Hyperbee.Migrations.Providers.OpenSearch/OpenSearchMigrationOptions.cs index ee59fab..e20e679 100644 --- a/src/Hyperbee.Migrations.Providers.OpenSearch/OpenSearchMigrationOptions.cs +++ b/src/Hyperbee.Migrations.Providers.OpenSearch/OpenSearchMigrationOptions.cs @@ -1,75 +1,145 @@ -namespace Hyperbee.Migrations.Providers.OpenSearch; +namespace Hyperbee.Migrations.Providers.OpenSearch; +/// +/// Cluster-health gate threshold the runner waits for before executing migrations. +/// public enum ClusterHealthThreshold { + /// All primary shards active; some replicas may still be initializing. Yellow, + /// All primary and replica shards active. Green } +/// +/// Controls when the runner blocks on cluster wait-conditions (health, task +/// completion) emitted by migration statements. +/// public enum WaitMode { + /// Wait after every statement that emits a wait-condition. Safest; default. PerStatement, + /// Wait once at the end of each migration. Faster; assumes statements within a migration commute. PerMigration, + /// Never wait. Author owns synchronization; suitable for tests and fast-path scripts. Off } +/// +/// Behavior when a migration references $context but no +/// is configured. +/// public enum ContextResolutionPolicy { + /// Skip the migration silently when context is unset. SkipIfUnset, + /// Fail the run with a clear error when context is unset. RequireExplicit } +/// +/// OpenSearch-provider configuration for the migration runner. Controls ledger and +/// lock-index naming, cluster wait behavior, lock heartbeat cadence, and runner-level +/// safety knobs (UNSAFE justification, partial-rollback resume). +/// public class OpenSearchMigrationOptions : MigrationOptions { + /// Default name for the migration ledger index. public const string DefaultLedgerIndex = ".migrations"; + + /// Default name for the migration lock index. public const string DefaultLockIndex = ".migrations-lock"; + + /// Default _id of the singleton lock document inside the lock index. public const string DefaultLockName = "migration_lock"; + /// + /// Index storing applied-migration records. Created on first run with a strict + /// mapping; re-verified on subsequent runs. + /// public string LedgerIndex { get; set; } = DefaultLedgerIndex; + + /// + /// Index storing the singleton lock document (number_of_replicas: 0 per + /// PA-2 to eliminate replica-write coupling on the lock primary shard). + /// public string LockIndex { get; set; } = DefaultLockIndex; + + /// + /// _id of the lock document inside . Override to run + /// multiple independent migration scopes against the same cluster. + /// public string LockName { get; set; } = DefaultLockName; + /// Cluster-health threshold the runner waits for at startup and after schema-mutating statements. public ClusterHealthThreshold ClusterHealthThreshold { get; set; } = ClusterHealthThreshold.Yellow; + + /// When the runner blocks on cluster wait-conditions; see . public WaitMode WaitMode { get; set; } = WaitMode.PerStatement; + + /// + /// When true, UNSAFE-modified statements must include a non-empty justification + /// string. Recommended for production; off by default to keep test scripts terse. + /// public bool RequireUnsafeJustification { get; set; } = false; + + /// How the runner reacts when a migration references $context but is unset. public ContextResolutionPolicy ContextResolutionPolicy { get; set; } = ContextResolutionPolicy.SkipIfUnset; + /// + /// Active context label substituted for $context in migrations. Typically set + /// from environment (dev/staging/prod) so the same migration set targets the right cluster shape. + /// public string ActiveContext { get; set; } + + /// + /// When true, the bootstrapper verifies the ledger and lock indices exist with the + /// required mapping but does not create them. Use in tightly-scoped IAM contexts + /// (e.g., AWS Managed) where the deploy role lacks indices:admin/create. + /// public bool AssumeIndicesExist { get; set; } = false; - // R-19: when a previous Down attempt halted partway through the rollback - // sequence, the ledger entry is `partially_rolled_back`. Subsequent runs - // are refused (loudly, with remediation) until the operator inspects the - // cluster, reconciles state, and opts in to a retry by setting this - // flag. The runner project (R-26) is expected to surface this as a - // `--force-resume` CLI flag once it lands. - // - // ForceResume = true bypasses the partially_rolled_back lockout; the - // runner proceeds as if the record were in a normal state. Use only - // after manual reconciliation — silently retrying a partially-failed - // rollback can leave the cluster in an indeterminate state. + /// + /// Bypass the partial-rollback lockout (R-19). When a previous Down attempt halted + /// partway through, subsequent runs are refused until the operator reconciles state + /// and opts in by setting this flag. Use only after manual reconciliation — + /// silently retrying a partially-failed rollback can leave the cluster in an + /// indeterminate state. + /// public bool ForceResume { get; set; } = false; + /// Timeout for implicit cluster waits emitted by migration statements (e.g., wait-for-status, wait-for-task). public TimeSpan ImplicitWaitTimeout { get; set; } = TimeSpan.FromSeconds( 30 ); - // Heartbeat renewal interval. Must be shorter than LockStaleAfter so a healthy - // runner refreshes the lock before takeover candidates would consider it stale. + /// + /// Heartbeat renewal interval. Must be shorter than + /// so a healthy runner refreshes the lock before takeover candidates would + /// consider it stale. + /// public TimeSpan LockRenewInterval { get; set; } = TimeSpan.FromSeconds( 30 ); - // After this duration without renewal, the lock is considered stale and another - // runner may take it over. Validation enforces LockStaleAfter >= 2 * LockRenewInterval - // and LockStaleAfter < LockMaxLifetime. + /// + /// After this duration without renewal, the lock is considered stale and another + /// runner may take it over. Validation enforces + /// LockStaleAfter >= 2 * LockRenewInterval and + /// LockStaleAfter < LockMaxLifetime. + /// public TimeSpan LockStaleAfter { get; set; } = TimeSpan.FromSeconds( 60 ); - // Hard ceiling on total lock lifetime. When reached, in-flight migration is - // cancelled (CancellationToken signaled) and surfaces MigrationLockExpiredException. + /// + /// Hard ceiling on total lock lifetime. When reached, the in-flight migration is + /// cancelled (its is signaled) and + /// the runner surfaces MigrationLockExpiredException. + /// public TimeSpan LockMaxLifetime { get; set; } = TimeSpan.FromHours( 1 ); + /// Initializes a new instance with no migration activator. public OpenSearchMigrationOptions() : this( null ) { } + /// Initializes a new instance with the supplied . public OpenSearchMigrationOptions( IMigrationActivator migrationActivator ) : base( migrationActivator ) {