diff --git a/durabletask/worker.py b/durabletask/worker.py index 7d05f01..165b98c 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -636,12 +636,15 @@ def stream_reader(): if self._shutdown.is_set(): self._logger.info(f"Disconnected from {self._host_address}") - else: - self._logger.info("Work item stream ended normally") - # When stream ends (SHUTDOWN_SENTINEL received), always break outer loop - # The stream reader has exited, so we should exit too, not reconnect - # This matches Go SDK behavior where stream ending causes the listener to exit - break + break + + # Stream ended without shutdown being requested - reconnect + self._logger.info( + f"Work item stream ended. Will attempt to reconnect to {self._host_address}..." + ) + invalidate_connection() + # Fall through to the top of the outer loop, which will + # create a fresh connection (with retry/backoff if needed) except grpc.RpcError as rpc_error: # Check shutdown first - if shutting down, exit immediately if self._shutdown.is_set(): @@ -847,6 +850,13 @@ def _execute_orchestrator( stub.CompleteOrchestratorTask(res) except grpc.RpcError as rpc_error: # type: ignore self._handle_grpc_execution_error(rpc_error, "orchestrator") + except ValueError: + # gRPC raises ValueError when the underlying channel has been closed (e.g. during reconnection). + self._logger.debug( + f"Could not deliver orchestrator response for '{req.instanceId}': " + f"channel was closed (likely due to reconnection). " + f"The sidecar will re-dispatch this work item." + ) except Exception as ex: self._logger.exception( f"Failed to deliver orchestrator response for '{req.instanceId}' to sidecar: {ex}" @@ -897,6 +907,13 @@ def _execute_activity( stub.CompleteActivityTask(res) except grpc.RpcError as rpc_error: # type: ignore self._handle_grpc_execution_error(rpc_error, "activity") + except ValueError: + # gRPC raises ValueError when the underlying channel has been closed (e.g. during reconnection). + self._logger.debug( + f"Could not deliver activity response for '{req.name}#{req.taskId}' of " + f"orchestration ID '{instance_id}': channel was closed (likely due to " + f"reconnection). The sidecar will re-dispatch this work item." + ) except Exception as ex: self._logger.exception( f"Failed to deliver activity response for '{req.name}#{req.taskId}' of orchestration ID '{instance_id}' to sidecar: {ex}"