@@ -826,7 +826,7 @@ private void onAcks(Event.Acks acks) {
826826 log .atInfo ()
827827 .addKeyValue ("count_acks" , acks .acked ().size ())
828828 .addKeyValue ("wip_tasks" , wip ::size )
829- .log ("Received Results " );
829+ .log ("Received Acks " );
830830
831831 Collection <String > removed = batch .clear ();
832832 if (!acks .acked ().containsAll (removed )) {
@@ -865,6 +865,10 @@ private void onBackoff(Event.Backoff backoff) {
865865 }
866866
867867 private void onOom (Event .Oom oom ) {
868+ log .atInfo ()
869+ .addKeyValue ("wip_tasks" , wip ::size )
870+ .log ("Server is out of memory" );
871+
868872 setState (new Oom (oom .delaySeconds ()));
869873 }
870874
@@ -915,22 +919,26 @@ public String toString() {
915919 * after which it will force stream termination by imitating server shutdown.
916920 */
917921 private final class Oom extends BaseState {
918- private final long delaySeconds ;
922+ private final long shutdownAfterSeconds ;
919923 private ScheduledFuture <?> shutdown ;
920924
921- private Oom (long delaySeconds ) {
925+ private Oom (long shutdownAfterSeconds ) {
922926 super ("OOM" );
923- this .delaySeconds = delaySeconds ;
927+ this .shutdownAfterSeconds = shutdownAfterSeconds ;
928+
929+ log .atDebug ()
930+ .addKeyValue ("grace_period" , shutdownAfterSeconds )
931+ .log ("Server is out of memory" );
924932 }
925933
926934 @ Override
927935 public void onEnter (State prev ) {
928- shutdown = scheduledService .schedule (this ::initiateShutdown , delaySeconds , TimeUnit .SECONDS );
936+ shutdown = scheduledService .schedule (this ::initiateShutdown , shutdownAfterSeconds , TimeUnit .SECONDS );
929937 }
930938
931939 /** Imitate server shutdown sequence. */
932940 private void initiateShutdown () {
933- log .info ("No update from the server after {}s, context will be forcibly restarted" , delaySeconds );
941+ log .info ("No update from the server after {}s, context will be forcibly restarted" , shutdownAfterSeconds );
934942
935943 // We cannot route event handling via normal BatchContext#onEvent, because
936944 // it delegates to the current state, which is Oom. If Oom#onEvent were to
@@ -985,7 +993,7 @@ private ServerShuttingDown(State prev) {
985993 log .atDebug ()
986994 .addKeyValue ("prev_state" , prev )
987995 .addKeyValue ("can_prepare_next" , canPrepareNext )
988- .log ("Server shutting down" );
996+ .log ("Server is shutting down" );
989997 }
990998
991999 @ Override
@@ -1051,9 +1059,7 @@ public void onEvent(Event event) {
10511059 assert retries <= maxRetries : "maxRetries exceeded" ;
10521060
10531061 if (event == Event .STARTED ) {
1054- log .atInfo ()
1055- .addArgument (retries )
1056- .log ("Reconnected after {} retries" );
1062+ log .info ("Reconnected after {} retries" , retries );
10571063
10581064 setState (ACTIVE );
10591065 } else if (event instanceof Event .StreamHangup ) {
0 commit comments