Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
Expand Down Expand Up @@ -44,6 +45,8 @@ public class TransactionsMsgHandler implements TronMsgHandler {

private BlockingQueue<Runnable> queue = new LinkedBlockingQueue();

// Flag to indicate if the handler is closed to prevent new task submissions
private volatile boolean isClosed = false;
private int threadNum = Args.getInstance().getValidateSignThreadNum();
private final String trxEsName = "trx-msg-handler";
private ExecutorService trxHandlePool = ExecutorServiceManager.newThreadPoolExecutor(
Expand All @@ -57,9 +60,17 @@ public void init() {
handleSmartContract();
}

/**
* Gracefully close the handler by stopping task intake first,
* then shutting down producer and consumer thread pools in order.
*/
public void close() {
ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName);
isClosed = true;
// Shutdown the producer (scheduler) first.
// Pending tasks in smartContractQueue will be discarded.
ExecutorServiceManager.shutdownAndAwaitTermination(smartContractExecutor, smartEsName);
// Then shutdown the consumer (worker pool) to finish tasks already submitted to trxHandlePool.
ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName);
Comment on lines +68 to +73
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close() comment says pending smartContractQueue tasks will be discarded, but the method only stops the scheduler and shuts down the pools; it never clears/drains smartContractQueue. If this handler remains referenced (it’s a Spring @Component), queued TrxEvents (including PeerConnection references) will be retained until JVM exit. Consider clearing/draining the queue after the scheduler has fully terminated (after shutdownAndAwaitTermination(smartContractExecutor, ...)) to make the discard behavior real and release references.

Copilot uses AI. Check for mistakes.
}

public boolean isBusy() {
Expand All @@ -68,6 +79,11 @@ public boolean isBusy() {

@Override
public void processMessage(PeerConnection peer, TronMessage msg) throws P2pException {
// Early exit if the handler is already closed
if (isClosed) {
logger.warn("TransactionsMsgHandler is closed, drop message: {}", msg.getMessageId());
return;
}
TransactionsMessage transactionsMessage = (TransactionsMessage) msg;
check(peer, transactionsMessage);
for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) {
Expand All @@ -78,6 +94,11 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
int trxHandlePoolQueueSize = 0;
int dropSmartContractCount = 0;
for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) {
// Re-check isClosed status during iteration to handle concurrent shutdown
if (isClosed) {
logger.warn("TransactionsMsgHandler is closed during processing, stop submit.");
break;
}
Comment on lines +97 to +101
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When isClosed flips to true (or trxHandlePool starts rejecting), this loop breaks early, but the earlier loop has already removed all tx items from peer.getAdvInvRequest(). That can leave some transactions neither queued nor requested anymore. To avoid silently dropping txs, consider removing items from advInvRequest only after a tx is successfully enqueued/submitted (or stop removing once shutdown is detected).

Copilot uses AI. Check for mistakes.
int type = trx.getRawData().getContract(0).getType().getNumber();
if (type == ContractType.TriggerSmartContract_VALUE
|| type == ContractType.CreateSmartContract_VALUE) {
Expand All @@ -87,8 +108,14 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
dropSmartContractCount++;
}
} else {
ExecutorServiceManager.submit(
trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx)));
try {
ExecutorServiceManager.submit(
trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx)));
} catch (RejectedExecutionException e) {
// Safeguard against RejectedExecutionException if the pool was closed concurrently
logger.warn("Submit task failed, errMsg: {}", e.getMessage());
break;
}
}
}

Expand All @@ -115,7 +142,8 @@ private void check(PeerConnection peer, TransactionsMessage msg) throws P2pExcep
private void handleSmartContract() {
ExecutorServiceManager.scheduleWithFixedDelay(smartContractExecutor, () -> {
try {
while (queue.size() < MAX_SMART_CONTRACT_SUBMIT_SIZE && smartContractQueue.size() > 0) {
while (queue.size() < MAX_SMART_CONTRACT_SUBMIT_SIZE
&& smartContractQueue.size() > 0) {
TrxEvent event = smartContractQueue.take();
ExecutorServiceManager.submit(
trxHandlePool, () -> handleTransaction(event.getPeer(), event.getMsg()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,21 @@ public void testProcessMessage() {
}
}

@Test
public void testProcessMessageAfterClose() throws Exception {
TransactionsMsgHandler handler = new TransactionsMsgHandler();
handler.init();
handler.close();

PeerConnection peer = Mockito.mock(PeerConnection.class);
TransactionsMessage msg = Mockito.mock(TransactionsMessage.class);

handler.processMessage(peer, msg);

// Verify that the handler immediately returned and did not process the message
Mockito.verify(msg, Mockito.never()).getTransactions();
}

class TrxEvent {

@Getter
Expand Down
Loading