diff --git a/docs/site/opensearch.md b/docs/site/opensearch.md index 4bc2a8c..cc62813 100644 --- a/docs/site/opensearch.md +++ b/docs/site/opensearch.md @@ -503,6 +503,8 @@ Uploads the policy to `_plugins/_ism/policies` (or `_opendistro/_ism/policies` o } ``` +`CREATE POLICY` is **idempotent**. ISM versions policies internally, so a plain `PUT` to an already-existing policy returns HTTP 409 `version_conflict_engine_exception`. The dispatcher transparently handles this: on 409 it reads the current `_seq_no` and `_primary_term` from the existing policy and retries the `PUT` with `if_seq_no` / `if_primary_term` query parameters. The result is upsert semantics -- no behavior change when the policy doesn't exist; safe re-execution when it does. This makes `CREATE POLICY` usable inside `[Migration(N, journal: false)]` reconciliation migrations that re-run on every startup. A second 409 on the retry indicates a concurrent writer between the GET and the retry PUT and is surfaced as a hard failure (the migration lock should make this rare). + ### APPLY POLICY (ISM) ``` diff --git a/src/Hyperbee.Migrations.Providers.OpenSearch/Internal/Dispatch/StatementDispatcher.cs b/src/Hyperbee.Migrations.Providers.OpenSearch/Internal/Dispatch/StatementDispatcher.cs index 0861cec..9030193 100644 --- a/src/Hyperbee.Migrations.Providers.OpenSearch/Internal/Dispatch/StatementDispatcher.cs +++ b/src/Hyperbee.Migrations.Providers.OpenSearch/Internal/Dispatch/StatementDispatcher.cs @@ -803,16 +803,91 @@ private async Task DispatchCreatePolicyAsync( CreatePolicyAst a } var body = context.ResolvedBody.ToJsonString(); + var policyPath = $"{IsmPathPrefix}/policies/{ast.PolicyId}"; // PUT /_plugins/_ism/policies/ — Index State Management policy. + // + // ISM policies are versioned in OpenSearch: a plain PUT to an + // existing policy fails with HTTP 409 `version_conflict_engine_exception`. + // Authors using the [Migration(N, journal: false)] reconciliation + // pattern (see provider README's "Three temporal scopes for ISM + // attachment") need CREATE POLICY to be idempotent so the policy + // body can be upserted from source-of-truth on every startup. + // + // On 409: GET the existing policy, read `_seq_no` + `_primary_term` + // from the response root (ISM surfaces these at the document root, + // not under _source — unusual but documented), retry PUT with the + // CAS query parameters. Mirrors LockHandle.RenewLockAsync's + // optimistic-concurrency pattern. - var response = await ll.DoRequestAsync( + var firstResponse = await ll.DoRequestAsync( + global::OpenSearch.Net.HttpMethod.PUT, + policyPath, + context.CancellationToken, + data: PostData.String( body ) ).ConfigureAwait( false ); + + if ( firstResponse.HttpStatusCode != 409 ) + return BuildResult( verb, firstResponse, $"policy `{ast.PolicyId}` created/updated" ); + + // 409 — read the current version to retry with CAS. + var getResponse = await ll.DoRequestAsync( + global::OpenSearch.Net.HttpMethod.GET, + policyPath, + context.CancellationToken ).ConfigureAwait( false ); + + if ( !getResponse.Success || getResponse.Body is null ) + { + return new StatementResult( StatementOutcome.Failed, verb, + Detail: $"policy `{ast.PolicyId}` returned 409 on PUT but the follow-up GET to read _seq_no/_primary_term for CAS retry failed: HTTP {getResponse.HttpStatusCode}", + OpenSearchResponseStatus: getResponse.HttpStatusCode, + Exception: getResponse.OriginalException ); + } + + long seqNo = 0; + long primaryTerm = 0; + try + { + using var doc = JsonDocument.Parse( getResponse.Body ); + if ( !doc.RootElement.TryGetProperty( "_seq_no", out var seqEl ) + || !doc.RootElement.TryGetProperty( "_primary_term", out var termEl ) + || !seqEl.TryGetInt64( out seqNo ) + || !termEl.TryGetInt64( out primaryTerm ) ) + { + return new StatementResult( StatementOutcome.Failed, verb, + Detail: $"policy `{ast.PolicyId}` 409 conflict; CAS retry could not extract _seq_no/_primary_term from GET response (response did not contain the expected fields).", + OpenSearchResponseStatus: getResponse.HttpStatusCode ); + } + } + catch ( JsonException ex ) + { + return new StatementResult( StatementOutcome.Failed, verb, + Detail: $"policy `{ast.PolicyId}` 409 conflict; CAS retry could not parse GET response body as JSON: {ex.Message}", + OpenSearchResponseStatus: getResponse.HttpStatusCode, + Exception: ex ); + } + + // Retry the PUT with CAS query params. Inline rather than via a typed + // IRequestParameters because there's no ISM-specific request-parameters + // type in OpenSearch.Net (the endpoint is plugin-served). + var retryPath = $"{policyPath}?if_seq_no={seqNo}&if_primary_term={primaryTerm}"; + var retryResponse = await ll.DoRequestAsync( global::OpenSearch.Net.HttpMethod.PUT, - $"{IsmPathPrefix}/policies/{ast.PolicyId}", + retryPath, context.CancellationToken, data: PostData.String( body ) ).ConfigureAwait( false ); - return BuildResult( verb, response, $"policy `{ast.PolicyId}` created/updated" ); + if ( retryResponse.HttpStatusCode == 409 ) + { + // Second 409 means another writer beat us between the GET and + // the retry PUT. The lock should make this rare; treat as hard + // failure rather than recursing. + return new StatementResult( StatementOutcome.Failed, verb, + Detail: $"policy `{ast.PolicyId}` CAS retry hit a second 409 — concurrent writer between GET (_seq_no={seqNo}, _primary_term={primaryTerm}) and retry PUT.", + OpenSearchResponseStatus: retryResponse.HttpStatusCode, + Exception: retryResponse.OriginalException ?? new InvalidOperationException( $"concurrent writer on policy {ast.PolicyId}" ) ); + } + + return BuildResult( verb, retryResponse, $"policy `{ast.PolicyId}` updated via CAS retry (seq={seqNo}, term={primaryTerm})" ); } // --- APPLY POLICY TO --- diff --git a/src/Hyperbee.Migrations.Providers.OpenSearch/README.md b/src/Hyperbee.Migrations.Providers.OpenSearch/README.md index 930e88a..163f52c 100644 --- a/src/Hyperbee.Migrations.Providers.OpenSearch/README.md +++ b/src/Hyperbee.Migrations.Providers.OpenSearch/README.md @@ -290,6 +290,8 @@ APPLY POLICY TO `CREATE POLICY` uploads the policy to `_plugins/_ism/policies`. `APPLY POLICY` attaches it to existing indices matching the pattern via `_plugins/_ism/add` — the dispatcher inspects the response body and surfaces logical failures explicitly: HTTP 200 with `updated_indices: 0` is mapped to `Failed`, not silent OK. For future-only attachment, declare `ism_template.index_patterns` in the policy body (handled at index-creation time by the cluster). +`CREATE POLICY` is **idempotent**. ISM versions policies internally, so a plain `PUT` to an already-existing policy returns HTTP 409 `version_conflict_engine_exception`. The dispatcher transparently handles this: on 409 it reads the current `_seq_no` and `_primary_term` from the existing policy and retries the `PUT` with `if_seq_no` / `if_primary_term` query parameters. The result is upsert semantics — no behavior change when the policy doesn't exist; safe re-execution when it does. This makes `CREATE POLICY` usable inside `[Migration(N, journal: false)]` reconciliation migrations that re-run on every startup. A second 409 on the retry indicates a concurrent writer between the GET and the retry PUT and is surfaced as a hard failure (the migration lock should make this rare). + ### Cluster waits ``` diff --git a/tests/Hyperbee.Migrations.Integration.Tests/OpenSearchTemplatePolicyIntegrationTests.cs b/tests/Hyperbee.Migrations.Integration.Tests/OpenSearchTemplatePolicyIntegrationTests.cs index 9e92892..b11df16 100644 --- a/tests/Hyperbee.Migrations.Integration.Tests/OpenSearchTemplatePolicyIntegrationTests.cs +++ b/tests/Hyperbee.Migrations.Integration.Tests/OpenSearchTemplatePolicyIntegrationTests.cs @@ -256,6 +256,60 @@ public async Task ApplyPolicy_ReportsAtLeastOneIndexUpdated() $"expected updated_indices >= 1; got: {result.Detail}" ); } + [TestMethod] + [TestCategory( "OpenSearch" )] + [TestCategory( "Phase2" )] + public async Task CreatePolicy_TwiceWithMutatedBody_SecondCallSucceedsViaCasRetry() + { + // CREATE POLICY is idempotent — re-running with a mutated body + // upserts via a CAS retry on _seq_no/_primary_term. Required for + // the [Migration(N, journal: false)] reconciliation pattern (see + // provider README "Three temporal scopes for ISM attachment"); + // a non-idempotent CREATE POLICY would fail with HTTP 409 on + // every re-run after the first. + var firstBody = MinimalIsmPolicyBody(); + var firstResult = await DispatchAsync( + $"CREATE POLICY {_policyId} WITH BODY $body", firstBody ); + Assert.IsTrue( firstResult.IsSuccess, $"first create failed: {firstResult.Detail}" ); + + // Mutate the description so we can verify the retry actually + // overwrote the policy rather than no-op'd. A semantically + // meaningful change (different default_state, additional state) + // would also work; description is enough for the post-condition + // and keeps the policy minimal. + var mutatedBody = JsonNode.Parse( """ + { + "policy": { + "description": "test policy (mutated)", + "default_state": "hot", + "states": [ + { + "name": "hot", + "actions": [], + "transitions": [] + } + ] + } + } + """ ); + var secondResult = await DispatchAsync( + $"CREATE POLICY {_policyId} WITH BODY $body", mutatedBody ); + + Assert.IsTrue( secondResult.IsSuccess, + $"second create (CAS retry path) failed: {secondResult.Detail}" ); + StringAssert.Contains( secondResult.Detail!, "CAS retry", + $"expected detail to indicate CAS retry path; got: {secondResult.Detail}" ); + + // Post-condition: GET shows the mutated description, confirming + // the retry actually overwrote rather than silently succeeded. + var ll = OpenSearchTestContainer.LowLevelClient; + var get = await ll.DoRequestAsync( + OpenSearch.Net.HttpMethod.GET, $"_plugins/_ism/policies/{_policyId}", default ); + Assert.AreEqual( 200, get.HttpStatusCode ); + StringAssert.Contains( get.Body!, "test policy (mutated)", + $"expected mutated description in policy body after CAS retry; got: {get.Body}" ); + } + [TestMethod] [TestCategory( "OpenSearch" )] [TestCategory( "Phase2" )]