From 7ec75aeba4de2900a34ab3f235224306334c4a03 Mon Sep 17 00:00:00 2001 From: Phu Nguyen Date: Mon, 25 Jun 2018 09:30:09 -0500 Subject: [PATCH 1/3] CRUXX-580 improve channel publisher by always copying the depth --- .../recipes/channel/ChannelLuaReceiver.java | 186 ++++++++++++------ .../rdbi/recipes/channel/ChannelReceiver.java | 1 + .../channel/ChannelLuaReceiverTest.java | 36 +++- 3 files changed, 158 insertions(+), 65 deletions(-) diff --git a/rdbi-recipes/src/main/java/com/lithium/dbi/rdbi/recipes/channel/ChannelLuaReceiver.java b/rdbi-recipes/src/main/java/com/lithium/dbi/rdbi/recipes/channel/ChannelLuaReceiver.java index 6040820..444d853 100644 --- a/rdbi-recipes/src/main/java/com/lithium/dbi/rdbi/recipes/channel/ChannelLuaReceiver.java +++ b/rdbi-recipes/src/main/java/com/lithium/dbi/rdbi/recipes/channel/ChannelLuaReceiver.java @@ -19,69 +19,90 @@ public class ChannelLuaReceiver implements ChannelReceiver { public interface DAO { @Query( - "local current_count = redis.call(\"GET\", $countKey$)\n" + - "if not current_count then\n" + - " current_count = 0\n" + - "else\n" + - " current_count = tonumber(current_count)\n" + - "end\n" + - "if current_count <= tonumber($lastSeenCount$) then\n" + - " return {tostring(0)}\n" + - "end\n" + - "local results = redis.call(\"LRANGE\", $listKey$, 0, current_count - tonumber($lastSeenCount$) - 1)\n" + - "results[#results + 1] = tostring(current_count)\n" + - "return results" + "local current_count = redis.call(\"GET\", $countKey$)\n" + + "if not current_count then\n" + + " current_count = 0\n" + + "else\n" + + " current_count = tonumber(current_count)\n" + + "end\n" + + "if current_count <= tonumber($lastSeenCount$) then\n" + + " return {tostring(0)}\n" + + "end\n" + + "local results = redis.call(\"LRANGE\", $listKey$, 0, current_count - tonumber($lastSeenCount$) - 1)\n" + + "results[#results + 1] = tostring(current_count)\n" + + "return results" ) @Mapper(GetResultMapper.class) GetResult get( - @BindKey("countKey") String countKey, - @BindKey("listKey") String listKey, - @BindArg("lastSeenCount") Long lastSeenCount + @BindKey("countKey") String countKey, + @BindKey("listKey") String listKey, + @BindArg("lastSeenCount") Long lastSeenCount ); @Query( "local current_count = redis.call(\"GET\", $countKey$)\n" + "if not current_count then\n" + - " current_count = 0\n" + + " return {tostring(0)}\n" + "else\n" + - " current_count = tonumber(current_count)\n" + + " current_count = tonumber(current_count)\n" + "end\n" + "if current_count <= tonumber($lastSeenCount$) then\n" + - " redis.call(\"SET\", $copyDepthToKey$, current_count)\n" + - " return {tostring(0)}\n" + + " return {tostring(current_count)}\n" + "end\n" + "local results = redis.call(\"LRANGE\", $listKey$, 0, current_count - tonumber($lastSeenCount$) - 1)\n" + "results[#results + 1] = tostring(current_count)\n" + - "redis.call(\"SET\", $copyDepthToKey$, current_count)\n" + "return results" ) + @Mapper(GetWithDepthResultMapper.class) + GetResult getAndReturnCurrentDepth( + @BindKey("countKey") String countKey, + @BindKey("listKey") String listKey, + @BindArg("lastSeenCount") Long lastSeenCount + ); + + @Query( + "local current_count = redis.call(\"GET\", $countKey$)\n" + + "if not current_count then\n" + + " current_count = 0\n" + + "else\n" + + " current_count = tonumber(current_count)\n" + + "end\n" + + "if current_count <= tonumber($lastSeenCount$) then\n" + + " redis.call(\"SET\", $copyDepthToKey$, current_count)\n" + + " return {tostring(0)}\n" + + "end\n" + + "local results = redis.call(\"LRANGE\", $listKey$, 0, current_count - tonumber($lastSeenCount$) - 1)\n" + + "results[#results + 1] = tostring(current_count)\n" + + "redis.call(\"SET\", $copyDepthToKey$, current_count)\n" + + "return results" + ) @Mapper(GetResultMapper.class) GetResult get( - @BindKey("countKey") String countKey, - @BindKey("listKey") String listKey, - @BindArg("lastSeenCount") Long lastSeenCount, - @BindArg("copyDepthToKey") String copyDepthToKey + @BindKey("countKey") String countKey, + @BindKey("listKey") String listKey, + @BindArg("lastSeenCount") Long lastSeenCount, + @BindArg("copyDepthToKey") String copyDepthToKey ); @Query( - "local number_of_channels = ARGV[1]\n" + - "local bulk_result = {}\n" + - "for i = 1, number_of_channels do\n" + - " local current_count = redis.call(\"GET\", tostring(KEYS[i*2]))\n" + - " if not current_count then\n" + - " current_count = 0\n" + - " else\n" + - " current_count = tonumber(current_count)\n" + - " end\n" + - " if current_count <= tonumber(ARGV[i+1]) then\n" + - " bulk_result[i] = {}\n" + - " else\n" + - " local results = redis.call(\"LRANGE\", KEYS[i*2-1], 0, current_count - tonumber(ARGV[i+1]) + 1)\n" + - " results[#results + 1] = tostring(current_count)\n" + - " bulk_result[i] = results\n" + - " end\n" + - "end\n" + - "return bulk_result;" + "local number_of_channels = ARGV[1]\n" + + "local bulk_result = {}\n" + + "for i = 1, number_of_channels do\n" + + " local current_count = redis.call(\"GET\", tostring(KEYS[i*2]))\n" + + " if not current_count then\n" + + " current_count = 0\n" + + " else\n" + + " current_count = tonumber(current_count)\n" + + " end\n" + + " if current_count <= tonumber(ARGV[i+1]) then\n" + + " bulk_result[i] = {}\n" + + " else\n" + + " local results = redis.call(\"LRANGE\", KEYS[i*2-1], 0, current_count - tonumber(ARGV[i+1]) + 1)\n" + + " results[#results + 1] = tostring(current_count)\n" + + " bulk_result[i] = results\n" + + " end\n" + + "end\n" + + "return bulk_result;" ) @Mapper(GetBulkResultMapper.class) GetBulkResult getMulti( @@ -90,31 +111,31 @@ GetBulkResult getMulti( ); @Query( - "local current_count = redis.call(\"GET\", $countKey$)\n" + - "if not current_count then\n" + - " current_count = 0\n" + - "else\n" + - " current_count = tonumber(current_count)\n" + - "end\n" + - "return current_count" + "local current_count = redis.call(\"GET\", $countKey$)\n" + + "if not current_count then\n" + + " current_count = 0\n" + + "else\n" + + " current_count = tonumber(current_count)\n" + + "end\n" + + "return current_count" ) Long getDepth(@BindKey("countKey") String countKey); @Query( - "local current_count = redis.call(\"GET\", $countKey$)\n" + - "if not current_count then\n" + - " current_count = 0\n" + - "else\n" + - " current_count = tonumber(current_count)\n" + - "end\n" + - "redis.call(\"SET\", $copyDepthToKey$, current_count)\n" + - "return current_count" + "local current_count = redis.call(\"GET\", $countKey$)\n" + + "if not current_count then\n" + + " current_count = 0\n" + + "else\n" + + " current_count = tonumber(current_count)\n" + + "end\n" + + "redis.call(\"SET\", $copyDepthToKey$, current_count)\n" + + "return current_count" ) Long getDepth(@BindKey("countKey") String countKey, @BindKey("copyDepthToKey") String copyDepthToKey); } - public static class GetResultMapper implements ResultMapper> { + public static class GetResultMapper implements ResultMapper> { @Override public GetResult map(List result) { @@ -127,7 +148,24 @@ public GetResult map(List result) { } } - public static class GetBulkResultMapper implements ResultMapper>> { + public static class GetWithDepthResultMapper implements ResultMapper> { + + @Override + public GetResult map(List result) { + + if (result.size() == 0) { + throw new IllegalStateException("unexpected 0 length return from redis lua script"); + } + + if (result.size() == 1) { + return new GetResult(null, Long.valueOf(result.get(0))); + } + + return new GetResult(Lists.reverse(result.subList(0, result.size() - 1)), Long.valueOf(result.get(result.size() - 1))); + } + } + + public static class GetBulkResultMapper implements ResultMapper>> { @Override public GetBulkResult map(List> result) { @@ -136,17 +174,17 @@ public GetBulkResult map(List> result) { return null; } - List> listsResult = new ArrayList<>(); + List> listsResult = new ArrayList<>(); List listsSizes = new ArrayList<>(); - for (List each: result) { + for (List each : result) { if (each.size() == 0) { listsResult.add(each); listsSizes.add(0L); continue; } listsResult.add(Lists.reverse(each.subList(0, each.size() - 1))); - listsSizes.add(Long.valueOf(each.get(each.size()-1))); + listsSizes.add(Long.valueOf(each.get(each.size() - 1))); } return new GetBulkResult(listsResult, listsSizes); @@ -162,8 +200,32 @@ public GetResult get(String channel, Long lastSeenId) { return get(channel, lastSeenId, null); } + /** + * Gets new data from the channel and returns the current channel depth. + * This is unlike the get method, that returns the input channel depth if no data was found. + * + * This is useful because if the channel was reset, the client will see their id > the channel's id and should reset. + * This happens on redis clearing etc. + * + * @param channel the channel's name + * @param lastSeenId the last seen id by the client + * @return GetResult that contains the channel's latest depth + */ + @Override + public GetResult getAndReturnCurrentCount(String channel, Long lastSeenId) { + + try (Handle handle = rdbi.open()) { + DAO dao = handle.attach(DAO.class); + + return dao.getAndReturnCurrentDepth(ChannelPublisher.getChannelDepthKey(channel), + ChannelPublisher.getChannelQueueKey(channel), + lastSeenId); + } + } + @Override public GetResult get(String channel, Long lastSeenId, String copyDepthToKey) { + try (Handle handle = rdbi.open()) { DAO dao = handle.attach(DAO.class); if (copyDepthToKey == null) { diff --git a/rdbi-recipes/src/main/java/com/lithium/dbi/rdbi/recipes/channel/ChannelReceiver.java b/rdbi-recipes/src/main/java/com/lithium/dbi/rdbi/recipes/channel/ChannelReceiver.java index 1b67af1..aeba5c2 100644 --- a/rdbi-recipes/src/main/java/com/lithium/dbi/rdbi/recipes/channel/ChannelReceiver.java +++ b/rdbi-recipes/src/main/java/com/lithium/dbi/rdbi/recipes/channel/ChannelReceiver.java @@ -4,6 +4,7 @@ public interface ChannelReceiver { GetResult get(String channel, Long lastSeenId); + GetResult getAndReturnCurrentCount(String channel, Long lastSeenId); GetResult get(String channel, Long lastSeenId, String copyDepthToKey); GetBulkResult getMulti(List channels, List lastSeenIds); Long getDepth(String channel); diff --git a/rdbi-recipes/src/test/java/com/lithium/dbi/rdbi/recipes/channel/ChannelLuaReceiverTest.java b/rdbi-recipes/src/test/java/com/lithium/dbi/rdbi/recipes/channel/ChannelLuaReceiverTest.java index 81925ac..68e8ea1 100644 --- a/rdbi-recipes/src/test/java/com/lithium/dbi/rdbi/recipes/channel/ChannelLuaReceiverTest.java +++ b/rdbi-recipes/src/test/java/com/lithium/dbi/rdbi/recipes/channel/ChannelLuaReceiverTest.java @@ -2,6 +2,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import com.lithium.dbi.rdbi.RDBI; import org.junit.Ignore; import org.testng.annotations.Test; @@ -18,9 +19,7 @@ import java.util.concurrent.atomic.AtomicLong; import static java.util.stream.Collectors.toList; -import static org.testng.AssertJUnit.assertEquals; -import static org.testng.AssertJUnit.assertTrue; -import static org.testng.AssertJUnit.fail; +import static org.testng.AssertJUnit.*; public class ChannelLuaReceiverTest { @@ -161,6 +160,37 @@ public void testEmptyChannelPublishAndReceive() throws Exception { } + @Test + public void testGetAndReturnCurrentCount() { + + final String channel = "channel1"; + + RDBI rdbi = new RDBI(new JedisPool("localhost")); + final ChannelPublisher channelPublisher = new ChannelPublisher(rdbi); + channelPublisher.resetChannel(channel); + + try { + final ChannelReceiver channelReceiver = new ChannelLuaReceiver(rdbi); + GetResult result = channelReceiver.getAndReturnCurrentCount(channel, 0L); + + assertEquals((Long) 0L, result.getDepth()); + + channelPublisher.publish(channel, "1"); + GetResult result2 = channelReceiver.getAndReturnCurrentCount(channel, 0L); + + assertEquals((Long) 1L, result2.getDepth()); + assertEquals(1, result2.getMessages().size()); + assertEquals("1", result2.getMessages().get(0)); + + GetResult result3 = channelReceiver.getAndReturnCurrentCount(channel, 1000L); + assertEquals((Long) 1L, result3.getDepth()); + assertNull(result3.getMessages()); + + } finally { + channelPublisher.resetChannel(channel); + } + } + @Test public void testMultiThreadedMultiChannelPublishAndReceive() throws InterruptedException { final Set channelSet = ImmutableSet.of("channel1", "channel2", "channel3", "channel4", "channel5"); From a3f6ac6d55025d799560ee58487e1e14fc6ee384 Mon Sep 17 00:00:00 2001 From: Phu Nguyen Date: Mon, 25 Jun 2018 12:41:19 -0500 Subject: [PATCH 2/3] clean up --- .../recipes/channel/ChannelLuaReceiver.java | 108 +++++++++--------- .../channel/ChannelLuaReceiverTest.java | 58 ++++------ 2 files changed, 77 insertions(+), 89 deletions(-) diff --git a/rdbi-recipes/src/main/java/com/lithium/dbi/rdbi/recipes/channel/ChannelLuaReceiver.java b/rdbi-recipes/src/main/java/com/lithium/dbi/rdbi/recipes/channel/ChannelLuaReceiver.java index 444d853..9c28df5 100644 --- a/rdbi-recipes/src/main/java/com/lithium/dbi/rdbi/recipes/channel/ChannelLuaReceiver.java +++ b/rdbi-recipes/src/main/java/com/lithium/dbi/rdbi/recipes/channel/ChannelLuaReceiver.java @@ -20,17 +20,17 @@ public class ChannelLuaReceiver implements ChannelReceiver { public interface DAO { @Query( "local current_count = redis.call(\"GET\", $countKey$)\n" + - "if not current_count then\n" + - " current_count = 0\n" + - "else\n" + - " current_count = tonumber(current_count)\n" + - "end\n" + - "if current_count <= tonumber($lastSeenCount$) then\n" + - " return {tostring(0)}\n" + - "end\n" + - "local results = redis.call(\"LRANGE\", $listKey$, 0, current_count - tonumber($lastSeenCount$) - 1)\n" + - "results[#results + 1] = tostring(current_count)\n" + - "return results" + "if not current_count then\n" + + " current_count = 0\n" + + "else\n" + + " current_count = tonumber(current_count)\n" + + "end\n" + + "if current_count <= tonumber($lastSeenCount$) then\n" + + " return {tostring(0)}\n" + + "end\n" + + "local results = redis.call(\"LRANGE\", $listKey$, 0, current_count - tonumber($lastSeenCount$) - 1)\n" + + "results[#results + 1] = tostring(current_count)\n" + + "return results" ) @Mapper(GetResultMapper.class) GetResult get( @@ -62,19 +62,19 @@ GetResult getAndReturnCurrentDepth( @Query( "local current_count = redis.call(\"GET\", $countKey$)\n" + - "if not current_count then\n" + - " current_count = 0\n" + - "else\n" + - " current_count = tonumber(current_count)\n" + - "end\n" + - "if current_count <= tonumber($lastSeenCount$) then\n" + - " redis.call(\"SET\", $copyDepthToKey$, current_count)\n" + - " return {tostring(0)}\n" + - "end\n" + - "local results = redis.call(\"LRANGE\", $listKey$, 0, current_count - tonumber($lastSeenCount$) - 1)\n" + - "results[#results + 1] = tostring(current_count)\n" + - "redis.call(\"SET\", $copyDepthToKey$, current_count)\n" + - "return results" + "if not current_count then\n" + + " current_count = 0\n" + + "else\n" + + " current_count = tonumber(current_count)\n" + + "end\n" + + "if current_count <= tonumber($lastSeenCount$) then\n" + + " redis.call(\"SET\", $copyDepthToKey$, current_count)\n" + + " return {tostring(0)}\n" + + "end\n" + + "local results = redis.call(\"LRANGE\", $listKey$, 0, current_count - tonumber($lastSeenCount$) - 1)\n" + + "results[#results + 1] = tostring(current_count)\n" + + "redis.call(\"SET\", $copyDepthToKey$, current_count)\n" + + "return results" ) @Mapper(GetResultMapper.class) GetResult get( @@ -86,23 +86,23 @@ GetResult get( @Query( "local number_of_channels = ARGV[1]\n" + - "local bulk_result = {}\n" + - "for i = 1, number_of_channels do\n" + - " local current_count = redis.call(\"GET\", tostring(KEYS[i*2]))\n" + - " if not current_count then\n" + - " current_count = 0\n" + - " else\n" + - " current_count = tonumber(current_count)\n" + - " end\n" + - " if current_count <= tonumber(ARGV[i+1]) then\n" + - " bulk_result[i] = {}\n" + - " else\n" + - " local results = redis.call(\"LRANGE\", KEYS[i*2-1], 0, current_count - tonumber(ARGV[i+1]) + 1)\n" + - " results[#results + 1] = tostring(current_count)\n" + - " bulk_result[i] = results\n" + - " end\n" + - "end\n" + - "return bulk_result;" + "local bulk_result = {}\n" + + "for i = 1, number_of_channels do\n" + + " local current_count = redis.call(\"GET\", tostring(KEYS[i*2]))\n" + + " if not current_count then\n" + + " current_count = 0\n" + + " else\n" + + " current_count = tonumber(current_count)\n" + + " end\n" + + " if current_count <= tonumber(ARGV[i+1]) then\n" + + " bulk_result[i] = {}\n" + + " else\n" + + " local results = redis.call(\"LRANGE\", KEYS[i*2-1], 0, current_count - tonumber(ARGV[i+1]) + 1)\n" + + " results[#results + 1] = tostring(current_count)\n" + + " bulk_result[i] = results\n" + + " end\n" + + "end\n" + + "return bulk_result;" ) @Mapper(GetBulkResultMapper.class) GetBulkResult getMulti( @@ -112,24 +112,24 @@ GetBulkResult getMulti( @Query( "local current_count = redis.call(\"GET\", $countKey$)\n" + - "if not current_count then\n" + - " current_count = 0\n" + - "else\n" + - " current_count = tonumber(current_count)\n" + - "end\n" + - "return current_count" + "if not current_count then\n" + + " current_count = 0\n" + + "else\n" + + " current_count = tonumber(current_count)\n" + + "end\n" + + "return current_count" ) Long getDepth(@BindKey("countKey") String countKey); @Query( "local current_count = redis.call(\"GET\", $countKey$)\n" + - "if not current_count then\n" + - " current_count = 0\n" + - "else\n" + - " current_count = tonumber(current_count)\n" + - "end\n" + - "redis.call(\"SET\", $copyDepthToKey$, current_count)\n" + - "return current_count" + "if not current_count then\n" + + " current_count = 0\n" + + "else\n" + + " current_count = tonumber(current_count)\n" + + "end\n" + + "redis.call(\"SET\", $copyDepthToKey$, current_count)\n" + + "return current_count" ) Long getDepth(@BindKey("countKey") String countKey, @BindKey("copyDepthToKey") String copyDepthToKey); diff --git a/rdbi-recipes/src/test/java/com/lithium/dbi/rdbi/recipes/channel/ChannelLuaReceiverTest.java b/rdbi-recipes/src/test/java/com/lithium/dbi/rdbi/recipes/channel/ChannelLuaReceiverTest.java index 68e8ea1..428656e 100644 --- a/rdbi-recipes/src/test/java/com/lithium/dbi/rdbi/recipes/channel/ChannelLuaReceiverTest.java +++ b/rdbi-recipes/src/test/java/com/lithium/dbi/rdbi/recipes/channel/ChannelLuaReceiverTest.java @@ -2,7 +2,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; import com.lithium.dbi.rdbi.RDBI; import org.junit.Ignore; import org.testng.annotations.Test; @@ -19,7 +18,11 @@ import java.util.concurrent.atomic.AtomicLong; import static java.util.stream.Collectors.toList; -import static org.testng.AssertJUnit.*; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + public class ChannelLuaReceiverTest { @@ -206,39 +209,8 @@ public void testMultiThreadedMultiChannelPublishAndReceive() throws InterruptedE Map uuidMap = new HashMap<>(); - Thread thread1 = new Thread(new Runnable() { - @Override - public void run() { - for (int i = 0; i < messageAmount; i++) { - String stringVal = "value" + UUID.randomUUID(); - uuidMap.put(stringVal, 0); - final List value = ImmutableList.of(stringVal); - channelPublisher.publish(channelSet, value); - - if (Thread.interrupted()) { - return; - } - } - thread1Finished.set(true); - } - }); - - Thread thread2 = new Thread(new Runnable() { - @Override - public void run() { - for (int i = 0; i < messageAmount; i++) { - String stringVal = "value" + UUID.randomUUID(); - uuidMap.put(stringVal, 0); - final List value = ImmutableList.of(stringVal); - channelPublisher.publish(channelSet, value); - - if (Thread.interrupted()) { - return; - } - } - thread2Finished.set(true); - } - }); + Thread thread1 = randomPublish(channelSet, messageAmount, channelPublisher, thread1Finished, uuidMap); + Thread thread2 = randomPublish(channelSet, messageAmount, channelPublisher, thread2Finished, uuidMap); thread1.start(); thread2.start(); @@ -273,6 +245,22 @@ public void run() { } + private Thread randomPublish(Set channelSet, int messageAmount, ChannelPublisher channelPublisher, AtomicBoolean thread1Finished, Map uuidMap) { + return new Thread(() -> { + for (int i = 0; i < messageAmount; i++) { + String stringVal = "value" + UUID.randomUUID(); + uuidMap.put(stringVal, 0); + final List value = ImmutableList.of(stringVal); + channelPublisher.publish(channelSet, value); + + if (Thread.interrupted()) { + return; + } + } + thread1Finished.set(true); + }); + } + //ignored because this is a test to compare consecutive single channel gets vs. batch channel gets //results on a local redis instance //channels batch(ms) single(ms) From 5bcd820e16e0ed2fe837a83fcfe60435da74a306 Mon Sep 17 00:00:00 2001 From: Phu Nguyen Date: Mon, 25 Jun 2018 12:43:57 -0500 Subject: [PATCH 3/3] more cleanup --- .../recipes/channel/ChannelLuaReceiver.java | 142 +++++++++--------- 1 file changed, 71 insertions(+), 71 deletions(-) diff --git a/rdbi-recipes/src/main/java/com/lithium/dbi/rdbi/recipes/channel/ChannelLuaReceiver.java b/rdbi-recipes/src/main/java/com/lithium/dbi/rdbi/recipes/channel/ChannelLuaReceiver.java index 9c28df5..9b41b2e 100644 --- a/rdbi-recipes/src/main/java/com/lithium/dbi/rdbi/recipes/channel/ChannelLuaReceiver.java +++ b/rdbi-recipes/src/main/java/com/lithium/dbi/rdbi/recipes/channel/ChannelLuaReceiver.java @@ -19,24 +19,24 @@ public class ChannelLuaReceiver implements ChannelReceiver { public interface DAO { @Query( - "local current_count = redis.call(\"GET\", $countKey$)\n" + - "if not current_count then\n" + - " current_count = 0\n" + - "else\n" + - " current_count = tonumber(current_count)\n" + - "end\n" + - "if current_count <= tonumber($lastSeenCount$) then\n" + - " return {tostring(0)}\n" + - "end\n" + - "local results = redis.call(\"LRANGE\", $listKey$, 0, current_count - tonumber($lastSeenCount$) - 1)\n" + - "results[#results + 1] = tostring(current_count)\n" + - "return results" + "local current_count = redis.call(\"GET\", $countKey$)\n" + + "if not current_count then\n" + + " current_count = 0\n" + + "else\n" + + " current_count = tonumber(current_count)\n" + + "end\n" + + "if current_count <= tonumber($lastSeenCount$) then\n" + + " return {tostring(0)}\n" + + "end\n" + + "local results = redis.call(\"LRANGE\", $listKey$, 0, current_count - tonumber($lastSeenCount$) - 1)\n" + + "results[#results + 1] = tostring(current_count)\n" + + "return results" ) @Mapper(GetResultMapper.class) GetResult get( - @BindKey("countKey") String countKey, - @BindKey("listKey") String listKey, - @BindArg("lastSeenCount") Long lastSeenCount + @BindKey("countKey") String countKey, + @BindKey("listKey") String listKey, + @BindArg("lastSeenCount") Long lastSeenCount ); @Query( @@ -55,81 +55,81 @@ GetResult get( ) @Mapper(GetWithDepthResultMapper.class) GetResult getAndReturnCurrentDepth( - @BindKey("countKey") String countKey, - @BindKey("listKey") String listKey, - @BindArg("lastSeenCount") Long lastSeenCount + @BindKey("countKey") String countKey, + @BindKey("listKey") String listKey, + @BindArg("lastSeenCount") Long lastSeenCount ); @Query( - "local current_count = redis.call(\"GET\", $countKey$)\n" + - "if not current_count then\n" + - " current_count = 0\n" + - "else\n" + - " current_count = tonumber(current_count)\n" + - "end\n" + - "if current_count <= tonumber($lastSeenCount$) then\n" + - " redis.call(\"SET\", $copyDepthToKey$, current_count)\n" + - " return {tostring(0)}\n" + - "end\n" + - "local results = redis.call(\"LRANGE\", $listKey$, 0, current_count - tonumber($lastSeenCount$) - 1)\n" + - "results[#results + 1] = tostring(current_count)\n" + - "redis.call(\"SET\", $copyDepthToKey$, current_count)\n" + - "return results" + "local current_count = redis.call(\"GET\", $countKey$)\n" + + "if not current_count then\n" + + " current_count = 0\n" + + "else\n" + + " current_count = tonumber(current_count)\n" + + "end\n" + + "if current_count <= tonumber($lastSeenCount$) then\n" + + " redis.call(\"SET\", $copyDepthToKey$, current_count)\n" + + " return {tostring(0)}\n" + + "end\n" + + "local results = redis.call(\"LRANGE\", $listKey$, 0, current_count - tonumber($lastSeenCount$) - 1)\n" + + "results[#results + 1] = tostring(current_count)\n" + + "redis.call(\"SET\", $copyDepthToKey$, current_count)\n" + + "return results" ) @Mapper(GetResultMapper.class) GetResult get( - @BindKey("countKey") String countKey, - @BindKey("listKey") String listKey, - @BindArg("lastSeenCount") Long lastSeenCount, - @BindArg("copyDepthToKey") String copyDepthToKey + @BindKey("countKey") String countKey, + @BindKey("listKey") String listKey, + @BindArg("lastSeenCount") Long lastSeenCount, + @BindArg("copyDepthToKey") String copyDepthToKey ); @Query( - "local number_of_channels = ARGV[1]\n" + - "local bulk_result = {}\n" + - "for i = 1, number_of_channels do\n" + - " local current_count = redis.call(\"GET\", tostring(KEYS[i*2]))\n" + - " if not current_count then\n" + - " current_count = 0\n" + - " else\n" + - " current_count = tonumber(current_count)\n" + - " end\n" + - " if current_count <= tonumber(ARGV[i+1]) then\n" + - " bulk_result[i] = {}\n" + - " else\n" + - " local results = redis.call(\"LRANGE\", KEYS[i*2-1], 0, current_count - tonumber(ARGV[i+1]) + 1)\n" + - " results[#results + 1] = tostring(current_count)\n" + - " bulk_result[i] = results\n" + - " end\n" + - "end\n" + - "return bulk_result;" + "local number_of_channels = ARGV[1]\n" + + "local bulk_result = {}\n" + + "for i = 1, number_of_channels do\n" + + " local current_count = redis.call(\"GET\", tostring(KEYS[i*2]))\n" + + " if not current_count then\n" + + " current_count = 0\n" + + " else\n" + + " current_count = tonumber(current_count)\n" + + " end\n" + + " if current_count <= tonumber(ARGV[i+1]) then\n" + + " bulk_result[i] = {}\n" + + " else\n" + + " local results = redis.call(\"LRANGE\", KEYS[i*2-1], 0, current_count - tonumber(ARGV[i+1]) + 1)\n" + + " results[#results + 1] = tostring(current_count)\n" + + " bulk_result[i] = results\n" + + " end\n" + + "end\n" + + "return bulk_result;" ) @Mapper(GetBulkResultMapper.class) GetBulkResult getMulti( - @BindKey("allKeys") List inputKeys, - @BindArg("allArgs") List inputArgs + @BindKey("allKeys") List inputKeys, + @BindArg("allArgs") List inputArgs ); @Query( - "local current_count = redis.call(\"GET\", $countKey$)\n" + - "if not current_count then\n" + - " current_count = 0\n" + - "else\n" + - " current_count = tonumber(current_count)\n" + - "end\n" + - "return current_count" + "local current_count = redis.call(\"GET\", $countKey$)\n" + + "if not current_count then\n" + + " current_count = 0\n" + + "else\n" + + " current_count = tonumber(current_count)\n" + + "end\n" + + "return current_count" ) Long getDepth(@BindKey("countKey") String countKey); @Query( - "local current_count = redis.call(\"GET\", $countKey$)\n" + - "if not current_count then\n" + - " current_count = 0\n" + - "else\n" + - " current_count = tonumber(current_count)\n" + - "end\n" + - "redis.call(\"SET\", $copyDepthToKey$, current_count)\n" + - "return current_count" + "local current_count = redis.call(\"GET\", $countKey$)\n" + + "if not current_count then\n" + + " current_count = 0\n" + + "else\n" + + " current_count = tonumber(current_count)\n" + + "end\n" + + "redis.call(\"SET\", $copyDepthToKey$, current_count)\n" + + "return current_count" ) Long getDepth(@BindKey("countKey") String countKey, @BindKey("copyDepthToKey") String copyDepthToKey);