From 331c2c40eab9b5681e4ee368151db9423b310f37 Mon Sep 17 00:00:00 2001 From: no-0-name Date: Fri, 27 Feb 2026 00:49:54 +0300 Subject: [PATCH] refactor(core): add timeouts to block() and use Objects where appropriate --- .../io/agentscope/core/pipeline/MsgHub.java | 8 +++++-- .../core/rag/KnowledgeRetrievalTools.java | 21 +++++++++---------- .../java/io/agentscope/core/tool/Toolkit.java | 3 ++- .../core/tool/mcp/McpAsyncClientWrapper.java | 3 ++- 4 files changed, 20 insertions(+), 15 deletions(-) diff --git a/agentscope-core/src/main/java/io/agentscope/core/pipeline/MsgHub.java b/agentscope-core/src/main/java/io/agentscope/core/pipeline/MsgHub.java index 101a17b04..d83af3279 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/pipeline/MsgHub.java +++ b/agentscope-core/src/main/java/io/agentscope/core/pipeline/MsgHub.java @@ -18,9 +18,11 @@ import io.agentscope.core.agent.Agent; import io.agentscope.core.agent.AgentBase; import io.agentscope.core.message.Msg; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; @@ -100,6 +102,7 @@ public class MsgHub implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(MsgHub.class); + private static final Duration CLOSE_TIMEOUT = Duration.ofSeconds(10); private final String name; private final List participants; @@ -113,7 +116,7 @@ public class MsgHub implements AutoCloseable { * @param builder Builder instance */ private MsgHub(Builder builder) { - this.name = builder.name != null ? builder.name : UUID.randomUUID().toString(); + this.name = Objects.requireNonNullElse(builder.name, UUID.randomUUID().toString()); this.participants = new CopyOnWriteArrayList<>(builder.participants); this.announcement = builder.announcement; this.enableAutoBroadcast = builder.enableAutoBroadcast; @@ -195,10 +198,11 @@ public Mono exit() { /** * Close the MsgHub and cleanup resources. * This is the AutoCloseable implementation for try-with-resources support. + * Waits up to 10 seconds for exit to complete. */ @Override public void close() { - exit().block(); + exit().block(CLOSE_TIMEOUT); } /** diff --git a/agentscope-core/src/main/java/io/agentscope/core/rag/KnowledgeRetrievalTools.java b/agentscope-core/src/main/java/io/agentscope/core/rag/KnowledgeRetrievalTools.java index 9b3bdc7ac..d1fc8efd4 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/rag/KnowledgeRetrievalTools.java +++ b/agentscope-core/src/main/java/io/agentscope/core/rag/KnowledgeRetrievalTools.java @@ -22,7 +22,9 @@ import io.agentscope.core.rag.model.RetrieveConfig; import io.agentscope.core.tool.Tool; import io.agentscope.core.tool.ToolParam; +import java.time.Duration; import java.util.List; +import java.util.Objects; /** * Knowledge retrieval tools for Agentic RAG mode. @@ -51,19 +53,19 @@ */ public class KnowledgeRetrievalTools { + private static final int DEFAULT_RETRIEVE_LIMIT = 5; + private static final Duration RETRIEVE_TIMEOUT = Duration.ofSeconds(60); + private final Knowledge knowledge; /** * Creates a new KnowledgeRetrievalTools instance. * * @param knowledge the knowledge base to retrieve from - * @throws IllegalArgumentException if knowledgeBase is null + * @throws NullPointerException if knowledge is null */ public KnowledgeRetrievalTools(Knowledge knowledge) { - if (knowledge == null) { - throw new IllegalArgumentException("Knowledge base cannot be null"); - } - this.knowledge = knowledge; + this.knowledge = Objects.requireNonNull(knowledge, "knowledge"); } /** @@ -108,10 +110,7 @@ public String retrieveKnowledge( Integer limit, Agent agent) { - // Set default value - if (limit == null) { - limit = 5; - } + int effectiveLimit = Objects.requireNonNullElse(limit, DEFAULT_RETRIEVE_LIMIT); // Extract conversation history from agent if available List conversationHistory = null; @@ -122,7 +121,7 @@ public String retrieveKnowledge( // Build retrieval config with conversation history RetrieveConfig config = RetrieveConfig.builder() - .limit(limit) + .limit(effectiveLimit) .scoreThreshold(0.5) .conversationHistory(conversationHistory) .build(); @@ -131,7 +130,7 @@ public String retrieveKnowledge( .retrieve(query, config) .map(this::formatDocumentsForTool) .onErrorReturn("Failed to retrieve knowledge for query: " + query) - .block(); // Convert to synchronous call to match Tool interface + .block(RETRIEVE_TIMEOUT); } /** diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/Toolkit.java b/agentscope-core/src/main/java/io/agentscope/core/tool/Toolkit.java index 65db80509..298d6d777 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/tool/Toolkit.java +++ b/agentscope-core/src/main/java/io/agentscope/core/tool/Toolkit.java @@ -25,6 +25,7 @@ import io.agentscope.core.tool.subagent.SubAgentProvider; import io.agentscope.core.tool.subagent.SubAgentTool; import java.lang.reflect.Method; +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Map; @@ -935,7 +936,7 @@ public void apply() { disableTools, groupName, presetParameters) - .block(); + .block(Duration.ofSeconds(30)); } else if (subAgentProvider != null) { SubAgentTool subAgentTool = new SubAgentTool(subAgentProvider, subAgentConfig); toolkit.registerAgentTool(subAgentTool, groupName, extendedModel, null, null); diff --git a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java index f67968a20..5492b9397 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java +++ b/agentscope-core/src/main/java/io/agentscope/core/tool/mcp/McpAsyncClientWrapper.java @@ -17,6 +17,7 @@ import io.modelcontextprotocol.client.McpAsyncClient; import io.modelcontextprotocol.spec.McpSchema; +import java.time.Duration; import java.util.List; import java.util.Map; import org.slf4j.Logger; @@ -167,7 +168,7 @@ public void close() { client.closeGracefully() .doOnSuccess(v -> logger.debug("MCP client '{}' closed", name)) .doOnError(e -> logger.error("Error closing MCP client '{}'", name, e)) - .block(); + .block(Duration.ofSeconds(10)); } catch (Exception e) { logger.error("Exception during MCP client close", e); client.close();