Make lingerMillis and capacity dynamic for BatchingProcessor#268
Make lingerMillis and capacity dynamic for BatchingProcessor#268ocadaruma merged 9 commits intoline:masterfrom
Conversation
ocadaruma
left a comment
There was a problem hiding this comment.
Thank you for the patch! and sorry for the late review.
The change looks very useful.
I left some feedbacks. PTAL
| protected BatchingProcessor(long lingerMillis, int capacity) { | ||
| this.lingerMillis = lingerMillis; | ||
| this.capacity = capacity; | ||
| protected BatchingProcessor(Supplier<Long> lingerMillisSupplier, Supplier<Integer> capacitySupplier) { |
There was a problem hiding this comment.
I suggest keep current constructor to avoid breaking change, by adding overload like BatchingProcessor(lingerMillis, capacity) { this(() -> lingerMillis, () -> capacity) ... }
|
|
||
| private void scheduleFlush() { | ||
| executor.schedule(this::periodicallyFlushTask, lingerMillis, TimeUnit.MILLISECONDS); | ||
| executor.schedule(this::periodicallyFlushTask, this.lingerMillisSupplier.get(), TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Just getting new value and use it might be unsafe when supplier returns invalid value like negative value.
I suggest add a validation, and when it fails, logs it and keep using current value. WDYT? (Yeah I know, even the current code also doesn't have validation though...)
for capacity as well.
There was a problem hiding this comment.
EDIT:
I thought adding validations for extreme large numbers would be useful, but actually it could be intended in some cases. So let me add non-negative validations and error handling when get() throws exceptions
ocadaruma
left a comment
There was a problem hiding this comment.
Just left minor feedbacks. overall LGTM!
| executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); | ||
| } | ||
|
|
||
| private <V> Supplier<V> validatedAndLKGWrappedSupplier( |
There was a problem hiding this comment.
👍 yes and updated.
| try { | ||
| value = delegate.get(); | ||
| } catch (Exception e) { | ||
| if (lkg != null && lkg.get() != null) { |
There was a problem hiding this comment.
[nits] IMO null check of lkg is redundant, because it's obvious that callers don't pass null
There was a problem hiding this comment.
Yeah, i removed it from argument and create it inside this method. since there is no need for caller to pass it in.
Currently, BatchingProcessor takes
lingerMillisandcapacitythrough its constructor, and they can't be changed during runtime.To put this into production, it would be nice for them to be configurable during runtime, so that user can tune them like changing ProcessorProperty.
I took an attempt to make them into ProcessorProperty, but couldn't make it. So this PR is to use
Supplierfor them, so that user can inject the configuration by themselves.