Skip to content
Open
19 changes: 14 additions & 5 deletions Documentation/ADR/0004-BackgroundIndexService.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# ADR-0004: Background IndexService via Durable Outbox Queue

## Status
Draft
Accepted

## Context
`IndexService.ProcessAsync()` is currently called synchronously inside the HTTP request
Expand Down Expand Up @@ -71,8 +71,16 @@ outbox queue backed by a MongoDB `indexqueue` collection and a background worker
- A new `IIndexQueue` interface provides `EnqueueAsync`, `ClaimNextAsync`,
`AcknowledgeAsync`, and `NackAsync` operations.
- `MongoIndexQueue` implements `IIndexQueue` using the `indexqueue` collection.
- `EnqueueAsync` is called from `ResourceStorageService` after a successful resource store
(and, once ADR 0001 is implemented, inside the same MongoDB transaction).
- `EnqueueAsync` is called from a new `IndexQueueEnqueueListener : IServiceListener`,
which participates in the existing `ServiceListener` chain. This preserves the
`IServiceListener` pattern and allows operators to switch between synchronous indexing
(`SearchService`) and background indexing (`IndexQueueEnqueueListener`) by swapping a
single DI registration, without modifying any storage code.
- The existing `ServiceListener.InformAsync()` call and the `IServiceListener` /
`SearchService` wiring are fully preserved. `IndexQueueEnqueueListener` is a drop-in
replacement for `SearchService` in the listener registration. The DI swap to activate
background indexing will be opt-in (controlled by a configuration flag) so operators can
choose synchronous vs. background indexing per deployment.
- `IndexWorker : BackgroundService` runs on every Spark node and polls `ClaimNextAsync()`
in a loop. `ClaimNextAsync()` is a single atomic `FindOneAndUpdate` that transitions an
entry from `pending` to `processing` and stamps it with the claiming node's `WorkerId`.
Expand All @@ -81,8 +89,9 @@ outbox queue backed by a MongoDB `indexqueue` collection and a background worker
reclaimed by the next available worker (crash recovery).
- Failed entries are retried up to a configurable maximum; after that they transition to
`status=failed` and are logged for operator attention.
- `ServiceListener.InformAsync()` is removed from the synchronous write path for
indexing. The `IServiceListener` / `SearchService` wiring is preserved for future use.
- Once ADR 0001 is implemented, both `IndexQueueEnqueueListener` and `SearchService` will
participate in the same MongoDB transaction as `resources` and `counters`, completing
the transactional outbox pattern regardless of which indexing mode is active.

## Consequences

Expand Down
93 changes: 93 additions & 0 deletions Documentation/BackgroundIndexing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Background Indexing

> [!IMPORTANT]
> **Experimental feature — use with caution.**
> Background indexing is not yet recommended for production use. It introduces eventual
> consistency between writes and search results, which may cause unexpected behaviour in
> deployments that rely on immediate read-your-writes consistency on the search index.

## Overview

By default Spark processes search index updates synchronously in the HTTP request path:
every write waits for indexing to complete before returning a response. Background indexing
decouples this work by enqueuing index updates in a durable MongoDB `indexqueue` collection
and processing them in a `BackgroundService` worker (`IndexWorker`) that runs on each Spark
node.

The main benefit is reduced write latency. The trade-off is that search results are
eventually consistent — a resource may not appear in search results immediately after a
successful write.

## Configuration

Background indexing is controlled by the `IndexingMode` property of the `Experimental`
settings group in `SparkSettings`:

```json
{
"SparkSettings": {
"Experimental": {
"IndexingMode": "Background"
}
}
}
```

Or via environment variable (useful for Docker / Kubernetes deployments):

```
SparkSettings__Experimental__IndexingMode=Background
```

The default value is `Synchronous`. Set it to `Background` to enable background indexing.

The index queue behaviour can be tuned via `StoreSettings.IndexQueue`:

```json
{
"StoreSettings": {
"IndexQueue": {
"PollInterval": "00:00:00.020",
"LeaseTimeout": "00:05:00",
"MaxAttempts": 5
}
}
}
```

| Property | Default | Description |
|----------|---------|-------------|
| `PollInterval` | 20 ms | How often `IndexWorker` polls for new queue entries. |
| `LeaseTimeout` | 5 min | How long a claimed entry may be held before it is reclaimed by another worker (crash recovery). |
| `MaxAttempts` | 5 | Maximum number of processing attempts before an entry is marked `failed`. |

## Eventual Consistency Trade-offs

Operators must evaluate the following before enabling background indexing:

- **Conditional writes** — `conditional create`, `conditional update`, and `conditional delete`
resolve their match criteria via the search index. If a conflicting resource was written
within the last index-poll cycle, the condition may resolve against a stale index.
- **Search immediately after write** — clients that write a resource and immediately search
for it may transiently receive zero or incomplete results.
- **Automated test suites** — any test that asserts search result counts or presence
immediately after a write must either add retry/polling logic or be run against a
`Synchronous`-mode instance.

## Multi-node Deployments

Each Spark node runs its own `IndexWorker`. Claims are atomic (`FindOneAndUpdate`) so only
one node processes any given queue entry. If a node crashes mid-processing, the entry is
automatically reclaimed after `LeaseTimeout` by another node.

## Monitoring

- Entries that exhaust `MaxAttempts` are marked `status=failed` in the `indexqueue`
collection and logged by `IndexWorker`. Operator intervention is required to re-process
or discard them.
- Monitor the depth of the `indexqueue` collection. A growing queue indicates that the
worker cannot keep up with the write rate, or that the worker is stopped.

## See Also

- [ADR 0004 — Background IndexService via Durable Outbox Queue](ADR/0004-BackgroundIndexService.md)
16 changes: 15 additions & 1 deletion Documentation/MigrateFromv2Tov3.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ We now target `net8.0`, `net9.0`, and `net10.0`. `netstandard2.0` and `net472` t

### New classes and interfaces
- `FhirResponse<T>` wraps a `FhirResponse`, with the generic parameter representing the FHIR resource type.
- `IIndexQueue` (`Spark.Engine.Store.Interfaces`) — interface for durable index queue operations: `EnqueueAsync`, `ClaimNextAsync`, `AcknowledgeAsync`, `NackAsync`.
- `IndexQueueEntry` (`Spark.Engine.Core`) — model returned by `IIndexQueue.ClaimNextAsync`; carries the `Entry`, attempt count, and last error.
- `IndexQueueSettings` (`Spark.Engine.Store`) — configuration for index queue behavior: `LeaseTimeout`, `MaxAttempts`, `PollInterval`. Exposed as `StoreSettings.IndexQueue`.
- `MongoIndexQueue` (`Spark.Mongo.Store`) — MongoDB implementation of `IIndexQueue` backed by the `indexqueue` collection.
- `IndexWorker` (`Spark.Engine.Service`) — `BackgroundService` that polls `IIndexQueue` and drains pending entries via `IIndexService`. Registered automatically when `SparkSettings.Experimental.IndexingMode = Background`.
- `IndexServiceListener` (`Spark.Engine.Service`) — `IServiceListener` that processes search index updates synchronously in the HTTP request path. Registered by default (`IndexingMode = Synchronous`).
- `IndexQueueEnqueueListener` (`Spark.Engine.Service`) — `IServiceListener` that enqueues write events onto `IIndexQueue` for asynchronous background processing. Registered when `IndexingMode = Background`.
- `ExperimentalSettings` (`Spark.Engine`) — groups experimental settings under `SparkSettings.Experimental`. Currently exposes `IndexingMode`.
- `IndexingMode` (`Spark.Engine`) — enum that controls the indexing strategy: `Synchronous` (default) or `Background`. Accessed via `SparkSettings.Experimental.IndexingMode`.

### IFhirService, FhirServiceBase and FhirService changes
- New generic methods:
Expand All @@ -18,6 +27,10 @@ We now target `net8.0`, `net9.0`, and `net10.0`. `netstandard2.0` and `net472` t

### Method and property signature changes
- `Validate.HasResourceType(IKey, ResourceType)` has been changed to `Validate.HasResourceType(IKey, string)`
- `SearchService` no longer implements `IServiceListener`; it now only implements `ISearchService`. Code that registered or resolved `SearchService` as `IServiceListener` must be updated.
- `SearchService` constructor no longer accepts `IIndexService`; the signature changed from `SearchService(ILocalhost, IFhirModel, IFhirIndex, IIndexService)` to `SearchService(ILocalhost, IFhirModel, IFhirIndex)`.
- `SparkSettings` has a new property `ExperimentalSettings Experimental { get; set; }` (default `new ExperimentalSettings()`).
- `StoreSettings` has a new property `IndexQueueSettings IndexQueue { get; set; }` (defaults to `new IndexQueueSettings()`).
- `List<Hl7.Fhir.Model.SearchParameter> IFhirModel.SearchParameters` has been changed to `IReadOnlyListList<Spark.Engine.Model.SearchParameter> IFhirModel.SearchParameters`
- `IEnumerable<Hl7.Fhir.Model.SearchParameter> IFhirModel.FindSearchParameters(Type)` has been changed to `List<Spark.Engine.Model.SearchParameter> IFhirModel.FindSearchParameters(Type)`
- `IEnumerable<Hl7.Fhir.Model.SearchParameter> IFhirModel.FindSearchParameters(string)` has been changed to `List<Spark.Engine.Model.SearchParameter> IFhirModel.FindSearchParameters(string)`
Expand Down Expand Up @@ -82,8 +95,9 @@ We now target `net8.0`, `net9.0`, and `net10.0`. `netstandard2.0` and `net472` t

### Changes to extension methods
- `AddFhirFacade(this IServiceCollection, Action<SparkOptions>)` now returns `IMvcBuilder` instead of `IMvcCoreBuilder`.
- `AddFhir(this IServiceCollection, SparkSettings, Action<MvcOptions>)` now returns `IMvcBuilder` instead of `IMvcCoreBuilder`.
- `AddFhir(this IServiceCollection, SparkSettings, Action<MvcOptions>)` now returns `IMvcBuilder` instead of `IMvcCoreBuilder`. It also conditionally registers `IndexServiceListener` (default, `IndexingMode.Synchronous`) or `IndexQueueEnqueueListener` + `IndexWorker` (opt-in, `IndexingMode.Background`) based on `SparkSettings.Experimental.IndexingMode`.
- `AddFhirFormatters(this IServiceCollection, SparkSettings, Action<MvcOptions>` now returns `IMvcBuilder` instead of `IMvcCoreBuilder`.
- `AddMongoFhirStore(this IServiceCollection, StoreSettings)` now always registers `IndexQueueSettings` (sourced from `StoreSettings.IndexQueue`) and `IIndexQueue → MongoIndexQueue`.

### Namespace changes
- `Spark.Search.ChoiceValue` moved to `Spark.Engine.Search.ChoiceValue`
Expand Down
1 change: 1 addition & 0 deletions Spark.sln
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Documentation", "Documentat
Documentation\UsingSpark.md = Documentation\UsingSpark.md
Documentation\MigrateFromv1Tov2.md = Documentation\MigrateFromv1Tov2.md
Documentation\MigrateFromv2Tov3.md = Documentation\MigrateFromv2Tov3.md
Documentation\BackgroundIndexing.md = Documentation\BackgroundIndexing.md
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "scripts", "scripts", "{977F2FE5-C232-4F93-AA0A-73DB85A20879}"
Expand Down
33 changes: 33 additions & 0 deletions src/Spark.Engine.Test/Service/IndexQueueEnqueueListenerTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2026, Incendi <info@incendi.no>
*
* SPDX-License-Identifier: BSD-3-Clause
*/

using System;
using System.Threading;
using System.Threading.Tasks;
using Hl7.Fhir.Model;
using Moq;
using Spark.Engine.Core;
using Spark.Engine.Service;
using Spark.Engine.Store.Interfaces;
using Xunit;
using Task = System.Threading.Tasks.Task;

namespace Spark.Engine.Test.Service;

public class IndexQueueEnqueueListenerTests
{
[Fact]
public async Task InformAsync_ForwardsEntryToIndexQueue()
{
var mockQueue = new Mock<IIndexQueue>();
var entry = Entry.POST(new Key("http://localhost/", "Patient", "p1", null), new Patient { Id = "p1" });
var listener = new IndexQueueEnqueueListener(mockQueue.Object);

await listener.InformAsync(new Uri("http://localhost/Patient/p1"), entry);

mockQueue.Verify(q => q.EnqueueAsync(entry, It.IsAny<CancellationToken>()), Times.Once);
}
}
134 changes: 134 additions & 0 deletions src/Spark.Engine.Test/Service/IndexWorkerTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright (c) 2026, Incendi <info@incendi.no>
*
* SPDX-License-Identifier: BSD-3-Clause
*/

using System;
using System.Threading;
using System.Threading.Tasks;
using Hl7.Fhir.Model;
using Microsoft.Extensions.Logging.Abstractions;
using Moq;
using Spark.Engine.Core;
using Spark.Engine.Service;
using Spark.Engine.Store;
using Spark.Engine.Store.Interfaces;
using Xunit;
using Task = System.Threading.Tasks.Task;

namespace Spark.Engine.Test.Service;

public class IndexWorkerTests
{
private readonly Mock<IIndexQueue> _indexQueueMock = new();
private readonly Mock<IIndexService> _indexServiceMock = new();
private readonly IndexQueueSettings _settings = new() { PollInterval = TimeSpan.FromMilliseconds(1) };

private IndexWorker CreateWorker() =>
new(_indexQueueMock.Object, _indexServiceMock.Object,
NullLogger<IndexWorker>.Instance, _settings);

/// <summary>
/// Configures ClaimNextAsync to return <paramref name="first"/> on the first call,
/// then block indefinitely (until canceled) on subsequent calls.
/// </summary>
private void SetupClaimSequence(IndexQueueEntry first)
{
int calls = 0;
_indexQueueMock.Setup(indexQueue => indexQueue.ClaimNextAsync(It.IsAny<CancellationToken>()))
.Returns<CancellationToken>(async ct =>
{
if (Interlocked.Increment(ref calls) == 1)
return first;
await Task.Delay(Timeout.Infinite, ct).ConfigureAwait(false);
return null;
});
}

[Fact]
public async Task ExecuteAsync_WhenEntryAvailable_ProcessesAndAcknowledges()
{
var entry = new IndexQueueEntry { Id = "e1", Entry = Entry.POST(new Key("http://localhost/", "Patient", "p1", null), new Patient()) };
SetupClaimSequence(entry);

// Signal when AcknowledgeAsync is called so the test doesn't rely on timing
var ackSignal = new TaskCompletionSource();
_indexQueueMock.Setup(indexQueue => indexQueue.AcknowledgeAsync(It.IsAny<string>(), It.IsAny<CancellationToken>()))
.Callback(() => ackSignal.TrySetResult());

var worker = CreateWorker();
await worker.StartAsync(CancellationToken.None);
await ackSignal.Task.WaitAsync(TimeSpan.FromSeconds(5));
await worker.StopAsync(CancellationToken.None);

_indexServiceMock.Verify(indexService => indexService.ProcessAsync(entry.Entry), Times.Once);
_indexQueueMock.Verify(indexQueue => indexQueue.AcknowledgeAsync("e1", It.IsAny<CancellationToken>()), Times.Once);
_indexQueueMock.Verify(indexQueue => indexQueue.NackAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<CancellationToken>()), Times.Never);
}

[Fact]
public async Task ExecuteAsync_WhenProcessingFails_CallsNackWithEntryIdAndError()
{
var entry = new IndexQueueEntry { Id = "e2", Entry = Entry.POST(new Key("http://localhost/", "Patient", "p2", null), new Patient()) };
SetupClaimSequence(entry);

var processingError = new InvalidOperationException("index failure");
_indexServiceMock.Setup(indexService => indexService.ProcessAsync(It.IsAny<Entry>())).ThrowsAsync(processingError);

var nackSignal = new TaskCompletionSource();
_indexQueueMock.Setup(indexQueue => indexQueue.NackAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<CancellationToken>()))
.Callback(() => nackSignal.TrySetResult());

var worker = CreateWorker();
await worker.StartAsync(CancellationToken.None);
await nackSignal.Task.WaitAsync(TimeSpan.FromSeconds(5));
await worker.StopAsync(CancellationToken.None);

_indexQueueMock.Verify(indexQueue => indexQueue.NackAsync("e2", "index failure", It.IsAny<CancellationToken>()), Times.Once);
_indexQueueMock.Verify(indexQueue => indexQueue.AcknowledgeAsync(It.IsAny<string>(), It.IsAny<CancellationToken>()), Times.Never);
}

[Fact]
public async Task ExecuteAsync_WhenQueueIsEmpty_DoesNotCallIndexService()
{
int calls = 0;
var emptyQueueSignal = new TaskCompletionSource();
_indexQueueMock.Setup(indexQueue => indexQueue.ClaimNextAsync(It.IsAny<CancellationToken>()))
.Returns<CancellationToken>(async ct =>
{
if (Interlocked.Increment(ref calls) == 1)
{
emptyQueueSignal.TrySetResult();
return null;
}
await Task.Delay(Timeout.Infinite, ct).ConfigureAwait(false);
return null;
});

var worker = CreateWorker();
await worker.StartAsync(CancellationToken.None);
await emptyQueueSignal.Task.WaitAsync(TimeSpan.FromSeconds(5));
await worker.StopAsync(CancellationToken.None);

_indexQueueMock.Verify(indexQueue => indexQueue.ClaimNextAsync(It.IsAny<CancellationToken>()), Times.AtLeastOnce);
_indexServiceMock.Verify(indexService => indexService.ProcessAsync(It.IsAny<Entry>()), Times.Never);
_indexQueueMock.Verify(indexQueue => indexQueue.AcknowledgeAsync(It.IsAny<string>(), It.IsAny<CancellationToken>()), Times.Never);
}

[Fact]
public async Task ExecuteAsync_OnCancellation_ExitsWithoutThrowing()
{
_indexQueueMock.Setup(q => q.ClaimNextAsync(It.IsAny<CancellationToken>()))
.Returns<CancellationToken>(async ct =>
{
await Task.Delay(Timeout.Infinite, ct).ConfigureAwait(false);
return null;
});

var worker = CreateWorker();
await worker.StartAsync(CancellationToken.None);
var exception = await Record.ExceptionAsync(() => worker.StopAsync(CancellationToken.None));
Assert.Null(exception);
}
}
18 changes: 18 additions & 0 deletions src/Spark.Engine/Core/IndexQueueEntry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (c) 2026, Incendi <info@incendi.no>
*
* SPDX-License-Identifier: BSD-3-Clause
*/

using System;

namespace Spark.Engine.Core;

public class IndexQueueEntry
{
public string Id { get; set; }
public Entry Entry { get; set; }
public int Attempts { get; set; }
public string LastError { get; set; }
public DateTime EnqueuedAt { get; set; }
}
10 changes: 9 additions & 1 deletion src/Spark.Engine/Extensions/IServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,15 @@ public static IMvcBuilder AddFhir(this IServiceCollection services, SparkSetting
services.TryAddTransient<ISearchService, SearchService>();
services.TryAddTransient<ISnapshotPaginationProvider, SnapshotPaginationProvider>();
services.TryAddTransient<ISnapshotPaginationCalculator, SnapshotPaginationCalculator>();
services.TryAddTransient<IServiceListener, SearchService>(); // searchListener
if (settings.Experimental.IndexingMode == IndexingMode.Background)
{
services.TryAddTransient<IServiceListener, IndexQueueEnqueueListener>();
services.AddHostedService<IndexWorker>();
}
else
{
services.TryAddTransient<IServiceListener, IndexServiceListener>();
}
services.TryAddTransient(provider => new IServiceListener[] { provider.GetRequiredService<IServiceListener>() });
services.TryAddTransient<SearchService>(); // search
services.TryAddTransient<ITransactionService, TransactionService>(); // transaction
Expand Down
Loading