Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/aws_durable_execution_sdk_python/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,20 +613,20 @@ def checkpoint_batches_forever(self) -> None:

logger.debug("Checkpoint batch processed successfully")

# Signal completion for any synchronous operations
for queued_op in batch:
if queued_op.completion_event is not None:
queued_op.completion_event.set()

# Update local token for next iteration
current_checkpoint_token = output.checkpoint_token

# Fetch new operations from the API
# Fetch new operations from the API before unblocking sync waiters
self.fetch_paginated_operations(
output.new_execution_state.operations,
output.checkpoint_token,
output.new_execution_state.next_marker,
)

# Signal completion for any synchronous operations
for queued_op in batch:
if queued_op.completion_event is not None:
queued_op.completion_event.set()
except Exception as e:
# Checkpoint failed - wake all blocked threads so they can raise error
# Drain both queues and signal all completion events
Expand Down
Loading