From a61c5e44e30c3acee0d2be372de76bef96dcc3da Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 23 Dec 2025 15:42:24 +0800 Subject: [PATCH 01/13] Support basic functionality of asyncio based consumer --- pulsar/asyncio.py | 178 +++++++++++++++++++++++++++++++++++++++++- src/client.cc | 23 ++++++ src/consumer.cc | 59 ++++++++++++-- tests/asyncio_test.py | 114 +++++++++++++++++++++++++-- 4 files changed, 360 insertions(+), 14 deletions(-) diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index 445d477..b37eaee 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -23,9 +23,10 @@ import asyncio import functools -from typing import Any +from typing import Any, List, Union import _pulsar +from _pulsar import InitialPosition import pulsar class PulsarException(BaseException): @@ -116,6 +117,134 @@ async def close(self) -> None: self._producer.close_async(functools.partial(_set_future, future, value=None)) await future +class Consumer: + """ + The Pulsar message consumer, used to subscribe to messages from a topic. + """ + + def __init__(self, consumer: _pulsar.Consumer) -> None: + """ + Create the consumer. + Users should not call this constructor directly. Instead, create the + consumer via `Client.subscribe`. + + Parameters + ---------- + consumer: _pulsar.Consumer + The underlying Consumer object from the C extension. + """ + self._consumer: _pulsar.Consumer = consumer + + async def receive(self) -> pulsar.Message: + """ + Receive a single message asynchronously. + + Returns + ------- + pulsar.Message + The message received. + + Raises + ------ + PulsarException + """ + future = asyncio.get_running_loop().create_future() + self._consumer.receive_async(functools.partial(_set_future, future)) + msg = await future + m = pulsar.Message() + m._message = msg + m._schema = pulsar.schema.BytesSchema() + return m + + async def acknowledge(self, message: Union[pulsar.Message, pulsar.MessageId, _pulsar.Message, _pulsar.MessageId]) -> None: + """ + Acknowledge the reception of a single message asynchronously. + + Parameters + ---------- + message : Message, MessageId, _pulsar.Message, _pulsar.MessageId + The received message or message id. + + Raises + ------ + PulsarException + """ + future = asyncio.get_running_loop().create_future() + if isinstance(message, pulsar.Message): + msg = message._message + elif isinstance(message, pulsar.MessageId): + msg = message._msg_id + else: + msg = message + self._consumer.acknowledge_async(msg, functools.partial(_set_future, future, value=None)) + await future + + async def acknowledge_cumulative(self, message: Union[pulsar.Message, pulsar.MessageId, _pulsar.Message, _pulsar.MessageId]) -> None: + """ + Acknowledge the reception of all the messages in the stream up to (and + including) the provided message asynchronously. + + Parameters + ---------- + message : Message, MessageId, _pulsar.Message, _pulsar.MessageId + The received message or message id. + + Raises + ------ + PulsarException + """ + future = asyncio.get_running_loop().create_future() + if isinstance(message, pulsar.Message): + msg = message._message + elif isinstance(message, pulsar.MessageId): + msg = message._msg_id + else: + msg = message + self._consumer.acknowledge_cumulative_async(msg, functools.partial(_set_future, future, value=None)) + await future + + async def unsubscribe(self) -> None: + """ + Unsubscribe the current consumer from the topic asynchronously. + + Raises + ------ + PulsarException + """ + future = asyncio.get_running_loop().create_future() + self._consumer.unsubscribe_async(functools.partial(_set_future, future, value=None)) + await future + + async def close(self) -> None: + """ + Close the consumer asynchronously. + + Raises + ------ + PulsarException + """ + future = asyncio.get_running_loop().create_future() + self._consumer.close_async(functools.partial(_set_future, future, value=None)) + await future + + def topic(self) -> str: + """ + Return the topic this consumer is subscribed to. + """ + return self._consumer.topic() + + def subscription_name(self) -> str: + """ + Return the subscription name. + """ + return self._consumer.subscription_name() + + def consumer_name(self) -> str: + """ + Return the consumer name. + """ + return self._consumer.consumer_name() + class Client: """ The asynchronous version of `pulsar.Client`. @@ -151,6 +280,53 @@ async def create_producer(self, topic: str) -> Producer: self._client.create_producer_async(topic, conf, functools.partial(_set_future, future)) return Producer(await future) + async def subscribe(self, topic: Union[str, List[str]], subscription_name: str, + is_pattern_topic: bool = False, + consumer_type: pulsar.ConsumerType = pulsar.ConsumerType.Exclusive, + initial_position: InitialPosition = InitialPosition.Latest) -> Consumer: + """ + Subscribe to the given topic and subscription combination. + + Parameters + ---------- + topic: str, List[str], or regex pattern + The name of the topic, list of topics or regex pattern. + subscription_name: str + The name of the subscription. + is_pattern_topic: bool, default=False + Whether `topic` is a regex pattern. This option takes no effect when `topic` is a list of topics. + consumer_type: pulsar.ConsumerType, default=pulsar.ConsumerType.Exclusive + Select the subscription type to be used when subscribing to the topic. + initial_position: InitialPosition, default=InitialPosition.Latest + Set the initial position of a consumer when subscribing to the topic. + It could be either: ``InitialPosition.Earliest`` or ``InitialPosition.Latest``. + + Returns + ------- + Consumer + The consumer created + + Raises + ------ + PulsarException + """ + future = asyncio.get_running_loop().create_future() + conf = _pulsar.ConsumerConfiguration() + conf.consumer_type(consumer_type) + conf.subscription_initial_position(initial_position) + + if isinstance(topic, str): + if is_pattern_topic: + self._client.subscribe_async_pattern(topic, subscription_name, conf, functools.partial(_set_future, future)) + else: + self._client.subscribe_async(topic, subscription_name, conf, functools.partial(_set_future, future)) + elif isinstance(topic, list): + self._client.subscribe_async_topics(topic, subscription_name, conf, functools.partial(_set_future, future)) + else: + raise ValueError("Argument 'topic' is expected to be of a type between (str, list)") + + return Consumer(await future) + async def close(self) -> None: """ Close the client and all the associated producers and consumers diff --git a/src/client.cc b/src/client.cc index 72c824f..64056df 100644 --- a/src/client.cc +++ b/src/client.cc @@ -80,6 +80,26 @@ void Client_closeAsync(Client& client, ResultCallback callback) { client.closeAsync(callback); } +void Client_subscribeAsync(Client& client, const std::string& topic, const std::string& subscriptionName, + ConsumerConfiguration conf, SubscribeCallback callback) { + py::gil_scoped_release release; + client.subscribeAsync(topic, subscriptionName, conf, callback); +} + +void Client_subscribeAsync_topics(Client& client, const std::vector& topics, + const std::string& subscriptionName, ConsumerConfiguration conf, + SubscribeCallback callback) { + py::gil_scoped_release release; + client.subscribeAsync(topics, subscriptionName, conf, callback); +} + +void Client_subscribeAsync_pattern(Client& client, const std::string& topic_pattern, + const std::string& subscriptionName, ConsumerConfiguration conf, + SubscribeCallback callback) { + py::gil_scoped_release release; + client.subscribeWithRegexAsync(topic_pattern, subscriptionName, conf, callback); +} + void export_client(py::module_& m) { py::class_>(m, "Client") .def(py::init()) @@ -99,5 +119,8 @@ void export_client(py::module_& m) { .def("get_schema_info", &Client_getSchemaInfo) .def("close", &Client_close) .def("close_async", &Client_closeAsync) + .def("subscribe_async", &Client_subscribeAsync) + .def("subscribe_async_topics", &Client_subscribeAsync_topics) + .def("subscribe_async_pattern", &Client_subscribeAsync_pattern) .def("shutdown", &Client::shutdown); } diff --git a/src/consumer.cc b/src/consumer.cc index e32a865..b171f03 100644 --- a/src/consumer.cc +++ b/src/consumer.cc @@ -19,6 +19,7 @@ #include "utils.h" #include +#include #include #include @@ -89,6 +90,14 @@ void Consumer_seek(Consumer& consumer, const MessageId& msgId) { waitForAsyncResult([msgId, &consumer](ResultCallback callback) { consumer.seekAsync(msgId, callback); }); } +MessageId Consumer_get_last_message_id(Consumer& consumer) { + MessageId msgId; + Result res; + Py_BEGIN_ALLOW_THREADS res = consumer.getLastMessageId(msgId); + Py_END_ALLOW_THREADS CHECK_RESULT(res); + return msgId; +} + void Consumer_seek_timestamp(Consumer& consumer, uint64_t timestamp) { waitForAsyncResult( [timestamp, &consumer](ResultCallback callback) { consumer.seekAsync(timestamp, callback); }); @@ -96,14 +105,41 @@ void Consumer_seek_timestamp(Consumer& consumer, uint64_t timestamp) { bool Consumer_is_connected(Consumer& consumer) { return consumer.isConnected(); } -MessageId Consumer_get_last_message_id(Consumer& consumer) { - MessageId msgId; - Result res; - Py_BEGIN_ALLOW_THREADS res = consumer.getLastMessageId(msgId); - Py_END_ALLOW_THREADS +void Consumer_receiveAsync(Consumer& consumer, ReceiveCallback callback) { + py::gil_scoped_release release; + consumer.receiveAsync(callback); +} - CHECK_RESULT(res); - return msgId; +void Consumer_acknowledgeAsync(Consumer& consumer, const Message& msg, ResultCallback callback) { + py::gil_scoped_release release; + consumer.acknowledgeAsync(msg, callback); +} + +void Consumer_acknowledgeAsync_message_id(Consumer& consumer, const MessageId& msgId, + ResultCallback callback) { + py::gil_scoped_release release; + consumer.acknowledgeAsync(msgId, callback); +} + +void Consumer_acknowledgeCumulativeAsync(Consumer& consumer, const Message& msg, ResultCallback callback) { + py::gil_scoped_release release; + consumer.acknowledgeCumulativeAsync(msg, callback); +} + +void Consumer_acknowledgeCumulativeAsync_message_id(Consumer& consumer, const MessageId& msgId, + ResultCallback callback) { + py::gil_scoped_release release; + consumer.acknowledgeCumulativeAsync(msgId, callback); +} + +void Consumer_closeAsync(Consumer& consumer, ResultCallback callback) { + py::gil_scoped_release release; + consumer.closeAsync(callback); +} + +void Consumer_unsubscribeAsync(Consumer& consumer, ResultCallback callback) { + py::gil_scoped_release release; + consumer.unsubscribeAsync(callback); } void export_consumer(py::module_& m) { @@ -130,5 +166,12 @@ void export_consumer(py::module_& m) { .def("seek", &Consumer_seek) .def("seek", &Consumer_seek_timestamp) .def("is_connected", &Consumer_is_connected) - .def("get_last_message_id", &Consumer_get_last_message_id); + .def("get_last_message_id", &Consumer_get_last_message_id) + .def("receive_async", &Consumer_receiveAsync) + .def("acknowledge_async", &Consumer_acknowledgeAsync) + .def("acknowledge_async", &Consumer_acknowledgeAsync_message_id) + .def("acknowledge_cumulative_async", &Consumer_acknowledgeCumulativeAsync) + .def("acknowledge_cumulative_async", &Consumer_acknowledgeCumulativeAsync_message_id) + .def("close_async", &Consumer_closeAsync) + .def("unsubscribe_async", &Consumer_unsubscribeAsync); } diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py index fe6877f..f1f6e1a 100644 --- a/tests/asyncio_test.py +++ b/tests/asyncio_test.py @@ -20,8 +20,11 @@ import asyncio import pulsar +import time from pulsar.asyncio import ( Client, + Consumer, + Producer, PulsarException, ) from unittest import ( @@ -40,8 +43,9 @@ async def asyncSetUp(self) -> None: async def asyncTearDown(self) -> None: await self._client.close() - async def test_batch_send(self): - producer = await self._client.create_producer('awaitio-test-batch-send') + async def test_batch_end_to_end(self): + topic = f'asyncio-test-batch-e2e-{time.time()}' + producer = await self._client.create_producer(topic) tasks = [] for i in range(5): tasks.append(asyncio.create_task(producer.send(f'msg-{i}'.encode()))) @@ -58,15 +62,28 @@ async def test_batch_send(self): self.assertEqual(msg_ids[i].entry_id(), entry_id) self.assertEqual(msg_ids[i].batch_index(), i) + consumer = await self._client.subscribe(topic, 'sub', + initial_position=pulsar.InitialPosition.Earliest) + for i in range(5): + msg = await consumer.receive() + self.assertEqual(msg.data(), f'msg-{i}'.encode()) + await consumer.close() + + # create a different subscription to verify initial position is latest by default + consumer = await self._client.subscribe(topic, 'sub2') + await producer.send(b'final-message') + msg = await consumer.receive() + self.assertEqual(msg.data(), b'final-message') + async def test_create_producer_failure(self): try: - await self._client.create_producer('tenant/ns/awaitio-test-send-failure') + await self._client.create_producer('tenant/ns/asyncio-test-send-failure') self.fail() except PulsarException as e: self.assertEqual(e.error(), pulsar.Result.Timeout) async def test_send_failure(self): - producer = await self._client.create_producer('awaitio-test-send-failure') + producer = await self._client.create_producer('asyncio-test-send-failure') try: await producer.send(('x' * 1024 * 1024 * 10).encode()) self.fail() @@ -74,7 +91,7 @@ async def test_send_failure(self): self.assertEqual(e.error(), pulsar.Result.MessageTooBig) async def test_close_producer(self): - producer = await self._client.create_producer('awaitio-test-close-producer') + producer = await self._client.create_producer('asyncio-test-close-producer') await producer.close() try: await producer.close() @@ -82,5 +99,92 @@ async def test_close_producer(self): except PulsarException as e: self.assertEqual(e.error(), pulsar.Result.AlreadyClosed) + async def _prepare_messages(self, producer: Producer): + for i in range(5): + await producer.send(f'msg-{i}'.encode()) + + async def test_consumer_cumulative_acknowledge(self): + topic = f'asyncio-test-consumer-cumulative-ack-{time.time()}' + sub = 'sub' + consumer = await self._client.subscribe(topic, sub) + producer = await self._client.create_producer(topic) + await self._prepare_messages(producer) + last_msg = None + for _ in range(5): + last_msg = await consumer.receive() + await consumer.acknowledge_cumulative(last_msg) + await consumer.close() + + consumer = await self._client.subscribe(topic, sub) + await producer.send(b'final-message') + msg = await consumer.receive() + self.assertEqual(msg.data(), b'final-message') + + async def test_consumer_individual_acknowledge(self): + topic = f'asyncio-test-consumer-individual-ack-{time.time()}' + sub = 'sub' + consumer = await self._client.subscribe(topic, sub, + consumer_type=pulsar.ConsumerType.Shared) + producer = await self._client.create_producer(topic) + await self._prepare_messages(producer) + msgs = [] + for _ in range(5): + msg = await consumer.receive() + msgs.append(msg) + + await consumer.acknowledge(msgs[0]) + await consumer.acknowledge(msgs[2]) + await consumer.acknowledge(msgs[4]) + await consumer.close() + + consumer = await self._client.subscribe(topic, sub, + consumer_type=pulsar.ConsumerType.Shared) + msg = await consumer.receive() + self.assertEqual(msg.data(), b'msg-1') + msg = await consumer.receive() + self.assertEqual(msg.data(), b'msg-3') + + async def test_multi_topic_consumer(self): + topics = ['asyncio-test-multi-topic-1', 'asyncio-test-multi-topic-2'] + producers = [] + + for topic in topics: + producer = await self._client.create_producer(topic) + producers.append(producer) + + consumer = await self._client.subscribe(topics, 'test-multi-subscription') + + await producers[0].send(b'message-from-topic-1') + await producers[1].send(b'message-from-topic-2') + + async def verify_receive(consumer: Consumer): + received_messages = {} + for _ in range(2): + msg = await consumer.receive() + received_messages[msg.data()] = None + await consumer.acknowledge(msg.message_id()) + self.assertEqual(received_messages, { + b'message-from-topic-1': None, + b'message-from-topic-2': None + }) + + await verify_receive(consumer) + await consumer.close() + + consumer = await self._client.subscribe('public/default/asyncio-test-multi-topic-.*', + 'test-multi-subscription-2', + is_pattern_topic=True, + initial_position=pulsar.InitialPosition.Earliest) + await verify_receive(consumer) + await consumer.close() + + async def test_unsubscribe(self): + topic = f'asyncio-test-unsubscribe-{time.time()}' + sub = 'sub' + consumer = await self._client.subscribe(topic, sub) + await consumer.unsubscribe() + consumer = await self._client.subscribe(topic, sub) + + if __name__ == '__main__': main() From c232dfe8f269ac33879b7d4da5c42806c48bad3e Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 23 Dec 2025 20:07:52 +0800 Subject: [PATCH 02/13] add all options --- pulsar/asyncio.py | 328 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 308 insertions(+), 20 deletions(-) diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index b37eaee..255ea21 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -17,16 +17,26 @@ # under the License. # +# pylint: disable=no-name-in-module,c-extension-no-member,protected-access + """ The Pulsar Python client APIs that work with the asyncio module. """ import asyncio import functools -from typing import Any, List, Union +from typing import Any, Callable, List, Union import _pulsar -from _pulsar import InitialPosition +from _pulsar import ( + InitialPosition, + CompressionType, + PartitionsRoutingMode, + BatchingType, + ProducerAccessMode, + RegexSubscriptionMode, + ConsumerCryptoFailureAction, +) import pulsar class PulsarException(BaseException): @@ -156,7 +166,11 @@ async def receive(self) -> pulsar.Message: m._schema = pulsar.schema.BytesSchema() return m - async def acknowledge(self, message: Union[pulsar.Message, pulsar.MessageId, _pulsar.Message, _pulsar.MessageId]) -> None: + async def acknowledge( + self, + message: Union[pulsar.Message, pulsar.MessageId, + _pulsar.Message, _pulsar.MessageId] + ) -> None: """ Acknowledge the reception of a single message asynchronously. @@ -179,7 +193,11 @@ async def acknowledge(self, message: Union[pulsar.Message, pulsar.MessageId, _pu self._consumer.acknowledge_async(msg, functools.partial(_set_future, future, value=None)) await future - async def acknowledge_cumulative(self, message: Union[pulsar.Message, pulsar.MessageId, _pulsar.Message, _pulsar.MessageId]) -> None: + async def acknowledge_cumulative( + self, + message: Union[pulsar.Message, pulsar.MessageId, + _pulsar.Message, _pulsar.MessageId] + ) -> None: """ Acknowledge the reception of all the messages in the stream up to (and including) the provided message asynchronously. @@ -200,7 +218,9 @@ async def acknowledge_cumulative(self, message: Union[pulsar.Message, pulsar.Mes msg = message._msg_id else: msg = message - self._consumer.acknowledge_cumulative_async(msg, functools.partial(_set_future, future, value=None)) + self._consumer.acknowledge_cumulative_async( + msg, functools.partial(_set_future, future, value=None) + ) await future async def unsubscribe(self) -> None: @@ -256,7 +276,31 @@ def __init__(self, service_url, **kwargs) -> None: """ self._client: _pulsar.Client = pulsar.Client(service_url, **kwargs)._client - async def create_producer(self, topic: str) -> Producer: + # pylint: disable=too-many-arguments,too-many-locals + async def create_producer(self, topic: str, + producer_name: str = None, + schema: pulsar.schema.Schema = None, + initial_sequence_id: int = None, + send_timeout_millis: int = 30000, + compression_type: CompressionType = CompressionType.NONE, + max_pending_messages: int = 1000, + max_pending_messages_across_partitions: int = 50000, + block_if_queue_full: bool = False, + batching_enabled: bool = False, + batching_max_messages: int = 1000, + batching_max_allowed_size_in_bytes: int = 128*1024, + batching_max_publish_delay_ms: int = 10, + chunking_enabled: bool = False, + message_routing_mode: PartitionsRoutingMode = + PartitionsRoutingMode.RoundRobinDistribution, + lazy_start_partitioned_producers: bool = False, + properties: dict = None, + batching_type: BatchingType = BatchingType.Default, + encryption_key: str = None, + crypto_key_reader: pulsar.CryptoKeyReader = None, + access_mode: ProducerAccessMode = ProducerAccessMode.Shared, + message_router: Callable[[pulsar.Message, int], int] = None, + ) -> Producer: """ Create a new producer on a given topic @@ -264,6 +308,59 @@ async def create_producer(self, topic: str) -> Producer: ---------- topic: str The topic name + producer_name: str, optional + Specify a name for the producer. If not assigned, the system will + generate a globally unique name which can be accessed with + `Producer.producer_name()`. When specifying a name, it is app to + the user to ensure that, for a given topic, the producer name is + unique across all Pulsar's clusters. + schema: pulsar.schema.Schema, optional + Define the schema of the data that will be published by this producer. + initial_sequence_id: int, optional + Set the baseline for the sequence ids for messages published by + the producer. + send_timeout_millis: int, default=30000 + If a message is not acknowledged by the server before the + send_timeout expires, an error will be reported. + compression_type: CompressionType, default=CompressionType.NONE + Set the compression type for the producer. + max_pending_messages: int, default=1000 + Set the max size of the queue holding the messages pending to + receive an acknowledgment from the broker. + max_pending_messages_across_partitions: int, default=50000 + Set the max size of the queue holding the messages pending to + receive an acknowledgment across partitions. + block_if_queue_full: bool, default=False + Set whether send operations should block when the outgoing + message queue is full. + batching_enabled: bool, default=False + Enable automatic message batching. + batching_max_messages: int, default=1000 + Maximum number of messages in a batch. + batching_max_allowed_size_in_bytes: int, default=128*1024 + Maximum size in bytes of a batch. + batching_max_publish_delay_ms: int, default=10 + The batch interval in milliseconds. + chunking_enabled: bool, default=False + Enable chunking of large messages. + message_routing_mode: PartitionsRoutingMode, + default=PartitionsRoutingMode.RoundRobinDistribution + Set the message routing mode for the partitioned producer. + lazy_start_partitioned_producers: bool, default=False + Start partitioned producers lazily on demand. + properties: dict, optional + Sets the properties for the producer. + batching_type: BatchingType, default=BatchingType.Default + Sets the batching type for the producer. + encryption_key: str, optional + The key used for symmetric encryption. + crypto_key_reader: CryptoKeyReader, optional + Symmetric encryption class implementation. + access_mode: ProducerAccessMode, default=ProducerAccessMode.Shared + Set the type of access mode that the producer requires on the topic. + message_router: optional + A custom message router function that takes a Message and the + number of partitions and returns the partition index. Returns ------- @@ -274,16 +371,90 @@ async def create_producer(self, topic: str) -> Producer: ------ PulsarException """ + if schema is None: + schema = pulsar.schema.BytesSchema() + future = asyncio.get_running_loop().create_future() conf = _pulsar.ProducerConfiguration() - # TODO: add more configs - self._client.create_producer_async(topic, conf, functools.partial(_set_future, future)) + conf.send_timeout_millis(send_timeout_millis) + conf.compression_type(compression_type) + conf.max_pending_messages(max_pending_messages) + conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions) + conf.block_if_queue_full(block_if_queue_full) + conf.batching_enabled(batching_enabled) + conf.batching_max_messages(batching_max_messages) + conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes) + conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms) + conf.partitions_routing_mode(message_routing_mode) + conf.batching_type(batching_type) + conf.chunking_enabled(chunking_enabled) + conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers) + conf.access_mode(access_mode) + if message_router is not None: + def underlying_router(msg, num_partitions): + return int(message_router(pulsar.Message._wrap(msg), + num_partitions)) + conf.message_router(underlying_router) + + if producer_name: + conf.producer_name(producer_name) + if initial_sequence_id is not None: + conf.initial_sequence_id(initial_sequence_id) + if properties: + for k, v in properties.items(): + conf.property(k, v) + + conf.schema(schema.schema_info()) + if encryption_key: + conf.encryption_key(encryption_key) + if crypto_key_reader: + conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) + + if batching_enabled and chunking_enabled: + raise ValueError( + "Batching and chunking of messages can't be enabled together." + ) + + self._client.create_producer_async( + topic, conf, functools.partial(_set_future, future) + ) return Producer(await future) - async def subscribe(self, topic: Union[str, List[str]], subscription_name: str, - is_pattern_topic: bool = False, - consumer_type: pulsar.ConsumerType = pulsar.ConsumerType.Exclusive, - initial_position: InitialPosition = InitialPosition.Latest) -> Consumer: + # pylint: disable=too-many-arguments,too-many-locals,too-many-branches + async def subscribe(self, topic: Union[str, List[str]], + subscription_name: str, + consumer_type: pulsar.ConsumerType = + pulsar.ConsumerType.Exclusive, + schema: pulsar.schema.Schema = None, + message_listener = None, + receiver_queue_size: int = 1000, + max_total_receiver_queue_size_across_partitions: int = + 50000, + consumer_name: str = None, + unacked_messages_timeout_ms: int = None, + broker_consumer_stats_cache_time_ms: int = 30000, + negative_ack_redelivery_delay_ms: int = 60000, + is_read_compacted: bool = False, + properties: dict = None, + pattern_auto_discovery_period: int = 60, # pylint: disable=unused-argument + initial_position: InitialPosition = InitialPosition.Latest, + crypto_key_reader: pulsar.CryptoKeyReader = None, + replicate_subscription_state_enabled: bool = False, + max_pending_chunked_message: int = 10, + auto_ack_oldest_chunked_message_on_queue_full: bool = False, + start_message_id_inclusive: bool = False, + batch_receive_policy: pulsar.ConsumerBatchReceivePolicy = + None, + key_shared_policy: pulsar.ConsumerKeySharedPolicy = + None, + batch_index_ack_enabled: bool = False, + regex_subscription_mode: RegexSubscriptionMode = + RegexSubscriptionMode.PersistentOnly, + dead_letter_policy: pulsar.ConsumerDeadLetterPolicy = + None, + crypto_failure_action: ConsumerCryptoFailureAction = + ConsumerCryptoFailureAction.FAIL, + is_pattern_topic: bool = False) -> Consumer: """ Subscribe to the given topic and subscription combination. @@ -293,13 +464,65 @@ async def subscribe(self, topic: Union[str, List[str]], subscription_name: str, The name of the topic, list of topics or regex pattern. subscription_name: str The name of the subscription. - is_pattern_topic: bool, default=False - Whether `topic` is a regex pattern. This option takes no effect when `topic` is a list of topics. consumer_type: pulsar.ConsumerType, default=pulsar.ConsumerType.Exclusive Select the subscription type to be used when subscribing to the topic. + schema: pulsar.schema.Schema, optional + Define the schema of the data that will be received by this consumer. + message_listener: optional + Sets a message listener for the consumer. + receiver_queue_size: int, default=1000 + Sets the size of the consumer receive queue. + max_total_receiver_queue_size_across_partitions: int, default=50000 + Set the max total receiver queue size across partitions. + consumer_name: str, optional + Sets the consumer name. + unacked_messages_timeout_ms: int, optional + Sets the timeout in milliseconds for unacknowledged messages. + broker_consumer_stats_cache_time_ms: int, default=30000 + Sets the time duration for which the broker-side consumer stats + will be cached in the client. + negative_ack_redelivery_delay_ms: int, default=60000 + The delay after which to redeliver the messages that failed to be + processed. + is_read_compacted: bool, default=False + Selects whether to read the compacted version of the topic. + properties: dict, optional + Sets the properties for the consumer. + pattern_auto_discovery_period: int, default=60 + Periods of seconds for consumer to auto discover match topics. initial_position: InitialPosition, default=InitialPosition.Latest Set the initial position of a consumer when subscribing to the topic. - It could be either: ``InitialPosition.Earliest`` or ``InitialPosition.Latest``. + crypto_key_reader: CryptoKeyReader, optional + Symmetric encryption class implementation. + replicate_subscription_state_enabled: bool, default=False + Set whether the subscription status should be replicated. + max_pending_chunked_message: int, default=10 + Consumer buffers chunk messages into memory until it receives all the chunks. + auto_ack_oldest_chunked_message_on_queue_full: bool, default=False + Automatically acknowledge oldest chunked messages on queue + full. + start_message_id_inclusive: bool, default=False + Set the consumer to include the given position of any reset + operation. + batch_receive_policy: ConsumerBatchReceivePolicy, optional + Set the batch collection policy for batch receiving. + key_shared_policy: ConsumerKeySharedPolicy, optional + Set the key shared policy for use when the ConsumerType is + KeyShared. + batch_index_ack_enabled: bool, default=False + Enable the batch index acknowledgement. + regex_subscription_mode: RegexSubscriptionMode, + default=RegexSubscriptionMode.PersistentOnly + Set the regex subscription mode for use when the topic is a regex + pattern. + dead_letter_policy: ConsumerDeadLetterPolicy, optional + Set dead letter policy for consumer. + crypto_failure_action: ConsumerCryptoFailureAction, + default=ConsumerCryptoFailureAction.FAIL + Set the behavior when the decryption fails. + is_pattern_topic: bool, default=False + Whether `topic` is a regex pattern. This option takes no effect + when `topic` is a list of topics. Returns ------- @@ -310,20 +533,74 @@ async def subscribe(self, topic: Union[str, List[str]], subscription_name: str, ------ PulsarException """ + if schema is None: + schema = pulsar.schema.BytesSchema() + future = asyncio.get_running_loop().create_future() conf = _pulsar.ConsumerConfiguration() conf.consumer_type(consumer_type) + conf.regex_subscription_mode(regex_subscription_mode) + conf.read_compacted(is_read_compacted) + if message_listener: + conf.message_listener(_listener_wrapper(message_listener, schema)) + conf.receiver_queue_size(receiver_queue_size) + conf.max_total_receiver_queue_size_across_partitions( + max_total_receiver_queue_size_across_partitions + ) + if consumer_name: + conf.consumer_name(consumer_name) + if unacked_messages_timeout_ms: + conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms) + + conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms) + conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms) + if properties: + for k, v in properties.items(): + conf.property(k, v) conf.subscription_initial_position(initial_position) + conf.schema(schema.schema_info()) + + if crypto_key_reader: + conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) + + conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled) + conf.max_pending_chunked_message(max_pending_chunked_message) + conf.auto_ack_oldest_chunked_message_on_queue_full( + auto_ack_oldest_chunked_message_on_queue_full + ) + conf.start_message_id_inclusive(start_message_id_inclusive) + if batch_receive_policy: + conf.batch_receive_policy(batch_receive_policy.policy()) + + if key_shared_policy: + conf.key_shared_policy(key_shared_policy.policy()) + conf.batch_index_ack_enabled(batch_index_ack_enabled) + if dead_letter_policy: + conf.dead_letter_policy(dead_letter_policy.policy()) + conf.crypto_failure_action(crypto_failure_action) + if isinstance(topic, str): if is_pattern_topic: - self._client.subscribe_async_pattern(topic, subscription_name, conf, functools.partial(_set_future, future)) + self._client.subscribe_async_pattern( + topic, subscription_name, conf, + functools.partial(_set_future, future) + ) else: - self._client.subscribe_async(topic, subscription_name, conf, functools.partial(_set_future, future)) + self._client.subscribe_async( + topic, subscription_name, conf, + functools.partial(_set_future, future) + ) elif isinstance(topic, list): - self._client.subscribe_async_topics(topic, subscription_name, conf, functools.partial(_set_future, future)) + self._client.subscribe_async_topics( + topic, subscription_name, conf, + functools.partial(_set_future, future) + ) else: - raise ValueError("Argument 'topic' is expected to be of a type between (str, list)") + raise ValueError( + "Argument 'topic' is expected to be of a type between " + "(str, list)" + ) return Consumer(await future) @@ -336,7 +613,9 @@ async def close(self) -> None: PulsarException """ future = asyncio.get_running_loop().create_future() - self._client.close_async(functools.partial(_set_future, future, value=None)) + self._client.close_async( + functools.partial(_set_future, future, value=None) + ) await future def _set_future(future: asyncio.Future, result: _pulsar.Result, value: Any): @@ -346,3 +625,12 @@ def complete(): else: future.set_exception(PulsarException(result)) future.get_loop().call_soon_threadsafe(complete) + +def _listener_wrapper(listener, schema): + def wrapper(consumer, msg): + c = Consumer(consumer) + m = pulsar.Message() + m._message = msg + m._schema = schema + listener(c, m) + return wrapper From 08c6f9ae8a8967b77861dddbe046a3393c34ad70 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 23 Dec 2025 20:31:22 +0800 Subject: [PATCH 03/13] support seek and fix tests --- pulsar/asyncio.py | 32 +++++++++++++++++++++++- src/consumer.cc | 14 ++++++++++- tests/asyncio_test.py | 57 +++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 99 insertions(+), 4 deletions(-) diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index 255ea21..4a13050 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -235,6 +235,36 @@ async def unsubscribe(self) -> None: self._consumer.unsubscribe_async(functools.partial(_set_future, future, value=None)) await future + async def seek(self, messageid: Union[pulsar.MessageId, int]) -> None: + """ + Reset the subscription associated with this consumer to a specific + message id or publish timestamp asynchronously. + + The message id can either be a specific message or represent the first + or last messages in the topic. + + Parameters + ---------- + messageid : MessageId or int + The message id for seek, OR an integer event time (timestamp) to + seek to + + Raises + ------ + PulsarException + """ + future = asyncio.get_running_loop().create_future() + if isinstance(messageid, pulsar.MessageId): + msg_id = messageid._msg_id + elif isinstance(messageid, int): + msg_id = messageid + else: + raise ValueError(f"invalid messageid type {type(messageid)}") + self._consumer.seek_async( + msg_id, functools.partial(_set_future, future, value=None) + ) + await future + async def close(self) -> None: """ Close the consumer asynchronously. @@ -286,7 +316,7 @@ async def create_producer(self, topic: str, max_pending_messages: int = 1000, max_pending_messages_across_partitions: int = 50000, block_if_queue_full: bool = False, - batching_enabled: bool = False, + batching_enabled: bool = True, batching_max_messages: int = 1000, batching_max_allowed_size_in_bytes: int = 128*1024, batching_max_publish_delay_ms: int = 10, diff --git a/src/consumer.cc b/src/consumer.cc index b171f03..14b09bb 100644 --- a/src/consumer.cc +++ b/src/consumer.cc @@ -142,6 +142,16 @@ void Consumer_unsubscribeAsync(Consumer& consumer, ResultCallback callback) { consumer.unsubscribeAsync(callback); } +void Consumer_seekAsync(Consumer& consumer, const MessageId& msgId, ResultCallback callback) { + py::gil_scoped_release release; + consumer.seekAsync(msgId, callback); +} + +void Consumer_seekAsync_timestamp(Consumer& consumer, uint64_t timestamp, ResultCallback callback) { + py::gil_scoped_release release; + consumer.seekAsync(timestamp, callback); +} + void export_consumer(py::module_& m) { py::class_(m, "Consumer") .def(py::init<>()) @@ -173,5 +183,7 @@ void export_consumer(py::module_& m) { .def("acknowledge_cumulative_async", &Consumer_acknowledgeCumulativeAsync) .def("acknowledge_cumulative_async", &Consumer_acknowledgeCumulativeAsync_message_id) .def("close_async", &Consumer_closeAsync) - .def("unsubscribe_async", &Consumer_unsubscribeAsync); + .def("unsubscribe_async", &Consumer_unsubscribeAsync) + .def("seek_async", &Consumer_seekAsync) + .def("seek_async", &Consumer_seekAsync_timestamp); } diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py index f1f6e1a..3c3ff32 100644 --- a/tests/asyncio_test.py +++ b/tests/asyncio_test.py @@ -19,6 +19,7 @@ # import asyncio +from typing import List import pulsar import time from pulsar.asyncio import ( @@ -99,9 +100,12 @@ async def test_close_producer(self): except PulsarException as e: self.assertEqual(e.error(), pulsar.Result.AlreadyClosed) - async def _prepare_messages(self, producer: Producer): + async def _prepare_messages(self, producer: Producer) -> List[pulsar.MessageId]: + msg_ids = [] for i in range(5): - await producer.send(f'msg-{i}'.encode()) + msg_id = await producer.send(f'msg-{i}'.encode()) + msg_ids.append(msg_id) + return msg_ids async def test_consumer_cumulative_acknowledge(self): topic = f'asyncio-test-consumer-cumulative-ack-{time.time()}' @@ -185,6 +189,55 @@ async def test_unsubscribe(self): await consumer.unsubscribe() consumer = await self._client.subscribe(topic, sub) + async def test_seek_message_id(self): + topic = f'asyncio-test-seek-message-id-{time.time()}' + sub = 'sub' + consumer = await self._client.subscribe( + topic, sub, initial_position=pulsar.InitialPosition.Earliest + ) + + producer = await self._client.create_producer(topic) + msg_ids = await self._prepare_messages(producer) + + for i in range(5): + msg = await consumer.receive() + self.assertEqual(msg.data(), f'msg-{i}'.encode()) + + await consumer.seek(msg_ids[2]) + + msg = await consumer.receive() + self.assertEqual(msg.data(), b'msg-3') + + async def test_seek_timestamp(self): + topic = f'asyncio-test-seek-timestamp-{time.time()}' + sub = 'sub' + consumer = await self._client.subscribe( + topic, sub, initial_position=pulsar.InitialPosition.Earliest + ) + + producer = await self._client.create_producer(topic) + + # Send first 3 messages + for i in range(3): + await producer.send(f'msg-{i}'.encode()) + + seek_time = int(time.time() * 1000) + + # Send 2 more messages + for i in range(3, 5): + await producer.send(f'msg-{i}'.encode()) + + # Consume all messages first + for i in range(5): + msg = await consumer.receive() + self.assertEqual(msg.data(), f'msg-{i}'.encode()) + + # Seek to the timestamp (should start from msg-3) + await consumer.seek(seek_time) + + msg = await consumer.receive() + self.assertEqual(msg.data(), b'msg-3') + if __name__ == '__main__': main() From 84c29a4a6ff54b2f7168be8fa61d537d97bdfd29 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 23 Dec 2025 20:38:43 +0800 Subject: [PATCH 04/13] add comments --- pulsar/asyncio.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index 4a13050..146619a 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -492,6 +492,7 @@ async def subscribe(self, topic: Union[str, List[str]], ---------- topic: str, List[str], or regex pattern The name of the topic, list of topics or regex pattern. + When `is_pattern_topic` is True, `topic` is treated as a regex. subscription_name: str The name of the subscription. consumer_type: pulsar.ConsumerType, default=pulsar.ConsumerType.Exclusive From 3a6aa86eccfbb02685530389b9bd5e250b828205 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 23 Dec 2025 20:50:31 +0800 Subject: [PATCH 05/13] fix api docs --- pulsar/asyncio.py | 71 +++++++++++++++++++++---------------------- src/consumer.cc | 17 ++++++----- tests/asyncio_test.py | 2 ++ 3 files changed, 46 insertions(+), 44 deletions(-) diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index 146619a..4bba8d2 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -308,9 +308,9 @@ def __init__(self, service_url, **kwargs) -> None: # pylint: disable=too-many-arguments,too-many-locals async def create_producer(self, topic: str, - producer_name: str = None, - schema: pulsar.schema.Schema = None, - initial_sequence_id: int = None, + producer_name: str | None = None, + schema: pulsar.schema.Schema | None = None, + initial_sequence_id: int | None = None, send_timeout_millis: int = 30000, compression_type: CompressionType = CompressionType.NONE, max_pending_messages: int = 1000, @@ -324,12 +324,12 @@ async def create_producer(self, topic: str, message_routing_mode: PartitionsRoutingMode = PartitionsRoutingMode.RoundRobinDistribution, lazy_start_partitioned_producers: bool = False, - properties: dict = None, + properties: dict | None = None, batching_type: BatchingType = BatchingType.Default, - encryption_key: str = None, - crypto_key_reader: pulsar.CryptoKeyReader = None, + encryption_key: str | None = None, + crypto_key_reader: pulsar.CryptoKeyReader | None = None, access_mode: ProducerAccessMode = ProducerAccessMode.Shared, - message_router: Callable[[pulsar.Message, int], int] = None, + message_router: Callable[[pulsar.Message, int], int] | None = None, ) -> Producer: """ Create a new producer on a given topic @@ -338,15 +338,15 @@ async def create_producer(self, topic: str, ---------- topic: str The topic name - producer_name: str, optional + producer_name: str | None, default=None Specify a name for the producer. If not assigned, the system will generate a globally unique name which can be accessed with `Producer.producer_name()`. When specifying a name, it is app to the user to ensure that, for a given topic, the producer name is unique across all Pulsar's clusters. - schema: pulsar.schema.Schema, optional + schema: pulsar.schema.Schema | None, default=None Define the schema of the data that will be published by this producer. - initial_sequence_id: int, optional + initial_sequence_id: int | None, default=None Set the baseline for the sequence ids for messages published by the producer. send_timeout_millis: int, default=30000 @@ -364,7 +364,9 @@ async def create_producer(self, topic: str, Set whether send operations should block when the outgoing message queue is full. batching_enabled: bool, default=False - Enable automatic message batching. + Enable automatic message batching. Note that, unlike the synchronous + producer API in ``pulsar.__init__``, batching is enabled by default + for the asyncio producer. batching_max_messages: int, default=1000 Maximum number of messages in a batch. batching_max_allowed_size_in_bytes: int, default=128*1024 @@ -378,17 +380,17 @@ async def create_producer(self, topic: str, Set the message routing mode for the partitioned producer. lazy_start_partitioned_producers: bool, default=False Start partitioned producers lazily on demand. - properties: dict, optional + properties: dict | None, default=None Sets the properties for the producer. batching_type: BatchingType, default=BatchingType.Default Sets the batching type for the producer. - encryption_key: str, optional + encryption_key: str | None, default=None The key used for symmetric encryption. - crypto_key_reader: CryptoKeyReader, optional + crypto_key_reader: pulsar.CryptoKeyReader | None, default=None Symmetric encryption class implementation. access_mode: ProducerAccessMode, default=ProducerAccessMode.Shared Set the type of access mode that the producer requires on the topic. - message_router: optional + message_router: Callable[[pulsar.Message, int], int] | None, default=None A custom message router function that takes a Message and the number of partitions and returns the partition index. @@ -455,32 +457,31 @@ async def subscribe(self, topic: Union[str, List[str]], subscription_name: str, consumer_type: pulsar.ConsumerType = pulsar.ConsumerType.Exclusive, - schema: pulsar.schema.Schema = None, - message_listener = None, + schema: pulsar.schema.Schema | None = None, + message_listener: Callable[['Consumer', pulsar.Message], None] | None = None, receiver_queue_size: int = 1000, max_total_receiver_queue_size_across_partitions: int = 50000, - consumer_name: str = None, - unacked_messages_timeout_ms: int = None, + consumer_name: str | None = None, + unacked_messages_timeout_ms: int | None = None, broker_consumer_stats_cache_time_ms: int = 30000, negative_ack_redelivery_delay_ms: int = 60000, is_read_compacted: bool = False, - properties: dict = None, - pattern_auto_discovery_period: int = 60, # pylint: disable=unused-argument + properties: dict | None = None, initial_position: InitialPosition = InitialPosition.Latest, - crypto_key_reader: pulsar.CryptoKeyReader = None, + crypto_key_reader: pulsar.CryptoKeyReader | None = None, replicate_subscription_state_enabled: bool = False, max_pending_chunked_message: int = 10, auto_ack_oldest_chunked_message_on_queue_full: bool = False, start_message_id_inclusive: bool = False, - batch_receive_policy: pulsar.ConsumerBatchReceivePolicy = + batch_receive_policy: pulsar.ConsumerBatchReceivePolicy | None = None, - key_shared_policy: pulsar.ConsumerKeySharedPolicy = + key_shared_policy: pulsar.ConsumerKeySharedPolicy | None = None, batch_index_ack_enabled: bool = False, regex_subscription_mode: RegexSubscriptionMode = RegexSubscriptionMode.PersistentOnly, - dead_letter_policy: pulsar.ConsumerDeadLetterPolicy = + dead_letter_policy: pulsar.ConsumerDeadLetterPolicy | None = None, crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL, @@ -497,17 +498,17 @@ async def subscribe(self, topic: Union[str, List[str]], The name of the subscription. consumer_type: pulsar.ConsumerType, default=pulsar.ConsumerType.Exclusive Select the subscription type to be used when subscribing to the topic. - schema: pulsar.schema.Schema, optional + schema: pulsar.schema.Schema | None, default=None Define the schema of the data that will be received by this consumer. - message_listener: optional + message_listener: Callable[[Consumer, pulsar.Message], None] | None, default=None Sets a message listener for the consumer. receiver_queue_size: int, default=1000 Sets the size of the consumer receive queue. max_total_receiver_queue_size_across_partitions: int, default=50000 Set the max total receiver queue size across partitions. - consumer_name: str, optional + consumer_name: str | None, default=None Sets the consumer name. - unacked_messages_timeout_ms: int, optional + unacked_messages_timeout_ms: int | None, default=None Sets the timeout in milliseconds for unacknowledged messages. broker_consumer_stats_cache_time_ms: int, default=30000 Sets the time duration for which the broker-side consumer stats @@ -517,13 +518,11 @@ async def subscribe(self, topic: Union[str, List[str]], processed. is_read_compacted: bool, default=False Selects whether to read the compacted version of the topic. - properties: dict, optional + properties: dict | None, default=None Sets the properties for the consumer. - pattern_auto_discovery_period: int, default=60 - Periods of seconds for consumer to auto discover match topics. initial_position: InitialPosition, default=InitialPosition.Latest Set the initial position of a consumer when subscribing to the topic. - crypto_key_reader: CryptoKeyReader, optional + crypto_key_reader: pulsar.CryptoKeyReader | None, default=None Symmetric encryption class implementation. replicate_subscription_state_enabled: bool, default=False Set whether the subscription status should be replicated. @@ -535,9 +534,9 @@ async def subscribe(self, topic: Union[str, List[str]], start_message_id_inclusive: bool, default=False Set the consumer to include the given position of any reset operation. - batch_receive_policy: ConsumerBatchReceivePolicy, optional + batch_receive_policy: pulsar.ConsumerBatchReceivePolicy | None, default=None Set the batch collection policy for batch receiving. - key_shared_policy: ConsumerKeySharedPolicy, optional + key_shared_policy: pulsar.ConsumerKeySharedPolicy | None, default=None Set the key shared policy for use when the ConsumerType is KeyShared. batch_index_ack_enabled: bool, default=False @@ -546,7 +545,7 @@ async def subscribe(self, topic: Union[str, List[str]], default=RegexSubscriptionMode.PersistentOnly Set the regex subscription mode for use when the topic is a regex pattern. - dead_letter_policy: ConsumerDeadLetterPolicy, optional + dead_letter_policy: pulsar.ConsumerDeadLetterPolicy | None, default=None Set dead letter policy for consumer. crypto_failure_action: ConsumerCryptoFailureAction, default=ConsumerCryptoFailureAction.FAIL diff --git a/src/consumer.cc b/src/consumer.cc index 14b09bb..f1d7367 100644 --- a/src/consumer.cc +++ b/src/consumer.cc @@ -90,14 +90,6 @@ void Consumer_seek(Consumer& consumer, const MessageId& msgId) { waitForAsyncResult([msgId, &consumer](ResultCallback callback) { consumer.seekAsync(msgId, callback); }); } -MessageId Consumer_get_last_message_id(Consumer& consumer) { - MessageId msgId; - Result res; - Py_BEGIN_ALLOW_THREADS res = consumer.getLastMessageId(msgId); - Py_END_ALLOW_THREADS CHECK_RESULT(res); - return msgId; -} - void Consumer_seek_timestamp(Consumer& consumer, uint64_t timestamp) { waitForAsyncResult( [timestamp, &consumer](ResultCallback callback) { consumer.seekAsync(timestamp, callback); }); @@ -105,6 +97,15 @@ void Consumer_seek_timestamp(Consumer& consumer, uint64_t timestamp) { bool Consumer_is_connected(Consumer& consumer) { return consumer.isConnected(); } +MessageId Consumer_get_last_message_id(Consumer& consumer) { + MessageId msgId; + Result res; + Py_BEGIN_ALLOW_THREADS res = consumer.getLastMessageId(msgId); + Py_END_ALLOW_THREADS; + CHECK_RESULT(res); + return msgId; +} + void Consumer_receiveAsync(Consumer& consumer, ReceiveCallback callback) { py::gil_scoped_release release; consumer.receiveAsync(callback); diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py index 3c3ff32..dc47f39 100644 --- a/tests/asyncio_test.py +++ b/tests/asyncio_test.py @@ -187,7 +187,9 @@ async def test_unsubscribe(self): sub = 'sub' consumer = await self._client.subscribe(topic, sub) await consumer.unsubscribe() + # Verify the consumer can be created successfully with the same subscription name consumer = await self._client.subscribe(topic, sub) + await consumer.close() async def test_seek_message_id(self): topic = f'asyncio-test-seek-message-id-{time.time()}' From 1f98f4403067162362171542141f2c9323775c22 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 23 Dec 2025 21:31:24 +0800 Subject: [PATCH 06/13] test start_message_id_inclusive --- pulsar/asyncio.py | 7 +++--- tests/asyncio_test.py | 54 ++++++++++++++++++++++++++----------------- 2 files changed, 37 insertions(+), 24 deletions(-) diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index 4bba8d2..0d6b2ef 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -306,7 +306,7 @@ def __init__(self, service_url, **kwargs) -> None: """ self._client: _pulsar.Client = pulsar.Client(service_url, **kwargs)._client - # pylint: disable=too-many-arguments,too-many-locals + # pylint: disable=too-many-arguments,too-many-locals,too-many-positional-arguments async def create_producer(self, topic: str, producer_name: str | None = None, schema: pulsar.schema.Schema | None = None, @@ -452,13 +452,14 @@ def underlying_router(msg, num_partitions): ) return Producer(await future) - # pylint: disable=too-many-arguments,too-many-locals,too-many-branches + # pylint: disable=too-many-arguments,too-many-locals,too-many-branches,too-many-positional-arguments async def subscribe(self, topic: Union[str, List[str]], subscription_name: str, consumer_type: pulsar.ConsumerType = pulsar.ConsumerType.Exclusive, schema: pulsar.schema.Schema | None = None, - message_listener: Callable[['Consumer', pulsar.Message], None] | None = None, + message_listener: Callable[['Consumer', pulsar.Message], + None] | None = None, receiver_queue_size: int = 1000, max_total_receiver_queue_size_across_partitions: int = 50000, diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py index dc47f39..d26d4c7 100644 --- a/tests/asyncio_test.py +++ b/tests/asyncio_test.py @@ -18,27 +18,35 @@ # under the License. # +""" +Unit tests for asyncio Pulsar client API. +""" + +# pylint: disable=missing-function-docstring + import asyncio -from typing import List -import pulsar import time -from pulsar.asyncio import ( +from typing import List +from unittest import ( + main, + IsolatedAsyncioTestCase, +) + +import pulsar # pylint: disable=import-error +from pulsar.asyncio import ( # pylint: disable=import-error Client, Consumer, Producer, PulsarException, ) -from unittest import ( - main, - IsolatedAsyncioTestCase, -) -service_url = 'pulsar://localhost:6650' +SERVICE_URL = 'pulsar://localhost:6650' class AsyncioTest(IsolatedAsyncioTestCase): + """Test cases for asyncio Pulsar client.""" async def asyncSetUp(self) -> None: - self._client = Client(service_url, + self._client = Client(SERVICE_URL, operation_timeout_seconds=5) async def asyncTearDown(self) -> None: @@ -103,8 +111,7 @@ async def test_close_producer(self): async def _prepare_messages(self, producer: Producer) -> List[pulsar.MessageId]: msg_ids = [] for i in range(5): - msg_id = await producer.send(f'msg-{i}'.encode()) - msg_ids.append(msg_id) + msg_ids.append(await producer.send(f'msg-{i}'.encode())) return msg_ids async def test_consumer_cumulative_acknowledge(self): @@ -127,7 +134,7 @@ async def test_consumer_cumulative_acknowledge(self): async def test_consumer_individual_acknowledge(self): topic = f'asyncio-test-consumer-individual-ack-{time.time()}' sub = 'sub' - consumer = await self._client.subscribe(topic, sub, + consumer = await self._client.subscribe(topic, sub, consumer_type=pulsar.ConsumerType.Shared) producer = await self._client.create_producer(topic) await self._prepare_messages(producer) @@ -141,7 +148,7 @@ async def test_consumer_individual_acknowledge(self): await consumer.acknowledge(msgs[4]) await consumer.close() - consumer = await self._client.subscribe(topic, sub, + consumer = await self._client.subscribe(topic, sub, consumer_type=pulsar.ConsumerType.Shared) msg = await consumer.receive() self.assertEqual(msg.data(), b'msg-1') @@ -194,21 +201,26 @@ async def test_unsubscribe(self): async def test_seek_message_id(self): topic = f'asyncio-test-seek-message-id-{time.time()}' sub = 'sub' - consumer = await self._client.subscribe( - topic, sub, initial_position=pulsar.InitialPosition.Earliest - ) producer = await self._client.create_producer(topic) msg_ids = await self._prepare_messages(producer) - for i in range(5): - msg = await consumer.receive() - self.assertEqual(msg.data(), f'msg-{i}'.encode()) - + consumer = await self._client.subscribe( + topic, sub, initial_position=pulsar.InitialPosition.Earliest + ) await consumer.seek(msg_ids[2]) - msg = await consumer.receive() self.assertEqual(msg.data(), b'msg-3') + await consumer.close() + + consumer = await self._client.subscribe( + topic, sub, initial_position=pulsar.InitialPosition.Earliest, + start_message_id_inclusive=True + ) + await consumer.seek(msg_ids[2]) + msg = await consumer.receive() + self.assertEqual(msg.data(), b'msg-2') + await consumer.close() async def test_seek_timestamp(self): topic = f'asyncio-test-seek-timestamp-{time.time()}' From 5f37afbbdd59c97cad703d5ae3f947e4a6daf8f8 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 23 Dec 2025 21:34:26 +0800 Subject: [PATCH 07/13] fix incorrect api doc --- pulsar/asyncio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index 0d6b2ef..3ace5ee 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -363,7 +363,7 @@ async def create_producer(self, topic: str, block_if_queue_full: bool, default=False Set whether send operations should block when the outgoing message queue is full. - batching_enabled: bool, default=False + batching_enabled: bool, default=True Enable automatic message batching. Note that, unlike the synchronous producer API in ``pulsar.__init__``, batching is enabled by default for the asyncio producer. From ed78ac54ba4653f63f1391e3ad4e7d7b0f64674c Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 23 Dec 2025 21:40:34 +0800 Subject: [PATCH 08/13] fix doc and remove message_listener --- pulsar/asyncio.py | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index 3ace5ee..97860b4 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -341,7 +341,7 @@ async def create_producer(self, topic: str, producer_name: str | None, default=None Specify a name for the producer. If not assigned, the system will generate a globally unique name which can be accessed with - `Producer.producer_name()`. When specifying a name, it is app to + `Producer.producer_name()`. When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique across all Pulsar's clusters. schema: pulsar.schema.Schema | None, default=None @@ -458,8 +458,6 @@ async def subscribe(self, topic: Union[str, List[str]], consumer_type: pulsar.ConsumerType = pulsar.ConsumerType.Exclusive, schema: pulsar.schema.Schema | None = None, - message_listener: Callable[['Consumer', pulsar.Message], - None] | None = None, receiver_queue_size: int = 1000, max_total_receiver_queue_size_across_partitions: int = 50000, @@ -501,8 +499,6 @@ async def subscribe(self, topic: Union[str, List[str]], Select the subscription type to be used when subscribing to the topic. schema: pulsar.schema.Schema | None, default=None Define the schema of the data that will be received by this consumer. - message_listener: Callable[[Consumer, pulsar.Message], None] | None, default=None - Sets a message listener for the consumer. receiver_queue_size: int, default=1000 Sets the size of the consumer receive queue. max_total_receiver_queue_size_across_partitions: int, default=50000 @@ -572,8 +568,6 @@ async def subscribe(self, topic: Union[str, List[str]], conf.consumer_type(consumer_type) conf.regex_subscription_mode(regex_subscription_mode) conf.read_compacted(is_read_compacted) - if message_listener: - conf.message_listener(_listener_wrapper(message_listener, schema)) conf.receiver_queue_size(receiver_queue_size) conf.max_total_receiver_queue_size_across_partitions( max_total_receiver_queue_size_across_partitions @@ -656,12 +650,3 @@ def complete(): else: future.set_exception(PulsarException(result)) future.get_loop().call_soon_threadsafe(complete) - -def _listener_wrapper(listener, schema): - def wrapper(consumer, msg): - c = Consumer(consumer) - m = pulsar.Message() - m._message = msg - m._schema = schema - listener(c, m) - return wrapper From e0b1de78d5d56975c72a08bbea4958160cae6b25 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 23 Dec 2025 21:42:24 +0800 Subject: [PATCH 09/13] fail when is_pattern_topic is True and topics is a list --- pulsar/asyncio.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index 97860b4..7fb92e3 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -548,8 +548,8 @@ async def subscribe(self, topic: Union[str, List[str]], default=ConsumerCryptoFailureAction.FAIL Set the behavior when the decryption fails. is_pattern_topic: bool, default=False - Whether `topic` is a regex pattern. This option takes no effect - when `topic` is a list of topics. + Whether `topic` is a regex pattern. If it's True when `topic` is a list, a ValueError + will be raised. Returns ------- @@ -617,6 +617,12 @@ async def subscribe(self, topic: Union[str, List[str]], functools.partial(_set_future, future) ) elif isinstance(topic, list): + if is_pattern_topic: + raise ValueError( + "Argument 'topic' must be a string when " + "'is_pattern_topic' is True; lists of topics do not " + "support pattern subscriptions" + ) self._client.subscribe_async_topics( topic, subscription_name, conf, functools.partial(_set_future, future) From a4405b8f5db09e153e8e0731113f02912ebbbb76 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 23 Dec 2025 21:52:04 +0800 Subject: [PATCH 10/13] fix incorrect schema --- pulsar/asyncio.py | 18 +++++++++++------- tests/asyncio_test.py | 12 ++++++++++++ 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index 7fb92e3..ca76a71 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -85,14 +85,15 @@ def __init__(self, producer: _pulsar.Producer) -> None: """ self._producer: _pulsar.Producer = producer - async def send(self, content: bytes) -> pulsar.MessageId: + async def send(self, content: Any) -> pulsar.MessageId: """ Send a message asynchronously. parameters ---------- - content: bytes - The message payload + content: Any + The message payload, whose type should respect the schema defined in + `Client.create_producer`. Returns ------- @@ -132,7 +133,7 @@ class Consumer: The Pulsar message consumer, used to subscribe to messages from a topic. """ - def __init__(self, consumer: _pulsar.Consumer) -> None: + def __init__(self, consumer: _pulsar.Consumer, schema: pulsar.schema.Schema) -> None: """ Create the consumer. Users should not call this constructor directly. Instead, create the @@ -142,8 +143,11 @@ def __init__(self, consumer: _pulsar.Consumer) -> None: ---------- consumer: _pulsar.Consumer The underlying Consumer object from the C extension. + schema: pulsar.schema.Schema + The schema of the data that will be received by this consumer. """ - self._consumer: _pulsar.Consumer = consumer + self._consumer = consumer + self._schema = schema async def receive(self) -> pulsar.Message: """ @@ -163,7 +167,7 @@ async def receive(self) -> pulsar.Message: msg = await future m = pulsar.Message() m._message = msg - m._schema = pulsar.schema.BytesSchema() + m._schema = self._schema return m async def acknowledge( @@ -633,7 +637,7 @@ async def subscribe(self, topic: Union[str, List[str]], "(str, list)" ) - return Consumer(await future) + return Consumer(await future, schema) async def close(self) -> None: """ diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py index d26d4c7..50f6836 100644 --- a/tests/asyncio_test.py +++ b/tests/asyncio_test.py @@ -252,6 +252,18 @@ async def test_seek_timestamp(self): msg = await consumer.receive() self.assertEqual(msg.data(), b'msg-3') + async def test_schema(self): + topic = f'asyncio-test-schema-{time.time()}' + producer = await self._client.create_producer( + topic, schema=pulsar.schema.StringSchema() + ) + consumer = await self._client.subscribe( + topic, 'sub', schema=pulsar.schema.StringSchema() + ) + await producer.send('test-message') + msg = await consumer.receive() + self.assertEqual(msg.value(), 'test-message') + if __name__ == '__main__': main() From 2584878c0d648a0080ba92ae89ea48785bb52047 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 23 Dec 2025 22:18:41 +0800 Subject: [PATCH 11/13] fix schema tests --- pulsar/asyncio.py | 18 ++++++++++-------- tests/asyncio_test.py | 21 +++++++++++++++++---- 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index ca76a71..77ea8e2 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -72,7 +72,7 @@ class Producer: The Pulsar message producer, used to publish messages on a topic. """ - def __init__(self, producer: _pulsar.Producer) -> None: + def __init__(self, producer: _pulsar.Producer, schema: pulsar.schema.Schema) -> None: """ Create the producer. Users should not call this constructor directly. Instead, create the @@ -82,8 +82,11 @@ def __init__(self, producer: _pulsar.Producer) -> None: ---------- producer: _pulsar.Producer The underlying Producer object from the C extension. + schema: pulsar.schema.Schema + The schema of the data that will be sent by this producer. """ - self._producer: _pulsar.Producer = producer + self._producer = producer + self._schema = schema async def send(self, content: Any) -> pulsar.MessageId: """ @@ -105,7 +108,7 @@ async def send(self, content: Any) -> pulsar.MessageId: PulsarException """ builder = _pulsar.MessageBuilder() - builder.content(content) + builder.content(self._schema.encode(content)) future = asyncio.get_running_loop().create_future() self._producer.send_async(builder.build(), functools.partial(_set_future, future)) msg_id = await future @@ -454,7 +457,8 @@ def underlying_router(msg, num_partitions): self._client.create_producer_async( topic, conf, functools.partial(_set_future, future) ) - return Producer(await future) + schema.attach_client(self._client) + return Producer(await future, schema) # pylint: disable=too-many-arguments,too-many-locals,too-many-branches,too-many-positional-arguments async def subscribe(self, topic: Union[str, List[str]], @@ -632,11 +636,9 @@ async def subscribe(self, topic: Union[str, List[str]], functools.partial(_set_future, future) ) else: - raise ValueError( - "Argument 'topic' is expected to be of a type between " - "(str, list)" - ) + raise ValueError( "Argument 'topic' is expected to be of type 'str' or 'list'") + schema.attach_client(self._client) return Consumer(await future, schema) async def close(self) -> None: diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py index 50f6836..656ffba 100644 --- a/tests/asyncio_test.py +++ b/tests/asyncio_test.py @@ -39,6 +39,12 @@ Producer, PulsarException, ) +from pulsar.schema import ( # pylint: disable=import-error + AvroSchema, + Integer, + Record, + String, +) SERVICE_URL = 'pulsar://localhost:6650' @@ -253,16 +259,23 @@ async def test_seek_timestamp(self): self.assertEqual(msg.data(), b'msg-3') async def test_schema(self): + class ExampleRecord(Record): # pylint: disable=too-few-public-methods + """Example record schema for testing.""" + str_field = String() + int_field = Integer() + topic = f'asyncio-test-schema-{time.time()}' producer = await self._client.create_producer( - topic, schema=pulsar.schema.StringSchema() + topic, schema=AvroSchema(ExampleRecord) ) consumer = await self._client.subscribe( - topic, 'sub', schema=pulsar.schema.StringSchema() + topic, 'sub', schema=AvroSchema(ExampleRecord) ) - await producer.send('test-message') + await producer.send(ExampleRecord(str_field='test', int_field=42)) msg = await consumer.receive() - self.assertEqual(msg.value(), 'test-message') + self.assertIsInstance(msg.value(), ExampleRecord) + self.assertEqual(msg.value().str_field, 'test') + self.assertEqual(msg.value().int_field, 42) if __name__ == '__main__': From 4e1e42c6cb405d7fcd582dc1f59a90773f9aee06 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 24 Dec 2025 11:16:49 +0800 Subject: [PATCH 12/13] Remove changes not related --- pulsar/asyncio.py | 113 +--------------------------------------------- 1 file changed, 1 insertion(+), 112 deletions(-) diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index 77ea8e2..83726fc 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -315,28 +315,7 @@ def __init__(self, service_url, **kwargs) -> None: # pylint: disable=too-many-arguments,too-many-locals,too-many-positional-arguments async def create_producer(self, topic: str, - producer_name: str | None = None, schema: pulsar.schema.Schema | None = None, - initial_sequence_id: int | None = None, - send_timeout_millis: int = 30000, - compression_type: CompressionType = CompressionType.NONE, - max_pending_messages: int = 1000, - max_pending_messages_across_partitions: int = 50000, - block_if_queue_full: bool = False, - batching_enabled: bool = True, - batching_max_messages: int = 1000, - batching_max_allowed_size_in_bytes: int = 128*1024, - batching_max_publish_delay_ms: int = 10, - chunking_enabled: bool = False, - message_routing_mode: PartitionsRoutingMode = - PartitionsRoutingMode.RoundRobinDistribution, - lazy_start_partitioned_producers: bool = False, - properties: dict | None = None, - batching_type: BatchingType = BatchingType.Default, - encryption_key: str | None = None, - crypto_key_reader: pulsar.CryptoKeyReader | None = None, - access_mode: ProducerAccessMode = ProducerAccessMode.Shared, - message_router: Callable[[pulsar.Message, int], int] | None = None, ) -> Producer: """ Create a new producer on a given topic @@ -345,61 +324,8 @@ async def create_producer(self, topic: str, ---------- topic: str The topic name - producer_name: str | None, default=None - Specify a name for the producer. If not assigned, the system will - generate a globally unique name which can be accessed with - `Producer.producer_name()`. When specifying a name, it is up to - the user to ensure that, for a given topic, the producer name is - unique across all Pulsar's clusters. schema: pulsar.schema.Schema | None, default=None Define the schema of the data that will be published by this producer. - initial_sequence_id: int | None, default=None - Set the baseline for the sequence ids for messages published by - the producer. - send_timeout_millis: int, default=30000 - If a message is not acknowledged by the server before the - send_timeout expires, an error will be reported. - compression_type: CompressionType, default=CompressionType.NONE - Set the compression type for the producer. - max_pending_messages: int, default=1000 - Set the max size of the queue holding the messages pending to - receive an acknowledgment from the broker. - max_pending_messages_across_partitions: int, default=50000 - Set the max size of the queue holding the messages pending to - receive an acknowledgment across partitions. - block_if_queue_full: bool, default=False - Set whether send operations should block when the outgoing - message queue is full. - batching_enabled: bool, default=True - Enable automatic message batching. Note that, unlike the synchronous - producer API in ``pulsar.__init__``, batching is enabled by default - for the asyncio producer. - batching_max_messages: int, default=1000 - Maximum number of messages in a batch. - batching_max_allowed_size_in_bytes: int, default=128*1024 - Maximum size in bytes of a batch. - batching_max_publish_delay_ms: int, default=10 - The batch interval in milliseconds. - chunking_enabled: bool, default=False - Enable chunking of large messages. - message_routing_mode: PartitionsRoutingMode, - default=PartitionsRoutingMode.RoundRobinDistribution - Set the message routing mode for the partitioned producer. - lazy_start_partitioned_producers: bool, default=False - Start partitioned producers lazily on demand. - properties: dict | None, default=None - Sets the properties for the producer. - batching_type: BatchingType, default=BatchingType.Default - Sets the batching type for the producer. - encryption_key: str | None, default=None - The key used for symmetric encryption. - crypto_key_reader: pulsar.CryptoKeyReader | None, default=None - Symmetric encryption class implementation. - access_mode: ProducerAccessMode, default=ProducerAccessMode.Shared - Set the type of access mode that the producer requires on the topic. - message_router: Callable[[pulsar.Message, int], int] | None, default=None - A custom message router function that takes a Message and the - number of partitions and returns the partition index. Returns ------- @@ -412,52 +338,15 @@ async def create_producer(self, topic: str, """ if schema is None: schema = pulsar.schema.BytesSchema() + schema.attach_client(self._client) future = asyncio.get_running_loop().create_future() conf = _pulsar.ProducerConfiguration() - conf.send_timeout_millis(send_timeout_millis) - conf.compression_type(compression_type) - conf.max_pending_messages(max_pending_messages) - conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions) - conf.block_if_queue_full(block_if_queue_full) - conf.batching_enabled(batching_enabled) - conf.batching_max_messages(batching_max_messages) - conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes) - conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms) - conf.partitions_routing_mode(message_routing_mode) - conf.batching_type(batching_type) - conf.chunking_enabled(chunking_enabled) - conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers) - conf.access_mode(access_mode) - if message_router is not None: - def underlying_router(msg, num_partitions): - return int(message_router(pulsar.Message._wrap(msg), - num_partitions)) - conf.message_router(underlying_router) - - if producer_name: - conf.producer_name(producer_name) - if initial_sequence_id is not None: - conf.initial_sequence_id(initial_sequence_id) - if properties: - for k, v in properties.items(): - conf.property(k, v) - conf.schema(schema.schema_info()) - if encryption_key: - conf.encryption_key(encryption_key) - if crypto_key_reader: - conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) - - if batching_enabled and chunking_enabled: - raise ValueError( - "Batching and chunking of messages can't be enabled together." - ) self._client.create_producer_async( topic, conf, functools.partial(_set_future, future) ) - schema.attach_client(self._client) return Producer(await future, schema) # pylint: disable=too-many-arguments,too-many-locals,too-many-branches,too-many-positional-arguments From 75495d3ff7262afbe6b9bb8744fac806ff0d60d1 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 24 Dec 2025 11:20:21 +0800 Subject: [PATCH 13/13] remove unused imports --- pulsar/asyncio.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index 83726fc..01246c6 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -25,15 +25,11 @@ import asyncio import functools -from typing import Any, Callable, List, Union +from typing import Any, List, Union import _pulsar from _pulsar import ( InitialPosition, - CompressionType, - PartitionsRoutingMode, - BatchingType, - ProducerAccessMode, RegexSubscriptionMode, ConsumerCryptoFailureAction, )