Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions durabletask/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,12 +636,15 @@ def stream_reader():

if self._shutdown.is_set():
self._logger.info(f"Disconnected from {self._host_address}")
else:
self._logger.info("Work item stream ended normally")
# When stream ends (SHUTDOWN_SENTINEL received), always break outer loop
# The stream reader has exited, so we should exit too, not reconnect
# This matches Go SDK behavior where stream ending causes the listener to exit
break
break

# Stream ended without shutdown being requested - reconnect
self._logger.info(
f"Work item stream ended. Will attempt to reconnect to {self._host_address}..."
)
invalidate_connection()
# Fall through to the top of the outer loop, which will
# create a fresh connection (with retry/backoff if needed)
except grpc.RpcError as rpc_error:
# Check shutdown first - if shutting down, exit immediately
if self._shutdown.is_set():
Expand Down Expand Up @@ -847,6 +850,13 @@ def _execute_orchestrator(
stub.CompleteOrchestratorTask(res)
except grpc.RpcError as rpc_error: # type: ignore
self._handle_grpc_execution_error(rpc_error, "orchestrator")
except ValueError:
# gRPC raises ValueError when the underlying channel has been closed (e.g. during reconnection).
self._logger.debug(
f"Could not deliver orchestrator response for '{req.instanceId}': "
f"channel was closed (likely due to reconnection). "
f"The sidecar will re-dispatch this work item."
)
except Exception as ex:
self._logger.exception(
f"Failed to deliver orchestrator response for '{req.instanceId}' to sidecar: {ex}"
Expand Down Expand Up @@ -897,6 +907,13 @@ def _execute_activity(
stub.CompleteActivityTask(res)
except grpc.RpcError as rpc_error: # type: ignore
self._handle_grpc_execution_error(rpc_error, "activity")
except ValueError:
# gRPC raises ValueError when the underlying channel has been closed (e.g. during reconnection).
self._logger.debug(
f"Could not deliver activity response for '{req.name}#{req.taskId}' of "
f"orchestration ID '{instance_id}': channel was closed (likely due to "
f"reconnection). The sidecar will re-dispatch this work item."
)
except Exception as ex:
self._logger.exception(
f"Failed to deliver activity response for '{req.name}#{req.taskId}' of orchestration ID '{instance_id}' to sidecar: {ex}"
Expand Down
Loading