Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,9 @@ jobs:
# for example if a test fails only when Cython is enabled
fail-fast: false
matrix:
python-version: ['3.9', '3.10', '3.11', '3.12', '3.13']
python-version: ['3.10', '3.11', '3.12', '3.13']
use-cython: ['true', 'false']
experimental: [false]
include:
- python-version: 'pypy3.9'
use-cython: false
experimental: true
env:
USE_CYTHON: ${{ matrix.use-cython }}
continue-on-error: ${{ matrix.experimental }}
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ Yes! Use the `asyncio` reactor implementation: https://twistedmatrix.com/documen

### Will you support Python 2.7 or Python 3.5

No. Faust requires Python 3.8 or later, since it heavily uses features that were
No. Faust requires Python 3.10 or later, since it heavily uses features that were
introduced in Python 3.6 (`async`, `await`, variable type annotations).

### I get a maximum number of open files exceeded error by RocksDB when running a Faust app locally. How can I fix this
Expand Down
2 changes: 1 addition & 1 deletion docs/includes/faq.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ https://twistedmatrix.com/documents/17.1.0/api/twisted.internet.asyncioreactor.h
Will you support Python 2.7 or Python 3.5?
------------------------------------------

No. Faust requires Python 3.8 or later, since it heavily uses features that were
No. Faust requires Python 3.10 or later, since it heavily uses features that were
introduced in Python 3.6 (`async`, `await`, variable type annotations).


Expand Down
2 changes: 1 addition & 1 deletion docs/introduction.rst
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ What do I need?

- RocksDB 5.0 or later, :pypi:`python-rocksdb`

Faust requires Python 3.8 or later, and a running Kafka broker.
Faust requires Python 3.10 or later, and a running Kafka broker.

There's no plan to support earlier Python versions.
Please get in touch if this is something you want to work on.
Expand Down
2 changes: 1 addition & 1 deletion docs/userguide/application.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1374,7 +1374,7 @@ setuptools to install a command-line program for your project.
include_package_data=True,
zip_safe=False,
install_requires=['faust'],
python_requires='~=3.8',
python_requires='~=3.10',
)

For inspiration you can also look to the `setup.py` files in the
Expand Down
2 changes: 1 addition & 1 deletion examples/django/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def reqs(*f):
license='BSD',
packages=find_packages(exclude=['ez_setup', 'tests', 'tests.*']),
include_package_data=True,
python_requires='>=3.8.0',
python_requires='>=3.10.0',
keywords=[],
zip_safe=False,
install_requires=reqs('default.txt'),
Expand Down
1 change: 1 addition & 0 deletions faust/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
"""Python Stream processing."""

# :copyright: (c) 2017-2020, Robinhood Markets, Inc.
# All rights reserved.
# :license: BSD (3 Clause), see LICENSE for more details.
Expand Down
10 changes: 5 additions & 5 deletions faust/stores/aerospike.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def _get(self, key: bytes) -> Optional[bytes]:
key = (self.namespace, self.table_name, key)
fun = self.client.get
try:
(key, meta, bins) = self.aerospike_fun_call_with_retry(fun=fun, key=key)
key, meta, bins = self.aerospike_fun_call_with_retry(fun=fun, key=key)
if bins:
return bins[self.BIN_KEY]
return None
Expand Down Expand Up @@ -173,7 +173,7 @@ def _itervalues(self) -> Iterator[bytes]:
fun=fun, namespace=self.namespace, set=self.table_name
)
for result in scan.results():
(key, meta, bins) = result
key, meta, bins = result
if bins:
yield bins[self.BIN_KEY]
else:
Expand All @@ -193,8 +193,8 @@ def _iteritems(self) -> Iterator[Tuple[bytes, bytes]]:
fun=fun, namespace=self.namespace, set=self.table_name
)
for result in scan.results():
(key_data, meta, bins) = result
(ns, set, policy, key) = key_data
key_data, meta, bins = result
ns, set, policy, key = key_data

if bins:
bins = bins[self.BIN_KEY]
Expand All @@ -214,7 +214,7 @@ def _contains(self, key: bytes) -> bool:
try:
if self.app.conf.store_check_exists:
key = (self.namespace, self.table_name, key)
(key, meta) = self.aerospike_fun_call_with_retry(
key, meta = self.aerospike_fun_call_with_retry(
fun=self.client.exists, key=key
)
if meta:
Expand Down
19 changes: 11 additions & 8 deletions faust/transport/drivers/aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@
import aiokafka
import aiokafka.abc
import opentracing
from packaging.version import Version

_AIOKAFKA_HAS_API_VERSION = Version(aiokafka.__version__) < Version("0.13.0")
from aiokafka import TopicPartition
from aiokafka.consumer.group_coordinator import OffsetCommitRequest
from aiokafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
Expand All @@ -45,7 +42,7 @@
)
from aiokafka.partitioner import DefaultPartitioner, murmur2
from aiokafka.protocol.admin import CreateTopicsRequest
from aiokafka.protocol.metadata import MetadataRequest_v1
from aiokafka.protocol.metadata import MetadataRequest
from aiokafka.structs import OffsetAndMetadata, TopicPartition as _TopicPartition
from aiokafka.util import parse_kafka_version
from mode import Service, get_logger
Expand All @@ -55,6 +52,7 @@
from mode.utils.objects import cached_property
from mode.utils.times import Seconds, humanize_seconds_ago, want_seconds
from opentracing.ext import tags
from packaging.version import Version
from yarl import URL

from faust.auth import (
Expand Down Expand Up @@ -96,6 +94,8 @@

logger = get_logger(__name__)

_AIOKAFKA_HAS_API_VERSION = Version(aiokafka.__version__) < Version("0.13.0")

DEFAULT_GENERATION_ID = OffsetCommitRequest.DEFAULT_GENERATION_ID

TOPIC_LENGTH_MAX = 249
Expand Down Expand Up @@ -511,9 +511,13 @@ def _create_worker_consumer(
conf = self.app.conf
if self.consumer.in_transaction:
isolation_level = "read_committed"
# Table recovery depends on app.assignor state to map changelog
# active/standby partitions. Keep Faust assignor enabled whenever
# this app has changelog tables configured.
has_changelog_tables = bool(self.app.tables.changelog_topics)
self._assignor = (
self.app.assignor
if self.app.conf.table_standby_replicas > 0
if self.app.conf.table_standby_replicas > 0 or has_changelog_tables
else RoundRobinPartitionAssignor
)
auth_settings = credentials_to_aiokafka_auth(
Expand Down Expand Up @@ -1513,7 +1517,7 @@ async def _get_controller_node(
for node_id in nodes:
if node_id is None:
raise NotReady("Not connected to Kafka Broker")
request = MetadataRequest_v1([])
request = MetadataRequest([])
wait_result = await owner.wait(
client.send(node_id, request),
timeout=timeout,
Expand Down Expand Up @@ -1546,7 +1550,6 @@ async def _really_create_topic(
owner.log.debug("Topic %r exists, skipping creation.", topic)
return

protocol_version = 1
extra_configs = config or {}
config = self._topic_config(retention, compacting, deleting)
config.update(extra_configs)
Expand All @@ -1563,7 +1566,7 @@ async def _really_create_topic(
else:
raise Exception("Controller node is None")

request = CreateTopicsRequest[protocol_version](
request = CreateTopicsRequest(
[(topic, partitions, replication, [], list(config.items()))],
timeout,
False,
Expand Down
4 changes: 1 addition & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[project]
name = "faust-streaming"
description = "Python Stream Processing. A Faust fork"
requires-python = ">=3.9"
requires-python = ">=3.10"
dynamic = [
"version",
"optional-dependencies",
Expand All @@ -26,8 +26,6 @@ classifiers = [
"License :: OSI Approved :: BSD License",
"Programming Language :: Python",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
Expand Down
6 changes: 3 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
LIBRARIES = []
E_UNSUPPORTED_PYTHON = NAME + " 1.0 requires Python %%s or later!"

if sys.version_info < (3, 8):
raise Exception(E_UNSUPPORTED_PYTHON % ("3.8",)) # NOQA
if sys.version_info < (3, 10):
raise Exception(E_UNSUPPORTED_PYTHON % ("3.10",)) # NOQA

from pathlib import Path # noqa

Expand Down Expand Up @@ -197,7 +197,7 @@ def do_setup(**kwargs):
# PEP-561: https://www.python.org/dev/peps/pep-0561/
package_data={"faust": ["py.typed"]},
include_package_data=True,
python_requires=">=3.8.0",
python_requires=">=3.10.0",
zip_safe=False,
install_requires=reqs("requirements.txt"),
tests_require=reqs("test.txt"),
Expand Down
22 changes: 21 additions & 1 deletion tests/unit/transport/drivers/test_aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
import faust
from faust import auth
from faust.exceptions import ImproperlyConfigured, NotReady
from faust.transport.drivers.aiokafka import _AIOKAFKA_HAS_API_VERSION
from faust.sensors.monitor import Monitor
from faust.transport.drivers import aiokafka as mod
from faust.transport.drivers.aiokafka import (
_AIOKAFKA_HAS_API_VERSION,
SLOW_PROCESSING_CAUSE_AGENT,
SLOW_PROCESSING_CAUSE_STREAM,
SLOW_PROCESSING_EXPLAINED,
Expand Down Expand Up @@ -796,6 +796,26 @@ def test__create_worker_consumer__transaction(self, *, cthread, app):
isolation_level="read_committed",
)

def test__create_worker_consumer__uses_roundrobin_without_tables(
self, *, cthread, app
):
app.conf.table_standby_replicas = 0
app.tables._changelogs.clear()
transport = cthread.transport
with patch("aiokafka.AIOKafkaConsumer"):
cthread._create_worker_consumer(transport)
assert cthread._assignor is mod.RoundRobinPartitionAssignor

def test__create_worker_consumer__uses_faust_assignor_with_changelog_topics(
self, *, cthread, app
):
app.conf.table_standby_replicas = 0
app.tables._changelogs["app-foo-changelog"] = Mock(name="table")
transport = cthread.transport
with patch("aiokafka.AIOKafkaConsumer"):
cthread._create_worker_consumer(transport)
assert cthread._assignor is app.assignor

def assert_create_worker_consumer(
self,
cthread,
Expand Down
4 changes: 1 addition & 3 deletions tox.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tox]
envlist = 3.12,3.11,3.10,3.9,3.8,flake8,apicheck,configcheck,typecheck,docstyle,bandit,spell
envlist = 3.12,3.11,3.10,flake8,apicheck,configcheck,typecheck,docstyle,bandit,spell

[testenv]
deps=
Expand All @@ -20,8 +20,6 @@ basepython =
3.12,flake8,apicheck,linkcheck,configcheck,typecheck,docstyle,bandit,spell: python3.12
3.11,flake8,apicheck,linkcheck,configcheck,typecheck,docstyle,bandit,spell: python3.11
3.10,flake8,apicheck,linkcheck,configcheck,typecheck,docstyle,bandit,spell: python3.10
3.9,flake8,apicheck,linkcheck,configcheck,typecheck,docstyle,bandit,spell: python3.9
3.8,flake8,apicheck,linkcheck,configcheck,typecheck,docstyle,bandit,spell: python3.8

[testenv:apicheck]
setenv =
Expand Down
Loading