Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,25 @@ public interface EventProducer<K, V> {
*/
void send(K key, V value, long timestamp);

/**
* Sends data to underlying sink asynchronously with a callback to handle completion or errors.
*
* @param key message key
* @param value message value/payload
* @param callback callback to be invoked when the send completes or fails
*/
void send(K key, V value, SendCallback callback);

/**
* Sends data to underlying sink asynchronously with a callback to handle completion or errors.
*
* @param key message key
* @param value message value/payload
* @param timestamp time stamp to be used for the event
* @param callback callback to be invoked when the send completes or fails
*/
void send(K key, V value, long timestamp, SendCallback callback);

/**
* Sends data to underlying sink, async by default unless sync=true in the init configs
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.hypertrace.core.eventstore;

/**
* A callback interface that allows clients to receive notifications when a send operation
* completes. This can be used to handle both successful sends and errors asynchronously.
*/
public interface SendCallback {

/**
* Called when a send operation completes.
*
* @param result The result of the send operation if successful, null if an exception occurred
* @param exception The exception thrown during the send operation, null if the send was
* successful
*/
void onCompletion(SendResult result, Exception exception);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.hypertrace.core.eventstore;

/**
* Represents the result of a successful send operation. This class abstracts away the underlying
* implementation details (e.g., Kafka's RecordMetadata) and provides a common interface for
* clients.
*/
public class SendResult {

private final String topic;
private final int partition;
private final long offset;
private final long timestamp;

public SendResult(String topic, int partition, long offset, long timestamp) {
this.topic = topic;
this.partition = partition;
this.offset = offset;
this.timestamp = timestamp;
}

/** Returns the topic the record was appended to. */
public String getTopic() {
return topic;
}

/** Returns the partition the record was sent to. */
public int getPartition() {
return partition;
}

/** Returns the offset of the record in the topic/partition. */
public long getOffset() {
return offset;
}

/** Returns the timestamp of the record in the topic/partition. */
public long getTimestamp() {
return timestamp;
}

@Override
public String toString() {
return "SendResult{"
+ "topic='"
+ topic
+ '\''
+ ", partition="
+ partition
+ ", offset="
+ offset
+ ", timestamp="
+ timestamp
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.hypertrace.core.eventstore.EventProducer;
import org.hypertrace.core.eventstore.EventProducerConfig;
import org.hypertrace.core.eventstore.KeyValuePair;
import org.hypertrace.core.eventstore.SendCallback;
import org.hypertrace.core.eventstore.SendResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -48,6 +50,38 @@ public void send(K key, V value, long timestamp) {
producer.send(new ProducerRecord<>(this.topic, null, timestamp, key, value));
}

@Override
public void send(K key, V value, SendCallback callback) {
producer.send(
new ProducerRecord<>(this.topic, key, value),
(metadata, exception) ->
callback.onCompletion(
exception == null
? new SendResult(
metadata.topic(),
metadata.partition(),
metadata.offset(),
metadata.timestamp())
: null,
exception));
}

@Override
public void send(K key, V value, long timestamp, SendCallback callback) {
producer.send(
new ProducerRecord<>(this.topic, null, timestamp, key, value),
(metadata, exception) ->
callback.onCompletion(
exception == null
? new SendResult(
metadata.topic(),
metadata.partition(),
metadata.offset(),
metadata.timestamp())
: null,
exception));
}

@Override
public void batchSend(List<KeyValuePair<K, V>> events) {
events.forEach(entry -> this.send(entry.getKey(), entry.getValue()));
Expand Down
Loading