Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions tests/instrumentation/kafka_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,12 @@ def producer(topics):

@pytest.fixture()
def consumer(topics):
consumer = KafkaConsumer(bootstrap_servers=f"{KAFKA_HOST}:9092", consumer_timeout_ms=500)
consumer = KafkaConsumer(
bootstrap_servers=f"{KAFKA_HOST}:9092",
consumer_timeout_ms=500,
# Unique topics can be produced to before the consumer's initial offset is resolved.
auto_offset_reset="earliest",
)
consumer.subscribe(topics=topics)
deadline = time.time() + 5
while not consumer.assignment() and time.time() < deadline:
Expand Down Expand Up @@ -127,20 +132,28 @@ def delayed_send():

thread = threading.Thread(target=delayed_send)
thread.start()
# Give Kafka enough time to resolve offsets and deliver both delayed records before iteration stops.
consumer.config["consumer_timeout_ms"] = 2000
for item in consumer:
with elasticapm.capture_span("foo"):
pass
thread.join()
transactions = elasticapm_client.events[TRANSACTION]
spans = elasticapm_client.events[SPAN]
producer_transaction = next(transaction for transaction in transactions if transaction["name"] == "foo")
consumer_transactions = [transaction for transaction in transactions if transaction["type"] == "messaging"]
consumer_spans = [span for span in spans if span["name"] == "foo"]
# the consumer transactions should have the same trace id as the transaction that triggered the messages
assert transactions[0]["trace_id"] == transactions[1]["trace_id"] == transactions[2]["trace_id"]
assert transactions[1]["name"] == f"Kafka RECEIVE from {test_topic}"
assert transactions[1]["type"] == "messaging"
assert transactions[1]["context"]["message"]["queue"]["name"] == test_topic
assert len(consumer_transactions) == 2
assert len(consumer_spans) == 2
assert (
producer_transaction["trace_id"] == consumer_transactions[0]["trace_id"] == consumer_transactions[1]["trace_id"]
)
assert consumer_transactions[0]["name"] == f"Kafka RECEIVE from {test_topic}"
assert consumer_transactions[0]["context"]["message"]["queue"]["name"] == test_topic

assert spans[2]["transaction_id"] == transactions[1]["id"]
assert spans[3]["transaction_id"] == transactions[2]["id"]
assert consumer_spans[0]["transaction_id"] == consumer_transactions[0]["id"]
assert consumer_spans[1]["transaction_id"] == consumer_transactions[1]["id"]


def test_kafka_consume_ongoing_transaction(instrument, elasticapm_client, producer, consumer, topics):
Expand All @@ -157,6 +170,8 @@ def delayed_send():
thread = threading.Thread(target=delayed_send)
thread.start()
transaction = elasticapm_client.begin_transaction("foo")
# Give Kafka enough time to resolve offsets and deliver both delayed records before iteration stops.
consumer.config["consumer_timeout_ms"] = 2000
for item in consumer:
pass
thread.join()
Expand Down Expand Up @@ -184,6 +199,8 @@ def delayed_send():

thread = threading.Thread(target=delayed_send)
thread.start()
# Give Kafka enough time to resolve offsets and deliver all delayed records before iteration stops.
consumer.config["consumer_timeout_ms"] = 2000
for item in consumer:
with elasticapm.capture_span("test"):
assert item
Expand Down
Loading