Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<VersionPrefix>0.3.1</VersionPrefix>
<VersionPrefix>0.4.0</VersionPrefix>
<VersionSuffix>alpha</VersionSuffix>
<PackageId>CMP.ServiceFabricReceiver.Common</PackageId>
<Authors>CDON</Authors>
Expand Down
10 changes: 5 additions & 5 deletions CMP.ServiceFabricReceiver.Common/EventProcessing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public static Func<Func<EventContext, Task>, Func<EventContext, Task>> Handling(
public static Func<Func<EventContext, Task>, Func<EventContext, Task>> PartitionLogging()
=> f => async ctx =>
{
using (ctx.Logger.BeginScope("Event hub partition : {PartitionId}", ctx.PartitionId))
using (ctx.Logger.BeginScope("{FeatureName} - {EventHubPartitionId}", nameof(PartitionLogging), ctx.PartitionId))
{
await f(ctx);
}
Expand All @@ -54,7 +54,7 @@ public static Func<Func<EventContext, Task>, Func<EventContext, Task>> Partition
public static Func<Func<EventContext, Task>, Func<EventContext, Task>> Logging()
=> f => async ctx =>
{
using (ctx.Logger.BeginScope("{FeatureName} - Events ({eventCount}) - Cancelled : {cancelled}", nameof(Logging), ctx.Events.Length, ctx.CancellationToken.IsCancellationRequested))
using (ctx.Logger.BeginScope("{FeatureName} - Events ({EventCount}) - Cancelled : {IsCancellationRequested}", nameof(Logging), ctx.Events.Length, ctx.CancellationToken.IsCancellationRequested))
{
const string name = "EventProcessor";
ctx.Logger.LogDebug($"{name}.ProcessEventsAsync for partition {ctx.PartitionId} got {ctx.Events.Count()} events",
Expand Down Expand Up @@ -86,21 +86,21 @@ public static Func<Func<EventContext, Task>, Func<EventContext, Task>> Retry(int

public static async Task Retry(Func<EventContext, Task> f, EventContext ctx, bool faulted = false, int exceptionDelaySeconds = 1)
{
using (ctx.Logger.BeginScope("{FeatureName} - Retry : {retry}", nameof(Retry), faulted))
using (ctx.Logger.BeginScope("{FeatureName} - Retry : {IsRetry}", nameof(Retry), faulted))
{
try
{
await f(ctx);
}
catch (Exception ex) when (faulted)
{
ctx.Logger.LogError(ex, $"Failed to process events- Faulted : {faulted}. Cancelled : {ctx.CancellationToken.IsCancellationRequested}", new object[] { ctx.CancellationToken.IsCancellationRequested });
ctx.Logger.LogError(ex, "Failed to process events - IsRetry : {IsRetry}. Cancelled : {IsCancellationRequested}", faulted, ctx.CancellationToken.IsCancellationRequested);
ctx.CancellationToken.ThrowIfCancellationRequested();
throw;
}
catch (Exception ex)
{
ctx.Logger.LogError(ex, $"Failed to process events. Cancelled : {ctx.CancellationToken.IsCancellationRequested}", new object[] { ctx.CancellationToken.IsCancellationRequested });
ctx.Logger.LogError(ex, "Failed to process events. Cancelled : {IsCancellationRequested}", ctx.CancellationToken.IsCancellationRequested);
ctx.CancellationToken.ThrowIfCancellationRequested();
if (exceptionDelaySeconds > 0)
await Task.Delay(TimeSpan.FromSeconds(exceptionDelaySeconds), ctx.CancellationToken);
Expand Down
6 changes: 3 additions & 3 deletions CMP.ServiceFabricReceiver.Common/Execution.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@ public static async Task ExecuteAsync(
{
if (e is OperationCanceledException)
{
logger.LogError(e, serviceName + " RunAsync canceled. RunAsync for {PartitionId}", partitionId);
logger.LogError(e, serviceName + " RunAsync canceled. RunAsync for {ServiceFabricPartitionId}", partitionId);
serviceEventSource($"{serviceName}.RunAsync for {partitionId} error {e}", new object[0]);
throw;
}

logger.LogError(e, serviceName + " Exception during shutdown. Exception of unexpected type .RunAsync for {PartitionId}", partitionId);
logger.LogError(e, serviceName + " Exception during shutdown. Exception of unexpected type .RunAsync for {ServiceFabricPartitionId}", partitionId);
serviceEventSource($"{serviceName}.RunAsync for {partitionId} error {e}", new object[0]);
cancellationToken.ThrowIfCancellationRequested();
}
catch (Exception e)
{
logger.LogError(e, serviceName + ". RunAsync for {PartitionId}", partitionId);
logger.LogError(e, serviceName + ". RunAsync for {ServiceFabricPartitionId}", partitionId);
serviceEventSource($"{serviceName}.RunAsync for {partitionId} error {e}", new object[0]);
throw;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<!--<RuntimeIdentifier>win7-x64</RuntimeIdentifier>-->
<VersionPrefix>0.3.1</VersionPrefix>
<VersionPrefix>0.4.0</VersionPrefix>
<VersionSuffix>alpha</VersionSuffix>
<PackageId>CMP.ServiceFabricReceiver.Stateless</PackageId>
<Title>CMP.ServiceFabricReceiver.Stateless</Title>
Expand Down
10 changes: 5 additions & 5 deletions CMP.ServiceFabricReceiver.Stateless/EventProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ public EventProcessor(

public Task CloseAsync(PartitionContext context, CloseReason reason)
{
_logger.LogInformation("EventProcessor.CloseAsync for {PartitionId} reason {1}", context.PartitionId, reason);
_logger.LogInformation("EventProcessor.CloseAsync for {EventHubPartitionId} reason {1}", context.PartitionId, reason);
//_serviceEventSource("EventProcessor.CloseAsync for {0} reason {1}", new object[] { context.PartitionId, reason });
return Task.CompletedTask;
}

public Task OpenAsync(PartitionContext context)
{
_logger.LogInformation("EventProcessor.OpenAsync for {PartitionId}", context.PartitionId);
_logger.LogInformation("EventProcessor.OpenAsync for {EventHubPartitionId}", context.PartitionId);
//_serviceEventSource("EventProcessor.OpenAsync for {0}", new[] { context.PartitionId });
return Task.CompletedTask;
}
Expand All @@ -47,18 +47,18 @@ public Task ProcessErrorAsync(PartitionContext context, Exception error)
if (error is ReceiverDisconnectedException)
{
_logger.LogInformation(
"Receiver disconnected on partition {PartitionId}. Exception: {@Exception}",
"Receiver disconnected on partition {EventHubPartitionId}. Exception: {@Exception}",
context.PartitionId, error);
return Task.CompletedTask;
}
if (error is LeaseLostException)
{
_logger.LogInformation(
"Lease lost on partition {PartitionId}. Exception: {@Exception}",
"Lease lost on partition {EventHubPartitionId}. Exception: {@Exception}",
context.PartitionId, error);
return Task.CompletedTask;
}
_logger.LogError(error, "EventProcessor.ProcessErrorAsync for {PartitionId}", context.PartitionId);
_logger.LogError(error, "EventProcessor.ProcessErrorAsync for {EventHubPartitionId}", context.PartitionId);
//_serviceEventSource("EventProcessor.ProcessErrorAsync for {0} error {1}", new object[] { context.PartitionId, error });
return Task.CompletedTask;
}
Expand Down
8 changes: 4 additions & 4 deletions CMP.ServiceFabricReceiver.Stateless/EventProcessorFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,21 @@ namespace CMP.ServiceFabricRecevier.Stateless
{
public class EventProcessorFactory : IEventProcessorFactory
{
private readonly ILogger _logger;
private readonly Func<string,ILogger> _loggerFactory;
private readonly CancellationToken _cancellationToken;
private readonly Func<string, Func<EventContext, Task>> f;

public EventProcessorFactory(
ILogger logger,
Func<string, ILogger> loggerFactory,
CancellationToken cancellationToken,
Func<string, Func<EventContext, Task>> f)
{
_logger = logger;
_loggerFactory = loggerFactory;
_cancellationToken = cancellationToken;
this.f = f;
}

public IEventProcessor CreateEventProcessor(PartitionContext context)
=> new EventProcessor(_logger, _cancellationToken, f);
=> new EventProcessor(_loggerFactory($"Processor({context.PartitionId})") , _cancellationToken, f);
}
}
25 changes: 19 additions & 6 deletions CMP.ServiceFabricReceiver.Stateless/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,25 @@ public static Task RunAsync(
EventProcessorOptions options,
CancellationToken cancellationToken,
Action<string, object[]> serviceEventSource,
string partition,
string serviceFabricPartition,
Func<string, Func<EventContext, Task>> f)
=> Composition.Combine(
Features.Execution(logger, serviceEventSource, nameof(ReceiverService), partition),
Features.ReceiverExceptions(logger, partition),
Features.Run(ct => host.RegisterEventProcessorFactoryAsync(new EventProcessorFactory(logger, ct, f), options))
)(cancellationToken);
=> RunAsync(host, _ => logger, options, cancellationToken, serviceEventSource, serviceFabricPartition, f);

public static Task RunAsync(
this EventProcessorHost host,
Func<string, ILogger> loggerFactory,
EventProcessorOptions options,
CancellationToken cancellationToken,
Action<string, object[]> serviceEventSource,
string serviceFabricPartition,
Func<string, Func<EventContext, Task>> f)
{
var logger = loggerFactory($"{host.HostName}.{nameof(RunAsync)}.{serviceFabricPartition}");
return Composition.Combine(
Features.Execution(logger, serviceEventSource, nameof(ReceiverService), serviceFabricPartition),
Features.ReceiverExceptions(logger, serviceFabricPartition),
Features.Run(ct => host.RegisterEventProcessorFactoryAsync(new EventProcessorFactory(loggerFactory, ct, f), options))
)(cancellationToken);
}
}
}
4 changes: 2 additions & 2 deletions CMP.ServiceFabricReceiver.Stateless/ReceiverExceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public static async Task ExecuteAsync(
{
logger.LogInformation($"Execution.ExecuteAsync error 1 {e.GetType().Name}");
logger.LogInformation(
"Receiver disconnected on partition {PartitionId}. " +
"Receiver disconnected on partition {ServiceFabricPartitionId}. " +
"Exception: {@Exception}, IsCancellationRequested: {IsCancellationRequested}",
partitionId, e, cancellationToken.IsCancellationRequested);
//await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken);
Expand All @@ -35,7 +35,7 @@ public static async Task ExecuteAsync(
{
logger.LogInformation($"Execution.ExecuteAsync error 2 {e.GetType().Name}");
logger.LogInformation(
"Lease lost on partition {PartitionId}. " +
"Lease lost on partition {ServiceFabricPartitionId}. " +
"Exception: {@Exception}, IsCancellationRequested: {IsCancellationRequested}",
partitionId, e, cancellationToken.IsCancellationRequested);
//await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken);
Expand Down
11 changes: 7 additions & 4 deletions CMP.ServiceFabricReceiver.Stateless/ReceiverService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace CMP.ServiceFabricRecevier.Stateless
{
public class ReceiverService : StatelessService
{
private readonly Func<string,ILogger> _loggerFactory;
private readonly ILogger _logger;
private readonly ReceiverSettings _settings;
private readonly Action<string, object[]> _serviceEventSource;
Expand All @@ -21,15 +22,15 @@ public class ReceiverService : StatelessService

public ReceiverService(
StatelessServiceContext serviceContext,
ILogger logger,
Func<string,ILogger> loggerFactory,
ReceiverSettings settings,
Action<string, object[]> serviceEventSource,
Func<CancellationToken, Task> @switch,
Func<string, Func<EventContext, Task>> f,
EventProcessorOptions options)
: base(serviceContext)
{
_logger = logger;
_loggerFactory = loggerFactory;
_settings = settings;
_serviceEventSource = serviceEventSource;
_f = f;
Expand All @@ -43,6 +44,8 @@ public ReceiverService(
_settings.EventHubConnectionString,
_settings.StorageConnectionString,
_settings.LeaseContainerName);

_logger = loggerFactory(nameof(ReceiverService));
}

protected override Task OnOpenAsync(CancellationToken cancellationToken)
Expand All @@ -62,11 +65,11 @@ protected override async Task RunAsync(CancellationToken cancellationToken)
try
{
await Execution.ExecuteAsync(cancellationToken, _logger, _serviceEventSource, nameof(ReceiverService), Context.PartitionId.ToString(), _switch);
await _host.RunAsync(_logger, _options, cancellationToken, _serviceEventSource, Context.PartitionId.ToString(), _f);
await _host.RunAsync(_loggerFactory, _options, cancellationToken, _serviceEventSource, Context.PartitionId.ToString(), _f);
}
catch (FabricTransientException e)
{
_logger.LogError(e, nameof(ReceiverService) + "Exception .RunAsync for {PartitionId}", Context.PartitionId);
_logger.LogError(e, nameof(ReceiverService) + "Exception .RunAsync for {ServiceFabricPartitionId}", Context.PartitionId);
}
}

Expand Down
23 changes: 20 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,24 @@ Note that the `Func<string, Func<IReadOnlyCollection<EventData>, CancellationTok
The real event handling function that is returned in the end, e.g. `(events, ct) => EventHandler.Handle(events.ToArray())`, will be created every time a batch of events comes in from EventHubs. This means that **no state** is maintained between the differnet executions of the function.





### Monitoring

With the feature "OperationLogging" that enablas a custom request log to application insights, monitoring could be done using the following AI query.

```sql
requests
| where cloud_RoleName == 'name of receiver'
| where success == true
| summarize sum(todouble(customDimensions.EventCount)) by bin(timestamp, 10m), tostring(customDimensions.EventHubPartitionId)
| render timechart"
```

If the request log is found to "noicy", traces could also be used.

```sql
traces
| where cloud_RoleName == 'name of receiver'
| summarize sum(todouble(customDimensions.EventCount)) by bin(timestamp, 10m), tostring(customDimensions.EventHubPartitionId)
| render timechart
```

16 changes: 9 additions & 7 deletions samples/Stateless1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ private static void Main(string[] args)
.Enrich.FromLogContext()
.CreateLogger();

var logger = new SerilogLoggerProvider(Log.Logger, true)
Microsoft.Extensions.Logging.ILogger loggerFactory(string category) =>
new SerilogLoggerProvider(Log.Logger, true)
.CreateLogger("Stateless-Sample");
var logger = loggerFactory("Program");

var storageAccount = CloudStorageAccount.DevelopmentStorageAccount;
var table = storageAccount.CreateCloudTableClient().GetTableReference("receiversample");
Expand All @@ -60,7 +62,7 @@ private static void Main(string[] args)
{
InitialOffsetProvider = partition =>
{
logger.LogWarning("InitialOffsetProvider called for {partition}", partition);
logger.LogWarning("InitialOffsetProvider called for {EventHubPartitionId}", partition);
return EventPosition.FromStart();
}
};
Expand All @@ -79,10 +81,10 @@ private static void Main(string[] args)
if (!isInCluster)
{
logger.LogInformation("Running in Process. Application insights key set : {instrumentationKeySet}", string.IsNullOrWhiteSpace(telemetryClient.InstrumentationKey));
settings.ToHost()
.RunAsync(logger, options, CancellationToken.None, (s, o) => { }, "none", partitionId => ctx => pipeline(ctx))
.GetAwaiter()
.GetResult();
settings.ToHost()
.RunAsync(loggerFactory, options, CancellationToken.None, (s, o) => { }, "none", partitionId => ctx => pipeline(ctx))
.GetAwaiter()
.GetResult();

Thread.Sleep(Timeout.Infinite);
}
Expand All @@ -94,7 +96,7 @@ private static void Main(string[] args)
context =>
new SampleService(
context,
logger,
loggerFactory,
settings,
ServiceEventSource.Current.Message,
ct => Task.CompletedTask,
Expand Down
4 changes: 2 additions & 2 deletions samples/Stateless1/SampleService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ namespace Stateless1
public class SampleService : ReceiverService
{
public SampleService(StatelessServiceContext serviceContext,
ILogger logger, ReceiverSettings settings,
Func<string, ILogger> loggerFactory, ReceiverSettings settings,
Action<string, object[]> serviceEventSource,
Func<CancellationToken, Task> @switch,
Func<string, Func<EventContext, Task>> f,
EventProcessorOptions options)
: base(serviceContext, logger, settings, serviceEventSource, @switch, f, options)
: base(serviceContext, loggerFactory, settings, serviceEventSource, @switch, f, options)
{
}
}
Expand Down