diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index f3b02a839..6db891f73 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -42,13 +42,9 @@ jobs: # for example if a test fails only when Cython is enabled fail-fast: false matrix: - python-version: ['3.9', '3.10', '3.11', '3.12', '3.13'] + python-version: ['3.10', '3.11', '3.12', '3.13'] use-cython: ['true', 'false'] experimental: [false] - include: - - python-version: 'pypy3.9' - use-cython: false - experimental: true env: USE_CYTHON: ${{ matrix.use-cython }} continue-on-error: ${{ matrix.experimental }} diff --git a/README.md b/README.md index c1391f2b6..e4212211a 100644 --- a/README.md +++ b/README.md @@ -416,7 +416,7 @@ Yes! Use the `asyncio` reactor implementation: https://twistedmatrix.com/documen ### Will you support Python 2.7 or Python 3.5 -No. Faust requires Python 3.8 or later, since it heavily uses features that were +No. Faust requires Python 3.10 or later, since it heavily uses features that were introduced in Python 3.6 (`async`, `await`, variable type annotations). ### I get a maximum number of open files exceeded error by RocksDB when running a Faust app locally. How can I fix this diff --git a/docs/includes/faq.txt b/docs/includes/faq.txt index 7641cb64a..1b6babd2b 100644 --- a/docs/includes/faq.txt +++ b/docs/includes/faq.txt @@ -55,7 +55,7 @@ https://twistedmatrix.com/documents/17.1.0/api/twisted.internet.asyncioreactor.h Will you support Python 2.7 or Python 3.5? ------------------------------------------ -No. Faust requires Python 3.8 or later, since it heavily uses features that were +No. Faust requires Python 3.10 or later, since it heavily uses features that were introduced in Python 3.6 (`async`, `await`, variable type annotations). diff --git a/docs/introduction.rst b/docs/introduction.rst index 5932baa23..ad064a4f3 100644 --- a/docs/introduction.rst +++ b/docs/introduction.rst @@ -162,7 +162,7 @@ What do I need? - RocksDB 5.0 or later, :pypi:`python-rocksdb` -Faust requires Python 3.8 or later, and a running Kafka broker. +Faust requires Python 3.10 or later, and a running Kafka broker. There's no plan to support earlier Python versions. Please get in touch if this is something you want to work on. diff --git a/docs/userguide/application.rst b/docs/userguide/application.rst index 832739c81..29c21b970 100644 --- a/docs/userguide/application.rst +++ b/docs/userguide/application.rst @@ -1374,7 +1374,7 @@ setuptools to install a command-line program for your project. include_package_data=True, zip_safe=False, install_requires=['faust'], - python_requires='~=3.8', + python_requires='~=3.10', ) For inspiration you can also look to the `setup.py` files in the diff --git a/examples/django/setup.py b/examples/django/setup.py index eda2bfd42..6cef929b8 100644 --- a/examples/django/setup.py +++ b/examples/django/setup.py @@ -95,7 +95,7 @@ def reqs(*f): license='BSD', packages=find_packages(exclude=['ez_setup', 'tests', 'tests.*']), include_package_data=True, - python_requires='>=3.8.0', + python_requires='>=3.10.0', keywords=[], zip_safe=False, install_requires=reqs('default.txt'), diff --git a/faust/__init__.py b/faust/__init__.py index c20b05903..f3ef14c75 100644 --- a/faust/__init__.py +++ b/faust/__init__.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- """Python Stream processing.""" + # :copyright: (c) 2017-2020, Robinhood Markets, Inc. # All rights reserved. # :license: BSD (3 Clause), see LICENSE for more details. diff --git a/faust/stores/aerospike.py b/faust/stores/aerospike.py index 291ccbe9f..198f78dc7 100644 --- a/faust/stores/aerospike.py +++ b/faust/stores/aerospike.py @@ -98,7 +98,7 @@ def _get(self, key: bytes) -> Optional[bytes]: key = (self.namespace, self.table_name, key) fun = self.client.get try: - (key, meta, bins) = self.aerospike_fun_call_with_retry(fun=fun, key=key) + key, meta, bins = self.aerospike_fun_call_with_retry(fun=fun, key=key) if bins: return bins[self.BIN_KEY] return None @@ -173,7 +173,7 @@ def _itervalues(self) -> Iterator[bytes]: fun=fun, namespace=self.namespace, set=self.table_name ) for result in scan.results(): - (key, meta, bins) = result + key, meta, bins = result if bins: yield bins[self.BIN_KEY] else: @@ -193,8 +193,8 @@ def _iteritems(self) -> Iterator[Tuple[bytes, bytes]]: fun=fun, namespace=self.namespace, set=self.table_name ) for result in scan.results(): - (key_data, meta, bins) = result - (ns, set, policy, key) = key_data + key_data, meta, bins = result + ns, set, policy, key = key_data if bins: bins = bins[self.BIN_KEY] @@ -214,7 +214,7 @@ def _contains(self, key: bytes) -> bool: try: if self.app.conf.store_check_exists: key = (self.namespace, self.table_name, key) - (key, meta) = self.aerospike_fun_call_with_retry( + key, meta = self.aerospike_fun_call_with_retry( fun=self.client.exists, key=key ) if meta: diff --git a/faust/transport/drivers/aiokafka.py b/faust/transport/drivers/aiokafka.py index 943e9ebc3..961366af2 100644 --- a/faust/transport/drivers/aiokafka.py +++ b/faust/transport/drivers/aiokafka.py @@ -27,9 +27,6 @@ import aiokafka import aiokafka.abc import opentracing -from packaging.version import Version - -_AIOKAFKA_HAS_API_VERSION = Version(aiokafka.__version__) < Version("0.13.0") from aiokafka import TopicPartition from aiokafka.consumer.group_coordinator import OffsetCommitRequest from aiokafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor @@ -45,7 +42,7 @@ ) from aiokafka.partitioner import DefaultPartitioner, murmur2 from aiokafka.protocol.admin import CreateTopicsRequest -from aiokafka.protocol.metadata import MetadataRequest_v1 +from aiokafka.protocol.metadata import MetadataRequest from aiokafka.structs import OffsetAndMetadata, TopicPartition as _TopicPartition from aiokafka.util import parse_kafka_version from mode import Service, get_logger @@ -55,6 +52,7 @@ from mode.utils.objects import cached_property from mode.utils.times import Seconds, humanize_seconds_ago, want_seconds from opentracing.ext import tags +from packaging.version import Version from yarl import URL from faust.auth import ( @@ -96,6 +94,8 @@ logger = get_logger(__name__) +_AIOKAFKA_HAS_API_VERSION = Version(aiokafka.__version__) < Version("0.13.0") + DEFAULT_GENERATION_ID = OffsetCommitRequest.DEFAULT_GENERATION_ID TOPIC_LENGTH_MAX = 249 @@ -511,9 +511,13 @@ def _create_worker_consumer( conf = self.app.conf if self.consumer.in_transaction: isolation_level = "read_committed" + # Table recovery depends on app.assignor state to map changelog + # active/standby partitions. Keep Faust assignor enabled whenever + # this app has changelog tables configured. + has_changelog_tables = bool(self.app.tables.changelog_topics) self._assignor = ( self.app.assignor - if self.app.conf.table_standby_replicas > 0 + if self.app.conf.table_standby_replicas > 0 or has_changelog_tables else RoundRobinPartitionAssignor ) auth_settings = credentials_to_aiokafka_auth( @@ -1513,7 +1517,7 @@ async def _get_controller_node( for node_id in nodes: if node_id is None: raise NotReady("Not connected to Kafka Broker") - request = MetadataRequest_v1([]) + request = MetadataRequest([]) wait_result = await owner.wait( client.send(node_id, request), timeout=timeout, @@ -1546,7 +1550,6 @@ async def _really_create_topic( owner.log.debug("Topic %r exists, skipping creation.", topic) return - protocol_version = 1 extra_configs = config or {} config = self._topic_config(retention, compacting, deleting) config.update(extra_configs) @@ -1563,7 +1566,7 @@ async def _really_create_topic( else: raise Exception("Controller node is None") - request = CreateTopicsRequest[protocol_version]( + request = CreateTopicsRequest( [(topic, partitions, replication, [], list(config.items()))], timeout, False, diff --git a/pyproject.toml b/pyproject.toml index 6ddfebde8..b4b7f923c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "faust-streaming" description = "Python Stream Processing. A Faust fork" -requires-python = ">=3.9" +requires-python = ">=3.10" dynamic = [ "version", "optional-dependencies", @@ -26,8 +26,6 @@ classifiers = [ "License :: OSI Approved :: BSD License", "Programming Language :: Python", "Programming Language :: Python :: 3 :: Only", - "Programming Language :: Python :: 3.8", - "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", diff --git a/setup.py b/setup.py index e8047d085..877301a3c 100644 --- a/setup.py +++ b/setup.py @@ -46,8 +46,8 @@ LIBRARIES = [] E_UNSUPPORTED_PYTHON = NAME + " 1.0 requires Python %%s or later!" -if sys.version_info < (3, 8): - raise Exception(E_UNSUPPORTED_PYTHON % ("3.8",)) # NOQA +if sys.version_info < (3, 10): + raise Exception(E_UNSUPPORTED_PYTHON % ("3.10",)) # NOQA from pathlib import Path # noqa @@ -197,7 +197,7 @@ def do_setup(**kwargs): # PEP-561: https://www.python.org/dev/peps/pep-0561/ package_data={"faust": ["py.typed"]}, include_package_data=True, - python_requires=">=3.8.0", + python_requires=">=3.10.0", zip_safe=False, install_requires=reqs("requirements.txt"), tests_require=reqs("test.txt"), diff --git a/tests/unit/transport/drivers/test_aiokafka.py b/tests/unit/transport/drivers/test_aiokafka.py index a0252e1a2..d36665704 100644 --- a/tests/unit/transport/drivers/test_aiokafka.py +++ b/tests/unit/transport/drivers/test_aiokafka.py @@ -17,10 +17,10 @@ import faust from faust import auth from faust.exceptions import ImproperlyConfigured, NotReady -from faust.transport.drivers.aiokafka import _AIOKAFKA_HAS_API_VERSION from faust.sensors.monitor import Monitor from faust.transport.drivers import aiokafka as mod from faust.transport.drivers.aiokafka import ( + _AIOKAFKA_HAS_API_VERSION, SLOW_PROCESSING_CAUSE_AGENT, SLOW_PROCESSING_CAUSE_STREAM, SLOW_PROCESSING_EXPLAINED, @@ -796,6 +796,26 @@ def test__create_worker_consumer__transaction(self, *, cthread, app): isolation_level="read_committed", ) + def test__create_worker_consumer__uses_roundrobin_without_tables( + self, *, cthread, app + ): + app.conf.table_standby_replicas = 0 + app.tables._changelogs.clear() + transport = cthread.transport + with patch("aiokafka.AIOKafkaConsumer"): + cthread._create_worker_consumer(transport) + assert cthread._assignor is mod.RoundRobinPartitionAssignor + + def test__create_worker_consumer__uses_faust_assignor_with_changelog_topics( + self, *, cthread, app + ): + app.conf.table_standby_replicas = 0 + app.tables._changelogs["app-foo-changelog"] = Mock(name="table") + transport = cthread.transport + with patch("aiokafka.AIOKafkaConsumer"): + cthread._create_worker_consumer(transport) + assert cthread._assignor is app.assignor + def assert_create_worker_consumer( self, cthread, diff --git a/tox.ini b/tox.ini index 59aff06e1..e1673e689 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = 3.12,3.11,3.10,3.9,3.8,flake8,apicheck,configcheck,typecheck,docstyle,bandit,spell +envlist = 3.12,3.11,3.10,flake8,apicheck,configcheck,typecheck,docstyle,bandit,spell [testenv] deps= @@ -20,8 +20,6 @@ basepython = 3.12,flake8,apicheck,linkcheck,configcheck,typecheck,docstyle,bandit,spell: python3.12 3.11,flake8,apicheck,linkcheck,configcheck,typecheck,docstyle,bandit,spell: python3.11 3.10,flake8,apicheck,linkcheck,configcheck,typecheck,docstyle,bandit,spell: python3.10 - 3.9,flake8,apicheck,linkcheck,configcheck,typecheck,docstyle,bandit,spell: python3.9 - 3.8,flake8,apicheck,linkcheck,configcheck,typecheck,docstyle,bandit,spell: python3.8 [testenv:apicheck] setenv =