diff --git a/CMP.ServiceFabricReceiver.Common/CMP.ServiceFabricReceiver.Common.csproj b/CMP.ServiceFabricReceiver.Common/CMP.ServiceFabricReceiver.Common.csproj index dbc9e58..53e04bd 100644 --- a/CMP.ServiceFabricReceiver.Common/CMP.ServiceFabricReceiver.Common.csproj +++ b/CMP.ServiceFabricReceiver.Common/CMP.ServiceFabricReceiver.Common.csproj @@ -2,7 +2,7 @@ netstandard2.0 - 0.3.1 + 0.4.0 alpha CMP.ServiceFabricReceiver.Common CDON diff --git a/CMP.ServiceFabricReceiver.Common/EventProcessing.cs b/CMP.ServiceFabricReceiver.Common/EventProcessing.cs index 7ce6c0a..8fd4178 100644 --- a/CMP.ServiceFabricReceiver.Common/EventProcessing.cs +++ b/CMP.ServiceFabricReceiver.Common/EventProcessing.cs @@ -45,7 +45,7 @@ public static Func, Func> Handling( public static Func, Func> PartitionLogging() => f => async ctx => { - using (ctx.Logger.BeginScope("Event hub partition : {PartitionId}", ctx.PartitionId)) + using (ctx.Logger.BeginScope("{FeatureName} - {EventHubPartitionId}", nameof(PartitionLogging), ctx.PartitionId)) { await f(ctx); } @@ -54,7 +54,7 @@ public static Func, Func> Partition public static Func, Func> Logging() => f => async ctx => { - using (ctx.Logger.BeginScope("{FeatureName} - Events ({eventCount}) - Cancelled : {cancelled}", nameof(Logging), ctx.Events.Length, ctx.CancellationToken.IsCancellationRequested)) + using (ctx.Logger.BeginScope("{FeatureName} - Events ({EventCount}) - Cancelled : {IsCancellationRequested}", nameof(Logging), ctx.Events.Length, ctx.CancellationToken.IsCancellationRequested)) { const string name = "EventProcessor"; ctx.Logger.LogDebug($"{name}.ProcessEventsAsync for partition {ctx.PartitionId} got {ctx.Events.Count()} events", @@ -86,7 +86,7 @@ public static Func, Func> Retry(int public static async Task Retry(Func f, EventContext ctx, bool faulted = false, int exceptionDelaySeconds = 1) { - using (ctx.Logger.BeginScope("{FeatureName} - Retry : {retry}", nameof(Retry), faulted)) + using (ctx.Logger.BeginScope("{FeatureName} - Retry : {IsRetry}", nameof(Retry), faulted)) { try { @@ -94,13 +94,13 @@ public static async Task Retry(Func f, EventContext ctx, boo } catch (Exception ex) when (faulted) { - ctx.Logger.LogError(ex, $"Failed to process events- Faulted : {faulted}. Cancelled : {ctx.CancellationToken.IsCancellationRequested}", new object[] { ctx.CancellationToken.IsCancellationRequested }); + ctx.Logger.LogError(ex, "Failed to process events - IsRetry : {IsRetry}. Cancelled : {IsCancellationRequested}", faulted, ctx.CancellationToken.IsCancellationRequested); ctx.CancellationToken.ThrowIfCancellationRequested(); throw; } catch (Exception ex) { - ctx.Logger.LogError(ex, $"Failed to process events. Cancelled : {ctx.CancellationToken.IsCancellationRequested}", new object[] { ctx.CancellationToken.IsCancellationRequested }); + ctx.Logger.LogError(ex, "Failed to process events. Cancelled : {IsCancellationRequested}", ctx.CancellationToken.IsCancellationRequested); ctx.CancellationToken.ThrowIfCancellationRequested(); if (exceptionDelaySeconds > 0) await Task.Delay(TimeSpan.FromSeconds(exceptionDelaySeconds), ctx.CancellationToken); diff --git a/CMP.ServiceFabricReceiver.Common/Execution.cs b/CMP.ServiceFabricReceiver.Common/Execution.cs index e39405f..a456f77 100644 --- a/CMP.ServiceFabricReceiver.Common/Execution.cs +++ b/CMP.ServiceFabricReceiver.Common/Execution.cs @@ -25,18 +25,18 @@ public static async Task ExecuteAsync( { if (e is OperationCanceledException) { - logger.LogError(e, serviceName + " RunAsync canceled. RunAsync for {PartitionId}", partitionId); + logger.LogError(e, serviceName + " RunAsync canceled. RunAsync for {ServiceFabricPartitionId}", partitionId); serviceEventSource($"{serviceName}.RunAsync for {partitionId} error {e}", new object[0]); throw; } - logger.LogError(e, serviceName + " Exception during shutdown. Exception of unexpected type .RunAsync for {PartitionId}", partitionId); + logger.LogError(e, serviceName + " Exception during shutdown. Exception of unexpected type .RunAsync for {ServiceFabricPartitionId}", partitionId); serviceEventSource($"{serviceName}.RunAsync for {partitionId} error {e}", new object[0]); cancellationToken.ThrowIfCancellationRequested(); } catch (Exception e) { - logger.LogError(e, serviceName + ". RunAsync for {PartitionId}", partitionId); + logger.LogError(e, serviceName + ". RunAsync for {ServiceFabricPartitionId}", partitionId); serviceEventSource($"{serviceName}.RunAsync for {partitionId} error {e}", new object[0]); throw; } diff --git a/CMP.ServiceFabricReceiver.Stateless/CMP.ServiceFabricRecevier.Stateless.csproj b/CMP.ServiceFabricReceiver.Stateless/CMP.ServiceFabricRecevier.Stateless.csproj index 4972c43..961f902 100644 --- a/CMP.ServiceFabricReceiver.Stateless/CMP.ServiceFabricRecevier.Stateless.csproj +++ b/CMP.ServiceFabricReceiver.Stateless/CMP.ServiceFabricRecevier.Stateless.csproj @@ -3,7 +3,7 @@ netcoreapp3.1 - 0.3.1 + 0.4.0 alpha CMP.ServiceFabricReceiver.Stateless CMP.ServiceFabricReceiver.Stateless diff --git a/CMP.ServiceFabricReceiver.Stateless/EventProcessor.cs b/CMP.ServiceFabricReceiver.Stateless/EventProcessor.cs index 32455f4..2870bfe 100644 --- a/CMP.ServiceFabricReceiver.Stateless/EventProcessor.cs +++ b/CMP.ServiceFabricReceiver.Stateless/EventProcessor.cs @@ -30,14 +30,14 @@ public EventProcessor( public Task CloseAsync(PartitionContext context, CloseReason reason) { - _logger.LogInformation("EventProcessor.CloseAsync for {PartitionId} reason {1}", context.PartitionId, reason); + _logger.LogInformation("EventProcessor.CloseAsync for {EventHubPartitionId} reason {1}", context.PartitionId, reason); //_serviceEventSource("EventProcessor.CloseAsync for {0} reason {1}", new object[] { context.PartitionId, reason }); return Task.CompletedTask; } public Task OpenAsync(PartitionContext context) { - _logger.LogInformation("EventProcessor.OpenAsync for {PartitionId}", context.PartitionId); + _logger.LogInformation("EventProcessor.OpenAsync for {EventHubPartitionId}", context.PartitionId); //_serviceEventSource("EventProcessor.OpenAsync for {0}", new[] { context.PartitionId }); return Task.CompletedTask; } @@ -47,18 +47,18 @@ public Task ProcessErrorAsync(PartitionContext context, Exception error) if (error is ReceiverDisconnectedException) { _logger.LogInformation( - "Receiver disconnected on partition {PartitionId}. Exception: {@Exception}", + "Receiver disconnected on partition {EventHubPartitionId}. Exception: {@Exception}", context.PartitionId, error); return Task.CompletedTask; } if (error is LeaseLostException) { _logger.LogInformation( - "Lease lost on partition {PartitionId}. Exception: {@Exception}", + "Lease lost on partition {EventHubPartitionId}. Exception: {@Exception}", context.PartitionId, error); return Task.CompletedTask; } - _logger.LogError(error, "EventProcessor.ProcessErrorAsync for {PartitionId}", context.PartitionId); + _logger.LogError(error, "EventProcessor.ProcessErrorAsync for {EventHubPartitionId}", context.PartitionId); //_serviceEventSource("EventProcessor.ProcessErrorAsync for {0} error {1}", new object[] { context.PartitionId, error }); return Task.CompletedTask; } diff --git a/CMP.ServiceFabricReceiver.Stateless/EventProcessorFactory.cs b/CMP.ServiceFabricReceiver.Stateless/EventProcessorFactory.cs index a009b86..bd1d195 100644 --- a/CMP.ServiceFabricReceiver.Stateless/EventProcessorFactory.cs +++ b/CMP.ServiceFabricReceiver.Stateless/EventProcessorFactory.cs @@ -9,21 +9,21 @@ namespace CMP.ServiceFabricRecevier.Stateless { public class EventProcessorFactory : IEventProcessorFactory { - private readonly ILogger _logger; + private readonly Func _loggerFactory; private readonly CancellationToken _cancellationToken; private readonly Func> f; public EventProcessorFactory( - ILogger logger, + Func loggerFactory, CancellationToken cancellationToken, Func> f) { - _logger = logger; + _loggerFactory = loggerFactory; _cancellationToken = cancellationToken; this.f = f; } public IEventProcessor CreateEventProcessor(PartitionContext context) - => new EventProcessor(_logger, _cancellationToken, f); + => new EventProcessor(_loggerFactory($"Processor({context.PartitionId})") , _cancellationToken, f); } } diff --git a/CMP.ServiceFabricReceiver.Stateless/Extensions.cs b/CMP.ServiceFabricReceiver.Stateless/Extensions.cs index e84f1a3..4923334 100644 --- a/CMP.ServiceFabricReceiver.Stateless/Extensions.cs +++ b/CMP.ServiceFabricReceiver.Stateless/Extensions.cs @@ -15,12 +15,25 @@ public static Task RunAsync( EventProcessorOptions options, CancellationToken cancellationToken, Action serviceEventSource, - string partition, + string serviceFabricPartition, Func> f) - => Composition.Combine( - Features.Execution(logger, serviceEventSource, nameof(ReceiverService), partition), - Features.ReceiverExceptions(logger, partition), - Features.Run(ct => host.RegisterEventProcessorFactoryAsync(new EventProcessorFactory(logger, ct, f), options)) - )(cancellationToken); + => RunAsync(host, _ => logger, options, cancellationToken, serviceEventSource, serviceFabricPartition, f); + + public static Task RunAsync( + this EventProcessorHost host, + Func loggerFactory, + EventProcessorOptions options, + CancellationToken cancellationToken, + Action serviceEventSource, + string serviceFabricPartition, + Func> f) + { + var logger = loggerFactory($"{host.HostName}.{nameof(RunAsync)}.{serviceFabricPartition}"); + return Composition.Combine( + Features.Execution(logger, serviceEventSource, nameof(ReceiverService), serviceFabricPartition), + Features.ReceiverExceptions(logger, serviceFabricPartition), + Features.Run(ct => host.RegisterEventProcessorFactoryAsync(new EventProcessorFactory(loggerFactory, ct, f), options)) + )(cancellationToken); + } } } diff --git a/CMP.ServiceFabricReceiver.Stateless/ReceiverExceptions.cs b/CMP.ServiceFabricReceiver.Stateless/ReceiverExceptions.cs index e72e444..e1392f9 100644 --- a/CMP.ServiceFabricReceiver.Stateless/ReceiverExceptions.cs +++ b/CMP.ServiceFabricReceiver.Stateless/ReceiverExceptions.cs @@ -25,7 +25,7 @@ public static async Task ExecuteAsync( { logger.LogInformation($"Execution.ExecuteAsync error 1 {e.GetType().Name}"); logger.LogInformation( - "Receiver disconnected on partition {PartitionId}. " + + "Receiver disconnected on partition {ServiceFabricPartitionId}. " + "Exception: {@Exception}, IsCancellationRequested: {IsCancellationRequested}", partitionId, e, cancellationToken.IsCancellationRequested); //await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken); @@ -35,7 +35,7 @@ public static async Task ExecuteAsync( { logger.LogInformation($"Execution.ExecuteAsync error 2 {e.GetType().Name}"); logger.LogInformation( - "Lease lost on partition {PartitionId}. " + + "Lease lost on partition {ServiceFabricPartitionId}. " + "Exception: {@Exception}, IsCancellationRequested: {IsCancellationRequested}", partitionId, e, cancellationToken.IsCancellationRequested); //await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken); diff --git a/CMP.ServiceFabricReceiver.Stateless/ReceiverService.cs b/CMP.ServiceFabricReceiver.Stateless/ReceiverService.cs index bd9b09a..2bc9b74 100644 --- a/CMP.ServiceFabricReceiver.Stateless/ReceiverService.cs +++ b/CMP.ServiceFabricReceiver.Stateless/ReceiverService.cs @@ -11,6 +11,7 @@ namespace CMP.ServiceFabricRecevier.Stateless { public class ReceiverService : StatelessService { + private readonly Func _loggerFactory; private readonly ILogger _logger; private readonly ReceiverSettings _settings; private readonly Action _serviceEventSource; @@ -21,7 +22,7 @@ public class ReceiverService : StatelessService public ReceiverService( StatelessServiceContext serviceContext, - ILogger logger, + Func loggerFactory, ReceiverSettings settings, Action serviceEventSource, Func @switch, @@ -29,7 +30,7 @@ public ReceiverService( EventProcessorOptions options) : base(serviceContext) { - _logger = logger; + _loggerFactory = loggerFactory; _settings = settings; _serviceEventSource = serviceEventSource; _f = f; @@ -43,6 +44,8 @@ public ReceiverService( _settings.EventHubConnectionString, _settings.StorageConnectionString, _settings.LeaseContainerName); + + _logger = loggerFactory(nameof(ReceiverService)); } protected override Task OnOpenAsync(CancellationToken cancellationToken) @@ -62,11 +65,11 @@ protected override async Task RunAsync(CancellationToken cancellationToken) try { await Execution.ExecuteAsync(cancellationToken, _logger, _serviceEventSource, nameof(ReceiverService), Context.PartitionId.ToString(), _switch); - await _host.RunAsync(_logger, _options, cancellationToken, _serviceEventSource, Context.PartitionId.ToString(), _f); + await _host.RunAsync(_loggerFactory, _options, cancellationToken, _serviceEventSource, Context.PartitionId.ToString(), _f); } catch (FabricTransientException e) { - _logger.LogError(e, nameof(ReceiverService) + "Exception .RunAsync for {PartitionId}", Context.PartitionId); + _logger.LogError(e, nameof(ReceiverService) + "Exception .RunAsync for {ServiceFabricPartitionId}", Context.PartitionId); } } diff --git a/README.md b/README.md index 244a8cd..72c4c1f 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,24 @@ Note that the `Func, CancellationTok The real event handling function that is returned in the end, e.g. `(events, ct) => EventHandler.Handle(events.ToArray())`, will be created every time a batch of events comes in from EventHubs. This means that **no state** is maintained between the differnet executions of the function. - - - +### Monitoring + +With the feature "OperationLogging" that enablas a custom request log to application insights, monitoring could be done using the following AI query. + +```sql +requests +| where cloud_RoleName == 'name of receiver' +| where success == true +| summarize sum(todouble(customDimensions.EventCount)) by bin(timestamp, 10m), tostring(customDimensions.EventHubPartitionId) +| render timechart" +``` + +If the request log is found to "noicy", traces could also be used. + +```sql +traces +| where cloud_RoleName == 'name of receiver' +| summarize sum(todouble(customDimensions.EventCount)) by bin(timestamp, 10m), tostring(customDimensions.EventHubPartitionId) +| render timechart +``` diff --git a/samples/Stateless1/Program.cs b/samples/Stateless1/Program.cs index 8a4fef5..5ec50fe 100644 --- a/samples/Stateless1/Program.cs +++ b/samples/Stateless1/Program.cs @@ -40,8 +40,10 @@ private static void Main(string[] args) .Enrich.FromLogContext() .CreateLogger(); - var logger = new SerilogLoggerProvider(Log.Logger, true) + Microsoft.Extensions.Logging.ILogger loggerFactory(string category) => + new SerilogLoggerProvider(Log.Logger, true) .CreateLogger("Stateless-Sample"); + var logger = loggerFactory("Program"); var storageAccount = CloudStorageAccount.DevelopmentStorageAccount; var table = storageAccount.CreateCloudTableClient().GetTableReference("receiversample"); @@ -60,7 +62,7 @@ private static void Main(string[] args) { InitialOffsetProvider = partition => { - logger.LogWarning("InitialOffsetProvider called for {partition}", partition); + logger.LogWarning("InitialOffsetProvider called for {EventHubPartitionId}", partition); return EventPosition.FromStart(); } }; @@ -79,10 +81,10 @@ private static void Main(string[] args) if (!isInCluster) { logger.LogInformation("Running in Process. Application insights key set : {instrumentationKeySet}", string.IsNullOrWhiteSpace(telemetryClient.InstrumentationKey)); - settings.ToHost() - .RunAsync(logger, options, CancellationToken.None, (s, o) => { }, "none", partitionId => ctx => pipeline(ctx)) - .GetAwaiter() - .GetResult(); + settings.ToHost() + .RunAsync(loggerFactory, options, CancellationToken.None, (s, o) => { }, "none", partitionId => ctx => pipeline(ctx)) + .GetAwaiter() + .GetResult(); Thread.Sleep(Timeout.Infinite); } @@ -94,7 +96,7 @@ private static void Main(string[] args) context => new SampleService( context, - logger, + loggerFactory, settings, ServiceEventSource.Current.Message, ct => Task.CompletedTask, diff --git a/samples/Stateless1/SampleService.cs b/samples/Stateless1/SampleService.cs index bb86627..7c22b52 100644 --- a/samples/Stateless1/SampleService.cs +++ b/samples/Stateless1/SampleService.cs @@ -12,12 +12,12 @@ namespace Stateless1 public class SampleService : ReceiverService { public SampleService(StatelessServiceContext serviceContext, - ILogger logger, ReceiverSettings settings, + Func loggerFactory, ReceiverSettings settings, Action serviceEventSource, Func @switch, Func> f, EventProcessorOptions options) - : base(serviceContext, logger, settings, serviceEventSource, @switch, f, options) + : base(serviceContext, loggerFactory, settings, serviceEventSource, @switch, f, options) { } }