Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,14 @@
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
import org.apache.kafka.streams.state.internals.SessionKeySchema;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class SessionWindowedSerializer<T> implements WindowedSerializer<T> {
public class SessionWindowedSerializer<T> implements Serializer<Windowed<T>> {

/**
* Default serializer for the inner serializer class of a windowed record. Must implement the {@link Serde} interface.
Expand Down Expand Up @@ -110,12 +109,10 @@ public void close() {
}
}

@Override
public byte[] serializeBaseKey(final String topic, final Windowed<T> data) {
return serializeBaseKey(topic, new RecordHeaders(), data);
}

@Override
public byte[] serializeBaseKey(final String topic, final Headers headers, final Windowed<T> data) {
WindowedSerdes.verifyInnerSerializerNotNull(inner, this);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,14 @@
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
import org.apache.kafka.streams.state.internals.WindowKeySchema;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class TimeWindowedSerializer<T> implements WindowedSerializer<T> {
public class TimeWindowedSerializer<T> implements Serializer<Windowed<T>> {

/**
* Default serializer for the inner serializer class of a windowed record. Must implement the {@link Serde} interface.
Expand Down Expand Up @@ -111,12 +110,10 @@ public void close() {
}
}

@Override
public byte[] serializeBaseKey(final String topic, final Windowed<T> data) {
return serializeBaseKey(topic, new RecordHeaders(), data);
}

@Override
public byte[] serializeBaseKey(final String topic, final Headers headers, final Windowed<T> data) {
WindowedSerdes.verifyInnerSerializerNotNull(inner, this);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

import org.apache.kafka.clients.producer.internals.BuiltInPartitioner;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.SessionWindowedSerializer;
import org.apache.kafka.streams.kstream.TimeWindowedSerializer;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StreamPartitioner;

Expand All @@ -27,9 +30,9 @@

public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Windowed<K>, V> {

private final WindowedSerializer<K> serializer;
private final Serializer<Windowed<K>> serializer;

public WindowedStreamPartitioner(final WindowedSerializer<K> serializer) {
public WindowedStreamPartitioner(final Serializer<Windowed<K>> serializer) {
this.serializer = serializer;
}

Expand All @@ -46,8 +49,17 @@ public WindowedStreamPartitioner(final WindowedSerializer<K> serializer) {
*/
@Override
public Optional<Set<Integer>> partitions(final String topic, final Windowed<K> windowedKey, final V value, final int numPartitions) {

final byte[] keyBytes;
// for windowed key, the key bytes should never be null
final byte[] keyBytes = serializer.serializeBaseKey(topic, new RecordHeaders(), windowedKey);
if (serializer instanceof SessionWindowedSerializer) {
keyBytes = ((SessionWindowedSerializer<K>) serializer).serializeBaseKey(topic, new RecordHeaders(), windowedKey);
} else if (serializer instanceof TimeWindowedSerializer) {
keyBytes = ((TimeWindowedSerializer<K>) serializer).serializeBaseKey(topic, new RecordHeaders(), windowedKey);
} else {
throw new IllegalStateException("WindowedStreamPartitioner requires SessionWindowedSerializer or TimeWindowedSerializer, " +
"but got: " + serializer.getClass().getName());
}

// stick with the same built-in partitioner util functions that producer used
// to make sure its behavior is consistent with the producer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.kafka.streams.kstream.internals.graph;

import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.SessionWindowedSerializer;
import org.apache.kafka.streams.kstream.TimeWindowedSerializer;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.ProducedInternal;
import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopicNameExtractor;
Expand Down Expand Up @@ -58,8 +60,8 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
final String[] parentNames = parentNodeNames();

final StreamPartitioner<? super K, ? super V> partitioner;
if (producedInternal.streamPartitioner() == null && keySerializer instanceof WindowedSerializer) {
partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<K, V>((WindowedSerializer<K>) keySerializer);
if (producedInternal.streamPartitioner() == null && (keySerializer instanceof SessionWindowedSerializer || keySerializer instanceof TimeWindowedSerializer)) {
partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<K, V>((Serializer<Windowed<K>>) keySerializer);
} else {
partitioner = producedInternal.streamPartitioner();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.kstream.TimeWindowedSerializer;
import org.apache.kafka.streams.kstream.Windowed;
Expand Down Expand Up @@ -59,7 +60,7 @@ public class WindowedStreamPartitionerTest {
@Test
public void testCopartitioning() {
final Random rand = new Random();
final WindowedSerializer<Integer> timeWindowedSerializer = new TimeWindowedSerializer<>(intSerializer);
final Serializer<Windowed<Integer>> timeWindowedSerializer = new TimeWindowedSerializer<>(intSerializer);
final WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(timeWindowedSerializer);

for (int k = 0; k < 10; k++) {
Expand Down