diff --git a/lib/producer.js b/lib/producer.js index ef97481..82e22bf 100644 --- a/lib/producer.js +++ b/lib/producer.js @@ -22,7 +22,7 @@ class Producer{ sendWithKey(topic, msg, key, cb) { const msgId = this._addMsgToPending(topic, cb); - this.producer.send(msgId, topic, msg, key, function(err){ + this.producer.sendWithKey(msgId, topic, msg, key, function(err){ if(err){ delete this.pendingResult[msgId]; if(cb){ @@ -34,7 +34,7 @@ class Producer{ sendWithKeyAndPartition(topic, msg, key, partition, cb) { const msgId = this._addMsgToPending(topic, cb); - this.producer.send(msgId, topic, msg, key, partition, function(err){ + this.producer.sendWithKeyAndPartition(msgId, topic, msg, key, partition, function(err){ if(err){ delete this.pendingResult[msgId]; if(cb){ diff --git a/lib/protocol.js b/lib/protocol.js index 7c09100..d22c838 100644 --- a/lib/protocol.js +++ b/lib/protocol.js @@ -5,14 +5,14 @@ function parseData(parsingContext) { if (parsingContext.currentMessage.remainingSize > 0) { newMessage = _parseNextMessage(parsingContext); if (newMessage) { - _onNewMessage(newMessage, parsingContext) + _onNewMessage(newMessage, parsingContext); } } while (parsingContext.offset < parsingContext.data.length) { _readMsgSize(parsingContext); newMessage = _parseNextMessage(parsingContext); if (newMessage) { - _onNewMessage(newMessage, parsingContext) + _onNewMessage(newMessage, parsingContext); } } }