-
Notifications
You must be signed in to change notification settings - Fork 29
fix: add a timeout to RedisSteamBroker xautoclaim lock to prevent infinite locking #107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: add a timeout to RedisSteamBroker xautoclaim lock to prevent infinite locking #107
Conversation
f03a4a3 to
3b690f2
Compare
|
@s3rius is something missing to merge this one ? We have prod issues are happening again and again because of that. |
e708799 to
4cd0d63
Compare
|
anyone ? @danfimov @spikeninja ? |
|
Add a test pls to test_broker.py @pytest.mark.anyio
async def test_unacknowledged_lock_timeout_in_stream_broker(
redis_url: str,
valid_broker_message: BrokerMessage,
) -> None:
unacknowledged_lock_timeout = 1
queue_name = uuid.uuid4().hex
consumer_group_name = uuid.uuid4().hex
broker = RedisStreamBroker(
url=redis_url,
approximate=False,
queue_name=queue_name,
consumer_group_name=consumer_group_name,
unacknowledged_lock_timeout=unacknowledged_lock_timeout,
)
await broker.startup()
await broker.kick(valid_broker_message)
message = await get_message(broker)
assert isinstance(message, AckableMessage)
assert message.data == valid_broker_message.message
async with Redis(connection_pool=broker.connection_pool) as redis:
lock_key = f"autoclaim:{consumer_group_name}:{queue_name}"
await redis.exists(lock_key)
await asyncio.sleep(unacknowledged_lock_timeout + 0.5)
lock_exists_after_timeout = await redis.exists(lock_key)
assert lock_exists_after_timeout == 0, "Lock should be released after timeout"
await broker.shutdown() |
4cd0d63 to
500ed50
Compare
Unfortunately your test doesn't cover the issue. The bug reproduction is quite complex : concurrent broker, crash, etc |
it should not cover your bug, it should cover at least the new field existence and check that the lock is properly acquired and released |
feel free to add your tests as i've mentioned here |
500ed50 to
04836a6
Compare
04836a6 to
e997423
Compare
|
Nullable typing + basic test ✅ |
|
Thx a lot @spikeninja 🙏 Do you think you can release soon ? |
https://github.com/taskiq-python/taskiq-redis/releases/tag/1.2.2 |
If for any reasons the worker crashes or get killed when holding the lock, the lock will never be released. It happened to us when a message make the container running out-of-memory, killing the worker right away and never releasing the lock.
This PR adds a timeout to at least release the lock at some point.