Skip to content
Merged
10 changes: 10 additions & 0 deletions core/src/main/java/org/bitcoinj/coinjoin/CoinJoinBaseManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.common.collect.Lists;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.bitcoinj.core.InventoryItem;
import org.bitcoinj.core.Utils;
import org.bitcoinj.utils.Threading;
import org.slf4j.Logger;
Expand Down Expand Up @@ -81,4 +82,13 @@ public CoinJoinQueue getQueueItemAndTry() {
}
return null;
}

public boolean alreadyHave(InventoryItem item) {
queueLock.lock();
try {
return coinJoinQueue.stream().anyMatch(queue -> item.hash.equals(queue.getHash()));
} finally {
queueLock.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import org.bitcoinj.coinjoin.utils.CoinJoinManager;
import org.bitcoinj.core.Context;
import org.bitcoinj.core.InventoryItem;
import org.bitcoinj.core.MasternodeSync;
import org.bitcoinj.core.Peer;
import org.bitcoinj.core.Sha256Hash;
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/bitcoinj/coinjoin/CoinJoinQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ public Sha256Hash getSignatureHash() {
}
}

@Override
public Sha256Hash getHash() {
return getSignatureHash();
}

public boolean checkSignature(BLSPublicKey pubKey) {
Sha256Hash hash = getSignatureHash();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
import org.bitcoinj.core.BlockChain;
import org.bitcoinj.core.Context;
import org.bitcoinj.core.ECKey;
import org.bitcoinj.core.GetDataMessage;
import org.bitcoinj.core.InventoryItem;
import org.bitcoinj.core.InventoryMessage;
import org.bitcoinj.core.MasternodeAddress;
import org.bitcoinj.core.MasternodeSync;
import org.bitcoinj.core.Message;
Expand All @@ -53,10 +56,8 @@
import org.bitcoinj.core.listeners.TransactionReceivedInBlockListener;
import org.bitcoinj.evolution.Masternode;
import org.bitcoinj.evolution.MasternodeMetaDataManager;
import org.bitcoinj.evolution.SimplifiedMasternodeListDiff;
import org.bitcoinj.evolution.SimplifiedMasternodeListManager;
import org.bitcoinj.quorums.ChainLocksHandler;
import org.bitcoinj.quorums.QuorumRotationInfo;
import org.bitcoinj.utils.ContextPropagatingThreadFactory;
import org.bitcoinj.utils.Threading;
import org.bitcoinj.wallet.Wallet;
Expand All @@ -68,6 +69,9 @@
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -83,7 +87,7 @@ public class CoinJoinManager {

private static final Logger log = LoggerFactory.getLogger(CoinJoinManager.class);
private final ArrayList<WalletEx> wallets = Lists.newArrayList();
private final Context context;
private Context context;
public final HashMap<String, CoinJoinClientManager> coinJoinClientManagers;
private final CoinJoinClientQueueManager coinJoinClientQueueManager;

Expand Down Expand Up @@ -505,7 +509,31 @@ public void processTransaction(Transaction tx) {
}

public final PreMessageReceivedEventListener preMessageReceivedEventListener = (peer, m) -> {
if (m instanceof CoinJoinQueue) {
if (m instanceof InventoryMessage) {
InventoryMessage inv = (InventoryMessage) m;
List<InventoryItem> items = inv.getItems();
List<InventoryItem> dsqList = new LinkedList<>();
for (InventoryItem item : items) {
if (item.type == InventoryItem.Type.CoinJoinQueue) {
dsqList.add(item);
}
}
Iterator<InventoryItem> it = dsqList.iterator();
GetDataMessage getdata = new GetDataMessage(context.getParams());
while (it.hasNext()) {
InventoryItem item = it.next();
if(!alreadyHave(item)) {
getdata.addItem(item);
} else {
log.info("coinjoin: DSQUEUE: already has {}", item.hash);
}
}
if (!getdata.getItems().isEmpty()) {
// This will cause us to receive a bunch of block or tx messages.
log.info("coinjoin: DSQUEUE: requesting {} dsq messages", getdata.getItems().size());
peer.sendMessage(getdata);
}
} else if (m instanceof CoinJoinQueue) {
// Offload DSQueue message processing to thread pool to avoid blocking network I/O thread
messageProcessingExecutor.execute(() -> {
processMessage(peer, m);
Expand All @@ -530,4 +558,8 @@ public void addWallet(WalletEx wallet) {
public void removeWallet(WalletEx wallet) {
wallets.remove(wallet);
}

protected boolean alreadyHave(InventoryItem item) {
return coinJoinClientQueueManager.alreadyHave(item);
}
}
9 changes: 9 additions & 0 deletions core/src/main/java/org/bitcoinj/core/BitcoinSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ public class BitcoinSerializer extends MessageSerializer {
names.put(InstantSendLock.class, "isdlock");
names.put(ChainLockSignature.class, "clsig");
names.put(SendHeadersMessage.class, "sendheaders");
names.put(SendHeaders2Message.class, "sendheaders2");
names.put(GetHeaders2Message.class, "getheaders2");
names.put(Headers2Message.class, "headers2");
names.put(SendAddressMessageV2.class, "sendaddrv2");
names.put(GetMasternodePaymentRequestSyncMessage.class, "mnget");
names.put(AssetLockTransaction.class, "tx");
Expand Down Expand Up @@ -264,6 +267,12 @@ private Message makeMessage(String command, int length, byte[] payloadBytes, byt
return new VersionAck(params, payloadBytes);
} else if (command.equals("headers")) {
return new HeadersMessage(params, payloadBytes);
} else if (command.equals("headers2")) {
return new Headers2Message(params, payloadBytes);
} else if (command.equals("getheaders2")) {
return new GetHeaders2Message(params, payloadBytes);
} else if (command.equals("sendheaders2")) {
return new SendHeaders2Message(params, payloadBytes);
} else if (command.equals("alert")) {
return makeAlertMessage(payloadBytes);
} else if (command.equals("filterload")) {
Expand Down
Loading
Loading