From 5b6813c4811df00c660286926affc44b19e837ba Mon Sep 17 00:00:00 2001 From: izeek182 Date: Sun, 6 Aug 2023 18:22:51 -0600 Subject: [PATCH] StartTCP --- bin/RDTEchoServer.lua | 1 + bin/RDTTest.lua | 1 + lib/dataQue.lua | 37 +++++ lib/fileUtils.lua | 6 +- lib/logUtils.lua | 20 ++- lib/netDefs.lua | 73 ++++++++- lib/netRDT.lua | 341 ++++++++++++++++++++---------------------- lib/que.lua | 6 +- lib/tcp.lua | 47 ++++++ lib/tcpPacket.lua | 76 ++++++++++ lib/tcpSocket.lua | 59 ++++++++ 11 files changed, 477 insertions(+), 190 deletions(-) create mode 100644 lib/dataQue.lua create mode 100644 lib/tcp.lua create mode 100644 lib/tcpPacket.lua create mode 100644 lib/tcpSocket.lua diff --git a/bin/RDTEchoServer.lua b/bin/RDTEchoServer.lua index a591ad3..220e444 100644 --- a/bin/RDTEchoServer.lua +++ b/bin/RDTEchoServer.lua @@ -2,6 +2,7 @@ local RDT = require("netRDT") require("logUtils") local logID = _LogUtil.newLogger("rdtDebug",_LogLevel.info,_LogLevel.trace,_LogLevel.noLog) +_LogUtil.clearAllFiles() local port = 15; diff --git a/bin/RDTTest.lua b/bin/RDTTest.lua index 1cf9d9b..6e36b1b 100644 --- a/bin/RDTTest.lua +++ b/bin/RDTTest.lua @@ -2,6 +2,7 @@ local RDT = require("netRDT") require("logUtils") local logID = _LogUtil.newLogger("rdtDebug",_LogLevel.debug,_LogLevel.trace,_LogLevel.noLog) +_LogUtil.clearAllFiles() local dest = "3ae67331-1f18-49ea-866b-e8bd3e02cb8f"; local port = 15; diff --git a/lib/dataQue.lua b/lib/dataQue.lua new file mode 100644 index 0000000..5239fd1 --- /dev/null +++ b/lib/dataQue.lua @@ -0,0 +1,37 @@ + +require("logUtils") +local serial = require("serialization") +local que = require("que") +local logID = _LogUtil.newLogger("dataQue",_LogLevel.error,_LogLevel.trace,_LogLevel.noLog) + +local dataQue = {} + +function dataQue.QueueData(que,data) + + return que +end + +function dataQue.NextInWin(que) +end + +function dataQue.StartOfWin(que) +end + +function dataQue.winHasNext(que) +end + +function dataQue.AckPacket(que,ackNum) +end + +function dataQue.DataQue(max) + local dq={} + dq.q = que.Queue(max) + dq.winSize = 0 + dq.winSent = 0 + dq.seqNum = 0 + dq.ackNum = 0 + return dq +end + + +return dataQue \ No newline at end of file diff --git a/lib/fileUtils.lua b/lib/fileUtils.lua index 48f1c5d..9ced857 100644 --- a/lib/fileUtils.lua +++ b/lib/fileUtils.lua @@ -36,5 +36,9 @@ if (_FileUtil == nil) then end end - + function _FileUtil.rm(file) + if(fs.exists(file)) then + fs.remove(file) + end + end end \ No newline at end of file diff --git a/lib/logUtils.lua b/lib/logUtils.lua index ffab8c2..6659de3 100644 --- a/lib/logUtils.lua +++ b/lib/logUtils.lua @@ -4,7 +4,7 @@ if (_LogUtil == nil) then _LogUtil = { loggerCount = 0 } - local _MaxLogSize = 1000000 + local _MaxLogSize = 50000 _LogLevel = { trace = 1, info = 2, @@ -42,7 +42,8 @@ if (_LogUtil == nil) then local _utilLoggers = { broadcastLog = function (...)end, fileLog = function (...)end, - printLog = function (...)end + printLog = function (...)end, + clearAllFiles= function (...)end } function _utilLoggers.printLog(str) @@ -107,6 +108,11 @@ if (_LogUtil == nil) then loggers[logID].cl = level end + function _LogUtil.clearAllFiles() + _utilLoggers.clearAllFiles() + end + + function _LogUtil.logFailures(logID,callback,...) local results local arguments = {...} @@ -130,14 +136,18 @@ if (_LogUtil == nil) then end if not (_FileUtil == nil) then + local logsDir = "/logs" function _utilLoggers.fileLog(str,name) - local fileName = "/logs/"..name..".log" + local fileName = logsDir.."/"..name..".log" if(_FileUtil.size(fileName) > _MaxLogSize) then _FileUtil.clear(fileName) end _FileUtil.append(fileName,str,"\n") end - _FileUtil.ensureDir("/logs/") + function _utilLoggers.clearAllFiles() + _FileUtil.rm(logsDir) + _FileUtil.ensureDir(logsDir.."/") + end end if(_NetUtil == nil) then @@ -152,4 +162,6 @@ if (_LogUtil == nil) then _NetUtil.broadcast(_NetDefs.portEnum.logger,str,_NetDefs.HostName) end end + + end \ No newline at end of file diff --git a/lib/netDefs.lua b/lib/netDefs.lua index 1a637b2..9984f72 100644 --- a/lib/netDefs.lua +++ b/lib/netDefs.lua @@ -40,12 +40,77 @@ if(_NetDefs == nil) then RDT = 1, UDP = 2, } + local maxWinSz = 32 + _NetDefs.rdtConst = { + maxWindowSize = maxWinSz, + bufferSize = (maxWinSz) * 2, + } + + _NetDefs.timming = { + serviceInterval = 0.1, -- Time between service calls (seconds) + sendRate = 0.5, -- Time between sending packets from the que + resendInterval = 2, -- Time since last Ack before resending Que (seconds) + sendHBInterval = 3, -- Time since last transmuit before sending proof of life (seconds) + coldInterval = 10, -- Time since last Ack before resending Que (seconds) + sendRateInc = -0.1, -- Time change when increacing the rate general sending (seconds) + sendRateDec = 0.1, -- Time change when decreasing the rate general sending (seconds) + } + + function _NetDefs.reCalcTimeouts() + --timming measured in units of serviceInterval + _NetDefs.timeOut = { + genSend = _NetDefs.timming.sendRate/_NetDefs.timming.serviceInterval, + resend = _NetDefs.timming.resendInterval/_NetDefs.timming.serviceInterval, + sendHB = _NetDefs.timming.sendHBInterval/_NetDefs.timming.serviceInterval, + cold = _NetDefs.timming.coldInterval /_NetDefs.timming.serviceInterval, + } + end + + function _NetDefs.incSendRate() + local t = _NetDefs.timming + t.sendRate = t.sendRate + t.sendRateInc + if t.sendRate < t.serviceInterval then + t.sendRate = t.serviceInterval + end + _NetDefs.timming = t + _NetDefs.reCalcTimeouts() + end - _NetDefs.timeOut = { - resend = 10, - sendHB = 10, - cold = 50, + function _NetDefs.decSendRate() + local t = _NetDefs.timming + t.sendRate = t.sendRate + t.sendRateDec + if t.sendRate < t.serviceInterval then + t.sendRate = t.serviceInterval + end + _NetDefs.timming = t + _NetDefs.reCalcTimeouts() + end + _NetDefs.reCalcTimeouts() + -- these values may be adjusted over the course of opperation + _NetDefs.congestionControl = { + lossScaleDownFactor = 0.5, + scaleUpRate = 0.25, + maxPkt = 10 } + + function _NetDefs.queTimeout() + local cc = _NetDefs.congestionControl + cc.maxPkt = cc.maxPkt * cc.lossScaleDownFactor + if(cc.maxPkt < 1) then + cc.maxPkt = 1 + end + _NetDefs.congestionControl = cc + _NetDefs.reCalcTimeouts() + end + + function _NetDefs.ackedPacket() + local cc = _NetDefs.congestionControl + cc.maxPkt = cc.maxPkt + cc.scaleUpRate + _NetDefs.congestionControl = cc + _NetDefs.reCalcTimeouts() + end + + _NetDefs.events = { syncResponse = "NetSynResponse" } diff --git a/lib/netRDT.lua b/lib/netRDT.lua index a3464b8..1bd237c 100644 --- a/lib/netRDT.lua +++ b/lib/netRDT.lua @@ -2,10 +2,11 @@ if (RDT == nil) then RDT = { } - if(_LogUtil == nil) then + if (_LogUtil == nil) then require("logUtils") end local que = require("que") + local tcpPkt = require("tcpPacket.lua") require("netDefs") require("netUtils") local component = require("component") @@ -14,7 +15,7 @@ if (RDT == nil) then local event = require("event") local thread = require("thread") - local logID = _LogUtil.newLogger("netRDT",_LogLevel.error,_LogLevel.trace,_LogLevel.noLog) + local logID = _LogUtil.newLogger("netRDT", _LogLevel.error, _LogLevel.trace, _LogLevel.noLog) local _RdtMode = { syn1 = 1, @@ -24,11 +25,6 @@ if (RDT == nil) then listening = 5, close = 9, } - - local __windowSize = 16 - local __serviceInterval = 0.5 - local __bufferSize = (__windowSize) * 2 - local __MaxInFlight = 2 local _NetRDT = { time = 0, @@ -41,11 +37,11 @@ if (RDT == nil) then -- LastTx:, -- LastRx:, -- pktQue:[pktQue], - -- lenQue:, - -- pktsInFlight: - -- pktInFlight: }}}openSocket + -- winOffset: + -- dupAck + -- pktRxBuf:{}}}}openSocket newMessagecb = {}, -- {:} - newClientcb = {}, -- {:} + newClientcb = {}, -- {:} } local function timeSince(someTime) @@ -56,77 +52,38 @@ if (RDT == nil) then end end - local function setCallback(localPort,callback) - _LogUtil.trace(logID,"setting Callback on",localPort) + local function setCallback(localPort, callback) + _LogUtil.trace(logID, "setting Callback on", localPort) _NetRDT.Sockets[localPort].callback = callback end - local function callCallback(localPort,data) - _LogUtil.trace(logID," Calling back on port ",localPort) + local function callCallback(localPort, data) + _LogUtil.trace(logID, " Calling back on port ", localPort) _NetRDT.Sockets[localPort].callback(data) end - - -- local function enqueue(port, packet) - -- _NetRDT.Sockets[port].pktQue = que.enqueue(_NetRDT.Sockets[port].pktQue,packet) - -- end - - -- local function peakAtData(port) - -- local i = _NetRDT.Sockets[port].pktNum+_NetRDT.Sockets[port].pktsInFlight - -- local data = _NetRDT.Sockets[port].pktQue[i] - -- _LogUtil.trace(logID," dequeuing data from:",i," to send on port:",port,", ",serial.serialize(data)) - -- end - - -- local function dequeue(port) - -- local i = _NetRDT.Sockets[port].pktNum - -- local data = _NetRDT.Sockets[port].pktQue[i] - -- _NetRDT.Sockets[port].pktNum = i + 1 - -- _NetRDT.Sockets[port].lenQue = _NetRDT.Sockets[port].lenQue - 1 - -- _LogUtil.trace(logID," dequeuing data from:",i," to send on port:",port,", ",serial.serialize(data)) - -- return data - -- end - local function openSocket(port, remotePort, remoteHost, callback) - _LogUtil.info(logID,"Opening RDT socket on:",port," to ",remoteHost,":",remotePort) + _LogUtil.info(logID, "Opening RDT socket on:", port, " to ", remoteHost, ":", remotePort) _NetRDT.portTypes[port] = _RdtMode.data - _NetUtil.open(port, _NetRDT.safeNetProcessing) - local socket = {}; - socket.remoteHost = remoteHost - socket.remotePort = remotePort - socket.callback = callback - socket.pktNum = 0 - socket.ackNum = 0 - socket.LastTx = _NetRDT.time - socket.LastRx = _NetRDT.time - socket.pktQue = que.Queue(__bufferSize) - socket.lenQue = 0 - socket.pktInFlight = 0 - _NetRDT.Sockets[port] = socket - end - - local function packPkt(srcPort, RDT_mode, data) - local header = { srcPort, 0, 0, RDT_mode } - local pkt = {header,data} - return pkt - end - - local function unpackPkt(pkt) - local header, data = table.unpack(pkt) - local srcPort, pktNum, ackNum, RDT_mode = table.unpack(header) - return srcPort, pktNum, ackNum, RDT_mode, data + _NetUtil.open(port, _NetRDT.safeNetProcessing) + local socket = {}; + socket.remoteHost = remoteHost + socket.remotePort = remotePort + socket.callback = callback + socket.pktNum = 0 + socket.ackNum = 1 + socket.LastTx = _NetRDT.time + socket.LastRx = _NetRDT.time + socket.pktQue = que.Queue(_NetDefs.rdtConst.bufferSize) + socket.winOffset = 0 + socket.dupAck = 0 + socket.pktRxBuf = {} + _NetRDT.Sockets[port] = socket end - local function pktSetAckNum(pkt,ack,num) - -- _LogUtil.trace(logID,"changing pkt's ack/num",serial.serialize(pkt)) - pkt[1][3] = ack - pkt[1][2] = num - -- _LogUtil.trace(logID,"changing new ack/num",serial.serialize(pkt)) - return pkt + local function buildNextPkt(localPort, data) + return tcpPkt.pack(localPort, _RdtMode.data, data) end - local function buildNextPkt(localPort,data) - return packPkt(localPort,_RdtMode.data,data) - end - local function getFreePort() for i = 1, 100, 1 do local rand = math.random(_NetDefs.portEnum.RDT_start, _NetDefs.portEnum.RDT_end) @@ -137,12 +94,11 @@ if (RDT == nil) then return -1 end - local function sendPkt(localPort,pkt) + local function sendPkt(localPort, pkt,num) local skt = _NetRDT.Sockets[localPort] local ack = skt.ackNum - local num = skt.pktNum + skt.pktInFlight - pkt = pktSetAckNum(pkt,ack,num) - _LogUtil.info(logID,"Sending on:",localPort," -> ",serial.serialize(pkt)) + pkt = tcpPkt.setSeqAck(pkt, ack, num) + _LogUtil.info(logID, "Sending on:", localPort, " -> ", serial.serialize(pkt)) _NetRDT.Sockets[localPort].LastTx = _NetRDT.time _NetUtil.send( _NetRDT.Sockets[localPort].remoteHost, @@ -151,150 +107,180 @@ if (RDT == nil) then ) end + local function sendFromQue(localPort,n) + local skt = _NetRDT.Sockets[localPort] + local num = skt.pktNum+n + sendPkt(localPort, que.peak(skt.pktQue, n),num) + end + + local function resendFirstPacket(localPort) + sendFromQue(localPort,1) + end + + local function sendNextPacket(localPort) + local skt = _NetRDT.Sockets[localPort] + sendFromQue(localPort,skt.winOffset + 1) + end + local function sendSignal(localPort, RdtMode, data) -- _LogUtil.info(logID,"Sending on:",localPort," -> ",data) + local num = _NetRDT.Sockets[localPort].pktNum sendPkt(localPort, - packPkt( - localPort, - RdtMode, - data) + tcpPkt.pack( + localPort, + RdtMode, + data), + num ) end - local function closeSocket(port,graceful) - _LogUtil.info(logID,"closing RDT port:",port) + local function closeSocket(port, graceful) + _LogUtil.info(logID, "closing RDT port:", port) if graceful then - sendSignal(port,_RdtMode.close) + sendSignal(port, _RdtMode.close) end _NetUtil.close(port) _NetRDT.portTypes[port] = nil _NetRDT.Sockets[port] = nil end - local function expectedPacket(port, pktNum) - return _NetRDT.Sockets[port].ackNum == pktNum - end - local function ackDistance(recivedAck, currentPktNum) - if(recivedAck > currentPktNum) then + if (recivedAck > currentPktNum) then return recivedAck - currentPktNum end - return ((__bufferSize + recivedAck) - currentPktNum) % __bufferSize + local bufLen = _NetDefs.rdtConst.bufferSize + return ((bufLen + recivedAck) - currentPktNum) % bufLen end - - -- Recived an ack for a packed dont resend that packet local function registerAckPacket(ackNum, port) local ackDist = ackDistance(ackNum, _NetRDT.Sockets[port].pktNum) - _LogUtil.trace(logID,"Recived packet with ack:",ackNum," Last PacketNum:",_NetRDT.Sockets[port].pktNum," on port:",port," distance from last ack:",ackDist) - while ackDist < __windowSize and ackDist > 0 do - _NetRDT.Sockets[port].pktQue = que.dequeue(_NetRDT.Sockets[port].pktQue) - _NetRDT.Sockets[port].pktNum = _NetRDT.Sockets[port].pktNum + 1 - ackDist = ackDistance(ackNum, _NetRDT.Sockets[port].pktNum) - end - end + _LogUtil.trace(logID, "Recived packet with ack:", ackNum, " Last PacketNum:", _NetRDT.Sockets[port].pktNum, + " on port:", port, " distance from last ack:", ackDist) + if(ackDist == 0) then + _NetRDT.Sockets[port].dupAck = _NetRDT.Sockets[port].dupAck + 1 + -- and que.len(_NetRDT.Sockets[port].pktQue) > 0 + if(_NetRDT.Sockets[port].dupAck == 2 and que.len(_NetRDT.Sockets[port].pktQue) > 0) then + resendFirstPacket(port) + end + else + _NetRDT.Sockets[port].dupAck = 0 + if(ackDist < _NetDefs.rdtConst.maxWindowSize and _NetRDT.Sockets[port].winOffset > (0.8*_NetDefs.congestionControl.maxPkt)) then + _NetDefs.ackedPacket() + _LogUtil.trace(logID,"packet Acked, with many packets in flight expanding maxPkts in flight, now:".._NetDefs.congestionControl.maxPkt) + end + _LogUtil.trace(logID,"packet:"..ackNum.." Acked. dequeuing") + if(que.len(_NetRDT.Sockets[port].pktQue) > 0) then + _NetRDT.Sockets[port].pktQue = que.dequeue(_NetRDT.Sockets[port].pktQue) + _NetRDT.Sockets[port].pktNum = _NetRDT.Sockets[port].pktNum + 1 + _NetRDT.Sockets[port].winOffset = _NetRDT.Sockets[port] - 1 + registerAckPacket(ackNum, port) + end - local function ackPacket(port, pktNum) - _NetRDT.Sockets[port].ackNum = pktNum + 1 - -- The the next send packet(HB or data with deliver this ack no need to press the issue) + end end - local function passOnData(port,data) - _LogUtil.trace(logID,"Passing on data:"..serial.serialize({data})) - callCallback(port,data) + local function passOnData(port, data) + -- _LogUtil.trace(logID, "Passing on data:" .. serial.serialize({ data })) + callCallback(port, data) end - local function processRdtPacket(remoteAddress, port, pktNum, ackNum, data) - registerAckPacket(ackNum, port) - if (expectedPacket(port, pktNum)) then - ackPacket(port, pktNum) - passOnData(port, data) + local function ackPacket(port) + local nextData = _NetRDT.Sockets[port].ackNum + local dataInQue = _NetRDT.Sockets[port].pktRxBuf[nextData] + _LogUtil.info(logID,"nextIndex:"..nextData.." ack packet, data Received but not used"..serial.serialize(_NetRDT.Sockets[port].pktRxBuf)) + if not (dataInQue == nil) then + _NetRDT.Sockets[port].ackNum = nextData + 1 + passOnData(port, dataInQue) + _NetRDT.Sockets[port].pktRxBuf[nextData] = nil + ackPacket(port) + -- The the next send packet(HB or data with deliver this ack no need to press the issue) end end - local function newSocket(localPortIn,remoteAddressIn,remotePortIn) - return {localPort=localPortIn,remoteAddress=remoteAddressIn,remotePort=remotePortIn} + local function newSocket(localPortIn, remoteAddressIn, remotePortIn) + return { localPort = localPortIn, remoteAddress = remoteAddressIn, remotePort = remotePortIn } end - local function recievedData(port,remoteAddress,pktNum, ackNum,data) - _LogUtil.info(logID,"Received data on:",port," -> ",serial.serialize(data)) + local function recievedData(port, remoteAddress, pktNum, ackNum, data) + _LogUtil.info(logID, "Received data on:", port, " -> ", serial.serialize(data)) _NetRDT.Sockets[port].LastRx = _NetRDT.time - processRdtPacket(remoteAddress, port, pktNum, ackNum, data) + _NetRDT.Sockets[port].pktRxBuf[pktNum] = data + registerAckPacket(ackNum, port) + ackPacket(port) + -- processRdtPacket(remoteAddress, port, pktNum, ackNum) end - local function connectionRequest(port,remoteAddress,remotePort) - _LogUtil.info(logID,"Received connection request on:",port) + local function connectionRequest(port, remoteAddress, remotePort) + _LogUtil.info(logID, "Received connection request on:", port) if (_NetRDT.portTypes[port] == _RdtMode.listening) then local newLocalPort = getFreePort() -- new client callback returns new message callback - local newCallback = _NetRDT.newClientcb[port](newSocket(newLocalPort,remoteAddress,remotePort)) - if(newCallback == nil) then - _LogUtil.trace(logID," no callback returned, replacing with default ",_NetRDT.newMessagecb[port]) + local newCallback = _NetRDT.newClientcb[port](newSocket(newLocalPort, remoteAddress, remotePort)) + if (newCallback == nil) then + _LogUtil.trace(logID, " no callback returned, replacing with default ", _NetRDT.newMessagecb[port]) newCallback = _NetRDT.newMessagecb[port] end - openSocket(newLocalPort,remotePort, remoteAddress, newCallback) - sendSignal(newLocalPort, _RdtMode.syn2,port) + openSocket(newLocalPort, remotePort, remoteAddress, newCallback) + sendSignal(newLocalPort, _RdtMode.syn2, port) end end local function tempCallback() - _LogUtil.error(logID,"callback never set after connection Accepted") + _LogUtil.error(logID, "callback never set after connection Accepted") end - local function connectionAccepted(port,remoteAddress,remotePort) - _LogUtil.info(logID,"Connection accepted:",port) + local function connectionAccepted(port, remoteAddress, remotePort) + _LogUtil.info(logID, "Connection accepted:", port) if (_NetRDT.portTypes[port] == _RdtMode.syn1) then - openSocket(port, remotePort,remoteAddress,tempCallback) - event.push(_NetDefs.events.syncResponse, port, remoteAddress, remotePort) + openSocket(port, remotePort, remoteAddress, tempCallback) + event.push(_NetDefs.events.syncResponse, port, remoteAddress, remotePort) end end - local function gotHeartbeat(ackNum,port) - _LogUtil.info(logID,"Received Heart Beat on:",port) + local function gotHeartbeat(ackNum, port) + _LogUtil.info(logID, "Received Heart Beat on:", port) registerAckPacket(ackNum, port) _NetRDT.Sockets[port].LastRx = _NetRDT.time end local function closeConnection(port) - _LogUtil.info(logID,"port:",port, "Closed by remote host") + _LogUtil.info(logID, "port:", port, "Closed by remote host") if _NetRDT.portTypes[port] == _RdtMode.data then - closeSocket(port,false) + closeSocket(port, false) end end - local function netProcessing(_, _, remoteAddress, port, _,pkt) - _LogUtil.trace(logID,"Received:",serial.serialize(pkt)) - local srcPort, pktNum, ackNum, RDT_mode,data = unpackPkt(pkt) + local function netProcessing(_, _, remoteAddress, port, _, pkt) + -- _LogUtil.trace(logID, "Received:", serial.serialize(pkt)) + local srcPort, pktNum, ackNum, RDT_mode, data = tcpPkt.unpack(pkt) if (RDT_mode == _RdtMode.data) then - recievedData(port,remoteAddress,pktNum, ackNum,data) + recievedData(port, remoteAddress, pktNum, ackNum, data) elseif (RDT_mode == _RdtMode.syn1) then - connectionRequest(port,remoteAddress,srcPort) + connectionRequest(port, remoteAddress, srcPort) elseif (RDT_mode == _RdtMode.syn2) then - connectionAccepted(port,remoteAddress,srcPort) + connectionAccepted(port, remoteAddress, srcPort) elseif (RDT_mode == _RdtMode.hb) then - gotHeartbeat(ackNum,port) + gotHeartbeat(ackNum, port) elseif (RDT_mode == _RdtMode.close) then closeConnection(port) end -end + end function _NetRDT.safeNetProcessing(...) - return _LogUtil.logFailures(logID,netProcessing,...) + return _LogUtil.logFailures(logID, netProcessing, ...) end - local function sendSyn(remoteHost,remotePort,localPort) + local function sendSyn(remoteHost, remotePort, localPort) _NetUtil.open(localPort, _NetRDT.safeNetProcessing) _NetUtil.send( remoteHost, remotePort, - packPkt( - localPort, - _RdtMode.syn1, - {}) + tcpPkt.pack( + localPort, + _RdtMode.syn1, + {}) ) end - - local function advanceTimer() _NetRDT.time = _NetRDT.time + 1 if (_NetRDT.time > _NetRDT.maxTime) then @@ -303,51 +289,50 @@ end end local function resendQue(localPort) - _NetRDT.Sockets[localPort].pktInFlight = 0 + _NetRDT.Sockets[localPort].winOffset = 0 end local function handleTimouts() for localPort, socket in pairs(_NetRDT.Sockets) do local LastTx = socket.LastTx local LastRx = socket.LastRx - if (socket.pktInFlight > 0 and timeSince(LastTx) > _NetDefs.timeOut.resend) then - _LogUtil.trace(logID,"timeout on que resending") + if (socket.winOffset > 0 and timeSince(LastTx) > _NetDefs.timeOut.resend) then + _NetDefs.queTimeout() + _LogUtil.trace(logID, "timeout on que resending, redusing max pktsInflight to:".._NetDefs.congestionControl.maxPkt) resendQue(localPort) - elseif (socket.pktInFlight == 0 and timeSince(LastTx) > _NetDefs.timeOut.sendHB and _NetRDT.portTypes[localPort] == _RdtMode.data) then - _LogUtil.trace(logID,"timeout on POL sending Heartbeat") + elseif (socket.winOffset == 0 and timeSince(LastTx) > _NetDefs.timeOut.sendHB and _NetRDT.portTypes[localPort] == _RdtMode.data) then + _LogUtil.trace(logID, "timeout on POL sending Heartbeat") sendSignal(localPort, _RdtMode.hb) end if (timeSince(LastRx) > _NetDefs.timeOut.cold) then - _LogUtil.error(logID,"Socket Has timed out on port:",localPort) - closeSocket(localPort,false) + _LogUtil.error(logID, "Socket Has timed out on port:", localPort) + closeSocket(localPort, false) end end end - local function sendNextPacket(localPort) - local skt = _NetRDT.Sockets[localPort] - sendPkt(localPort,que.peak(skt.pktQue,skt.pktInFlight+1)) - _NetRDT.Sockets[localPort].pktInFlight = skt.pktInFlight + 1 - end - local function processQueue() for localPort, socket in pairs(_NetRDT.Sockets) do - local queLen = que.len(socket.pktQue) - if (socket.pktInFlight < queLen )and (socket.pktInFlight <__MaxInFlight) then - _LogUtil.trace(logID,"sending data from que, len:",queLen," pktInFlight:",socket.pktInFlight," windowSize:",__MaxInFlight) - sendNextPacket(localPort) + local LastTx = socket.LastTx + if(timeSince(LastTx) > _NetDefs.timeOut.genSend) then + local queLen = que.len(socket.pktQue) + if (socket.winOffset < queLen) and (socket.winOffset < _NetDefs.congestionControl.maxPkt) then + _LogUtil.trace(logID, "sending data from que, len:", queLen, " winOffset:", socket.winOffset, + " windowSize:", _NetDefs.congestionControl.maxPkt) + sendNextPacket(localPort) + end end end end local function netMaintaince() - local status,err + local status, err while true do - os.sleep(__serviceInterval) - _LogUtil.logFailures(logID,processQueue) - _LogUtil.logFailures(logID,advanceTimer) - _LogUtil.logFailures(logID,handleTimouts) + os.sleep(_NetDefs.timming.serviceInterval) + _LogUtil.logFailures(logID, processQueue) + _LogUtil.logFailures(logID, advanceTimer) + _LogUtil.logFailures(logID, handleTimouts) end end @@ -356,11 +341,11 @@ end end local function init() - _LogUtil.info(logID,"initial startup") + _LogUtil.info(logID, "initial startup") reset() local t = thread.create(netMaintaince) if (t == nil) then - _LogUtil.error(logID,"thread failed to create") + _LogUtil.error(logID, "thread failed to create") return end t:detach() @@ -379,27 +364,27 @@ end _NetRDT.portTypes[port] = nil _NetUtil.close(port) end - + function RDT.openSocket(dest, port, callback) local newLocalPort = getFreePort() _NetRDT.portTypes[newLocalPort] = _RdtMode.syn1 - sendSyn(dest,port,newLocalPort) + sendSyn(dest, port, newLocalPort) local _, localPort, remoteAddress, remotePort = event.pull(30, _NetDefs.events.syncResponse, newLocalPort) if localPort == nil then - error("Port failed to open")-- Timed out :( + error("Port failed to open") -- Timed out :( end - local skt = newSocket(localPort,remoteAddress,remotePort) - setCallback(localPort,callback) + local skt = newSocket(localPort, remoteAddress, remotePort) + setCallback(localPort, callback) return skt end function RDT.closeSocket(socket) - closeSocket(socket.localPort,true) + closeSocket(socket.localPort, true) end - function RDT.send(socket,data) - local queue = _NetRDT.Sockets[socket.localPort].pktQue - _NetRDT.Sockets[socket.localPort].pktQue = que.enqueue(queue,buildNextPkt(socket.localPort,data)) + function RDT.send(socket, data) + local queue = _NetRDT.Sockets[socket.localPort].pktQue + _NetRDT.Sockets[socket.localPort].pktQue = que.enqueue(queue, buildNextPkt(socket.localPort, data)) end init() diff --git a/lib/que.lua b/lib/que.lua index f5b7826..285332e 100644 --- a/lib/que.lua +++ b/lib/que.lua @@ -6,7 +6,7 @@ local logID = _LogUtil.newLogger("que",_LogLevel.error,_LogLevel.trace,_LogLevel function que.enqueue(queue,data) if(queue.len >= queue.max) then - _LogUtil.error(debug.traceback("Que full! Not appending")) + _LogUtil.error(logID,debug.traceback("Que full! Not appending")) return queue end local i = (queue.ini + (queue.len)) % queue.max @@ -19,7 +19,7 @@ end function que.peak(queue,n) if(n > queue.len) then - _LogUtil.error(debug.traceback("peaking outside of que range returning nil")) + _LogUtil.error(logID,debug.traceback("peaking outside of que range:"..n.." max"..queue.len.." returning nil")) return nil end local i = (queue.ini + (n-1)) % queue.max @@ -31,7 +31,7 @@ end function que.dequeue(queue) if(queue.len <=0) then - _LogUtil.error(debug.traceback("Cant dequeue from an empty que.")) + _LogUtil.error(logID,debug.traceback("Cant dequeue from an empty que.")) return nil end local i = queue.ini diff --git a/lib/tcp.lua b/lib/tcp.lua new file mode 100644 index 0000000..8160dc2 --- /dev/null +++ b/lib/tcp.lua @@ -0,0 +1,47 @@ +if (TCP == nil) then + TCP = { + } + + if (_LogUtil == nil) then + require("logUtils") + end + local que = require("que") + local tcpPkt = require("tcpPacket.lua") + require("netDefs") + require("netUtils") + local component = require("component") + local computer = require("computer") + local serial = require("serialization") + local event = require("event") + local thread = require("thread") + + local logID = _LogUtil.newLogger("netRDT", _LogLevel.error, _LogLevel.trace, _LogLevel.noLog) + + + local _NetTCP = { + time = 0, + maxTime = 100, + portTypes = {}, --{,} + Sockets = {}, --{:{ + -- callback:, + -- pktNum:, + -- ackNum:, + -- LastTx:, + -- LastRx:, + -- pktQue:[pktQue], + -- winOffset: + -- dupAck + -- pktRxBuf:{}}}}openSocket + newMessagecb = {}, -- {:} + newClientcb = {}, -- {:} + } + + local function timeSince(someTime) + if (someTime < _NetTCP.time) then + return _NetTCP.time - someTime; + else + return _NetTCP.time + (_NetTCP.maxTime - someTime); + end + end +end +return TCP diff --git a/lib/tcpPacket.lua b/lib/tcpPacket.lua new file mode 100644 index 0000000..5d4d587 --- /dev/null +++ b/lib/tcpPacket.lua @@ -0,0 +1,76 @@ + +require("logUtils") +local serial = require("serialization") + +local logID = _LogUtil.newLogger("TCP_Packet",_LogLevel.error,_LogLevel.trace,_LogLevel.noLog) + +local tcpPacket = { +} + +local function compileFlags(...) + local sum = 0 + for index, value in ipairs({...}) do + if(value) then + sum = sum + (2^index) + end + end +end + +local function getFlags(number) + local maxFlag = 4 + local flags = {} + for i = maxFlag, 1, -1 do + local flagVal = (2^i) + if number > flagVal then + number = number - flagVal + flags[i] = true + else + flags[i] = false + end + end +end + +function tcpPacket.tcpPkt(synF,ackF,finF,srcPort,syn,ack,payload) + local pkt = {} + pkt.synF,pkt.ackF,pkt.finF = synF,ackF,finF + pkt.payload = payload + pkt.srcPort = srcPort + pkt.syn = syn + pkt.ack = ack + return pkt +end + +function tcpPacket.syn(srcPort) + return tcpPacket.tcpPkt(true,false,false,srcPort,0,0,nil) +end + +function tcpPacket.synAck(srcPort,ack) + return tcpPacket.tcpPkt(true,true,false,srcPort,0,ack,nil) +end + +function tcpPacket.Ack(srcPort) + return tcpPacket.tcpPkt(false,true,false,srcPort,1,1,nil) +end + +function tcpPacket.fin(srcPort,seq,ack) + return tcpPacket.tcpPkt(false,true,true,srcPort,seq,ack,nil) +end + +function tcpPacket.std(srcPort,seq,ack,payload) + tcpPacket.tcpPkt(false,true,false,srcPort,seq,ack,payload) +end + +function tcpPacket.toCompact(pkt) + local flag = compileFlags(pkt.synF,pkt.ackF,pkt.finF) + local header = {pkt.srcPort,flag,pkt.seq,pkt.ack} + local pktStr = {header,pkt.payload} + return pktStr +end + +function tcpPacket.fromCompact(pktStr) + local header , payload = table.unpack(pktStr) + local srcPort,flag,seq,ack = table.unpack(header) + return tcpPacket.tcpPkt(getFlags(flag),srcPort,seq,ack,payload) +end + +return tcpPacket \ No newline at end of file diff --git a/lib/tcpSocket.lua b/lib/tcpSocket.lua new file mode 100644 index 0000000..ea74d00 --- /dev/null +++ b/lib/tcpSocket.lua @@ -0,0 +1,59 @@ +require("logUtils") +local dq = require("dataQue") +local serial = require("serialization") +local logID = _LogUtil.newLogger("TCP_Socket",_LogLevel.error,_LogLevel.trace,_LogLevel.noLog) +local SocketTypes = { + listener = 1, + synSent = 2, + synReci = 3, + active = 4, + closing = 5 +} + +local tcpSocket = { +} + +local function PacketoOnListener(skt,pkt) + +end + +local function PacketoOnActive(skt,pkt) + +end + +local function PacketoOnSyn(skt,pkt) + +end + +local function PacketoOnClosing(skt,pkt) + +end + + +local function newSocket(socketType,newclientCB,msgCB,localPort,remoteAddr,remotePort) + local skt = { + socketType = socketType, + localPort = localPort, + remoteAddr = remoteAddr, + remotePort = remotePort, + msgCB = msgCB, + pktNum = 0, + ackNum = 0, + LastTx = 0, + LastRx = 0, + dataQue = dq.DataQue(_NetDefs.rdtConst.bufferSize), + pktRxBuf = {}, + } +end + +function tcpSocket.newListener(newclientcb,msgCB,localPort) + return newSocket(SocketTypes.listener,newclientcb,localPort,nil,nil) +end + +function tcpSocket.newPacket(socket,packet) + if socket.socketType == SocketTypes.listener then + + end + +end +return tcpSocket \ No newline at end of file