Skip to content

Conversation

@BewareMyPower
Copy link
Contributor

@BewareMyPower BewareMyPower commented Dec 23, 2025

Main issue: #55

The options are migrated from the original create_producer or subscribe methods from pulsar.Client directly so most of them are not covered by tests. The differences are:

  • is_pattern_topic is added to specify whether the topic argument represents a topic regex rather than a topic name.
  • pattern_auto_discovery_period actually never works, so it is not added to the new API
  • message_listener is not supported because it's hard to use in asyncio based programming, we can support it if there is reasonable request for this feature in future
  • fix all incorrect type hints (typically by adding | None to indicate it can be None)

@BewareMyPower BewareMyPower changed the title Support basic functionality of asyncio based consumer Implement async consumer and migrate existing options when creating producer or consumer Dec 23, 2025
@BewareMyPower BewareMyPower self-assigned this Dec 23, 2025
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements async consumer support and migrates producer/consumer configuration options from the sync API to the async API in the Pulsar Python client. The changes address feature requests for full asyncio support (issue #55) and exposing all producer configuration parameters in the async client (issue #261).

Key Changes:

  • Added comprehensive async Consumer class with methods for receive, acknowledge, seek, unsubscribe, and close operations
  • Migrated all producer configuration parameters from sync to async API with proper validation and defaults
  • Added async subscribe method with full configuration support including multi-topic and pattern subscriptions
  • Implemented C++ bindings for async consumer operations with proper GIL management

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 7 comments.

File Description
pulsar/asyncio.py Adds Consumer class with async operations; extends create_producer and adds subscribe method with full configuration parameter support
src/consumer.cc Implements async wrapper functions for consumer operations (receive, acknowledge, seek, close, unsubscribe) with GIL release
src/client.cc Adds async subscribe methods for single topic, multiple topics, and pattern-based subscriptions
tests/asyncio_test.py Adds comprehensive test coverage for async consumer operations including acknowledgement, multi-topic, seek, and unsubscribe functionality; fixes typos in test names

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@BewareMyPower BewareMyPower marked this pull request as draft December 23, 2025 12:50
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 6 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 10 comments.

Comments suppressed due to low confidence (2)

pulsar/asyncio.py:117

  • The Producer class doesn't store the schema that was used during creation. The synchronous Producer stores self._schema and uses it in _build_msg to encode content via self._schema.encode(content). The async Producer directly passes content to the MessageBuilder without encoding it through the schema. This means that when using non-bytes schemas (e.g., StringSchema, JSON schemas), the async Producer won't properly encode the data. Store the schema in the Producer's __init__ and use it to encode content in the send method.
    def __init__(self, producer: _pulsar.Producer) -> None:
        """
        Create the producer.
        Users should not call this constructor directly. Instead, create the
        producer via `Client.create_producer`.

        Parameters
        ----------
        producer: _pulsar.Producer
            The underlying Producer object from the C extension.
        """
        self._producer: _pulsar.Producer = producer

    async def send(self, content: Any) -> pulsar.MessageId:
        """
        Send a message asynchronously.

        parameters
        ----------
        content: Any
            The message payload, whose type should respect the schema defined in
            `Client.create_producer`.

        Returns
        -------
        pulsar.MessageId
            The message id that represents the persisted position of the message.

        Raises
        ------
        PulsarException
        """
        builder = _pulsar.MessageBuilder()
        builder.content(content)
        future = asyncio.get_running_loop().create_future()
        self._producer.send_async(builder.build(), functools.partial(_set_future, future))
        msg_id = await future
        return pulsar.MessageId(
            msg_id.partition(),
            msg_id.ledger_id(),
            msg_id.entry_id(),
            msg_id.batch_index(),
        )

pulsar/asyncio.py:117

  • The async Producer's send() method only accepts content as a parameter, while the synchronous Producer's send() method supports additional parameters like properties, partition_key, ordering_key, sequence_id, replication_clusters, disable_replication, event_timestamp, deliver_at, and deliver_after. This limits the functionality of the async producer. Consider adding these parameters to match the synchronous API, allowing users to set message metadata and delivery options.
    async def send(self, content: Any) -> pulsar.MessageId:
        """
        Send a message asynchronously.

        parameters
        ----------
        content: Any
            The message payload, whose type should respect the schema defined in
            `Client.create_producer`.

        Returns
        -------
        pulsar.MessageId
            The message id that represents the persisted position of the message.

        Raises
        ------
        PulsarException
        """
        builder = _pulsar.MessageBuilder()
        builder.content(content)
        future = asyncio.get_running_loop().create_future()
        self._producer.send_async(builder.build(), functools.partial(_set_future, future))
        msg_id = await future
        return pulsar.MessageId(
            msg_id.partition(),
            msg_id.ledger_id(),
            msg_id.entry_id(),
            msg_id.batch_index(),
        )

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@BewareMyPower BewareMyPower marked this pull request as ready for review December 23, 2025 14:19
@BewareMyPower BewareMyPower changed the title Implement async consumer and migrate existing options when creating producer or consumer Implement async consumer Dec 24, 2025
@BewareMyPower BewareMyPower merged commit 961411f into apache:main Dec 24, 2025
17 of 22 checks passed
@BewareMyPower BewareMyPower deleted the bewaremypower/asyncio-consumer branch December 24, 2025 08:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants