-
Notifications
You must be signed in to change notification settings - Fork 48
Implement async consumer #277
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement async consumer #277
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements async consumer support and migrates producer/consumer configuration options from the sync API to the async API in the Pulsar Python client. The changes address feature requests for full asyncio support (issue #55) and exposing all producer configuration parameters in the async client (issue #261).
Key Changes:
- Added comprehensive async Consumer class with methods for receive, acknowledge, seek, unsubscribe, and close operations
- Migrated all producer configuration parameters from sync to async API with proper validation and defaults
- Added async subscribe method with full configuration support including multi-topic and pattern subscriptions
- Implemented C++ bindings for async consumer operations with proper GIL management
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| pulsar/asyncio.py | Adds Consumer class with async operations; extends create_producer and adds subscribe method with full configuration parameter support |
| src/consumer.cc | Implements async wrapper functions for consumer operations (receive, acknowledge, seek, close, unsubscribe) with GIL release |
| src/client.cc | Adds async subscribe methods for single topic, multiple topics, and pattern-based subscriptions |
| tests/asyncio_test.py | Adds comprehensive test coverage for async consumer operations including acknowledgement, multi-topic, seek, and unsubscribe functionality; fixes typos in test names |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 10 comments.
Comments suppressed due to low confidence (2)
pulsar/asyncio.py:117
- The Producer class doesn't store the schema that was used during creation. The synchronous Producer stores
self._schemaand uses it in_build_msgto encode content viaself._schema.encode(content). The async Producer directly passes content to the MessageBuilder without encoding it through the schema. This means that when using non-bytes schemas (e.g., StringSchema, JSON schemas), the async Producer won't properly encode the data. Store the schema in the Producer's__init__and use it to encode content in thesendmethod.
def __init__(self, producer: _pulsar.Producer) -> None:
"""
Create the producer.
Users should not call this constructor directly. Instead, create the
producer via `Client.create_producer`.
Parameters
----------
producer: _pulsar.Producer
The underlying Producer object from the C extension.
"""
self._producer: _pulsar.Producer = producer
async def send(self, content: Any) -> pulsar.MessageId:
"""
Send a message asynchronously.
parameters
----------
content: Any
The message payload, whose type should respect the schema defined in
`Client.create_producer`.
Returns
-------
pulsar.MessageId
The message id that represents the persisted position of the message.
Raises
------
PulsarException
"""
builder = _pulsar.MessageBuilder()
builder.content(content)
future = asyncio.get_running_loop().create_future()
self._producer.send_async(builder.build(), functools.partial(_set_future, future))
msg_id = await future
return pulsar.MessageId(
msg_id.partition(),
msg_id.ledger_id(),
msg_id.entry_id(),
msg_id.batch_index(),
)
pulsar/asyncio.py:117
- The async Producer's
send()method only acceptscontentas a parameter, while the synchronous Producer'ssend()method supports additional parameters likeproperties,partition_key,ordering_key,sequence_id,replication_clusters,disable_replication,event_timestamp,deliver_at, anddeliver_after. This limits the functionality of the async producer. Consider adding these parameters to match the synchronous API, allowing users to set message metadata and delivery options.
async def send(self, content: Any) -> pulsar.MessageId:
"""
Send a message asynchronously.
parameters
----------
content: Any
The message payload, whose type should respect the schema defined in
`Client.create_producer`.
Returns
-------
pulsar.MessageId
The message id that represents the persisted position of the message.
Raises
------
PulsarException
"""
builder = _pulsar.MessageBuilder()
builder.content(content)
future = asyncio.get_running_loop().create_future()
self._producer.send_async(builder.build(), functools.partial(_set_future, future))
msg_id = await future
return pulsar.MessageId(
msg_id.partition(),
msg_id.ledger_id(),
msg_id.entry_id(),
msg_id.batch_index(),
)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Main issue: #55
The options are migrated from the original
create_producerorsubscribemethods frompulsar.Clientdirectly so most of them are not covered by tests. The differences are:is_pattern_topicis added to specify whether the topic argument represents a topic regex rather than a topic name.pattern_auto_discovery_periodactually never works, so it is not added to the new APImessage_listeneris not supported because it's hard to use in asyncio based programming, we can support it if there is reasonable request for this feature in future| Noneto indicate it can be None)