-
Notifications
You must be signed in to change notification settings - Fork 0
fix(net): fix RejectedExecutionException during shutdown trxHandlePool #8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ | |
| import java.util.concurrent.BlockingQueue; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.LinkedBlockingQueue; | ||
| import java.util.concurrent.RejectedExecutionException; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.TimeUnit; | ||
| import lombok.Getter; | ||
|
|
@@ -44,6 +45,8 @@ public class TransactionsMsgHandler implements TronMsgHandler { | |
|
|
||
| private BlockingQueue<Runnable> queue = new LinkedBlockingQueue(); | ||
|
|
||
| // Flag to indicate if the handler is closed to prevent new task submissions | ||
| private volatile boolean isClosed = false; | ||
| private int threadNum = Args.getInstance().getValidateSignThreadNum(); | ||
| private final String trxEsName = "trx-msg-handler"; | ||
| private ExecutorService trxHandlePool = ExecutorServiceManager.newThreadPoolExecutor( | ||
|
|
@@ -57,9 +60,17 @@ public void init() { | |
| handleSmartContract(); | ||
| } | ||
|
|
||
| /** | ||
| * Gracefully close the handler by stopping task intake first, | ||
| * then shutting down producer and consumer thread pools in order. | ||
| */ | ||
| public void close() { | ||
| ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName); | ||
| isClosed = true; | ||
| // Shutdown the producer (scheduler) first. | ||
| // Pending tasks in smartContractQueue will be discarded. | ||
| ExecutorServiceManager.shutdownAndAwaitTermination(smartContractExecutor, smartEsName); | ||
| // Then shutdown the consumer (worker pool) to finish tasks already submitted to trxHandlePool. | ||
| ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName); | ||
|
Comment on lines
+68
to
+73
|
||
| } | ||
|
|
||
| public boolean isBusy() { | ||
|
|
@@ -68,6 +79,11 @@ public boolean isBusy() { | |
|
|
||
| @Override | ||
| public void processMessage(PeerConnection peer, TronMessage msg) throws P2pException { | ||
| // Early exit if the handler is already closed | ||
| if (isClosed) { | ||
| logger.warn("TransactionsMsgHandler is closed, drop message: {}", msg.getMessageId()); | ||
| return; | ||
| } | ||
| TransactionsMessage transactionsMessage = (TransactionsMessage) msg; | ||
| check(peer, transactionsMessage); | ||
| for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) { | ||
|
|
@@ -78,6 +94,11 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep | |
| int trxHandlePoolQueueSize = 0; | ||
| int dropSmartContractCount = 0; | ||
| for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) { | ||
| // Re-check isClosed status during iteration to handle concurrent shutdown | ||
| if (isClosed) { | ||
| logger.warn("TransactionsMsgHandler is closed during processing, stop submit."); | ||
| break; | ||
| } | ||
|
Comment on lines
+97
to
+101
|
||
| int type = trx.getRawData().getContract(0).getType().getNumber(); | ||
| if (type == ContractType.TriggerSmartContract_VALUE | ||
| || type == ContractType.CreateSmartContract_VALUE) { | ||
|
|
@@ -87,8 +108,14 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep | |
| dropSmartContractCount++; | ||
| } | ||
| } else { | ||
| ExecutorServiceManager.submit( | ||
| trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx))); | ||
| try { | ||
| ExecutorServiceManager.submit( | ||
| trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx))); | ||
| } catch (RejectedExecutionException e) { | ||
| // Safeguard against RejectedExecutionException if the pool was closed concurrently | ||
| logger.warn("Submit task failed, errMsg: {}", e.getMessage()); | ||
| break; | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -115,7 +142,8 @@ private void check(PeerConnection peer, TransactionsMessage msg) throws P2pExcep | |
| private void handleSmartContract() { | ||
| ExecutorServiceManager.scheduleWithFixedDelay(smartContractExecutor, () -> { | ||
| try { | ||
| while (queue.size() < MAX_SMART_CONTRACT_SUBMIT_SIZE && smartContractQueue.size() > 0) { | ||
| while (queue.size() < MAX_SMART_CONTRACT_SUBMIT_SIZE | ||
| && smartContractQueue.size() > 0) { | ||
| TrxEvent event = smartContractQueue.take(); | ||
| ExecutorServiceManager.submit( | ||
| trxHandlePool, () -> handleTransaction(event.getPeer(), event.getMsg())); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.