diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java index 0436b48d374..a92ef412f22 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java @@ -3,6 +3,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import lombok.Getter; @@ -44,6 +45,8 @@ public class TransactionsMsgHandler implements TronMsgHandler { private BlockingQueue queue = new LinkedBlockingQueue(); + // Flag to indicate if the handler is closed to prevent new task submissions + private volatile boolean isClosed = false; private int threadNum = Args.getInstance().getValidateSignThreadNum(); private final String trxEsName = "trx-msg-handler"; private ExecutorService trxHandlePool = ExecutorServiceManager.newThreadPoolExecutor( @@ -57,9 +60,17 @@ public void init() { handleSmartContract(); } + /** + * Gracefully close the handler by stopping task intake first, + * then shutting down producer and consumer thread pools in order. + */ public void close() { - ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName); + isClosed = true; + // Shutdown the producer (scheduler) first. + // Pending tasks in smartContractQueue will be discarded. ExecutorServiceManager.shutdownAndAwaitTermination(smartContractExecutor, smartEsName); + // Then shutdown the consumer (worker pool) to finish tasks already submitted to trxHandlePool. + ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName); } public boolean isBusy() { @@ -68,6 +79,11 @@ public boolean isBusy() { @Override public void processMessage(PeerConnection peer, TronMessage msg) throws P2pException { + // Early exit if the handler is already closed + if (isClosed) { + logger.warn("TransactionsMsgHandler is closed, drop message: {}", msg.getMessageId()); + return; + } TransactionsMessage transactionsMessage = (TransactionsMessage) msg; check(peer, transactionsMessage); for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) { @@ -78,6 +94,11 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep int trxHandlePoolQueueSize = 0; int dropSmartContractCount = 0; for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) { + // Re-check isClosed status during iteration to handle concurrent shutdown + if (isClosed) { + logger.warn("TransactionsMsgHandler is closed during processing, stop submit."); + break; + } int type = trx.getRawData().getContract(0).getType().getNumber(); if (type == ContractType.TriggerSmartContract_VALUE || type == ContractType.CreateSmartContract_VALUE) { @@ -87,8 +108,14 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep dropSmartContractCount++; } } else { - ExecutorServiceManager.submit( - trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx))); + try { + ExecutorServiceManager.submit( + trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx))); + } catch (RejectedExecutionException e) { + // Safeguard against RejectedExecutionException if the pool was closed concurrently + logger.warn("Submit task failed, errMsg: {}", e.getMessage()); + break; + } } } @@ -115,7 +142,8 @@ private void check(PeerConnection peer, TransactionsMessage msg) throws P2pExcep private void handleSmartContract() { ExecutorServiceManager.scheduleWithFixedDelay(smartContractExecutor, () -> { try { - while (queue.size() < MAX_SMART_CONTRACT_SUBMIT_SIZE && smartContractQueue.size() > 0) { + while (queue.size() < MAX_SMART_CONTRACT_SUBMIT_SIZE + && smartContractQueue.size() > 0) { TrxEvent event = smartContractQueue.take(); ExecutorServiceManager.submit( trxHandlePool, () -> handleTransaction(event.getPeer(), event.getMsg())); diff --git a/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java b/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java index db8aac00c60..89e034b7c61 100644 --- a/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java +++ b/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java @@ -132,6 +132,21 @@ public void testProcessMessage() { } } + @Test + public void testProcessMessageAfterClose() throws Exception { + TransactionsMsgHandler handler = new TransactionsMsgHandler(); + handler.init(); + handler.close(); + + PeerConnection peer = Mockito.mock(PeerConnection.class); + TransactionsMessage msg = Mockito.mock(TransactionsMessage.class); + + handler.processMessage(peer, msg); + + // Verify that the handler immediately returned and did not process the message + Mockito.verify(msg, Mockito.never()).getTransactions(); + } + class TrxEvent { @Getter