From 433f546f6fddeb8c33c5bab237c77be30b9d2335 Mon Sep 17 00:00:00 2001 From: Kenneth Myhra Date: Sat, 21 Feb 2026 18:17:56 +0100 Subject: [PATCH 01/10] ADR: Update Background IndexService via Durable Outbox Queue Key changes: 1. Update the Decision section to reflect that `EnqueueAsync` is called via `IndexQueueEnqueueListener : IServiceListener` (not directly from `ResourceStorageService`) 2. Update to say `ServiceListener.InformAsync()` is NOT removed - instead `IndexQueueEnqueueListener` is a drop-in replacement 4. Mention the DI swap will be opt-in --- .../ADR/0004-BackgroundIndexService.md | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/Documentation/ADR/0004-BackgroundIndexService.md b/Documentation/ADR/0004-BackgroundIndexService.md index e65ba9d13..8d6d4fe31 100644 --- a/Documentation/ADR/0004-BackgroundIndexService.md +++ b/Documentation/ADR/0004-BackgroundIndexService.md @@ -1,7 +1,7 @@ # ADR-0004: Background IndexService via Durable Outbox Queue ## Status -Draft +Accepted ## Context `IndexService.ProcessAsync()` is currently called synchronously inside the HTTP request @@ -71,8 +71,16 @@ outbox queue backed by a MongoDB `indexqueue` collection and a background worker - A new `IIndexQueue` interface provides `EnqueueAsync`, `ClaimNextAsync`, `AcknowledgeAsync`, and `NackAsync` operations. - `MongoIndexQueue` implements `IIndexQueue` using the `indexqueue` collection. -- `EnqueueAsync` is called from `ResourceStorageService` after a successful resource store - (and, once ADR 0001 is implemented, inside the same MongoDB transaction). +- `EnqueueAsync` is called from a new `IndexQueueEnqueueListener : IServiceListener`, + which participates in the existing `ServiceListener` chain. This preserves the + `IServiceListener` pattern and allows operators to switch between synchronous indexing + (`SearchService`) and background indexing (`IndexQueueEnqueueListener`) by swapping a + single DI registration, without modifying any storage code. +- The existing `ServiceListener.InformAsync()` call and the `IServiceListener` / + `SearchService` wiring are fully preserved. `IndexQueueEnqueueListener` is a drop-in + replacement for `SearchService` in the listener registration. The DI swap to activate + background indexing will be opt-in (controlled by a configuration flag) so operators can + choose synchronous vs. background indexing per deployment. - `IndexWorker : BackgroundService` runs on every Spark node and polls `ClaimNextAsync()` in a loop. `ClaimNextAsync()` is a single atomic `FindOneAndUpdate` that transitions an entry from `pending` to `processing` and stamps it with the claiming node's `WorkerId`. @@ -81,8 +89,9 @@ outbox queue backed by a MongoDB `indexqueue` collection and a background worker reclaimed by the next available worker (crash recovery). - Failed entries are retried up to a configurable maximum; after that they transition to `status=failed` and are logged for operator attention. -- `ServiceListener.InformAsync()` is removed from the synchronous write path for - indexing. The `IServiceListener` / `SearchService` wiring is preserved for future use. +- Once ADR 0001 is implemented, both `IndexQueueEnqueueListener` and `SearchService` will + participate in the same MongoDB transaction as `resources` and `counters`, completing + the transactional outbox pattern regardless of which indexing mode is active. ## Consequences From c452a4533db574692693c1742b3f01a3c6f201a9 Mon Sep 17 00:00:00 2001 From: Kenneth Myhra Date: Sat, 21 Feb 2026 08:08:08 +0100 Subject: [PATCH 02/10] Engine: Add IIndexQueue and IndexQueueEntry IIndexQueue is the interface that will be used by an Index Queue implementation. IndexQueueEntry is an entry in the index queue. --- src/Spark.Engine/Core/IndexQueueEntry.cs | 18 ++++++++++++++++++ .../Store/Interfaces/IIndexQueue.cs | 19 +++++++++++++++++++ 2 files changed, 37 insertions(+) create mode 100644 src/Spark.Engine/Core/IndexQueueEntry.cs create mode 100644 src/Spark.Engine/Store/Interfaces/IIndexQueue.cs diff --git a/src/Spark.Engine/Core/IndexQueueEntry.cs b/src/Spark.Engine/Core/IndexQueueEntry.cs new file mode 100644 index 000000000..d52bd1e22 --- /dev/null +++ b/src/Spark.Engine/Core/IndexQueueEntry.cs @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2026, Incendi + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +using System; + +namespace Spark.Engine.Core; + +public class IndexQueueEntry +{ + public string Id { get; set; } + public Entry Entry { get; set; } + public int Attempts { get; set; } + public string LastError { get; set; } + public DateTime EnqueuedAt { get; set; } +} diff --git a/src/Spark.Engine/Store/Interfaces/IIndexQueue.cs b/src/Spark.Engine/Store/Interfaces/IIndexQueue.cs new file mode 100644 index 000000000..e1015525f --- /dev/null +++ b/src/Spark.Engine/Store/Interfaces/IIndexQueue.cs @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2026, Incendi + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +using System.Threading; +using System.Threading.Tasks; +using Spark.Engine.Core; + +namespace Spark.Engine.Store.Interfaces; + +public interface IIndexQueue +{ + Task EnqueueAsync(Entry entry, CancellationToken cancellationToken = default); + Task ClaimNextAsync(CancellationToken cancellationToken = default); + Task AcknowledgeAsync(string id, CancellationToken cancellationToken = default); + Task NackAsync(string id, string error, CancellationToken cancellationToken = default); +} From c95e7683eecd6576c0917d30fbd13931b831783d Mon Sep 17 00:00:00 2001 From: Kenneth Myhra Date: Sat, 21 Feb 2026 08:09:57 +0100 Subject: [PATCH 03/10] Mongo: Add MongoIndexQueue implementation This defines the index queue fields and statuses together with our other database-related constants. This defines an IndexQueueSettings class which allows us to configure: - LeaseTimeout - How long a claimed entry remains in processing state until it can be reclaimed by another worker. - MaxAttempts - Number of attempts before an entry is moved to the failed state. - PollInterval - At which interval should our worker poll the index queue. And finally MongoIndexQueue which is our MongoDB implementation of IIndexQueue. --- src/Spark.Engine/Store/IndexQueueSettings.cs | 28 ++++ src/Spark.Mongo/Store/Constants.cs | 20 +++ src/Spark.Mongo/Store/MongoIndexQueue.cs | 164 +++++++++++++++++++ 3 files changed, 212 insertions(+) create mode 100644 src/Spark.Engine/Store/IndexQueueSettings.cs create mode 100644 src/Spark.Mongo/Store/MongoIndexQueue.cs diff --git a/src/Spark.Engine/Store/IndexQueueSettings.cs b/src/Spark.Engine/Store/IndexQueueSettings.cs new file mode 100644 index 000000000..9b35ea19f --- /dev/null +++ b/src/Spark.Engine/Store/IndexQueueSettings.cs @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2026, Incendi + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +using System; + +namespace Spark.Engine.Store; + +public class IndexQueueSettings +{ + /// + /// How long a claimed entry may remain in the processing state before being + /// considered stale and reclaimed by another worker. + /// + public TimeSpan LeaseTimeout { get; set; } = TimeSpan.FromMinutes(5); + + /// + /// Maximum number of processing attempts before an entry is moved to failed. + /// + public int MaxAttempts { get; set; } = 5; + + /// + /// How long the IndexWorker sleeps between polling cycles when the queue is empty. + /// + public TimeSpan PollInterval { get; set; } = TimeSpan.FromMilliseconds(20); +} diff --git a/src/Spark.Mongo/Store/Constants.cs b/src/Spark.Mongo/Store/Constants.cs index 684bf7087..e83cbb829 100644 --- a/src/Spark.Mongo/Store/Constants.cs +++ b/src/Spark.Mongo/Store/Constants.cs @@ -12,6 +12,26 @@ public static class Collection public const string RESOURCE = "resources"; public const string COUNTERS = "counters"; public const string SNAPSHOT = "snapshots"; + public const string INDEX_QUEUE = "indexqueue"; +} + +public static class IndexQueueField +{ + public const string STATUS = "status"; + public const string WORKER_ID = "workerId"; + public const string CLAIMED_AT = "claimedAt"; + public const string LEASE_EXPIRES_AT = "leaseExpiresAt"; + public const string ATTEMPTS = "attempts"; + public const string LAST_ERROR = "lastError"; + public const string ENQUEUED_AT = "enqueuedAt"; + public const string ENTRY = "entry"; +} + +public static class IndexQueueStatus +{ + public const string PENDING = "pending"; + public const string PROCESSING = "processing"; + public const string FAILED = "failed"; } public static class Field diff --git a/src/Spark.Mongo/Store/MongoIndexQueue.cs b/src/Spark.Mongo/Store/MongoIndexQueue.cs new file mode 100644 index 000000000..0fb433866 --- /dev/null +++ b/src/Spark.Mongo/Store/MongoIndexQueue.cs @@ -0,0 +1,164 @@ +/* + * Copyright (c) 2026, Incendi + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using MongoDB.Bson; +using MongoDB.Driver; +using Spark.Engine.Core; +using Spark.Engine.Store.Interfaces; +using Spark.Engine.Store; +using Spark.Store.Mongo; + +namespace Spark.Mongo.Store; + +public class MongoIndexQueue : IIndexQueue +{ + private readonly IMongoCollection _collection; + private readonly IndexQueueSettings _settings; + private readonly string _workerId; + + public MongoIndexQueue(string mongoUrl, IndexQueueSettings settings) + { + var database = MongoDatabaseFactory.GetMongoDatabase(mongoUrl); + _collection = database.GetCollection(Collection.INDEX_QUEUE); + _settings = settings; + _workerId = $"{Environment.MachineName}:{Environment.ProcessId}"; + EnsureIndexes(); + } + + public async Task EnqueueAsync(Entry entry, CancellationToken cancellationToken = default) + { + // FIXME: This is papering over a problem how ToBsonDocument() is asserting keys - AssertKeyIsValid(). + // ToBsonDocument() is made specifically for the resources collection, but we use it here since it works + // good enough for us. It is also tied in with how ToEntry() works in ClaimNextAsync(), ToEntry() was + // also made specifically for the 'resources' collection. + var newEntry = Entry.Create(entry.Method, + Key.Create(entry.Key.TypeName, entry.Key.ResourceId, entry.Key.VersionId), entry.Resource); + var document = new BsonDocument + { + [Field.PRIMARYKEY] = ObjectId.GenerateNewId().ToString(), + [IndexQueueField.ENTRY] = newEntry.ToBsonDocument(), + [IndexQueueField.STATUS] = IndexQueueStatus.PENDING, + [IndexQueueField.WORKER_ID] = BsonNull.Value, + [IndexQueueField.CLAIMED_AT] = BsonNull.Value, + [IndexQueueField.LEASE_EXPIRES_AT] = BsonNull.Value, + [IndexQueueField.ATTEMPTS] = 0, + [IndexQueueField.LAST_ERROR] = BsonNull.Value, + [IndexQueueField.ENQUEUED_AT] = DateTime.UtcNow, + }; + + await _collection.InsertOneAsync(document, cancellationToken: cancellationToken) + .ConfigureAwait(false); + } + + public async Task ClaimNextAsync(CancellationToken cancellationToken = default) + { + var now = DateTime.UtcNow; + + // Claim the oldest pending entry, or reclaim a processing entry whose lease has expired. + var filter = Builders.Filter.Or( + Builders.Filter.And( + Builders.Filter.Eq(IndexQueueField.STATUS, IndexQueueStatus.PENDING), + Builders.Filter.Lt(IndexQueueField.ATTEMPTS, _settings.MaxAttempts) + ), + Builders.Filter.And( + Builders.Filter.Eq(IndexQueueField.STATUS, IndexQueueStatus.PROCESSING), + Builders.Filter.Lt(IndexQueueField.LEASE_EXPIRES_AT, now) + ) + ); + + var update = Builders.Update + .Set(IndexQueueField.STATUS, IndexQueueStatus.PROCESSING) + .Set(IndexQueueField.WORKER_ID, _workerId) + .Set(IndexQueueField.CLAIMED_AT, now) + .Set(IndexQueueField.LEASE_EXPIRES_AT, now.Add(_settings.LeaseTimeout)); + + var options = new FindOneAndUpdateOptions + { + ReturnDocument = ReturnDocument.After, + Sort = Builders.Sort.Ascending(IndexQueueField.ENQUEUED_AT), + }; + + var document = await _collection + .FindOneAndUpdateAsync(filter, update, options, cancellationToken) + .ConfigureAwait(false); + + if (document is null) + return null; + + return new IndexQueueEntry + { + Id = document[Field.PRIMARYKEY].AsString, + Entry = document[IndexQueueField.ENTRY].AsBsonDocument.ToEntry(), + Attempts = document[IndexQueueField.ATTEMPTS].AsInt32, + LastError = document[IndexQueueField.LAST_ERROR] == BsonNull.Value + ? null + : document[IndexQueueField.LAST_ERROR].AsString, + EnqueuedAt = document[IndexQueueField.ENQUEUED_AT].ToUniversalTime(), + }; + } + + public async Task AcknowledgeAsync(string id, CancellationToken cancellationToken = default) + { + var filter = Builders.Filter.Eq(Field.PRIMARYKEY, id); + await _collection.DeleteOneAsync(filter, cancellationToken).ConfigureAwait(false); + } + + public async Task NackAsync(string id, string error, CancellationToken cancellationToken = default) + { + var filter = Builders.Filter.Eq(Field.PRIMARYKEY, id); + + // Atomically increment attempts and set status to failed or pending using an + // aggregation pipeline update (requires MongoDB 4.2+). + BsonDocument[] stages = + [ + new BsonDocument("$set", new BsonDocument + { + [IndexQueueField.ATTEMPTS] = new BsonDocument("$add", + new BsonArray { $"${IndexQueueField.ATTEMPTS}", 1 }), + [IndexQueueField.LAST_ERROR] = error, + [IndexQueueField.WORKER_ID] = BsonNull.Value, + [IndexQueueField.CLAIMED_AT] = BsonNull.Value, + [IndexQueueField.LEASE_EXPIRES_AT] = BsonNull.Value, + [IndexQueueField.STATUS] = new BsonDocument("$cond", new BsonDocument + { + ["if"] = new BsonDocument("$gte", new BsonArray + { + new BsonDocument("$add", new BsonArray { $"${IndexQueueField.ATTEMPTS}", 1 }), + _settings.MaxAttempts, + }), + ["then"] = IndexQueueStatus.FAILED, + ["else"] = IndexQueueStatus.PENDING, + }), + }), + ]; + + PipelineDefinition pipeline = stages; + var update = Builders.Update.Pipeline(pipeline); + await _collection.UpdateOneAsync(filter, update, cancellationToken: cancellationToken) + .ConfigureAwait(false); + } + + private void EnsureIndexes() + { + var indexes = new List> + { + new CreateIndexModel( + Builders.IndexKeys + .Ascending(IndexQueueField.STATUS) + .Ascending(IndexQueueField.LEASE_EXPIRES_AT)), + new CreateIndexModel( + Builders.IndexKeys + .Ascending(IndexQueueField.STATUS) + .Ascending(IndexQueueField.ENQUEUED_AT)), + }; + + _collection.Indexes.CreateMany(indexes); + } +} From 7da0ca8360f55aaffd4b1acea8c3410eb0e2dcb3 Mon Sep 17 00:00:00 2001 From: Kenneth Myhra Date: Sat, 21 Feb 2026 09:45:56 +0100 Subject: [PATCH 04/10] Engine: Add service listener IndexQueueEnqueueListener When this service listener is informed it calls IIndexQueue.EnqueueAsync() which enqueues the resource in the index queue for our background indexer to pick it up. --- .../Service/IndexQueueEnqueueListenerTests.cs | 33 +++++++++++++++++++ .../Service/IndexQueueEnqueueListener.cs | 33 +++++++++++++++++++ 2 files changed, 66 insertions(+) create mode 100644 src/Spark.Engine.Test/Service/IndexQueueEnqueueListenerTests.cs create mode 100644 src/Spark.Engine/Service/IndexQueueEnqueueListener.cs diff --git a/src/Spark.Engine.Test/Service/IndexQueueEnqueueListenerTests.cs b/src/Spark.Engine.Test/Service/IndexQueueEnqueueListenerTests.cs new file mode 100644 index 000000000..17e652043 --- /dev/null +++ b/src/Spark.Engine.Test/Service/IndexQueueEnqueueListenerTests.cs @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2026, Incendi + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +using System; +using System.Threading; +using System.Threading.Tasks; +using Hl7.Fhir.Model; +using Moq; +using Spark.Engine.Core; +using Spark.Engine.Service; +using Spark.Engine.Store.Interfaces; +using Xunit; +using Task = System.Threading.Tasks.Task; + +namespace Spark.Engine.Test.Service; + +public class IndexQueueEnqueueListenerTests +{ + [Fact] + public async Task InformAsync_ForwardsEntryToIndexQueue() + { + var mockQueue = new Mock(); + var entry = Entry.POST(new Key("http://localhost/", "Patient", "p1", null), new Patient { Id = "p1" }); + var listener = new IndexQueueEnqueueListener(mockQueue.Object); + + await listener.InformAsync(new Uri("http://localhost/Patient/p1"), entry); + + mockQueue.Verify(q => q.EnqueueAsync(entry, It.IsAny()), Times.Once); + } +} diff --git a/src/Spark.Engine/Service/IndexQueueEnqueueListener.cs b/src/Spark.Engine/Service/IndexQueueEnqueueListener.cs new file mode 100644 index 000000000..71f0e4419 --- /dev/null +++ b/src/Spark.Engine/Service/IndexQueueEnqueueListener.cs @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2026, Incendi + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +using Spark.Engine.Core; +using Spark.Engine.Store.Interfaces; +using System; +using System.Threading.Tasks; + +namespace Spark.Engine.Service; + +/// +/// An that enqueues FHIR resource write events onto the +/// for asynchronous processing by IndexWorker. +/// Register this in place of the default SearchService listener to switch from +/// synchronous to background indexing. +/// +public class IndexQueueEnqueueListener : IServiceListener +{ + private readonly IIndexQueue _indexQueue; + + public IndexQueueEnqueueListener(IIndexQueue indexQueue) + { + _indexQueue = indexQueue; + } + + public Task InformAsync(Uri location, Entry interaction) + { + return _indexQueue.EnqueueAsync(interaction); + } +} From f84dcd84f8252d3b7b8dc62dfccdca8138cb704d Mon Sep 17 00:00:00 2001 From: Kenneth Myhra Date: Sat, 21 Feb 2026 15:00:01 +0100 Subject: [PATCH 05/10] Engine: Add IndexWorker This is the BackgroundService which consumes a resource from the index queue, indexes it, and ACKs it if successful, otherwise NACKs it. --- .../Service/IndexWorkerTests.cs | 134 ++++++++++++++++++ src/Spark.Engine/Service/IndexWorker.cs | 80 +++++++++++ 2 files changed, 214 insertions(+) create mode 100644 src/Spark.Engine.Test/Service/IndexWorkerTests.cs create mode 100644 src/Spark.Engine/Service/IndexWorker.cs diff --git a/src/Spark.Engine.Test/Service/IndexWorkerTests.cs b/src/Spark.Engine.Test/Service/IndexWorkerTests.cs new file mode 100644 index 000000000..91eedae8d --- /dev/null +++ b/src/Spark.Engine.Test/Service/IndexWorkerTests.cs @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2026, Incendi + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +using System; +using System.Threading; +using System.Threading.Tasks; +using Hl7.Fhir.Model; +using Microsoft.Extensions.Logging.Abstractions; +using Moq; +using Spark.Engine.Core; +using Spark.Engine.Service; +using Spark.Engine.Store; +using Spark.Engine.Store.Interfaces; +using Xunit; +using Task = System.Threading.Tasks.Task; + +namespace Spark.Engine.Test.Service; + +public class IndexWorkerTests +{ + private readonly Mock _indexQueueMock = new(); + private readonly Mock _indexServiceMock = new(); + private readonly IndexQueueSettings _settings = new() { PollInterval = TimeSpan.FromMilliseconds(1) }; + + private IndexWorker CreateWorker() => + new(_indexQueueMock.Object, _indexServiceMock.Object, + NullLogger.Instance, _settings); + + /// + /// Configures ClaimNextAsync to return on the first call, + /// then block indefinitely (until canceled) on subsequent calls. + /// + private void SetupClaimSequence(IndexQueueEntry first) + { + int calls = 0; + _indexQueueMock.Setup(indexQueue => indexQueue.ClaimNextAsync(It.IsAny())) + .Returns(async ct => + { + if (Interlocked.Increment(ref calls) == 1) + return first; + await Task.Delay(Timeout.Infinite, ct).ConfigureAwait(false); + return null; + }); + } + + [Fact] + public async Task ExecuteAsync_WhenEntryAvailable_ProcessesAndAcknowledges() + { + var entry = new IndexQueueEntry { Id = "e1", Entry = Entry.POST(new Key("http://localhost/", "Patient", "p1", null), new Patient()) }; + SetupClaimSequence(entry); + + // Signal when AcknowledgeAsync is called so the test doesn't rely on timing + var ackSignal = new TaskCompletionSource(); + _indexQueueMock.Setup(indexQueue => indexQueue.AcknowledgeAsync(It.IsAny(), It.IsAny())) + .Callback(() => ackSignal.TrySetResult()); + + var worker = CreateWorker(); + await worker.StartAsync(CancellationToken.None); + await ackSignal.Task.WaitAsync(TimeSpan.FromSeconds(5)); + await worker.StopAsync(CancellationToken.None); + + _indexServiceMock.Verify(indexService => indexService.ProcessAsync(entry.Entry), Times.Once); + _indexQueueMock.Verify(indexQueue => indexQueue.AcknowledgeAsync("e1", It.IsAny()), Times.Once); + _indexQueueMock.Verify(indexQueue => indexQueue.NackAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Never); + } + + [Fact] + public async Task ExecuteAsync_WhenProcessingFails_CallsNackWithEntryIdAndError() + { + var entry = new IndexQueueEntry { Id = "e2", Entry = Entry.POST(new Key("http://localhost/", "Patient", "p2", null), new Patient()) }; + SetupClaimSequence(entry); + + var processingError = new InvalidOperationException("index failure"); + _indexServiceMock.Setup(indexService => indexService.ProcessAsync(It.IsAny())).ThrowsAsync(processingError); + + var nackSignal = new TaskCompletionSource(); + _indexQueueMock.Setup(indexQueue => indexQueue.NackAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Callback(() => nackSignal.TrySetResult()); + + var worker = CreateWorker(); + await worker.StartAsync(CancellationToken.None); + await nackSignal.Task.WaitAsync(TimeSpan.FromSeconds(5)); + await worker.StopAsync(CancellationToken.None); + + _indexQueueMock.Verify(indexQueue => indexQueue.NackAsync("e2", "index failure", It.IsAny()), Times.Once); + _indexQueueMock.Verify(indexQueue => indexQueue.AcknowledgeAsync(It.IsAny(), It.IsAny()), Times.Never); + } + + [Fact] + public async Task ExecuteAsync_WhenQueueIsEmpty_DoesNotCallIndexService() + { + int calls = 0; + var emptyQueueSignal = new TaskCompletionSource(); + _indexQueueMock.Setup(indexQueue => indexQueue.ClaimNextAsync(It.IsAny())) + .Returns(async ct => + { + if (Interlocked.Increment(ref calls) == 1) + { + emptyQueueSignal.TrySetResult(); + return null; + } + await Task.Delay(Timeout.Infinite, ct).ConfigureAwait(false); + return null; + }); + + var worker = CreateWorker(); + await worker.StartAsync(CancellationToken.None); + await emptyQueueSignal.Task.WaitAsync(TimeSpan.FromSeconds(5)); + await worker.StopAsync(CancellationToken.None); + + _indexQueueMock.Verify(indexQueue => indexQueue.ClaimNextAsync(It.IsAny()), Times.AtLeastOnce); + _indexServiceMock.Verify(indexService => indexService.ProcessAsync(It.IsAny()), Times.Never); + _indexQueueMock.Verify(indexQueue => indexQueue.AcknowledgeAsync(It.IsAny(), It.IsAny()), Times.Never); + } + + [Fact] + public async Task ExecuteAsync_OnCancellation_ExitsWithoutThrowing() + { + _indexQueueMock.Setup(q => q.ClaimNextAsync(It.IsAny())) + .Returns(async ct => + { + await Task.Delay(Timeout.Infinite, ct).ConfigureAwait(false); + return null; + }); + + var worker = CreateWorker(); + await worker.StartAsync(CancellationToken.None); + var exception = await Record.ExceptionAsync(() => worker.StopAsync(CancellationToken.None)); + Assert.Null(exception); + } +} diff --git a/src/Spark.Engine/Service/IndexWorker.cs b/src/Spark.Engine/Service/IndexWorker.cs new file mode 100644 index 000000000..218aba464 --- /dev/null +++ b/src/Spark.Engine/Service/IndexWorker.cs @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2026, Incendi + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Spark.Engine.Core; +using Spark.Engine.Store; +using Spark.Engine.Store.Interfaces; + +namespace Spark.Engine.Service; + +public class IndexWorker : BackgroundService +{ + private readonly IIndexQueue _indexQueue; + private readonly IIndexService _indexService; + private readonly ILogger _logger; + private readonly IndexQueueSettings _settings; + private readonly string _workerId; + + public IndexWorker( + IIndexQueue indexQueue, + IIndexService indexService, + ILogger logger, + IndexQueueSettings settings) + { + _indexQueue = indexQueue; + _indexService = indexService; + _logger = logger; + _settings = settings; + _workerId = $"{Environment.MachineName}:{Environment.ProcessId}"; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _logger.LogInformation("IndexWorker {WorkerId} starting.", _workerId); + + while (!stoppingToken.IsCancellationRequested) + { + IndexQueueEntry entry = null; + try + { + entry = await _indexQueue.ClaimNextAsync(stoppingToken).ConfigureAwait(false); + + if (entry is null) + { + await Task.Delay(_settings.PollInterval, stoppingToken).ConfigureAwait(false); + continue; + } + + await _indexService.ProcessAsync(entry.Entry).ConfigureAwait(false); + await _indexQueue.AcknowledgeAsync(entry.Id, stoppingToken).ConfigureAwait(false); + } + catch (Exception ex) + { + if (entry is null) + continue; + + _logger.LogWarning(ex, "IndexWorker {WorkerId} failed to process entry {EntryId} (attempt {Attempts}).", + _workerId, entry.Id, entry.Attempts); + try + { + await _indexQueue.NackAsync(entry.Id, ex.Message, stoppingToken).ConfigureAwait(false); + } + catch (Exception nackEx) + { + _logger.LogError(nackEx, "IndexWorker {WorkerId} failed to nack entry {EntryId}.", + _workerId, entry.Id); + } + } + } + + _logger.LogInformation("IndexWorker {WorkerId} stopping.", _workerId); + } +} From 66d25bee6f75f4be455ca7cdfd8a676a6e192720 Mon Sep 17 00:00:00 2001 From: Kenneth Myhra Date: Sat, 21 Feb 2026 18:52:03 +0100 Subject: [PATCH 06/10] Engine: Extract the IServiceListener implementation in SearchService The SearchService implemented the IServiceListener for the IndexService, this has been extracted into it's own implementation IndexServiceListener. --- .../IServiceCollectionExtensions.cs | 2 +- .../FhirServiceExtensions/SearchService.cs | 12 ++------ .../Service/IndexServiceListener.cs | 30 +++++++++++++++++++ 3 files changed, 33 insertions(+), 11 deletions(-) create mode 100644 src/Spark.Engine/Service/IndexServiceListener.cs diff --git a/src/Spark.Engine/Extensions/IServiceCollectionExtensions.cs b/src/Spark.Engine/Extensions/IServiceCollectionExtensions.cs index 37508251f..21417f119 100644 --- a/src/Spark.Engine/Extensions/IServiceCollectionExtensions.cs +++ b/src/Spark.Engine/Extensions/IServiceCollectionExtensions.cs @@ -148,7 +148,7 @@ public static IMvcBuilder AddFhir(this IServiceCollection services, SparkSetting services.TryAddTransient(); services.TryAddTransient(); services.TryAddTransient(); - services.TryAddTransient(); // searchListener + services.TryAddTransient(); services.TryAddTransient(provider => new IServiceListener[] { provider.GetRequiredService() }); services.TryAddTransient(); // search services.TryAddTransient(); // transaction diff --git a/src/Spark.Engine/Service/FhirServiceExtensions/SearchService.cs b/src/Spark.Engine/Service/FhirServiceExtensions/SearchService.cs index 5a83f53a0..bd67bfba1 100644 --- a/src/Spark.Engine/Service/FhirServiceExtensions/SearchService.cs +++ b/src/Spark.Engine/Service/FhirServiceExtensions/SearchService.cs @@ -15,22 +15,19 @@ using Spark.Engine.Core; using Spark.Engine.Extensions; using Spark.Engine.Interfaces; -using Task = System.Threading.Tasks.Task; namespace Spark.Engine.Service.FhirServiceExtensions; -public class SearchService : ISearchService, IServiceListener +public class SearchService : ISearchService { private readonly IFhirModel _fhirModel; private readonly ILocalhost _localhost; - private IIndexService _indexService; private IFhirIndex _fhirIndex; - public SearchService(ILocalhost localhost, IFhirModel fhirModel, IFhirIndex fhirIndex, IIndexService indexService) + public SearchService(ILocalhost localhost, IFhirModel fhirModel, IFhirIndex fhirIndex) { _fhirModel = fhirModel; _localhost = localhost; - _indexService = indexService; _fhirIndex = fhirIndex; } @@ -153,9 +150,4 @@ private static string GetFirstSort(SearchParams searchCommand) } return firstSort; } - - public async Task InformAsync(Uri location, Entry interaction) - { - await _indexService.ProcessAsync(interaction).ConfigureAwait(false); - } } diff --git a/src/Spark.Engine/Service/IndexServiceListener.cs b/src/Spark.Engine/Service/IndexServiceListener.cs new file mode 100644 index 000000000..ae60970d6 --- /dev/null +++ b/src/Spark.Engine/Service/IndexServiceListener.cs @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2026, Incendi + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +using System; +using System.Threading.Tasks; +using Spark.Engine.Core; + +namespace Spark.Engine.Service; + +/// +/// An that processes FHIR resource write events +/// synchronously via . +/// +public class IndexServiceListener : IServiceListener +{ + private readonly IIndexService _indexService; + + public IndexServiceListener(IIndexService indexService) + { + _indexService = indexService; + } + + public Task InformAsync(Uri location, Entry interaction) + { + return _indexService.ProcessAsync(interaction); + } +} From 7c7de422b38274d8e6f602f0e4bfb534470593b8 Mon Sep 17 00:00:00 2001 From: Kenneth Myhra Date: Sat, 21 Feb 2026 19:29:59 +0100 Subject: [PATCH 07/10] Engine+Mongo: Introduce background indexing mode This introduces background indexing mode for search index updates. - IndexingMode enum to control synchronous vs. background indexing after FHIR resource writes. - Service registrations in IServiceCollectionExtensions to conditionally use IndexQueueEnqueueListener and IndexWorker for background mode, falling back to IndexServiceListener for synchronous. - IndexingMode property added to SparkSettings with default of Synchronous. - Introduced IndexQueueSettings in StoreSettings and registered MongoIndexQueue in Mongo extensions for durable queuing in background mode. - Enables eventual consistency in search results when using background indexing to offload work from HTTP requests. --- .../IServiceCollectionExtensions.cs | 10 ++++++- src/Spark.Engine/IndexingMode.cs | 26 +++++++++++++++++++ src/Spark.Engine/SparkSettings.cs | 6 +++++ src/Spark.Engine/StoreSettings.cs | 3 +++ .../IServiceCollectionExtensions.cs | 5 +++- 5 files changed, 48 insertions(+), 2 deletions(-) create mode 100644 src/Spark.Engine/IndexingMode.cs diff --git a/src/Spark.Engine/Extensions/IServiceCollectionExtensions.cs b/src/Spark.Engine/Extensions/IServiceCollectionExtensions.cs index 21417f119..7ddf4673f 100644 --- a/src/Spark.Engine/Extensions/IServiceCollectionExtensions.cs +++ b/src/Spark.Engine/Extensions/IServiceCollectionExtensions.cs @@ -148,7 +148,15 @@ public static IMvcBuilder AddFhir(this IServiceCollection services, SparkSetting services.TryAddTransient(); services.TryAddTransient(); services.TryAddTransient(); - services.TryAddTransient(); + if (settings.Experimental.IndexingMode == IndexingMode.Background) + { + services.TryAddTransient(); + services.AddHostedService(); + } + else + { + services.TryAddTransient(); + } services.TryAddTransient(provider => new IServiceListener[] { provider.GetRequiredService() }); services.TryAddTransient(); // search services.TryAddTransient(); // transaction diff --git a/src/Spark.Engine/IndexingMode.cs b/src/Spark.Engine/IndexingMode.cs new file mode 100644 index 000000000..b845a10b2 --- /dev/null +++ b/src/Spark.Engine/IndexingMode.cs @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2026, Incendi + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +namespace Spark.Engine; + +/// +/// Controls how the search index is updated after a FHIR resource write. +/// +public enum IndexingMode +{ + /// + /// Index updates are processed synchronously in the HTTP request path via + /// IndexServiceListener. Default behavior. + /// + Synchronous, + + /// + /// Index updates are enqueued to the durable indexqueue MongoDB collection + /// and processed by IndexWorker running as a background service. + /// Search results become eventually consistent. + /// + Background +} diff --git a/src/Spark.Engine/SparkSettings.cs b/src/Spark.Engine/SparkSettings.cs index 961423877..c0274c8a2 100644 --- a/src/Spark.Engine/SparkSettings.cs +++ b/src/Spark.Engine/SparkSettings.cs @@ -13,6 +13,11 @@ namespace Spark.Engine; +public class ExperimentalSettings +{ + public IndexingMode IndexingMode { get; set; } = IndexingMode.Synchronous; +} + public class SparkSettings { public Uri Endpoint { get; set; } @@ -22,6 +27,7 @@ public class SparkSettings public ExportSettings ExportSettings { get; set; } public IndexSettings IndexSettings { get; set; } public SearchSettings Search { get; set; } + public ExperimentalSettings Experimental { get; set; } = new(); public string FhirRelease { diff --git a/src/Spark.Engine/StoreSettings.cs b/src/Spark.Engine/StoreSettings.cs index ea61c68bd..645b8c8e2 100644 --- a/src/Spark.Engine/StoreSettings.cs +++ b/src/Spark.Engine/StoreSettings.cs @@ -4,9 +4,12 @@ * SPDX-License-Identifier: BSD-3-Clause */ +using Spark.Engine.Store; + namespace Spark.Engine; public class StoreSettings { public string ConnectionString { get; set; } + public IndexQueueSettings IndexQueue { get; set; } = new(); } diff --git a/src/Spark.Mongo/Extensions/IServiceCollectionExtensions.cs b/src/Spark.Mongo/Extensions/IServiceCollectionExtensions.cs index e57176611..d776737a2 100644 --- a/src/Spark.Mongo/Extensions/IServiceCollectionExtensions.cs +++ b/src/Spark.Mongo/Extensions/IServiceCollectionExtensions.cs @@ -9,6 +9,7 @@ using Microsoft.Extensions.DependencyInjection.Extensions; using Spark.Engine; using Spark.Engine.Interfaces; +using Spark.Engine.Store; using Spark.Engine.Store.Interfaces; using Spark.Mongo.Search.Common; using Spark.Mongo.Search.Indexer; @@ -36,5 +37,7 @@ public static void AddMongoFhirStore(this IServiceCollection services, StoreSett services.TryAddTransient((provider) => DefinitionsFactory.Generate(ModelInfo.SearchParameters)); services.TryAddTransient(); services.TryAddTransient(); + services.TryAddSingleton(settings.IndexQueue); + services.TryAddTransient(provider => new MongoIndexQueue(settings.ConnectionString, provider.GetRequiredService())); } -} \ No newline at end of file +} From 0e805a69ae03a8b817d1096ab7c806197004ef53 Mon Sep 17 00:00:00 2001 From: Kenneth Myhra Date: Sat, 21 Feb 2026 20:04:49 +0100 Subject: [PATCH 08/10] Documentation: Update migration document --- Documentation/MigrateFromv2Tov3.md | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/Documentation/MigrateFromv2Tov3.md b/Documentation/MigrateFromv2Tov3.md index b5d63350f..794d1ef60 100644 --- a/Documentation/MigrateFromv2Tov3.md +++ b/Documentation/MigrateFromv2Tov3.md @@ -5,6 +5,15 @@ We now target `net8.0`, `net9.0`, and `net10.0`. `netstandard2.0` and `net472` t ### New classes and interfaces - `FhirResponse` wraps a `FhirResponse`, with the generic parameter representing the FHIR resource type. +- `IIndexQueue` (`Spark.Engine.Store.Interfaces`) — interface for durable index queue operations: `EnqueueAsync`, `ClaimNextAsync`, `AcknowledgeAsync`, `NackAsync`. +- `IndexQueueEntry` (`Spark.Engine.Core`) — model returned by `IIndexQueue.ClaimNextAsync`; carries the `Entry`, attempt count, and last error. +- `IndexQueueSettings` (`Spark.Engine.Store`) — configuration for index queue behavior: `LeaseTimeout`, `MaxAttempts`, `PollInterval`. Exposed as `StoreSettings.IndexQueue`. +- `MongoIndexQueue` (`Spark.Mongo.Store`) — MongoDB implementation of `IIndexQueue` backed by the `indexqueue` collection. +- `IndexWorker` (`Spark.Engine.Service`) — `BackgroundService` that polls `IIndexQueue` and drains pending entries via `IIndexService`. Registered automatically when `SparkSettings.Experimental.IndexingMode = Background`. +- `IndexServiceListener` (`Spark.Engine.Service`) — `IServiceListener` that processes search index updates synchronously in the HTTP request path. Registered by default (`IndexingMode = Synchronous`). +- `IndexQueueEnqueueListener` (`Spark.Engine.Service`) — `IServiceListener` that enqueues write events onto `IIndexQueue` for asynchronous background processing. Registered when `IndexingMode = Background`. +- `ExperimentalSettings` (`Spark.Engine`) — groups experimental settings under `SparkSettings.Experimental`. Currently exposes `IndexingMode`. +- `IndexingMode` (`Spark.Engine`) — enum that controls the indexing strategy: `Synchronous` (default) or `Background`. Accessed via `SparkSettings.Experimental.IndexingMode`. ### IFhirService, FhirServiceBase and FhirService changes - New generic methods: @@ -18,6 +27,10 @@ We now target `net8.0`, `net9.0`, and `net10.0`. `netstandard2.0` and `net472` t ### Method and property signature changes - `Validate.HasResourceType(IKey, ResourceType)` has been changed to `Validate.HasResourceType(IKey, string)` +- `SearchService` no longer implements `IServiceListener`; it now only implements `ISearchService`. Code that registered or resolved `SearchService` as `IServiceListener` must be updated. +- `SearchService` constructor no longer accepts `IIndexService`; the signature changed from `SearchService(ILocalhost, IFhirModel, IFhirIndex, IIndexService)` to `SearchService(ILocalhost, IFhirModel, IFhirIndex)`. +- `SparkSettings` has a new property `ExperimentalSettings Experimental { get; set; }` (default `new ExperimentalSettings()`). +- `StoreSettings` has a new property `IndexQueueSettings IndexQueue { get; set; }` (defaults to `new IndexQueueSettings()`). - `List IFhirModel.SearchParameters` has been changed to `IReadOnlyListList IFhirModel.SearchParameters` - `IEnumerable IFhirModel.FindSearchParameters(Type)` has been changed to `List IFhirModel.FindSearchParameters(Type)` - `IEnumerable IFhirModel.FindSearchParameters(string)` has been changed to `List IFhirModel.FindSearchParameters(string)` @@ -82,8 +95,9 @@ We now target `net8.0`, `net9.0`, and `net10.0`. `netstandard2.0` and `net472` t ### Changes to extension methods - `AddFhirFacade(this IServiceCollection, Action)` now returns `IMvcBuilder` instead of `IMvcCoreBuilder`. -- `AddFhir(this IServiceCollection, SparkSettings, Action)` now returns `IMvcBuilder` instead of `IMvcCoreBuilder`. +- `AddFhir(this IServiceCollection, SparkSettings, Action)` now returns `IMvcBuilder` instead of `IMvcCoreBuilder`. It also conditionally registers `IndexServiceListener` (default, `IndexingMode.Synchronous`) or `IndexQueueEnqueueListener` + `IndexWorker` (opt-in, `IndexingMode.Background`) based on `SparkSettings.Experimental.IndexingMode`. - `AddFhirFormatters(this IServiceCollection, SparkSettings, Action` now returns `IMvcBuilder` instead of `IMvcCoreBuilder`. +- `AddMongoFhirStore(this IServiceCollection, StoreSettings)` now always registers `IndexQueueSettings` (sourced from `StoreSettings.IndexQueue`) and `IIndexQueue → MongoIndexQueue`. ### Namespace changes - `Spark.Search.ChoiceValue` moved to `Spark.Engine.Search.ChoiceValue` From 633f3e9841228f91019442cf96876152bb161a35 Mon Sep 17 00:00:00 2001 From: Kenneth Myhra Date: Sun, 22 Feb 2026 08:20:31 +0100 Subject: [PATCH 09/10] Mongo: Add FIXME on moving extension methods into appropriate classes --- src/Spark.Mongo/Store/BsonHelper.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Spark.Mongo/Store/BsonHelper.cs b/src/Spark.Mongo/Store/BsonHelper.cs index 9743ffaf2..e4e0c1c91 100644 --- a/src/Spark.Mongo/Store/BsonHelper.cs +++ b/src/Spark.Mongo/Store/BsonHelper.cs @@ -17,6 +17,8 @@ namespace Spark.Store.Mongo; public static class SparkBsonHelper { + // FIXME: Move all extension methods into appropriate classes, i.e. ResourceExtensions, KeyExtensions, etc. + public static BsonDocument CreateDocument(Resource resource) { if (resource != null) @@ -187,4 +189,4 @@ public static void TransferMetadata(BsonDocument from, BsonDocument to) to[Field.METHOD] = from[Field.METHOD]; to[Field.STATE] = from[Field.STATE]; } -} \ No newline at end of file +} From 83c998caaaea4baacd71caeda144fa9165fe45fb Mon Sep 17 00:00:00 2001 From: Kenneth Myhra Date: Thu, 26 Feb 2026 18:58:27 +0100 Subject: [PATCH 10/10] Documentation: Document Background Indexing Adds documentation for Background indexing and points that this feature is experimental and must be used with cautions since it has several trade-offs depending on which other features you use. The section Eventual Consistency Trade-offs document some of what an operator must evaluate before turning it on. --- Documentation/BackgroundIndexing.md | 93 +++++++++++++++++++++++++++++ Spark.sln | 1 + 2 files changed, 94 insertions(+) create mode 100644 Documentation/BackgroundIndexing.md diff --git a/Documentation/BackgroundIndexing.md b/Documentation/BackgroundIndexing.md new file mode 100644 index 000000000..9fa4392f6 --- /dev/null +++ b/Documentation/BackgroundIndexing.md @@ -0,0 +1,93 @@ +# Background Indexing + +> [!IMPORTANT] +> **Experimental feature — use with caution.** +> Background indexing is not yet recommended for production use. It introduces eventual +> consistency between writes and search results, which may cause unexpected behaviour in +> deployments that rely on immediate read-your-writes consistency on the search index. + +## Overview + +By default Spark processes search index updates synchronously in the HTTP request path: +every write waits for indexing to complete before returning a response. Background indexing +decouples this work by enqueuing index updates in a durable MongoDB `indexqueue` collection +and processing them in a `BackgroundService` worker (`IndexWorker`) that runs on each Spark +node. + +The main benefit is reduced write latency. The trade-off is that search results are +eventually consistent — a resource may not appear in search results immediately after a +successful write. + +## Configuration + +Background indexing is controlled by the `IndexingMode` property of the `Experimental` +settings group in `SparkSettings`: + +```json +{ + "SparkSettings": { + "Experimental": { + "IndexingMode": "Background" + } + } +} +``` + +Or via environment variable (useful for Docker / Kubernetes deployments): + +``` +SparkSettings__Experimental__IndexingMode=Background +``` + +The default value is `Synchronous`. Set it to `Background` to enable background indexing. + +The index queue behaviour can be tuned via `StoreSettings.IndexQueue`: + +```json +{ + "StoreSettings": { + "IndexQueue": { + "PollInterval": "00:00:00.020", + "LeaseTimeout": "00:05:00", + "MaxAttempts": 5 + } + } +} +``` + +| Property | Default | Description | +|----------|---------|-------------| +| `PollInterval` | 20 ms | How often `IndexWorker` polls for new queue entries. | +| `LeaseTimeout` | 5 min | How long a claimed entry may be held before it is reclaimed by another worker (crash recovery). | +| `MaxAttempts` | 5 | Maximum number of processing attempts before an entry is marked `failed`. | + +## Eventual Consistency Trade-offs + +Operators must evaluate the following before enabling background indexing: + +- **Conditional writes** — `conditional create`, `conditional update`, and `conditional delete` + resolve their match criteria via the search index. If a conflicting resource was written + within the last index-poll cycle, the condition may resolve against a stale index. +- **Search immediately after write** — clients that write a resource and immediately search + for it may transiently receive zero or incomplete results. +- **Automated test suites** — any test that asserts search result counts or presence + immediately after a write must either add retry/polling logic or be run against a + `Synchronous`-mode instance. + +## Multi-node Deployments + +Each Spark node runs its own `IndexWorker`. Claims are atomic (`FindOneAndUpdate`) so only +one node processes any given queue entry. If a node crashes mid-processing, the entry is +automatically reclaimed after `LeaseTimeout` by another node. + +## Monitoring + +- Entries that exhaust `MaxAttempts` are marked `status=failed` in the `indexqueue` + collection and logged by `IndexWorker`. Operator intervention is required to re-process + or discard them. +- Monitor the depth of the `indexqueue` collection. A growing queue indicates that the + worker cannot keep up with the write rate, or that the worker is stopped. + +## See Also + +- [ADR 0004 — Background IndexService via Durable Outbox Queue](ADR/0004-BackgroundIndexService.md) diff --git a/Spark.sln b/Spark.sln index e9598a379..5762ab647 100644 --- a/Spark.sln +++ b/Spark.sln @@ -75,6 +75,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Documentation", "Documentat Documentation\UsingSpark.md = Documentation\UsingSpark.md Documentation\MigrateFromv1Tov2.md = Documentation\MigrateFromv1Tov2.md Documentation\MigrateFromv2Tov3.md = Documentation\MigrateFromv2Tov3.md + Documentation\BackgroundIndexing.md = Documentation\BackgroundIndexing.md EndProjectSection EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "scripts", "scripts", "{977F2FE5-C232-4F93-AA0A-73DB85A20879}"