From ded4f6b4b921d11b70531c0587305a45d50a8447 Mon Sep 17 00:00:00 2001 From: Marco Moser Date: Tue, 31 Mar 2026 14:44:13 +0200 Subject: [PATCH 1/2] fix(aiokafka): keep Faust assignor for changelog tables and update admin request compatibility Ensure worker consumers use the Faust assignor whenever changelog tables are present, even with zero standby replicas, so table recovery mapping remains correct. Also update metadata/create-topics request construction for newer aiokafka APIs. --- faust/__init__.py | 1 + faust/stores/aerospike.py | 10 ++++----- faust/transport/drivers/aiokafka.py | 19 +++++++++------- tests/unit/transport/drivers/test_aiokafka.py | 22 ++++++++++++++++++- 4 files changed, 38 insertions(+), 14 deletions(-) diff --git a/faust/__init__.py b/faust/__init__.py index c20b05903..f3ef14c75 100644 --- a/faust/__init__.py +++ b/faust/__init__.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- """Python Stream processing.""" + # :copyright: (c) 2017-2020, Robinhood Markets, Inc. # All rights reserved. # :license: BSD (3 Clause), see LICENSE for more details. diff --git a/faust/stores/aerospike.py b/faust/stores/aerospike.py index 291ccbe9f..198f78dc7 100644 --- a/faust/stores/aerospike.py +++ b/faust/stores/aerospike.py @@ -98,7 +98,7 @@ def _get(self, key: bytes) -> Optional[bytes]: key = (self.namespace, self.table_name, key) fun = self.client.get try: - (key, meta, bins) = self.aerospike_fun_call_with_retry(fun=fun, key=key) + key, meta, bins = self.aerospike_fun_call_with_retry(fun=fun, key=key) if bins: return bins[self.BIN_KEY] return None @@ -173,7 +173,7 @@ def _itervalues(self) -> Iterator[bytes]: fun=fun, namespace=self.namespace, set=self.table_name ) for result in scan.results(): - (key, meta, bins) = result + key, meta, bins = result if bins: yield bins[self.BIN_KEY] else: @@ -193,8 +193,8 @@ def _iteritems(self) -> Iterator[Tuple[bytes, bytes]]: fun=fun, namespace=self.namespace, set=self.table_name ) for result in scan.results(): - (key_data, meta, bins) = result - (ns, set, policy, key) = key_data + key_data, meta, bins = result + ns, set, policy, key = key_data if bins: bins = bins[self.BIN_KEY] @@ -214,7 +214,7 @@ def _contains(self, key: bytes) -> bool: try: if self.app.conf.store_check_exists: key = (self.namespace, self.table_name, key) - (key, meta) = self.aerospike_fun_call_with_retry( + key, meta = self.aerospike_fun_call_with_retry( fun=self.client.exists, key=key ) if meta: diff --git a/faust/transport/drivers/aiokafka.py b/faust/transport/drivers/aiokafka.py index 943e9ebc3..961366af2 100644 --- a/faust/transport/drivers/aiokafka.py +++ b/faust/transport/drivers/aiokafka.py @@ -27,9 +27,6 @@ import aiokafka import aiokafka.abc import opentracing -from packaging.version import Version - -_AIOKAFKA_HAS_API_VERSION = Version(aiokafka.__version__) < Version("0.13.0") from aiokafka import TopicPartition from aiokafka.consumer.group_coordinator import OffsetCommitRequest from aiokafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor @@ -45,7 +42,7 @@ ) from aiokafka.partitioner import DefaultPartitioner, murmur2 from aiokafka.protocol.admin import CreateTopicsRequest -from aiokafka.protocol.metadata import MetadataRequest_v1 +from aiokafka.protocol.metadata import MetadataRequest from aiokafka.structs import OffsetAndMetadata, TopicPartition as _TopicPartition from aiokafka.util import parse_kafka_version from mode import Service, get_logger @@ -55,6 +52,7 @@ from mode.utils.objects import cached_property from mode.utils.times import Seconds, humanize_seconds_ago, want_seconds from opentracing.ext import tags +from packaging.version import Version from yarl import URL from faust.auth import ( @@ -96,6 +94,8 @@ logger = get_logger(__name__) +_AIOKAFKA_HAS_API_VERSION = Version(aiokafka.__version__) < Version("0.13.0") + DEFAULT_GENERATION_ID = OffsetCommitRequest.DEFAULT_GENERATION_ID TOPIC_LENGTH_MAX = 249 @@ -511,9 +511,13 @@ def _create_worker_consumer( conf = self.app.conf if self.consumer.in_transaction: isolation_level = "read_committed" + # Table recovery depends on app.assignor state to map changelog + # active/standby partitions. Keep Faust assignor enabled whenever + # this app has changelog tables configured. + has_changelog_tables = bool(self.app.tables.changelog_topics) self._assignor = ( self.app.assignor - if self.app.conf.table_standby_replicas > 0 + if self.app.conf.table_standby_replicas > 0 or has_changelog_tables else RoundRobinPartitionAssignor ) auth_settings = credentials_to_aiokafka_auth( @@ -1513,7 +1517,7 @@ async def _get_controller_node( for node_id in nodes: if node_id is None: raise NotReady("Not connected to Kafka Broker") - request = MetadataRequest_v1([]) + request = MetadataRequest([]) wait_result = await owner.wait( client.send(node_id, request), timeout=timeout, @@ -1546,7 +1550,6 @@ async def _really_create_topic( owner.log.debug("Topic %r exists, skipping creation.", topic) return - protocol_version = 1 extra_configs = config or {} config = self._topic_config(retention, compacting, deleting) config.update(extra_configs) @@ -1563,7 +1566,7 @@ async def _really_create_topic( else: raise Exception("Controller node is None") - request = CreateTopicsRequest[protocol_version]( + request = CreateTopicsRequest( [(topic, partitions, replication, [], list(config.items()))], timeout, False, diff --git a/tests/unit/transport/drivers/test_aiokafka.py b/tests/unit/transport/drivers/test_aiokafka.py index a0252e1a2..d36665704 100644 --- a/tests/unit/transport/drivers/test_aiokafka.py +++ b/tests/unit/transport/drivers/test_aiokafka.py @@ -17,10 +17,10 @@ import faust from faust import auth from faust.exceptions import ImproperlyConfigured, NotReady -from faust.transport.drivers.aiokafka import _AIOKAFKA_HAS_API_VERSION from faust.sensors.monitor import Monitor from faust.transport.drivers import aiokafka as mod from faust.transport.drivers.aiokafka import ( + _AIOKAFKA_HAS_API_VERSION, SLOW_PROCESSING_CAUSE_AGENT, SLOW_PROCESSING_CAUSE_STREAM, SLOW_PROCESSING_EXPLAINED, @@ -796,6 +796,26 @@ def test__create_worker_consumer__transaction(self, *, cthread, app): isolation_level="read_committed", ) + def test__create_worker_consumer__uses_roundrobin_without_tables( + self, *, cthread, app + ): + app.conf.table_standby_replicas = 0 + app.tables._changelogs.clear() + transport = cthread.transport + with patch("aiokafka.AIOKafkaConsumer"): + cthread._create_worker_consumer(transport) + assert cthread._assignor is mod.RoundRobinPartitionAssignor + + def test__create_worker_consumer__uses_faust_assignor_with_changelog_topics( + self, *, cthread, app + ): + app.conf.table_standby_replicas = 0 + app.tables._changelogs["app-foo-changelog"] = Mock(name="table") + transport = cthread.transport + with patch("aiokafka.AIOKafkaConsumer"): + cthread._create_worker_consumer(transport) + assert cthread._assignor is app.assignor + def assert_create_worker_consumer( self, cthread, From 52754845521711359e4f55edc92515016ef01ada Mon Sep 17 00:00:00 2001 From: Marco Moser Date: Wed, 1 Apr 2026 13:09:27 +0200 Subject: [PATCH 2/2] remove support for python3.9 and below --- .github/workflows/python-package.yml | 6 +----- README.md | 2 +- docs/includes/faq.txt | 2 +- docs/introduction.rst | 2 +- docs/userguide/application.rst | 2 +- examples/django/setup.py | 2 +- pyproject.toml | 4 +--- setup.py | 6 +++--- tox.ini | 4 +--- 9 files changed, 11 insertions(+), 19 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index f3b02a839..6db891f73 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -42,13 +42,9 @@ jobs: # for example if a test fails only when Cython is enabled fail-fast: false matrix: - python-version: ['3.9', '3.10', '3.11', '3.12', '3.13'] + python-version: ['3.10', '3.11', '3.12', '3.13'] use-cython: ['true', 'false'] experimental: [false] - include: - - python-version: 'pypy3.9' - use-cython: false - experimental: true env: USE_CYTHON: ${{ matrix.use-cython }} continue-on-error: ${{ matrix.experimental }} diff --git a/README.md b/README.md index c1391f2b6..e4212211a 100644 --- a/README.md +++ b/README.md @@ -416,7 +416,7 @@ Yes! Use the `asyncio` reactor implementation: https://twistedmatrix.com/documen ### Will you support Python 2.7 or Python 3.5 -No. Faust requires Python 3.8 or later, since it heavily uses features that were +No. Faust requires Python 3.10 or later, since it heavily uses features that were introduced in Python 3.6 (`async`, `await`, variable type annotations). ### I get a maximum number of open files exceeded error by RocksDB when running a Faust app locally. How can I fix this diff --git a/docs/includes/faq.txt b/docs/includes/faq.txt index 7641cb64a..1b6babd2b 100644 --- a/docs/includes/faq.txt +++ b/docs/includes/faq.txt @@ -55,7 +55,7 @@ https://twistedmatrix.com/documents/17.1.0/api/twisted.internet.asyncioreactor.h Will you support Python 2.7 or Python 3.5? ------------------------------------------ -No. Faust requires Python 3.8 or later, since it heavily uses features that were +No. Faust requires Python 3.10 or later, since it heavily uses features that were introduced in Python 3.6 (`async`, `await`, variable type annotations). diff --git a/docs/introduction.rst b/docs/introduction.rst index 5932baa23..ad064a4f3 100644 --- a/docs/introduction.rst +++ b/docs/introduction.rst @@ -162,7 +162,7 @@ What do I need? - RocksDB 5.0 or later, :pypi:`python-rocksdb` -Faust requires Python 3.8 or later, and a running Kafka broker. +Faust requires Python 3.10 or later, and a running Kafka broker. There's no plan to support earlier Python versions. Please get in touch if this is something you want to work on. diff --git a/docs/userguide/application.rst b/docs/userguide/application.rst index 832739c81..29c21b970 100644 --- a/docs/userguide/application.rst +++ b/docs/userguide/application.rst @@ -1374,7 +1374,7 @@ setuptools to install a command-line program for your project. include_package_data=True, zip_safe=False, install_requires=['faust'], - python_requires='~=3.8', + python_requires='~=3.10', ) For inspiration you can also look to the `setup.py` files in the diff --git a/examples/django/setup.py b/examples/django/setup.py index eda2bfd42..6cef929b8 100644 --- a/examples/django/setup.py +++ b/examples/django/setup.py @@ -95,7 +95,7 @@ def reqs(*f): license='BSD', packages=find_packages(exclude=['ez_setup', 'tests', 'tests.*']), include_package_data=True, - python_requires='>=3.8.0', + python_requires='>=3.10.0', keywords=[], zip_safe=False, install_requires=reqs('default.txt'), diff --git a/pyproject.toml b/pyproject.toml index 6ddfebde8..b4b7f923c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "faust-streaming" description = "Python Stream Processing. A Faust fork" -requires-python = ">=3.9" +requires-python = ">=3.10" dynamic = [ "version", "optional-dependencies", @@ -26,8 +26,6 @@ classifiers = [ "License :: OSI Approved :: BSD License", "Programming Language :: Python", "Programming Language :: Python :: 3 :: Only", - "Programming Language :: Python :: 3.8", - "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", diff --git a/setup.py b/setup.py index e8047d085..877301a3c 100644 --- a/setup.py +++ b/setup.py @@ -46,8 +46,8 @@ LIBRARIES = [] E_UNSUPPORTED_PYTHON = NAME + " 1.0 requires Python %%s or later!" -if sys.version_info < (3, 8): - raise Exception(E_UNSUPPORTED_PYTHON % ("3.8",)) # NOQA +if sys.version_info < (3, 10): + raise Exception(E_UNSUPPORTED_PYTHON % ("3.10",)) # NOQA from pathlib import Path # noqa @@ -197,7 +197,7 @@ def do_setup(**kwargs): # PEP-561: https://www.python.org/dev/peps/pep-0561/ package_data={"faust": ["py.typed"]}, include_package_data=True, - python_requires=">=3.8.0", + python_requires=">=3.10.0", zip_safe=False, install_requires=reqs("requirements.txt"), tests_require=reqs("test.txt"), diff --git a/tox.ini b/tox.ini index 59aff06e1..e1673e689 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = 3.12,3.11,3.10,3.9,3.8,flake8,apicheck,configcheck,typecheck,docstyle,bandit,spell +envlist = 3.12,3.11,3.10,flake8,apicheck,configcheck,typecheck,docstyle,bandit,spell [testenv] deps= @@ -20,8 +20,6 @@ basepython = 3.12,flake8,apicheck,linkcheck,configcheck,typecheck,docstyle,bandit,spell: python3.12 3.11,flake8,apicheck,linkcheck,configcheck,typecheck,docstyle,bandit,spell: python3.11 3.10,flake8,apicheck,linkcheck,configcheck,typecheck,docstyle,bandit,spell: python3.10 - 3.9,flake8,apicheck,linkcheck,configcheck,typecheck,docstyle,bandit,spell: python3.9 - 3.8,flake8,apicheck,linkcheck,configcheck,typecheck,docstyle,bandit,spell: python3.8 [testenv:apicheck] setenv =