Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/org/openlcb/LoaderClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ public void handleFailure(String where, int errorCode) {
private void sendStream() {
// System.out.println("lSend Stream ");
// @todo the destStreamID is probably bogus at this point. Check why it is needed here.
StreamInitiateRequestMessage m = new StreamInitiateRequestMessage(src, dest, bufferSize, SRC_STREAM_ID, destStreamID);
// flags are 0 here by default
StreamInitiateRequestMessage m = new StreamInitiateRequestMessage(src, dest, 0, bufferSize, SRC_STREAM_ID, destStreamID);
connection.put(m, this);
startTimeout(STREAM_INIT_TIMEOUT_MSEC);
}
Expand Down
6 changes: 4 additions & 2 deletions src/org/openlcb/StreamDataProceedMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
public class StreamDataProceedMessage extends AddressedPayloadMessage {

public StreamDataProceedMessage(NodeID source, NodeID dest,
byte sourceStreamID, byte destStreamID) {
super(source, dest, new byte[] {sourceStreamID, destStreamID});
byte sourceStreamID, byte destStreamID, int flags) {
super(source, dest, new byte[] {sourceStreamID, destStreamID, (byte)((flags>>8)&0xFF), (byte)(flags&0xFF)});
this.sourceStreamID = sourceStreamID;
this.destStreamID = destStreamID;
this.flags = flags;
}

byte sourceStreamID;
byte destStreamID;
int flags;

public byte getSourceStreamID() { return sourceStreamID; }
public byte getDestinationStreamID() { return destStreamID; }
Expand Down
9 changes: 6 additions & 3 deletions src/org/openlcb/StreamInitiateReplyMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
public class StreamInitiateReplyMessage extends AddressedPayloadMessage {

public StreamInitiateReplyMessage(NodeID source, NodeID dest,
int bufferSize, byte sourceStreamID, byte destStreamID) {
super(source, dest, toPayload(bufferSize, sourceStreamID, destStreamID));
int flags, int bufferSize, byte sourceStreamID, byte destStreamID) {
super(source, dest, toPayload(flags, bufferSize, sourceStreamID, destStreamID));
this.flags = flags;
this.bufferSize = bufferSize;
this.sourceStreamID = sourceStreamID;
this.destStreamID = destStreamID;
}

int flags;
int bufferSize;
byte sourceStreamID;
byte destStreamID;
Expand All @@ -30,8 +32,9 @@ public StreamInitiateReplyMessage(NodeID source, NodeID dest,
public byte getDestinationStreamID() { return destStreamID; }
public byte getSourceStreamID() { return sourceStreamID; } //dph 20151229

static byte[] toPayload(int bufferSize, byte sourceStreamID, byte destStreamID) {
static byte[] toPayload(int flags, int bufferSize, byte sourceStreamID, byte destStreamID) {
byte[] b = new byte[]{0, 0, 0, 0, sourceStreamID, destStreamID};
Utilities.HostToNetworkUint16(b, 2, flags);
Utilities.HostToNetworkUint16(b, 0, bufferSize);
return b;
}
Expand Down
11 changes: 7 additions & 4 deletions src/org/openlcb/StreamInitiateRequestMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,23 @@
/**
* Stream Initialization Request message implementation
*
* @author Bob Jacobsen Copyright 2009
* @author Bob Jacobsen Copyright 2009, 2026
* @version $Revision$
*/
@Immutable
@ThreadSafe
public class StreamInitiateRequestMessage extends AddressedPayloadMessage {

public StreamInitiateRequestMessage(NodeID source, NodeID dest,
int bufferSize, byte sourceStreamID, byte destinationStreamID) {
super(source, dest, toPayload(bufferSize, sourceStreamID, destinationStreamID));
int flags, int bufferSize, byte sourceStreamID, byte destinationStreamID) {
super(source, dest, toPayload(flags, bufferSize, sourceStreamID, destinationStreamID));
this.flags = flags;
this.bufferSize = bufferSize;
this.sourceStreamID = sourceStreamID;
this.destinationStreamID = destinationStreamID;
}

int flags;
int bufferSize;
byte sourceStreamID;
byte destinationStreamID;
Expand All @@ -30,9 +32,10 @@ public StreamInitiateRequestMessage(NodeID source, NodeID dest,
public byte getSourceStreamID() { return sourceStreamID; }
public byte getDestinationStreamID() { return destinationStreamID; }

static byte[] toPayload(int bufferSize, byte sourceStreamID, byte destStreamID) {
static byte[] toPayload(int flags, int bufferSize, byte sourceStreamID, byte destStreamID) {
byte[] b = new byte[]{0, 0, 0, 0, sourceStreamID, destStreamID};
Utilities.HostToNetworkUint16(b, 0, bufferSize);
Utilities.HostToNetworkUint16(b, 2, flags);
return b;
}

Expand Down
11 changes: 8 additions & 3 deletions src/org/openlcb/can/MessageBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -376,15 +376,20 @@ List<Message> processFormat1(CanFrame f) {
return retlist;
// add all stream messages reply and proceed.
case StreamInitiateRequest:
retlist.add(new StreamInitiateRequestMessage(source,dest,Utilities.NetworkToHostUint16(content, 2),content[4],
retlist.add(new StreamInitiateRequestMessage(source,dest,
Utilities.NetworkToHostUint16(content, 2),
Utilities.NetworkToHostUint16(content, 0),content[4],
(content.length > 5 ? content[5] : -1)));
return retlist;
case StreamInitiateReply:
retlist.add(new StreamInitiateReplyMessage(source,dest,Utilities.NetworkToHostUint16(content, 0),content[4], content[5]));
retlist.add(new StreamInitiateReplyMessage(source,dest,
Utilities.NetworkToHostUint16(content, 2),Utilities.NetworkToHostUint16(content, 0),
content[4], content[5]));
return retlist;
// case StreamData is Format 7
case StreamDataProceed:
retlist.add(new StreamDataProceedMessage(source,dest,content[2], content[3]));
int sdpflags = (int)(content[2])<<8+(int)content[3];
retlist.add(new StreamDataProceedMessage(source,dest,content[0], content[1], sdpflags));
return retlist;
case StreamDataComplete:
retlist.add(new StreamDataCompleteMessage(source,dest,content.length > 2 ?
Expand Down
5 changes: 3 additions & 2 deletions src/org/openlcb/implementations/StreamReceiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public void handleStreamInitiateRequest(StreamInitiateRequestMessage msg, Connec
int len = msg.getBufferSize();
sourceStreamID = msg.getSourceStreamID();

Message m = new StreamInitiateReplyMessage(here, far, len, sourceStreamID, destStreamID);
// flags are 0
Message m = new StreamInitiateReplyMessage(here, far, 0, len, sourceStreamID, destStreamID);
connection.put(m, this);
}

Expand All @@ -38,7 +39,7 @@ public void handleStreamInitiateRequest(StreamInitiateRequestMessage msg, Connec
*/
public void handleStreamDataSend(StreamDataSendMessage msg, Connection sender){
// send proceed reply
Message m = new StreamDataProceedMessage(here, far, sourceStreamID, destStreamID);
Message m = new StreamDataProceedMessage(here, far, sourceStreamID, destStreamID, 0);
connection.put(m, this);
}

Expand Down
4 changes: 2 additions & 2 deletions src/org/openlcb/implementations/StreamTransmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ public StreamTransmitter(NodeID here, NodeID far, int bufferSize, int[] bytes, C
// not be right, but we don't have a value passed to this method.
destStreamID = 0;

// start negotiation
StreamInitiateRequestMessage m = new StreamInitiateRequestMessage(here, far, bufferSize, sourceStreamID, destStreamID);
// start negotiation - flags are 0 here
StreamInitiateRequestMessage m = new StreamInitiateRequestMessage(here, far, 0, bufferSize, sourceStreamID, destStreamID);
connection.put(m, this);
}

Expand Down
10 changes: 5 additions & 5 deletions test/org/openlcb/LoaderClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -277,17 +277,17 @@ public void onDone(int errorCode, String errorString) {
dcs.put(m, null);
// Stream Setup
Assert.assertEquals("StreamSetup", 1, messagesReceived.size());
Assert.assertTrue(messagesReceived.get(0).equals(new StreamInitiateRequestMessage(hereID,farID,16384,(byte)4,(byte)0))); // Stream negn
Assert.assertTrue(messagesReceived.get(0).equals(new StreamInitiateRequestMessage(hereID,farID,0,16384,(byte)4,(byte)0))); // Stream negn
messagesReceived.clear();
// *********** note small buffersize! **********
xmt.put(new StreamInitiateReplyMessage(farID,hereID,6,(byte)4,(byte)6), null);
xmt.put(new StreamInitiateReplyMessage(farID,hereID,0,6,(byte)4,(byte)6), null);
// Stream Data

Assert.assertEquals("stream data", 1, messagesReceived.size());
//System.out.println("Msg0: "+(messagesReceived.get(0) != null ? messagesReceived.get(0).toString() : " == null"));
Assert.assertTrue(messagesReceived.get(0).equals(new StreamDataSendMessage(hereID,farID,(byte)6,new int[]{'a','b','c','d','e','f'})));
messagesReceived.clear();
xmt.put(new StreamDataProceedMessage(farID,hereID,(byte)4,(byte)6),null);
xmt.put(new StreamDataProceedMessage(farID,hereID,(byte)4,(byte)6,0),null);
Assert.assertEquals("second stream data, stream complete, unfreeze", 3, messagesReceived.size());
Assert.assertTrue(messagesReceived.get(0).equals(new StreamDataSendMessage(hereID,farID,(byte)6,new int[]{'g','h','i','j'})));
Assert.assertTrue(messagesReceived.get(1).equals(new StreamDataCompleteMessage(hereID,farID,(byte)4,(byte)6)));
Expand Down Expand Up @@ -331,10 +331,10 @@ public void onDone(int errorCode, String errorString) {
Message m = new DatagramAcknowledgedMessage(farID,hereID);
dcs.put(m, null);
// Stream Setup
Assert.assertEquals(new StreamInitiateRequestMessage(hereID, farID, 16384, (byte)4, (byte)0), messagesReceived.get(0));
Assert.assertEquals(new StreamInitiateRequestMessage(hereID, farID, 0, 16384, (byte)4, (byte)0), messagesReceived.get(0));
messagesReceived.clear();
// *********** note larger buffersize! **********
xmt.put(new StreamInitiateReplyMessage(farID,hereID,64,(byte)4,(byte)6), null);
xmt.put(new StreamInitiateReplyMessage(farID,hereID,0,64,(byte)4,(byte)6), null);
// Stream Data
Assert.assertEquals("stream data", 3, messagesReceived.size());
Assert.assertTrue(messagesReceived.get(0).equals(new StreamDataSendMessage(hereID,farID,(byte)6,new int[]{'a','b','c','d','e','f','g','h','i','j'})));
Expand Down
2 changes: 1 addition & 1 deletion test/org/openlcb/StreamDataProceedMessageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class StreamDataProceedMessageTest {
public void testCTor() {
NodeID id1 = new NodeID(new byte[]{1, 1, 0, 0, 0, 4});
NodeID id2 = new NodeID(new byte[]{1, 1, 0, 0, 4, 4});
StreamDataProceedMessage t = new StreamDataProceedMessage(id1,id2,(byte)0x00,(byte)0x00);
StreamDataProceedMessage t = new StreamDataProceedMessage(id1,id2,(byte)0x00,(byte)0x00,0);
Assert.assertNotNull("exists",t);
}

Expand Down
2 changes: 1 addition & 1 deletion test/org/openlcb/StreamInitiateReplyMessageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class StreamInitiateReplyMessageTest {
public void testCTor() {
NodeID id1 = new NodeID(new byte[]{1, 1, 0, 0, 0, 4});
NodeID id2 = new NodeID(new byte[]{1, 1, 0, 0, 4, 4});
StreamInitiateReplyMessage t = new StreamInitiateReplyMessage(id1,id2,0,(byte)0x00,(byte)0x00);
StreamInitiateReplyMessage t = new StreamInitiateReplyMessage(id1,id2,0,0,(byte)0x00,(byte)0x00);
Assert.assertNotNull("exists",t);
}

Expand Down
2 changes: 1 addition & 1 deletion test/org/openlcb/StreamInitiateRequestMessageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class StreamInitiateRequestMessageTest {
public void testCTor() {
NodeID id1 = new NodeID(new byte[]{1, 1, 0, 0, 0, 4});
NodeID id2 = new NodeID(new byte[]{1, 1, 0, 0, 4, 4});
StreamInitiateRequestMessage t = new StreamInitiateRequestMessage(id1,id2,0,(byte)0x00,(byte)0x00);
StreamInitiateRequestMessage t = new StreamInitiateRequestMessage(id1,id2,0,0,(byte)0x00,(byte)0x00);
Assert.assertNotNull("exists",t);
}

Expand Down
38 changes: 38 additions & 0 deletions test/org/openlcb/can/MessageBuilderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,44 @@ public void testBogusMti() {
Assert.assertTrue(msg instanceof UnknownMtiMessage);
}

@Test
public void testStreamInitiateRequest() {
OpenLcbCanFrame frame = new OpenLcbCanFrame(0x123);
frame.setHeader(0x19CC8123);
frame.setData(new byte[]{0x02, 0x02, 1,2,3,4,5,6});

MessageBuilder b = new MessageBuilder(map);

List<Message> list = b.processFrame(frame);

Assert.assertEquals("count", 1, list.size());
Message msg = list.get(0);

Assert.assertTrue(msg instanceof StreamInitiateRequestMessage);
Assert.assertEquals("01.02.03.04.05.06 - 00.00.00.00.00.00 StreamInitiateRequest with payload 01 02 03 04 05 06 SSID 5 DSID 6 bsize 258",
msg.toString());
Assert.assertEquals((1<<8)+2, ((StreamInitiateRequestMessage)msg).getBufferSize());
}

@Test
public void testStreamInitiateReply() {
OpenLcbCanFrame frame = new OpenLcbCanFrame(0x123);
frame.setHeader(0x19868123);
frame.setData(new byte[]{0x02, 0x02, 1,2,3,4,5,6});

MessageBuilder b = new MessageBuilder(map);

List<Message> list = b.processFrame(frame);

Assert.assertEquals("count", 1, list.size());
Message msg = list.get(0);

Assert.assertTrue(msg instanceof StreamInitiateReplyMessage);
Assert.assertEquals("01.02.03.04.05.06 - 00.00.00.00.00.00 StreamInitiateReply with payload 01 02 03 04 05 06 SSID 5 DSID 6 bsize 258",
msg.toString());
Assert.assertEquals((1<<8)+2, ((StreamInitiateReplyMessage)msg).getBufferSize());
}

@Test
public void testAccumulateSnipReply() {
// start frame
Expand Down
10 changes: 5 additions & 5 deletions test/org/openlcb/implementations/StreamReceiverTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ public void put(Message msg, Connection sender) {
Assert.assertTrue(messagesReceived.size() == 0); // no startup messages

// start operation
Message m = new StreamInitiateRequestMessage(farID, hereID, (byte)64, (byte)11, (byte)0 );
Message m = new StreamInitiateRequestMessage(farID, hereID, 0, (byte)64, (byte)11, (byte)0 );

rcv.put(m, null);

Assert.assertTrue(messagesReceived.size() == 1);
Assert.assertTrue(messagesReceived.get(0)
.equals(new StreamInitiateReplyMessage(hereID, farID, (byte)64, (byte)11, (byte)3)));
.equals(new StreamInitiateReplyMessage(hereID, farID, 0, (byte)64, (byte)11, (byte)3)));
}

@Test
Expand All @@ -55,13 +55,13 @@ public void put(Message msg, Connection sender) {
Assert.assertTrue(messagesReceived.size() == 0); // no startup messages

// start operation
Message m = new StreamInitiateRequestMessage(farID, hereID, 64, (byte)12, (byte)0);
Message m = new StreamInitiateRequestMessage(farID, hereID, 0, 64, (byte)12, (byte)0);

rcv.put(m, null);

Assert.assertTrue(messagesReceived.size() == 1);
Assert.assertTrue(messagesReceived.get(0)
.equals(new StreamInitiateReplyMessage(hereID, farID, 64, (byte)12, (byte)3)));
.equals(new StreamInitiateReplyMessage(hereID, farID, 0, 64, (byte)12, (byte)3)));

// send one data message
messagesReceived = new java.util.ArrayList<Message>();
Expand All @@ -71,6 +71,6 @@ public void put(Message msg, Connection sender) {

Assert.assertTrue(messagesReceived.size() == 1);
Assert.assertTrue(messagesReceived.get(0)
.equals(new StreamDataProceedMessage(hereID, farID, (byte)12, (byte)3)));
.equals(new StreamDataProceedMessage(hereID, farID, (byte)12, (byte)3, 0)));
}
}
12 changes: 6 additions & 6 deletions test/org/openlcb/implementations/StreamTransmitterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void put(Message msg, Connection sender) {

Assert.assertTrue(messagesReceived.size() == 1); // startup message
Assert.assertEquals(new StreamInitiateRequestMessage(
hereID, farID, 64, (byte)4, (byte)0), messagesReceived.get(0));
hereID, farID, 0, 64, (byte)4, (byte)0), messagesReceived.get(0));
}

@Test
Expand All @@ -56,10 +56,10 @@ public void put(Message msg, Connection sender) {

Assert.assertEquals("init messages", 1, messagesReceived.size());
Assert.assertTrue(messagesReceived.get(0)
.equals(new StreamInitiateRequestMessage(hereID, farID, 256, (byte)4, (byte)0)));
.equals(new StreamInitiateRequestMessage(hereID, farID, 0, 256, (byte)4, (byte)0)));

// OK 256 byte buffers
Message m = new StreamInitiateReplyMessage(farID, hereID, 256, (byte)4, destID);
Message m = new StreamInitiateReplyMessage(farID, hereID, 0, 256, (byte)4, destID);
messagesReceived = new java.util.ArrayList<Message>();

xmt.put(m, null);
Expand Down Expand Up @@ -87,10 +87,10 @@ public void put(Message msg, Connection sender) {

Assert.assertEquals("init messages", 1, messagesReceived.size());
Assert.assertTrue(messagesReceived.get(0)
.equals(new StreamInitiateRequestMessage(hereID, farID, 256, (byte)4, (byte)0)));
.equals(new StreamInitiateRequestMessage(hereID, farID, 0, 256, (byte)4, (byte)0)));

// OK 256 byte buffers
Message m = new StreamInitiateReplyMessage(farID, hereID, 256, (byte)4, (byte)13);
Message m = new StreamInitiateReplyMessage(farID, hereID, 0, 256, (byte)4, (byte)13);
messagesReceived = new java.util.ArrayList<Message>();

xmt.put(m, null);
Expand All @@ -101,7 +101,7 @@ public void put(Message msg, Connection sender) {
.equals(new StreamDataSendMessage(hereID, farID, (byte)13, new int[256])));

// reply to proceed
m = new StreamDataProceedMessage(farID, hereID, (byte)4, (byte)0);
m = new StreamDataProceedMessage(farID, hereID, (byte)4, (byte)0, 0);
messagesReceived = new java.util.ArrayList<Message>();

xmt.put(m, null);
Expand Down