diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java index 9a87752e2888a..0e65f06799a29 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.internals.WindowedSerializer; import org.apache.kafka.streams.state.internals.SessionKeySchema; import org.slf4j.Logger; @@ -31,7 +30,7 @@ import java.util.Map; -public class SessionWindowedSerializer implements WindowedSerializer { +public class SessionWindowedSerializer implements Serializer> { /** * Default serializer for the inner serializer class of a windowed record. Must implement the {@link Serde} interface. @@ -110,12 +109,10 @@ public void close() { } } - @Override public byte[] serializeBaseKey(final String topic, final Windowed data) { return serializeBaseKey(topic, new RecordHeaders(), data); } - @Override public byte[] serializeBaseKey(final String topic, final Headers headers, final Windowed data) { WindowedSerdes.verifyInnerSerializerNotNull(inner, this); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java index 5eef64da78000..382384597cbd9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.internals.WindowedSerializer; import org.apache.kafka.streams.state.internals.WindowKeySchema; import org.slf4j.Logger; @@ -31,7 +30,7 @@ import java.util.Map; -public class TimeWindowedSerializer implements WindowedSerializer { +public class TimeWindowedSerializer implements Serializer> { /** * Default serializer for the inner serializer class of a windowed record. Must implement the {@link Serde} interface. @@ -111,12 +110,10 @@ public void close() { } } - @Override public byte[] serializeBaseKey(final String topic, final Windowed data) { return serializeBaseKey(topic, new RecordHeaders(), data); } - @Override public byte[] serializeBaseKey(final String topic, final Headers headers, final Windowed data) { WindowedSerdes.verifyInnerSerializerNotNull(inner, this); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java index 54735b98bb135..bee2f11f6ef77 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java @@ -18,6 +18,9 @@ import org.apache.kafka.clients.producer.internals.BuiltInPartitioner; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.kstream.SessionWindowedSerializer; +import org.apache.kafka.streams.kstream.TimeWindowedSerializer; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.StreamPartitioner; @@ -27,9 +30,9 @@ public class WindowedStreamPartitioner implements StreamPartitioner, V> { - private final WindowedSerializer serializer; + private final Serializer> serializer; - public WindowedStreamPartitioner(final WindowedSerializer serializer) { + public WindowedStreamPartitioner(final Serializer> serializer) { this.serializer = serializer; } @@ -46,8 +49,17 @@ public WindowedStreamPartitioner(final WindowedSerializer serializer) { */ @Override public Optional> partitions(final String topic, final Windowed windowedKey, final V value, final int numPartitions) { + + final byte[] keyBytes; // for windowed key, the key bytes should never be null - final byte[] keyBytes = serializer.serializeBaseKey(topic, new RecordHeaders(), windowedKey); + if (serializer instanceof SessionWindowedSerializer) { + keyBytes = ((SessionWindowedSerializer) serializer).serializeBaseKey(topic, new RecordHeaders(), windowedKey); + } else if (serializer instanceof TimeWindowedSerializer) { + keyBytes = ((TimeWindowedSerializer) serializer).serializeBaseKey(topic, new RecordHeaders(), windowedKey); + } else { + throw new IllegalStateException("WindowedStreamPartitioner requires SessionWindowedSerializer or TimeWindowedSerializer, " + + "but got: " + serializer.getClass().getName()); + } // stick with the same built-in partitioner util functions that producer used // to make sure its behavior is consistent with the producer diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java index f12a9e5b9bf2c..5687df8e32ae9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java @@ -18,8 +18,10 @@ package org.apache.kafka.streams.kstream.internals.graph; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.kstream.SessionWindowedSerializer; +import org.apache.kafka.streams.kstream.TimeWindowedSerializer; +import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.ProducedInternal; -import org.apache.kafka.streams.kstream.internals.WindowedSerializer; import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TopicNameExtractor; @@ -58,8 +60,8 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { final String[] parentNames = parentNodeNames(); final StreamPartitioner partitioner; - if (producedInternal.streamPartitioner() == null && keySerializer instanceof WindowedSerializer) { - partitioner = (StreamPartitioner) new WindowedStreamPartitioner((WindowedSerializer) keySerializer); + if (producedInternal.streamPartitioner() == null && (keySerializer instanceof SessionWindowedSerializer || keySerializer instanceof TimeWindowedSerializer)) { + partitioner = (StreamPartitioner) new WindowedStreamPartitioner((Serializer>) keySerializer); } else { partitioner = producedInternal.streamPartitioner(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java index 25a15efdd4c9d..e1ec0cf004d3e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.kstream.TimeWindowedSerializer; import org.apache.kafka.streams.kstream.Windowed; @@ -59,7 +60,7 @@ public class WindowedStreamPartitionerTest { @Test public void testCopartitioning() { final Random rand = new Random(); - final WindowedSerializer timeWindowedSerializer = new TimeWindowedSerializer<>(intSerializer); + final Serializer> timeWindowedSerializer = new TimeWindowedSerializer<>(intSerializer); final WindowedStreamPartitioner streamPartitioner = new WindowedStreamPartitioner<>(timeWindowedSerializer); for (int k = 0; k < 10; k++) {