Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/KafkaFlow.Abstractions/Producers/IDeliveryReportFlow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
namespace KafkaFlow;

/// <summary>
/// Transfer interface for the DeliveryReport in Confluent.Kafka
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
public interface IDeliveryReportFlow<out TKey, out TValue> : IDeliveryResultFlow<TKey, TValue>
Comment thread
Zananok marked this conversation as resolved.
{
string Topic { get; }

int Partition { get; }

long Offset { get; }

IError Error { get; }

/// <summary>
/// Unused
/// </summary>
TKey Key { get; }

/// <summary>
/// Unused
/// </summary>
TValue Value { get; }
}
29 changes: 29 additions & 0 deletions src/KafkaFlow.Abstractions/Producers/IDeliveryResultFlow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using System;

namespace KafkaFlow;

/// <summary>
/// Transfer interface for the DeliveryResult in Confluent.Kafka
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
public interface IDeliveryResultFlow<out TKey, out TValue>
{
string Topic { get; }

int Partition { get; }

long Offset { get; }

object Status { get; }

object Message { get; }

TKey Key { get; }

TValue Value { get; }

DateTime Timestamp { get; }

object Headers { get; }
}
15 changes: 15 additions & 0 deletions src/KafkaFlow.Abstractions/Producers/IError.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace KafkaFlow;

/// <summary>
/// Transfer interface for the Error in Confluent.Kafka
/// </summary>
public interface IError
{
public bool IsError { get; }

public string Reason { get; }

public bool IsFatal { get; }

public object Code { get; }
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Threading.Tasks;
using Confluent.Kafka;

namespace KafkaFlow;

Expand Down Expand Up @@ -31,7 +30,7 @@ public interface IMessageProducer
/// <param name="headers">The message headers</param>
/// <param name="partition">The partition where the message will be produced, if no partition is provided it will be calculated using the message key</param>
/// <returns></returns>
Task<DeliveryResult<byte[], byte[]>> ProduceAsync(
Task<IDeliveryResultFlow<byte[], byte[]>> ProduceAsync(
string topic,
object messageKey,
object messageValue,
Expand All @@ -46,7 +45,7 @@ Task<DeliveryResult<byte[], byte[]>> ProduceAsync(
/// <param name="headers">The message headers</param>
/// <param name="partition">The partition where the message will be produced, if no partition is provided it will be calculated using the message key</param>
/// <returns></returns>
Task<DeliveryResult<byte[], byte[]>> ProduceAsync(
Task<IDeliveryResultFlow<byte[], byte[]>> ProduceAsync(
object messageKey,
object messageValue,
IMessageHeaders headers = null,
Expand All @@ -67,7 +66,7 @@ void Produce(
object messageKey,
object messageValue,
IMessageHeaders headers = null,
Action<DeliveryReport<byte[], byte[]>> deliveryHandler = null,
Action<IDeliveryReportFlow<byte[], byte[]>> deliveryHandler = null,
int? partition = null);

/// <summary>
Expand All @@ -77,12 +76,12 @@ void Produce(
/// <param name="messageKey">The message key</param>
/// <param name="messageValue">The message value</param>
/// <param name="headers">The message headers</param>
/// <param name="deliveryHandler">A handler with the operation result</param>
/// <param name="deliveryReportHandler">A handler with the operation result</param>
/// <param name="partition">The partition where the message will be produced, if no partition is provided it will be calculated using the message key</param>
void Produce(
object messageKey,
object messageValue,
IMessageHeaders headers = null,
Action<DeliveryReport<byte[], byte[]>> deliveryHandler = null,
Action<IDeliveryReportFlow<byte[], byte[]>> deliveryReportHandler = null,
int? partition = null);
}
6 changes: 3 additions & 3 deletions src/KafkaFlow/Producers/BatchProduceExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ public static Task<IReadOnlyCollection<BatchProduceItem>> BatchProduceAsync(
item.MessageKey,
item.MessageValue,
item.Headers,
report =>
deliveryReportFlow =>
{
item.DeliveryReport = report;
item.DeliveryReport = deliveryReportFlow.ToDeliveryReport();

if (report.Error.IsError)
if (deliveryReportFlow.Error.IsError)
{
hasErrors = true;
}
Expand Down
28 changes: 28 additions & 0 deletions src/KafkaFlow/Producers/DeliveryReportExtension.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using Confluent.Kafka;
using KafkaFlow.Producers;

namespace KafkaFlow;

/// <summary>
/// No needed
/// </summary>
public static class DeliveryReportExtension
{
/// <summary>
/// Converts a Confluent.Kafka delivery report to a KafkaFlow delivery report.
/// </summary>
/// <param name="report"></param>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <returns></returns>
public static IDeliveryReportFlow<TKey, TValue> ToIDeliveryReportFlow<TKey, TValue>(this DeliveryReport<TKey, TValue> report)
{
return new DeliveryReportFlow<TKey, TValue>
{
Topic = report.Topic,
Partition = report.Partition,
Offset = report.Offset,
Error = report.Error.ToIError(),
};
}
}
34 changes: 34 additions & 0 deletions src/KafkaFlow/Producers/DeliveryResultExtension.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using System;
using Confluent.Kafka;

namespace KafkaFlow.Producers;

/// <summary>
/// No needed
/// </summary>
public static class DeliveryResultExtension
{
/// <summary>
/// Converts a KafkaFlow delivery result to a Confluent.Kafka delivery result.
/// </summary>
/// <param name="deliveryResult"></param>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <returns></returns>
public static IDeliveryResultFlow<TKey, TValue> ToIDeliveryResultFlow<TKey, TValue>(
this DeliveryResult<TKey, TValue> deliveryResult)
{
return new DeliveryResultFlow<TKey, TValue>
{
Topic = deliveryResult.Topic,
Partition = deliveryResult.Partition,
Offset = deliveryResult.Offset,
Status = deliveryResult.Status,
Message = deliveryResult.Message,
Key = deliveryResult.Key,
Value = deliveryResult.Value,
Timestamp = deliveryResult.Timestamp.UtcDateTime,
Headers = deliveryResult.Headers,
};
}
}
19 changes: 19 additions & 0 deletions src/KafkaFlow/Producers/ErrorExtension.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using Confluent.Kafka;

namespace KafkaFlow.Producers;

/// <summary>
/// No needed
/// </summary>
public static class ErrorExtension
{
internal static Error ToError(this IError report) => new((ErrorCode)report.Code, report.Reason, report.IsFatal);

internal static IError ToIError(this Error report) => new ErrorFlow
{
Code = report.Code,
IsError = (int)report.Code != 0,
Reason = report.Reason,
IsFatal = report.IsFatal,
};
}
28 changes: 28 additions & 0 deletions src/KafkaFlow/Producers/IDeliveryReportFlowExtension.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using Confluent.Kafka;
using KafkaFlow.Producers;

namespace KafkaFlow;

/// <summary>
/// No needed
/// </summary>
public static class IDeliveryReportFlowExtension
{
/// <summary>
/// Converts a Confluent.Kafka delivery report to a KafkaFlow delivery report.
/// </summary>
/// <param name="report"></param>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <returns></returns>
public static DeliveryReport<TKey, TValue> ToDeliveryReport<TKey, TValue>(this IDeliveryReportFlow<TKey, TValue> report)
{
return new DeliveryReport<TKey, TValue>
{
Topic = report.Topic,
Partition = report.Partition,
Offset = report.Offset,
Error = report.Error.ToError(),
};
}
}
40 changes: 40 additions & 0 deletions src/KafkaFlow/Producers/IDeliveryResultFlowExtension.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using System;
using Confluent.Kafka;

namespace KafkaFlow.Producers;

/// <summary>
/// No needed
/// </summary>
public static class IDeliveryResultFlowExtension
{
/// <summary>
/// Converts a Confluent.Kafka delivery result to a KafkaFlow delivery result.
/// </summary>
/// <param name="deliveryResult"></param>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <returns></returns>
/// <exception cref="ArgumentNullException"></exception>
public static DeliveryResult<TKey, TValue> ToDeliveryResult<TKey, TValue>(
this IDeliveryResultFlow<TKey, TValue> deliveryResult)
{
if (deliveryResult is null)
{
throw new ArgumentNullException(nameof(deliveryResult));
}

return new DeliveryResult<TKey, TValue>
{
Topic = deliveryResult.Topic,
Partition = deliveryResult.Partition,
Offset = deliveryResult.Offset,
Status = (PersistenceStatus)deliveryResult.Status,
Message = (Message<TKey, TValue>)deliveryResult.Message,
Key = deliveryResult.Key,
Value = deliveryResult.Value,
Timestamp = new Timestamp(deliveryResult.Timestamp),
Headers = (Headers)deliveryResult.Headers,
};
}
}
20 changes: 10 additions & 10 deletions src/KafkaFlow/Producers/MessageProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ public MessageProducer(

public string ProducerName => _configuration.Name;

public async Task<DeliveryResult<byte[], byte[]>> ProduceAsync(
public async Task<IDeliveryResultFlow<byte[], byte[]>> ProduceAsync(
string topic,
object messageKey,
object messageValue,
IMessageHeaders headers = null,
int? partition = null)
{
DeliveryResult<byte[], byte[]> report = null;
DeliveryResult<byte[], byte[]> deliveryResult = null;

using var messageScope = _producerDependencyScope.Resolver.CreateScope();

Expand All @@ -59,7 +59,7 @@ await _middlewareExecutor
messageContext,
async context =>
{
report = await this
deliveryResult = await this
.InternalProduceAsync(context, partition)
.ConfigureAwait(false);
})
Expand All @@ -73,10 +73,10 @@ await _middlewareExecutor
throw;
}

return report;
return deliveryResult.ToIDeliveryResultFlow();
}

public Task<DeliveryResult<byte[], byte[]>> ProduceAsync(
public Task<IDeliveryResultFlow<byte[], byte[]>> ProduceAsync(
object messageKey,
object messageValue,
IMessageHeaders headers = null,
Expand All @@ -101,7 +101,7 @@ public void Produce(
object messageKey,
object messageValue,
IMessageHeaders headers = null,
Action<DeliveryReport<byte[], byte[]>> deliveryHandler = null,
Action<IDeliveryReportFlow<byte[], byte[]>> deliveryHandler = null,
int? partition = null)
{
var messageScope = _producerDependencyScope.Resolver.CreateScope();
Expand Down Expand Up @@ -136,7 +136,7 @@ public void Produce(
completionSource.SetResult(0);
}

deliveryHandler?.Invoke(report);
deliveryHandler?.Invoke(report.ToIDeliveryReportFlow());
});

return completionSource.Task;
Expand All @@ -152,7 +152,7 @@ public void Produce(
Error = new Error(ErrorCode.Local_Fail, task.Exception?.Message),
Status = PersistenceStatus.NotPersisted,
Topic = topic,
});
}.ToIDeliveryReportFlow());
}

messageScope.Dispose();
Expand All @@ -165,7 +165,7 @@ public void Produce(
object messageKey,
object messageValue,
IMessageHeaders headers = null,
Action<DeliveryReport<byte[], byte[]>> deliveryHandler = null,
Action<IDeliveryReportFlow<byte[], byte[]>> deliveryReportHandler = null,
int? partition = null)
{
if (string.IsNullOrWhiteSpace(_configuration.DefaultTopic))
Expand All @@ -179,7 +179,7 @@ public void Produce(
messageKey,
messageValue,
headers,
deliveryHandler,
deliveryReportHandler,
partition);
}

Expand Down
Loading