Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/site/opensearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,8 @@ Uploads the policy to `_plugins/_ism/policies` (or `_opendistro/_ism/policies` o
}
```

`CREATE POLICY` is **idempotent**. ISM versions policies internally, so a plain `PUT` to an already-existing policy returns HTTP 409 `version_conflict_engine_exception`. The dispatcher transparently handles this: on 409 it reads the current `_seq_no` and `_primary_term` from the existing policy and retries the `PUT` with `if_seq_no` / `if_primary_term` query parameters. The result is upsert semantics -- no behavior change when the policy doesn't exist; safe re-execution when it does. This makes `CREATE POLICY` usable inside `[Migration(N, journal: false)]` reconciliation migrations that re-run on every startup. A second 409 on the retry indicates a concurrent writer between the GET and the retry PUT and is surfaced as a hard failure (the migration lock should make this rare).

### APPLY POLICY (ISM)

```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -803,16 +803,91 @@ private async Task<StatementResult> DispatchCreatePolicyAsync( CreatePolicyAst a
}

var body = context.ResolvedBody.ToJsonString();
var policyPath = $"{IsmPathPrefix}/policies/{ast.PolicyId}";

// PUT /_plugins/_ism/policies/<id> — Index State Management policy.
//
// ISM policies are versioned in OpenSearch: a plain PUT to an
// existing policy fails with HTTP 409 `version_conflict_engine_exception`.
// Authors using the [Migration(N, journal: false)] reconciliation
// pattern (see provider README's "Three temporal scopes for ISM
// attachment") need CREATE POLICY to be idempotent so the policy
// body can be upserted from source-of-truth on every startup.
//
// On 409: GET the existing policy, read `_seq_no` + `_primary_term`
// from the response root (ISM surfaces these at the document root,
// not under _source — unusual but documented), retry PUT with the
// CAS query parameters. Mirrors LockHandle.RenewLockAsync's
// optimistic-concurrency pattern.

var response = await ll.DoRequestAsync<StringResponse>(
var firstResponse = await ll.DoRequestAsync<StringResponse>(
global::OpenSearch.Net.HttpMethod.PUT,
policyPath,
context.CancellationToken,
data: PostData.String( body ) ).ConfigureAwait( false );

if ( firstResponse.HttpStatusCode != 409 )
return BuildResult( verb, firstResponse, $"policy `{ast.PolicyId}` created/updated" );

// 409 — read the current version to retry with CAS.
var getResponse = await ll.DoRequestAsync<StringResponse>(
global::OpenSearch.Net.HttpMethod.GET,
policyPath,
context.CancellationToken ).ConfigureAwait( false );

if ( !getResponse.Success || getResponse.Body is null )
{
return new StatementResult( StatementOutcome.Failed, verb,
Detail: $"policy `{ast.PolicyId}` returned 409 on PUT but the follow-up GET to read _seq_no/_primary_term for CAS retry failed: HTTP {getResponse.HttpStatusCode}",
OpenSearchResponseStatus: getResponse.HttpStatusCode,
Exception: getResponse.OriginalException );
}

long seqNo = 0;
long primaryTerm = 0;
try
{
using var doc = JsonDocument.Parse( getResponse.Body );
if ( !doc.RootElement.TryGetProperty( "_seq_no", out var seqEl )
|| !doc.RootElement.TryGetProperty( "_primary_term", out var termEl )
|| !seqEl.TryGetInt64( out seqNo )
|| !termEl.TryGetInt64( out primaryTerm ) )
{
return new StatementResult( StatementOutcome.Failed, verb,
Detail: $"policy `{ast.PolicyId}` 409 conflict; CAS retry could not extract _seq_no/_primary_term from GET response (response did not contain the expected fields).",
OpenSearchResponseStatus: getResponse.HttpStatusCode );
}
}
catch ( JsonException ex )
{
return new StatementResult( StatementOutcome.Failed, verb,
Detail: $"policy `{ast.PolicyId}` 409 conflict; CAS retry could not parse GET response body as JSON: {ex.Message}",
OpenSearchResponseStatus: getResponse.HttpStatusCode,
Exception: ex );
}

// Retry the PUT with CAS query params. Inline rather than via a typed
// IRequestParameters because there's no ISM-specific request-parameters
// type in OpenSearch.Net (the endpoint is plugin-served).
var retryPath = $"{policyPath}?if_seq_no={seqNo}&if_primary_term={primaryTerm}";
var retryResponse = await ll.DoRequestAsync<StringResponse>(
global::OpenSearch.Net.HttpMethod.PUT,
$"{IsmPathPrefix}/policies/{ast.PolicyId}",
retryPath,
context.CancellationToken,
data: PostData.String( body ) ).ConfigureAwait( false );

return BuildResult( verb, response, $"policy `{ast.PolicyId}` created/updated" );
if ( retryResponse.HttpStatusCode == 409 )
{
// Second 409 means another writer beat us between the GET and
// the retry PUT. The lock should make this rare; treat as hard
// failure rather than recursing.
return new StatementResult( StatementOutcome.Failed, verb,
Detail: $"policy `{ast.PolicyId}` CAS retry hit a second 409 — concurrent writer between GET (_seq_no={seqNo}, _primary_term={primaryTerm}) and retry PUT.",
OpenSearchResponseStatus: retryResponse.HttpStatusCode,
Exception: retryResponse.OriginalException ?? new InvalidOperationException( $"concurrent writer on policy {ast.PolicyId}" ) );
}

return BuildResult( verb, retryResponse, $"policy `{ast.PolicyId}` updated via CAS retry (seq={seqNo}, term={primaryTerm})" );
}

// --- APPLY POLICY <id> TO <pattern> ---
Expand Down
2 changes: 2 additions & 0 deletions src/Hyperbee.Migrations.Providers.OpenSearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ APPLY POLICY <id> TO <pattern>

`CREATE POLICY` uploads the policy to `_plugins/_ism/policies`. `APPLY POLICY` attaches it to existing indices matching the pattern via `_plugins/_ism/add` — the dispatcher inspects the response body and surfaces logical failures explicitly: HTTP 200 with `updated_indices: 0` is mapped to `Failed`, not silent OK. For future-only attachment, declare `ism_template.index_patterns` in the policy body (handled at index-creation time by the cluster).

`CREATE POLICY` is **idempotent**. ISM versions policies internally, so a plain `PUT` to an already-existing policy returns HTTP 409 `version_conflict_engine_exception`. The dispatcher transparently handles this: on 409 it reads the current `_seq_no` and `_primary_term` from the existing policy and retries the `PUT` with `if_seq_no` / `if_primary_term` query parameters. The result is upsert semantics — no behavior change when the policy doesn't exist; safe re-execution when it does. This makes `CREATE POLICY` usable inside `[Migration(N, journal: false)]` reconciliation migrations that re-run on every startup. A second 409 on the retry indicates a concurrent writer between the GET and the retry PUT and is surfaced as a hard failure (the migration lock should make this rare).

### Cluster waits

```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,60 @@ public async Task ApplyPolicy_ReportsAtLeastOneIndexUpdated()
$"expected updated_indices >= 1; got: {result.Detail}" );
}

[TestMethod]
[TestCategory( "OpenSearch" )]
[TestCategory( "Phase2" )]
public async Task CreatePolicy_TwiceWithMutatedBody_SecondCallSucceedsViaCasRetry()
{
// CREATE POLICY is idempotent — re-running with a mutated body
// upserts via a CAS retry on _seq_no/_primary_term. Required for
// the [Migration(N, journal: false)] reconciliation pattern (see
// provider README "Three temporal scopes for ISM attachment");
// a non-idempotent CREATE POLICY would fail with HTTP 409 on
// every re-run after the first.
var firstBody = MinimalIsmPolicyBody();
var firstResult = await DispatchAsync(
$"CREATE POLICY {_policyId} WITH BODY $body", firstBody );
Assert.IsTrue( firstResult.IsSuccess, $"first create failed: {firstResult.Detail}" );

// Mutate the description so we can verify the retry actually
// overwrote the policy rather than no-op'd. A semantically
// meaningful change (different default_state, additional state)
// would also work; description is enough for the post-condition
// and keeps the policy minimal.
var mutatedBody = JsonNode.Parse( """
{
"policy": {
"description": "test policy (mutated)",
"default_state": "hot",
"states": [
{
"name": "hot",
"actions": [],
"transitions": []
}
]
}
}
""" );
var secondResult = await DispatchAsync(
$"CREATE POLICY {_policyId} WITH BODY $body", mutatedBody );

Assert.IsTrue( secondResult.IsSuccess,
$"second create (CAS retry path) failed: {secondResult.Detail}" );
StringAssert.Contains( secondResult.Detail!, "CAS retry",
$"expected detail to indicate CAS retry path; got: {secondResult.Detail}" );

// Post-condition: GET shows the mutated description, confirming
// the retry actually overwrote rather than silently succeeded.
var ll = OpenSearchTestContainer.LowLevelClient;
var get = await ll.DoRequestAsync<StringResponse>(
OpenSearch.Net.HttpMethod.GET, $"_plugins/_ism/policies/{_policyId}", default );
Assert.AreEqual( 200, get.HttpStatusCode );
StringAssert.Contains( get.Body!, "test policy (mutated)",
$"expected mutated description in policy body after CAS retry; got: {get.Body}" );
}

[TestMethod]
[TestCategory( "OpenSearch" )]
[TestCategory( "Phase2" )]
Expand Down
Loading