Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,17 @@ public class OkHttpServices implements RESTServices {
* in several places and is slightly different in each place. It's also not possible to implement this logic in an
* OkHttp interceptor as the logic needs access to details that are not available to an interceptor.
*/
private final Random randRetry = new Random();
private int maxDelay = DEFAULT_MAX_DELAY;
private int minRetry = DEFAULT_MIN_RETRY;
private final Set<Integer> retryStatus = new HashSet<>();
private final Random randomForRetryDelay = new Random();

// The maximum amount of time to spend retrying requests.
private int maxDelayForRetries = DEFAULT_MAX_DELAY;

// The minimum number of retry attempts to make regardless of the max delay.
private int minRetryAttempts = DEFAULT_MIN_RETRY;

// The HTTP status codes that are retryable.
private static final Set<Integer> RETRYABLE_STATUS_CODES =
Set.of(STATUS_BAD_GATEWAY, STATUS_SERVICE_UNAVAILABLE, STATUS_GATEWAY_TIMEOUT);

private boolean checkFirstRequest = true;

Expand All @@ -129,10 +136,6 @@ public record ConnectionConfig(String host, int port, String basePath, String da
}

public OkHttpServices(ConnectionConfig connectionConfig) {
retryStatus.add(STATUS_BAD_GATEWAY);
retryStatus.add(STATUS_SERVICE_UNAVAILABLE);
retryStatus.add(STATUS_GATEWAY_TIMEOUT);

this.okHttpClient = connect(connectionConfig);
}

Expand Down Expand Up @@ -239,13 +242,13 @@ private void configureDelayAndRetry(Properties props) {
if (props.containsKey(MAX_DELAY_PROP)) {
int max = Utilities.parseInt(props.getProperty(MAX_DELAY_PROP));
if (max > 0) {
maxDelay = max * 1000;
maxDelayForRetries = max * 1000;
}
}
if (props.containsKey(MIN_RETRY_PROP)) {
int min = Utilities.parseInt(props.getProperty(MIN_RETRY_PROP));
if (min > 0) {
minRetry = min;
minRetryAttempts = min;
}
}
}
Expand Down Expand Up @@ -289,7 +292,7 @@ private int makeFirstRequest(int retry) {
private int makeFirstRequest(HttpUrl requestUri, String path, int retry) {
Response response = sendRequestOnce(setupRequest(requestUri, path, null).head());
int statusCode = response.code();
if (!retryStatus.contains(statusCode)) {
if (!RETRYABLE_STATUS_CODES.contains(statusCode)) {
closeResponse(response);
return 0;
}
Expand All @@ -298,7 +301,7 @@ private int makeFirstRequest(HttpUrl requestUri, String path, int retry) {
closeResponse(response);

int retryAfter = Utilities.parseInt(retryAfterRaw);
return Math.max(retryAfter, calculateDelay(randRetry, retry));
return Math.max(retryAfter, calculateDelay(retry));
}

private RequestParameters addTemporalProtectionParams(RequestParameters params, String uri, ProtectionLevel level,
Expand Down Expand Up @@ -518,7 +521,7 @@ private Response sendRequestWithRetry(
/*
* This loop is for retrying the request if the service is unavailable
*/
for (; retry < minRetry || (System.currentTimeMillis() - startTime) < maxDelay; retry++) {
for (; retry < minRetryAttempts || (System.currentTimeMillis() - startTime) < maxDelayForRetries; retry++) {
if (nextDelay > 0) {
try {
Thread.sleep(nextDelay);
Expand All @@ -537,7 +540,7 @@ private Response sendRequestWithRetry(
);
}
status = response.code();
if (!isRetryable || !retryStatus.contains(status)) {
if (!isRetryable || !RETRYABLE_STATUS_CODES.contains(status)) {
if (isFirstRequest()) setFirstRequest(false);
/*
* If we don't get a service unavailable status or if the request
Expand All @@ -562,13 +565,13 @@ private Response sendRequestWithRetry(
/*
* Calculate the delay before which we shouldn't retry
*/
nextDelay = Math.max(getRetryAfterTime(response), calculateDelay(randRetry, retry));
nextDelay = Math.max(getRetryAfterTime(response), calculateDelay(retry));
}
/*
* If the service is still unavailable after all the retries, we throw a
* FailedRetryException indicating that the service is unavailable.
*/
if (retryStatus.contains(status)) {
if (RETRYABLE_STATUS_CODES.contains(status)) {
checkFirstRequest();
closeResponse(response);
throw new FailedRetryException(
Expand Down Expand Up @@ -1198,7 +1201,7 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth
long startTime = System.currentTimeMillis();
int nextDelay = 0;
int retry = 0;
for (; retry < minRetry || (System.currentTimeMillis() - startTime) < maxDelay; retry++) {
for (; retry < minRetryAttempts || (System.currentTimeMillis() - startTime) < maxDelayForRetries; retry++) {
if (nextDelay > 0) {
try {
Thread.sleep(nextDelay);
Expand Down Expand Up @@ -1238,7 +1241,7 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth
status = response.code();

responseHeaders = response.headers();
if (transaction != null || !retryStatus.contains(status)) {
if (transaction != null || !RETRYABLE_STATUS_CODES.contains(status)) {
if (isFirstRequest()) setFirstRequest(false);

break;
Expand All @@ -1255,9 +1258,9 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth
}

int retryAfter = Utilities.parseInt(retryAfterRaw);
nextDelay = Math.max(retryAfter, calculateDelay(randRetry, retry));
nextDelay = Math.max(retryAfter, calculateDelay(retry));
}
if (retryStatus.contains(status)) {
if (RETRYABLE_STATUS_CODES.contains(status)) {
checkFirstRequest();
closeResponse(response);
throw new FailedRetryException(
Expand Down Expand Up @@ -1359,7 +1362,7 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth
long startTime = System.currentTimeMillis();
int nextDelay = 0;
int retry = 0;
for (; retry < minRetry || (System.currentTimeMillis() - startTime) < maxDelay; retry++) {
for (; retry < minRetryAttempts || (System.currentTimeMillis() - startTime) < maxDelayForRetries; retry++) {
if (nextDelay > 0) {
try {
Thread.sleep(nextDelay);
Expand All @@ -1382,7 +1385,7 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth
status = response.code();

responseHeaders = response.headers();
if (transaction != null || !retryStatus.contains(status)) {
if (transaction != null || !RETRYABLE_STATUS_CODES.contains(status)) {
if (isFirstRequest()) setFirstRequest(false);

break;
Expand All @@ -1397,9 +1400,9 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth
}

int retryAfter = Utilities.parseInt(retryAfterRaw);
nextDelay = Math.max(retryAfter, calculateDelay(randRetry, retry));
nextDelay = Math.max(retryAfter, calculateDelay(retry));
}
if (retryStatus.contains(status)) {
if (RETRYABLE_STATUS_CODES.contains(status)) {
checkFirstRequest();
closeResponse(response);
throw new FailedRetryException(
Expand Down Expand Up @@ -2092,7 +2095,7 @@ Response getResponse() {
long startTime = System.currentTimeMillis();
int nextDelay = 0;
int retry = 0;
for (; retry < minRetry || (System.currentTimeMillis() - startTime) < maxDelay; retry++) {
for (; retry < minRetryAttempts || (System.currentTimeMillis() - startTime) < maxDelayForRetries; retry++) {
if (nextDelay > 0) {
try {
Thread.sleep(nextDelay);
Expand Down Expand Up @@ -2121,7 +2124,7 @@ Response getResponse() {

status = response.code();

if (transaction != null || !retryStatus.contains(status)) {
if (transaction != null || !RETRYABLE_STATUS_CODES.contains(status)) {
if (isFirstRequest()) setFirstRequest(false);

break;
Expand All @@ -2132,9 +2135,9 @@ Response getResponse() {

closeResponse(response);

nextDelay = Math.max(retryAfter, calculateDelay(randRetry, retry));
nextDelay = Math.max(retryAfter, calculateDelay(retry));
}
if (retryStatus.contains(status)) {
if (RETRYABLE_STATUS_CODES.contains(status)) {
checkFirstRequest();
closeResponse(response);
throw new FailedRetryException(
Expand Down Expand Up @@ -2631,7 +2634,7 @@ private void putPostValueImpl(RequestLogger reqlog, String method,
long startTime = System.currentTimeMillis();
int nextDelay = 0;
int retry = 0;
for (; retry < minRetry || (System.currentTimeMillis() - startTime) < maxDelay; retry++) {
for (; retry < minRetryAttempts || (System.currentTimeMillis() - startTime) < maxDelayForRetries; retry++) {
if (nextDelay > 0) {
try {
Thread.sleep(nextDelay);
Expand Down Expand Up @@ -2693,7 +2696,7 @@ private void putPostValueImpl(RequestLogger reqlog, String method,

status = response.code();

if (!retryStatus.contains(status)) {
if (!RETRYABLE_STATUS_CODES.contains(status)) {
if (isFirstRequest()) setFirstRequest(false);
break;
}
Expand All @@ -2708,9 +2711,9 @@ private void putPostValueImpl(RequestLogger reqlog, String method,
}

int retryAfter = Utilities.parseInt(retryAfterRaw);
nextDelay = Math.max(retryAfter, calculateDelay(randRetry, retry));
nextDelay = Math.max(retryAfter, calculateDelay(retry));
}
if (retryStatus.contains(status)) {
if (RETRYABLE_STATUS_CODES.contains(status)) {
checkFirstRequest();
closeResponse(response);
throw new FailedRetryException(
Expand Down Expand Up @@ -3064,7 +3067,7 @@ public <R extends AbstractReadHandle, W extends AbstractWriteHandle> R putResour
long startTime = System.currentTimeMillis();
int nextDelay = 0;
int retry = 0;
for (; retry < minRetry || (System.currentTimeMillis() - startTime) < maxDelay; retry++) {
for (; retry < minRetryAttempts || (System.currentTimeMillis() - startTime) < maxDelayForRetries; retry++) {
if (nextDelay > 0) {
try {
Thread.sleep(nextDelay);
Expand All @@ -3083,7 +3086,7 @@ public <R extends AbstractReadHandle, W extends AbstractWriteHandle> R putResour
response = doPut(requestBldr, multiPart, hasStreamingPart);
status = response.code();

if (transaction != null || !retryStatus.contains(status)) {
if (transaction != null || !RETRYABLE_STATUS_CODES.contains(status)) {
if (isFirstRequest()) setFirstRequest(false);

break;
Expand All @@ -3098,9 +3101,9 @@ public <R extends AbstractReadHandle, W extends AbstractWriteHandle> R putResour
}

int retryAfter = Utilities.parseInt(retryAfterRaw);
nextDelay = Math.max(retryAfter, calculateDelay(randRetry, retry));
nextDelay = Math.max(retryAfter, calculateDelay(retry));
}
if (retryStatus.contains(status)) {
if (RETRYABLE_STATUS_CODES.contains(status)) {
checkFirstRequest();
closeResponse(response);
throw new FailedRetryException(
Expand Down Expand Up @@ -3238,7 +3241,7 @@ public <R extends AbstractReadHandle, W extends AbstractWriteHandle> R postResou
long startTime = System.currentTimeMillis();
int nextDelay = 0;
int retry = 0;
for (; retry < minRetry || (System.currentTimeMillis() - startTime) < maxDelay; retry++) {
for (; retry < minRetryAttempts || (System.currentTimeMillis() - startTime) < maxDelayForRetries; retry++) {
if (nextDelay > 0) {
try {
Thread.sleep(nextDelay);
Expand All @@ -3257,7 +3260,7 @@ public <R extends AbstractReadHandle, W extends AbstractWriteHandle> R postResou
response = doPost(requestBldr, multiPart, hasStreamingPart);
status = response.code();

if (transaction != null || !retryStatus.contains(status)) {
if (transaction != null || !RETRYABLE_STATUS_CODES.contains(status)) {
if (isFirstRequest()) setFirstRequest(false);

break;
Expand All @@ -3272,9 +3275,9 @@ public <R extends AbstractReadHandle, W extends AbstractWriteHandle> R postResou
}

int retryAfter = Utilities.parseInt(retryAfterRaw);
nextDelay = Math.max(retryAfter, calculateDelay(randRetry, retry));
nextDelay = Math.max(retryAfter, calculateDelay(retry));
}
if (retryStatus.contains(status)) {
if (RETRYABLE_STATUS_CODES.contains(status)) {
checkFirstRequest();
closeResponse(response);
throw new FailedRetryException(
Expand Down Expand Up @@ -3848,7 +3851,7 @@ private <W extends AbstractWriteHandle, U extends OkHttpResultIterator> U postIt
long startTime = System.currentTimeMillis();
int nextDelay = 0;
int retry = 0;
for (; retry < minRetry || (System.currentTimeMillis() - startTime) < maxDelay; retry++) {
for (; retry < minRetryAttempts || (System.currentTimeMillis() - startTime) < maxDelayForRetries; retry++) {
if (nextDelay > 0) {
try {
Thread.sleep(nextDelay);
Expand All @@ -3870,7 +3873,7 @@ private <W extends AbstractWriteHandle, U extends OkHttpResultIterator> U postIt
response = doPost(requestBldr, multiPart, hasStreamingPart);
status = response.code();

if (transaction != null || !retryStatus.contains(status)) {
if (transaction != null || !RETRYABLE_STATUS_CODES.contains(status)) {
if (isFirstRequest()) setFirstRequest(false);

break;
Expand All @@ -3885,9 +3888,9 @@ private <W extends AbstractWriteHandle, U extends OkHttpResultIterator> U postIt
}

int retryAfter = Utilities.parseInt(retryAfterRaw);
nextDelay = Math.max(retryAfter, calculateDelay(randRetry, retry));
nextDelay = Math.max(retryAfter, calculateDelay(retry));
}
if (retryStatus.contains(status)) {
if (RETRYABLE_STATUS_CODES.contains(status)) {
checkFirstRequest();
closeResponse(response);
throw new FailedRetryException(
Expand Down Expand Up @@ -4453,17 +4456,17 @@ private String stringJoin(Collection collection, String separator,
return (builder != null) ? builder.toString() : null;
}

private int calculateDelay(Random rand, int i) {
private int calculateDelay(int attempt) {
int min =
(i > 6) ? DELAY_CEILING :
(i == 0) ? DELAY_FLOOR :
DELAY_FLOOR + (1 << i) * DELAY_MULTIPLIER;
(attempt > 6) ? DELAY_CEILING :
(attempt == 0) ? DELAY_FLOOR :
DELAY_FLOOR + (1 << attempt) * DELAY_MULTIPLIER;
int range =
(i > 6) ? DELAY_FLOOR :
(i == 0) ? 2 * DELAY_MULTIPLIER :
(i == 6) ? DELAY_CEILING - min :
(1 << i) * DELAY_MULTIPLIER;
return min + randRetry.nextInt(range);
(attempt > 6) ? DELAY_FLOOR :
(attempt == 0) ? 2 * DELAY_MULTIPLIER :
(attempt == 6) ? DELAY_CEILING - min :
(1 << attempt) * DELAY_MULTIPLIER;
return min + randomForRetryDelay.nextInt(range);
}

static class OkHttpResult {
Expand Down Expand Up @@ -4967,7 +4970,7 @@ public InputStream match(QueryDefinition queryDef,
long startTime = System.currentTimeMillis();
int nextDelay = 0;
int retry = 0;
for (; retry < minRetry || (System.currentTimeMillis() - startTime) < maxDelay; retry++) {
for (; retry < minRetryAttempts || (System.currentTimeMillis() - startTime) < maxDelayForRetries; retry++) {
if (nextDelay > 0) {
try {
Thread.sleep(nextDelay);
Expand All @@ -4987,7 +4990,7 @@ public InputStream match(QueryDefinition queryDef,
}
status = response.code();

if (!retryStatus.contains(status)) {
if (!RETRYABLE_STATUS_CODES.contains(status)) {
if (isFirstRequest()) setFirstRequest(false);

break;
Expand All @@ -4998,9 +5001,9 @@ public InputStream match(QueryDefinition queryDef,

closeResponse(response);

nextDelay = Math.max(retryAfter, calculateDelay(randRetry, retry));
nextDelay = Math.max(retryAfter, calculateDelay(retry));
}
if (retryStatus.contains(status)) {
if (RETRYABLE_STATUS_CODES.contains(status)) {
checkFirstRequest();
closeResponse(response);
throw new FailedRetryException(
Expand Down