diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index 445d477..01246c6 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -17,15 +17,22 @@ # under the License. # +# pylint: disable=no-name-in-module,c-extension-no-member,protected-access + """ The Pulsar Python client APIs that work with the asyncio module. """ import asyncio import functools -from typing import Any +from typing import Any, List, Union import _pulsar +from _pulsar import ( + InitialPosition, + RegexSubscriptionMode, + ConsumerCryptoFailureAction, +) import pulsar class PulsarException(BaseException): @@ -61,7 +68,7 @@ class Producer: The Pulsar message producer, used to publish messages on a topic. """ - def __init__(self, producer: _pulsar.Producer) -> None: + def __init__(self, producer: _pulsar.Producer, schema: pulsar.schema.Schema) -> None: """ Create the producer. Users should not call this constructor directly. Instead, create the @@ -71,17 +78,21 @@ def __init__(self, producer: _pulsar.Producer) -> None: ---------- producer: _pulsar.Producer The underlying Producer object from the C extension. + schema: pulsar.schema.Schema + The schema of the data that will be sent by this producer. """ - self._producer: _pulsar.Producer = producer + self._producer = producer + self._schema = schema - async def send(self, content: bytes) -> pulsar.MessageId: + async def send(self, content: Any) -> pulsar.MessageId: """ Send a message asynchronously. parameters ---------- - content: bytes - The message payload + content: Any + The message payload, whose type should respect the schema defined in + `Client.create_producer`. Returns ------- @@ -93,7 +104,7 @@ async def send(self, content: bytes) -> pulsar.MessageId: PulsarException """ builder = _pulsar.MessageBuilder() - builder.content(content) + builder.content(self._schema.encode(content)) future = asyncio.get_running_loop().create_future() self._producer.send_async(builder.build(), functools.partial(_set_future, future)) msg_id = await future @@ -116,6 +127,177 @@ async def close(self) -> None: self._producer.close_async(functools.partial(_set_future, future, value=None)) await future +class Consumer: + """ + The Pulsar message consumer, used to subscribe to messages from a topic. + """ + + def __init__(self, consumer: _pulsar.Consumer, schema: pulsar.schema.Schema) -> None: + """ + Create the consumer. + Users should not call this constructor directly. Instead, create the + consumer via `Client.subscribe`. + + Parameters + ---------- + consumer: _pulsar.Consumer + The underlying Consumer object from the C extension. + schema: pulsar.schema.Schema + The schema of the data that will be received by this consumer. + """ + self._consumer = consumer + self._schema = schema + + async def receive(self) -> pulsar.Message: + """ + Receive a single message asynchronously. + + Returns + ------- + pulsar.Message + The message received. + + Raises + ------ + PulsarException + """ + future = asyncio.get_running_loop().create_future() + self._consumer.receive_async(functools.partial(_set_future, future)) + msg = await future + m = pulsar.Message() + m._message = msg + m._schema = self._schema + return m + + async def acknowledge( + self, + message: Union[pulsar.Message, pulsar.MessageId, + _pulsar.Message, _pulsar.MessageId] + ) -> None: + """ + Acknowledge the reception of a single message asynchronously. + + Parameters + ---------- + message : Message, MessageId, _pulsar.Message, _pulsar.MessageId + The received message or message id. + + Raises + ------ + PulsarException + """ + future = asyncio.get_running_loop().create_future() + if isinstance(message, pulsar.Message): + msg = message._message + elif isinstance(message, pulsar.MessageId): + msg = message._msg_id + else: + msg = message + self._consumer.acknowledge_async(msg, functools.partial(_set_future, future, value=None)) + await future + + async def acknowledge_cumulative( + self, + message: Union[pulsar.Message, pulsar.MessageId, + _pulsar.Message, _pulsar.MessageId] + ) -> None: + """ + Acknowledge the reception of all the messages in the stream up to (and + including) the provided message asynchronously. + + Parameters + ---------- + message : Message, MessageId, _pulsar.Message, _pulsar.MessageId + The received message or message id. + + Raises + ------ + PulsarException + """ + future = asyncio.get_running_loop().create_future() + if isinstance(message, pulsar.Message): + msg = message._message + elif isinstance(message, pulsar.MessageId): + msg = message._msg_id + else: + msg = message + self._consumer.acknowledge_cumulative_async( + msg, functools.partial(_set_future, future, value=None) + ) + await future + + async def unsubscribe(self) -> None: + """ + Unsubscribe the current consumer from the topic asynchronously. + + Raises + ------ + PulsarException + """ + future = asyncio.get_running_loop().create_future() + self._consumer.unsubscribe_async(functools.partial(_set_future, future, value=None)) + await future + + async def seek(self, messageid: Union[pulsar.MessageId, int]) -> None: + """ + Reset the subscription associated with this consumer to a specific + message id or publish timestamp asynchronously. + + The message id can either be a specific message or represent the first + or last messages in the topic. + + Parameters + ---------- + messageid : MessageId or int + The message id for seek, OR an integer event time (timestamp) to + seek to + + Raises + ------ + PulsarException + """ + future = asyncio.get_running_loop().create_future() + if isinstance(messageid, pulsar.MessageId): + msg_id = messageid._msg_id + elif isinstance(messageid, int): + msg_id = messageid + else: + raise ValueError(f"invalid messageid type {type(messageid)}") + self._consumer.seek_async( + msg_id, functools.partial(_set_future, future, value=None) + ) + await future + + async def close(self) -> None: + """ + Close the consumer asynchronously. + + Raises + ------ + PulsarException + """ + future = asyncio.get_running_loop().create_future() + self._consumer.close_async(functools.partial(_set_future, future, value=None)) + await future + + def topic(self) -> str: + """ + Return the topic this consumer is subscribed to. + """ + return self._consumer.topic() + + def subscription_name(self) -> str: + """ + Return the subscription name. + """ + return self._consumer.subscription_name() + + def consumer_name(self) -> str: + """ + Return the consumer name. + """ + return self._consumer.consumer_name() + class Client: """ The asynchronous version of `pulsar.Client`. @@ -127,7 +309,10 @@ def __init__(self, service_url, **kwargs) -> None: """ self._client: _pulsar.Client = pulsar.Client(service_url, **kwargs)._client - async def create_producer(self, topic: str) -> Producer: + # pylint: disable=too-many-arguments,too-many-locals,too-many-positional-arguments + async def create_producer(self, topic: str, + schema: pulsar.schema.Schema | None = None, + ) -> Producer: """ Create a new producer on a given topic @@ -135,6 +320,8 @@ async def create_producer(self, topic: str) -> Producer: ---------- topic: str The topic name + schema: pulsar.schema.Schema | None, default=None + Define the schema of the data that will be published by this producer. Returns ------- @@ -145,11 +332,199 @@ async def create_producer(self, topic: str) -> Producer: ------ PulsarException """ + if schema is None: + schema = pulsar.schema.BytesSchema() + schema.attach_client(self._client) + future = asyncio.get_running_loop().create_future() conf = _pulsar.ProducerConfiguration() - # TODO: add more configs - self._client.create_producer_async(topic, conf, functools.partial(_set_future, future)) - return Producer(await future) + conf.schema(schema.schema_info()) + + self._client.create_producer_async( + topic, conf, functools.partial(_set_future, future) + ) + return Producer(await future, schema) + + # pylint: disable=too-many-arguments,too-many-locals,too-many-branches,too-many-positional-arguments + async def subscribe(self, topic: Union[str, List[str]], + subscription_name: str, + consumer_type: pulsar.ConsumerType = + pulsar.ConsumerType.Exclusive, + schema: pulsar.schema.Schema | None = None, + receiver_queue_size: int = 1000, + max_total_receiver_queue_size_across_partitions: int = + 50000, + consumer_name: str | None = None, + unacked_messages_timeout_ms: int | None = None, + broker_consumer_stats_cache_time_ms: int = 30000, + negative_ack_redelivery_delay_ms: int = 60000, + is_read_compacted: bool = False, + properties: dict | None = None, + initial_position: InitialPosition = InitialPosition.Latest, + crypto_key_reader: pulsar.CryptoKeyReader | None = None, + replicate_subscription_state_enabled: bool = False, + max_pending_chunked_message: int = 10, + auto_ack_oldest_chunked_message_on_queue_full: bool = False, + start_message_id_inclusive: bool = False, + batch_receive_policy: pulsar.ConsumerBatchReceivePolicy | None = + None, + key_shared_policy: pulsar.ConsumerKeySharedPolicy | None = + None, + batch_index_ack_enabled: bool = False, + regex_subscription_mode: RegexSubscriptionMode = + RegexSubscriptionMode.PersistentOnly, + dead_letter_policy: pulsar.ConsumerDeadLetterPolicy | None = + None, + crypto_failure_action: ConsumerCryptoFailureAction = + ConsumerCryptoFailureAction.FAIL, + is_pattern_topic: bool = False) -> Consumer: + """ + Subscribe to the given topic and subscription combination. + + Parameters + ---------- + topic: str, List[str], or regex pattern + The name of the topic, list of topics or regex pattern. + When `is_pattern_topic` is True, `topic` is treated as a regex. + subscription_name: str + The name of the subscription. + consumer_type: pulsar.ConsumerType, default=pulsar.ConsumerType.Exclusive + Select the subscription type to be used when subscribing to the topic. + schema: pulsar.schema.Schema | None, default=None + Define the schema of the data that will be received by this consumer. + receiver_queue_size: int, default=1000 + Sets the size of the consumer receive queue. + max_total_receiver_queue_size_across_partitions: int, default=50000 + Set the max total receiver queue size across partitions. + consumer_name: str | None, default=None + Sets the consumer name. + unacked_messages_timeout_ms: int | None, default=None + Sets the timeout in milliseconds for unacknowledged messages. + broker_consumer_stats_cache_time_ms: int, default=30000 + Sets the time duration for which the broker-side consumer stats + will be cached in the client. + negative_ack_redelivery_delay_ms: int, default=60000 + The delay after which to redeliver the messages that failed to be + processed. + is_read_compacted: bool, default=False + Selects whether to read the compacted version of the topic. + properties: dict | None, default=None + Sets the properties for the consumer. + initial_position: InitialPosition, default=InitialPosition.Latest + Set the initial position of a consumer when subscribing to the topic. + crypto_key_reader: pulsar.CryptoKeyReader | None, default=None + Symmetric encryption class implementation. + replicate_subscription_state_enabled: bool, default=False + Set whether the subscription status should be replicated. + max_pending_chunked_message: int, default=10 + Consumer buffers chunk messages into memory until it receives all the chunks. + auto_ack_oldest_chunked_message_on_queue_full: bool, default=False + Automatically acknowledge oldest chunked messages on queue + full. + start_message_id_inclusive: bool, default=False + Set the consumer to include the given position of any reset + operation. + batch_receive_policy: pulsar.ConsumerBatchReceivePolicy | None, default=None + Set the batch collection policy for batch receiving. + key_shared_policy: pulsar.ConsumerKeySharedPolicy | None, default=None + Set the key shared policy for use when the ConsumerType is + KeyShared. + batch_index_ack_enabled: bool, default=False + Enable the batch index acknowledgement. + regex_subscription_mode: RegexSubscriptionMode, + default=RegexSubscriptionMode.PersistentOnly + Set the regex subscription mode for use when the topic is a regex + pattern. + dead_letter_policy: pulsar.ConsumerDeadLetterPolicy | None, default=None + Set dead letter policy for consumer. + crypto_failure_action: ConsumerCryptoFailureAction, + default=ConsumerCryptoFailureAction.FAIL + Set the behavior when the decryption fails. + is_pattern_topic: bool, default=False + Whether `topic` is a regex pattern. If it's True when `topic` is a list, a ValueError + will be raised. + + Returns + ------- + Consumer + The consumer created + + Raises + ------ + PulsarException + """ + if schema is None: + schema = pulsar.schema.BytesSchema() + + future = asyncio.get_running_loop().create_future() + conf = _pulsar.ConsumerConfiguration() + conf.consumer_type(consumer_type) + conf.regex_subscription_mode(regex_subscription_mode) + conf.read_compacted(is_read_compacted) + conf.receiver_queue_size(receiver_queue_size) + conf.max_total_receiver_queue_size_across_partitions( + max_total_receiver_queue_size_across_partitions + ) + if consumer_name: + conf.consumer_name(consumer_name) + if unacked_messages_timeout_ms: + conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms) + + conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms) + conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms) + if properties: + for k, v in properties.items(): + conf.property(k, v) + conf.subscription_initial_position(initial_position) + + conf.schema(schema.schema_info()) + + if crypto_key_reader: + conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) + + conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled) + conf.max_pending_chunked_message(max_pending_chunked_message) + conf.auto_ack_oldest_chunked_message_on_queue_full( + auto_ack_oldest_chunked_message_on_queue_full + ) + conf.start_message_id_inclusive(start_message_id_inclusive) + if batch_receive_policy: + conf.batch_receive_policy(batch_receive_policy.policy()) + + if key_shared_policy: + conf.key_shared_policy(key_shared_policy.policy()) + conf.batch_index_ack_enabled(batch_index_ack_enabled) + if dead_letter_policy: + conf.dead_letter_policy(dead_letter_policy.policy()) + conf.crypto_failure_action(crypto_failure_action) + + if isinstance(topic, str): + if is_pattern_topic: + self._client.subscribe_async_pattern( + topic, subscription_name, conf, + functools.partial(_set_future, future) + ) + else: + self._client.subscribe_async( + topic, subscription_name, conf, + functools.partial(_set_future, future) + ) + elif isinstance(topic, list): + if is_pattern_topic: + raise ValueError( + "Argument 'topic' must be a string when " + "'is_pattern_topic' is True; lists of topics do not " + "support pattern subscriptions" + ) + self._client.subscribe_async_topics( + topic, subscription_name, conf, + functools.partial(_set_future, future) + ) + else: + raise ValueError( "Argument 'topic' is expected to be of type 'str' or 'list'") + + schema.attach_client(self._client) + return Consumer(await future, schema) async def close(self) -> None: """ @@ -160,7 +535,9 @@ async def close(self) -> None: PulsarException """ future = asyncio.get_running_loop().create_future() - self._client.close_async(functools.partial(_set_future, future, value=None)) + self._client.close_async( + functools.partial(_set_future, future, value=None) + ) await future def _set_future(future: asyncio.Future, result: _pulsar.Result, value: Any): diff --git a/src/client.cc b/src/client.cc index 72c824f..64056df 100644 --- a/src/client.cc +++ b/src/client.cc @@ -80,6 +80,26 @@ void Client_closeAsync(Client& client, ResultCallback callback) { client.closeAsync(callback); } +void Client_subscribeAsync(Client& client, const std::string& topic, const std::string& subscriptionName, + ConsumerConfiguration conf, SubscribeCallback callback) { + py::gil_scoped_release release; + client.subscribeAsync(topic, subscriptionName, conf, callback); +} + +void Client_subscribeAsync_topics(Client& client, const std::vector& topics, + const std::string& subscriptionName, ConsumerConfiguration conf, + SubscribeCallback callback) { + py::gil_scoped_release release; + client.subscribeAsync(topics, subscriptionName, conf, callback); +} + +void Client_subscribeAsync_pattern(Client& client, const std::string& topic_pattern, + const std::string& subscriptionName, ConsumerConfiguration conf, + SubscribeCallback callback) { + py::gil_scoped_release release; + client.subscribeWithRegexAsync(topic_pattern, subscriptionName, conf, callback); +} + void export_client(py::module_& m) { py::class_>(m, "Client") .def(py::init()) @@ -99,5 +119,8 @@ void export_client(py::module_& m) { .def("get_schema_info", &Client_getSchemaInfo) .def("close", &Client_close) .def("close_async", &Client_closeAsync) + .def("subscribe_async", &Client_subscribeAsync) + .def("subscribe_async_topics", &Client_subscribeAsync_topics) + .def("subscribe_async_pattern", &Client_subscribeAsync_pattern) .def("shutdown", &Client::shutdown); } diff --git a/src/consumer.cc b/src/consumer.cc index e32a865..f1d7367 100644 --- a/src/consumer.cc +++ b/src/consumer.cc @@ -19,6 +19,7 @@ #include "utils.h" #include +#include #include #include @@ -100,12 +101,58 @@ MessageId Consumer_get_last_message_id(Consumer& consumer) { MessageId msgId; Result res; Py_BEGIN_ALLOW_THREADS res = consumer.getLastMessageId(msgId); - Py_END_ALLOW_THREADS - - CHECK_RESULT(res); + Py_END_ALLOW_THREADS; + CHECK_RESULT(res); return msgId; } +void Consumer_receiveAsync(Consumer& consumer, ReceiveCallback callback) { + py::gil_scoped_release release; + consumer.receiveAsync(callback); +} + +void Consumer_acknowledgeAsync(Consumer& consumer, const Message& msg, ResultCallback callback) { + py::gil_scoped_release release; + consumer.acknowledgeAsync(msg, callback); +} + +void Consumer_acknowledgeAsync_message_id(Consumer& consumer, const MessageId& msgId, + ResultCallback callback) { + py::gil_scoped_release release; + consumer.acknowledgeAsync(msgId, callback); +} + +void Consumer_acknowledgeCumulativeAsync(Consumer& consumer, const Message& msg, ResultCallback callback) { + py::gil_scoped_release release; + consumer.acknowledgeCumulativeAsync(msg, callback); +} + +void Consumer_acknowledgeCumulativeAsync_message_id(Consumer& consumer, const MessageId& msgId, + ResultCallback callback) { + py::gil_scoped_release release; + consumer.acknowledgeCumulativeAsync(msgId, callback); +} + +void Consumer_closeAsync(Consumer& consumer, ResultCallback callback) { + py::gil_scoped_release release; + consumer.closeAsync(callback); +} + +void Consumer_unsubscribeAsync(Consumer& consumer, ResultCallback callback) { + py::gil_scoped_release release; + consumer.unsubscribeAsync(callback); +} + +void Consumer_seekAsync(Consumer& consumer, const MessageId& msgId, ResultCallback callback) { + py::gil_scoped_release release; + consumer.seekAsync(msgId, callback); +} + +void Consumer_seekAsync_timestamp(Consumer& consumer, uint64_t timestamp, ResultCallback callback) { + py::gil_scoped_release release; + consumer.seekAsync(timestamp, callback); +} + void export_consumer(py::module_& m) { py::class_(m, "Consumer") .def(py::init<>()) @@ -130,5 +177,14 @@ void export_consumer(py::module_& m) { .def("seek", &Consumer_seek) .def("seek", &Consumer_seek_timestamp) .def("is_connected", &Consumer_is_connected) - .def("get_last_message_id", &Consumer_get_last_message_id); + .def("get_last_message_id", &Consumer_get_last_message_id) + .def("receive_async", &Consumer_receiveAsync) + .def("acknowledge_async", &Consumer_acknowledgeAsync) + .def("acknowledge_async", &Consumer_acknowledgeAsync_message_id) + .def("acknowledge_cumulative_async", &Consumer_acknowledgeCumulativeAsync) + .def("acknowledge_cumulative_async", &Consumer_acknowledgeCumulativeAsync_message_id) + .def("close_async", &Consumer_closeAsync) + .def("unsubscribe_async", &Consumer_unsubscribeAsync) + .def("seek_async", &Consumer_seekAsync) + .def("seek_async", &Consumer_seekAsync_timestamp); } diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py index fe6877f..656ffba 100644 --- a/tests/asyncio_test.py +++ b/tests/asyncio_test.py @@ -18,30 +18,49 @@ # under the License. # +""" +Unit tests for asyncio Pulsar client API. +""" + +# pylint: disable=missing-function-docstring + import asyncio -import pulsar -from pulsar.asyncio import ( - Client, - PulsarException, -) +import time +from typing import List from unittest import ( main, IsolatedAsyncioTestCase, ) -service_url = 'pulsar://localhost:6650' +import pulsar # pylint: disable=import-error +from pulsar.asyncio import ( # pylint: disable=import-error + Client, + Consumer, + Producer, + PulsarException, +) +from pulsar.schema import ( # pylint: disable=import-error + AvroSchema, + Integer, + Record, + String, +) + +SERVICE_URL = 'pulsar://localhost:6650' class AsyncioTest(IsolatedAsyncioTestCase): + """Test cases for asyncio Pulsar client.""" async def asyncSetUp(self) -> None: - self._client = Client(service_url, + self._client = Client(SERVICE_URL, operation_timeout_seconds=5) async def asyncTearDown(self) -> None: await self._client.close() - async def test_batch_send(self): - producer = await self._client.create_producer('awaitio-test-batch-send') + async def test_batch_end_to_end(self): + topic = f'asyncio-test-batch-e2e-{time.time()}' + producer = await self._client.create_producer(topic) tasks = [] for i in range(5): tasks.append(asyncio.create_task(producer.send(f'msg-{i}'.encode()))) @@ -58,15 +77,28 @@ async def test_batch_send(self): self.assertEqual(msg_ids[i].entry_id(), entry_id) self.assertEqual(msg_ids[i].batch_index(), i) + consumer = await self._client.subscribe(topic, 'sub', + initial_position=pulsar.InitialPosition.Earliest) + for i in range(5): + msg = await consumer.receive() + self.assertEqual(msg.data(), f'msg-{i}'.encode()) + await consumer.close() + + # create a different subscription to verify initial position is latest by default + consumer = await self._client.subscribe(topic, 'sub2') + await producer.send(b'final-message') + msg = await consumer.receive() + self.assertEqual(msg.data(), b'final-message') + async def test_create_producer_failure(self): try: - await self._client.create_producer('tenant/ns/awaitio-test-send-failure') + await self._client.create_producer('tenant/ns/asyncio-test-send-failure') self.fail() except PulsarException as e: self.assertEqual(e.error(), pulsar.Result.Timeout) async def test_send_failure(self): - producer = await self._client.create_producer('awaitio-test-send-failure') + producer = await self._client.create_producer('asyncio-test-send-failure') try: await producer.send(('x' * 1024 * 1024 * 10).encode()) self.fail() @@ -74,7 +106,7 @@ async def test_send_failure(self): self.assertEqual(e.error(), pulsar.Result.MessageTooBig) async def test_close_producer(self): - producer = await self._client.create_producer('awaitio-test-close-producer') + producer = await self._client.create_producer('asyncio-test-close-producer') await producer.close() try: await producer.close() @@ -82,5 +114,169 @@ async def test_close_producer(self): except PulsarException as e: self.assertEqual(e.error(), pulsar.Result.AlreadyClosed) + async def _prepare_messages(self, producer: Producer) -> List[pulsar.MessageId]: + msg_ids = [] + for i in range(5): + msg_ids.append(await producer.send(f'msg-{i}'.encode())) + return msg_ids + + async def test_consumer_cumulative_acknowledge(self): + topic = f'asyncio-test-consumer-cumulative-ack-{time.time()}' + sub = 'sub' + consumer = await self._client.subscribe(topic, sub) + producer = await self._client.create_producer(topic) + await self._prepare_messages(producer) + last_msg = None + for _ in range(5): + last_msg = await consumer.receive() + await consumer.acknowledge_cumulative(last_msg) + await consumer.close() + + consumer = await self._client.subscribe(topic, sub) + await producer.send(b'final-message') + msg = await consumer.receive() + self.assertEqual(msg.data(), b'final-message') + + async def test_consumer_individual_acknowledge(self): + topic = f'asyncio-test-consumer-individual-ack-{time.time()}' + sub = 'sub' + consumer = await self._client.subscribe(topic, sub, + consumer_type=pulsar.ConsumerType.Shared) + producer = await self._client.create_producer(topic) + await self._prepare_messages(producer) + msgs = [] + for _ in range(5): + msg = await consumer.receive() + msgs.append(msg) + + await consumer.acknowledge(msgs[0]) + await consumer.acknowledge(msgs[2]) + await consumer.acknowledge(msgs[4]) + await consumer.close() + + consumer = await self._client.subscribe(topic, sub, + consumer_type=pulsar.ConsumerType.Shared) + msg = await consumer.receive() + self.assertEqual(msg.data(), b'msg-1') + msg = await consumer.receive() + self.assertEqual(msg.data(), b'msg-3') + + async def test_multi_topic_consumer(self): + topics = ['asyncio-test-multi-topic-1', 'asyncio-test-multi-topic-2'] + producers = [] + + for topic in topics: + producer = await self._client.create_producer(topic) + producers.append(producer) + + consumer = await self._client.subscribe(topics, 'test-multi-subscription') + + await producers[0].send(b'message-from-topic-1') + await producers[1].send(b'message-from-topic-2') + + async def verify_receive(consumer: Consumer): + received_messages = {} + for _ in range(2): + msg = await consumer.receive() + received_messages[msg.data()] = None + await consumer.acknowledge(msg.message_id()) + self.assertEqual(received_messages, { + b'message-from-topic-1': None, + b'message-from-topic-2': None + }) + + await verify_receive(consumer) + await consumer.close() + + consumer = await self._client.subscribe('public/default/asyncio-test-multi-topic-.*', + 'test-multi-subscription-2', + is_pattern_topic=True, + initial_position=pulsar.InitialPosition.Earliest) + await verify_receive(consumer) + await consumer.close() + + async def test_unsubscribe(self): + topic = f'asyncio-test-unsubscribe-{time.time()}' + sub = 'sub' + consumer = await self._client.subscribe(topic, sub) + await consumer.unsubscribe() + # Verify the consumer can be created successfully with the same subscription name + consumer = await self._client.subscribe(topic, sub) + await consumer.close() + + async def test_seek_message_id(self): + topic = f'asyncio-test-seek-message-id-{time.time()}' + sub = 'sub' + + producer = await self._client.create_producer(topic) + msg_ids = await self._prepare_messages(producer) + + consumer = await self._client.subscribe( + topic, sub, initial_position=pulsar.InitialPosition.Earliest + ) + await consumer.seek(msg_ids[2]) + msg = await consumer.receive() + self.assertEqual(msg.data(), b'msg-3') + await consumer.close() + + consumer = await self._client.subscribe( + topic, sub, initial_position=pulsar.InitialPosition.Earliest, + start_message_id_inclusive=True + ) + await consumer.seek(msg_ids[2]) + msg = await consumer.receive() + self.assertEqual(msg.data(), b'msg-2') + await consumer.close() + + async def test_seek_timestamp(self): + topic = f'asyncio-test-seek-timestamp-{time.time()}' + sub = 'sub' + consumer = await self._client.subscribe( + topic, sub, initial_position=pulsar.InitialPosition.Earliest + ) + + producer = await self._client.create_producer(topic) + + # Send first 3 messages + for i in range(3): + await producer.send(f'msg-{i}'.encode()) + + seek_time = int(time.time() * 1000) + + # Send 2 more messages + for i in range(3, 5): + await producer.send(f'msg-{i}'.encode()) + + # Consume all messages first + for i in range(5): + msg = await consumer.receive() + self.assertEqual(msg.data(), f'msg-{i}'.encode()) + + # Seek to the timestamp (should start from msg-3) + await consumer.seek(seek_time) + + msg = await consumer.receive() + self.assertEqual(msg.data(), b'msg-3') + + async def test_schema(self): + class ExampleRecord(Record): # pylint: disable=too-few-public-methods + """Example record schema for testing.""" + str_field = String() + int_field = Integer() + + topic = f'asyncio-test-schema-{time.time()}' + producer = await self._client.create_producer( + topic, schema=AvroSchema(ExampleRecord) + ) + consumer = await self._client.subscribe( + topic, 'sub', schema=AvroSchema(ExampleRecord) + ) + await producer.send(ExampleRecord(str_field='test', int_field=42)) + msg = await consumer.receive() + self.assertIsInstance(msg.value(), ExampleRecord) + self.assertEqual(msg.value().str_field, 'test') + self.assertEqual(msg.value().int_field, 42) + + if __name__ == '__main__': main()