From 6d5b17a1d10f2d5454de002455e12238794d5ddc Mon Sep 17 00:00:00 2001 From: 0xbigapple Date: Mon, 30 Mar 2026 13:08:34 +0800 Subject: [PATCH 1/2] fix(net): fix RejectedExecutionException during shutdown trxHandlePool --- .../TransactionsMsgHandler.java | 37 +++++++++++++++++-- .../TransactionsMsgHandlerTest.java | 16 ++++++++ 2 files changed, 49 insertions(+), 4 deletions(-) diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java index 0436b48d374..414f21be295 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java @@ -3,6 +3,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import lombok.Getter; @@ -44,6 +45,8 @@ public class TransactionsMsgHandler implements TronMsgHandler { private BlockingQueue queue = new LinkedBlockingQueue(); + // Flag to indicate if the handler is closed to prevent new task submissions + private volatile boolean isClosed = false; private int threadNum = Args.getInstance().getValidateSignThreadNum(); private final String trxEsName = "trx-msg-handler"; private ExecutorService trxHandlePool = ExecutorServiceManager.newThreadPoolExecutor( @@ -57,9 +60,16 @@ public void init() { handleSmartContract(); } + /** + * Gracefully close the handler by stopping task intake first, + * then shutting down producer and consumer thread pools in order. + */ public void close() { - ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName); + isClosed = true; + // Shutdown the producer (scheduler) first to stop generating new tasks ExecutorServiceManager.shutdownAndAwaitTermination(smartContractExecutor, smartEsName); + // Then shutdown the consumer (worker pool) to finish existing tasks + ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName); } public boolean isBusy() { @@ -68,6 +78,11 @@ public boolean isBusy() { @Override public void processMessage(PeerConnection peer, TronMessage msg) throws P2pException { + // Early exit if the handler is already closed + if (isClosed) { + logger.warn("TransactionsMsgHandler is closed, drop message: {}", msg.getMessageId()); + return; + } TransactionsMessage transactionsMessage = (TransactionsMessage) msg; check(peer, transactionsMessage); for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) { @@ -78,6 +93,11 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep int trxHandlePoolQueueSize = 0; int dropSmartContractCount = 0; for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) { + // Re-check isClosed status during iteration to handle concurrent shutdown + if (isClosed) { + logger.warn("TransactionsMsgHandler is closed during processing, stop submit."); + break; + } int type = trx.getRawData().getContract(0).getType().getNumber(); if (type == ContractType.TriggerSmartContract_VALUE || type == ContractType.CreateSmartContract_VALUE) { @@ -87,8 +107,14 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep dropSmartContractCount++; } } else { - ExecutorServiceManager.submit( - trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx))); + try { + ExecutorServiceManager.submit( + trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx))); + } catch (RejectedExecutionException e) { + // Safeguard against RejectedExecutionException if the pool was closed concurrently + logger.warn("Submit task failed, errMsg: {}", e.getMessage()); + break; + } } } @@ -115,7 +141,10 @@ private void check(PeerConnection peer, TransactionsMessage msg) throws P2pExcep private void handleSmartContract() { ExecutorServiceManager.scheduleWithFixedDelay(smartContractExecutor, () -> { try { - while (queue.size() < MAX_SMART_CONTRACT_SUBMIT_SIZE && smartContractQueue.size() > 0) { + // Stop fetching from queue if the handler is closed + while (!isClosed + && queue.size() < MAX_SMART_CONTRACT_SUBMIT_SIZE + && smartContractQueue.size() > 0) { TrxEvent event = smartContractQueue.take(); ExecutorServiceManager.submit( trxHandlePool, () -> handleTransaction(event.getPeer(), event.getMsg())); diff --git a/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java b/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java index db8aac00c60..e102847f437 100644 --- a/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java +++ b/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java @@ -132,6 +132,22 @@ public void testProcessMessage() { } } + @Test + public void testProcessMessageAfterClose() { + TransactionsMsgHandler handler = new TransactionsMsgHandler(); + handler.init(); + handler.close(); + + PeerConnection peer = Mockito.mock(PeerConnection.class); + TransactionsMessage msg = Mockito.mock(TransactionsMessage.class); + + try { + handler.processMessage(peer, msg); + } catch (Exception e) { + Assert.fail("Should not throw any exception when closed"); + } + } + class TrxEvent { @Getter From c3e2c560ecb783b974f9183952a1a1163d2ec194 Mon Sep 17 00:00:00 2001 From: 0xbigapple Date: Mon, 30 Mar 2026 19:31:40 +0800 Subject: [PATCH 2/2] fix(net): refine TransactionsMsgHandler shutdown and test --- .../net/messagehandler/TransactionsMsgHandler.java | 9 ++++----- .../messagehandler/TransactionsMsgHandlerTest.java | 11 +++++------ 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java index 414f21be295..a92ef412f22 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java @@ -66,9 +66,10 @@ public void init() { */ public void close() { isClosed = true; - // Shutdown the producer (scheduler) first to stop generating new tasks + // Shutdown the producer (scheduler) first. + // Pending tasks in smartContractQueue will be discarded. ExecutorServiceManager.shutdownAndAwaitTermination(smartContractExecutor, smartEsName); - // Then shutdown the consumer (worker pool) to finish existing tasks + // Then shutdown the consumer (worker pool) to finish tasks already submitted to trxHandlePool. ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName); } @@ -141,9 +142,7 @@ private void check(PeerConnection peer, TransactionsMessage msg) throws P2pExcep private void handleSmartContract() { ExecutorServiceManager.scheduleWithFixedDelay(smartContractExecutor, () -> { try { - // Stop fetching from queue if the handler is closed - while (!isClosed - && queue.size() < MAX_SMART_CONTRACT_SUBMIT_SIZE + while (queue.size() < MAX_SMART_CONTRACT_SUBMIT_SIZE && smartContractQueue.size() > 0) { TrxEvent event = smartContractQueue.take(); ExecutorServiceManager.submit( diff --git a/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java b/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java index e102847f437..89e034b7c61 100644 --- a/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java +++ b/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java @@ -133,7 +133,7 @@ public void testProcessMessage() { } @Test - public void testProcessMessageAfterClose() { + public void testProcessMessageAfterClose() throws Exception { TransactionsMsgHandler handler = new TransactionsMsgHandler(); handler.init(); handler.close(); @@ -141,11 +141,10 @@ public void testProcessMessageAfterClose() { PeerConnection peer = Mockito.mock(PeerConnection.class); TransactionsMessage msg = Mockito.mock(TransactionsMessage.class); - try { - handler.processMessage(peer, msg); - } catch (Exception e) { - Assert.fail("Should not throw any exception when closed"); - } + handler.processMessage(peer, msg); + + // Verify that the handler immediately returned and did not process the message + Mockito.verify(msg, Mockito.never()).getTransactions(); } class TrxEvent {