From 02e3920fb03d8be9e726f477b69b6c53274c5973 Mon Sep 17 00:00:00 2001 From: Shinsuke Sugaya Date: Tue, 5 May 2026 07:44:54 +0900 Subject: [PATCH 1/7] fix(extractor): bound ApiExtractor response, pool connections, and retry transients Threat model: admin-configured endpoint, no SSRF concern. The fix targets misbehaving or hostile responses (oversize, slow), socket exhaustion under high crawl rate, and transient network errors. - Bound API response body via BoundedInputStream against a configurable maxResponseSize (default 100 MiB); oversize responses throw ExtractException. - Reuse the HTTP client with a PoolingHttpClientConnectionManager (default 50 total / 10 per route, configurable) so successive crawls share connections. - Add bounded retry on IOException, 5xx, 408, and 429 (honoring Retry-After delta- seconds); fail fast on other 4xx and return null as before. - Default connect/socket timeouts of 5s/30s when none configured to mitigate slow loris servers. - Allow per-call URL override via the new params key extractorUrl. Tests - New tests: response size cap, 5xx retry recovery, no-retry on 4xx, 429 with Retry-After, slow-server timeout, and 10x sequential calls for connection reuse. - All existing ApiExtractorTest cases pass. --- .../crawler/extractor/impl/ApiExtractor.java | 355 ++++++++++++++++-- .../extractor/impl/ApiExtractorTest.java | 304 ++++++++++++++- 2 files changed, 634 insertions(+), 25 deletions(-) diff --git a/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java b/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java index 04b8d004..3e03619f 100644 --- a/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java +++ b/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java @@ -23,9 +23,11 @@ import java.util.List; import java.util.Map; +import org.apache.commons.io.input.BoundedInputStream; import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; +import org.apache.http.NoHttpResponseException; import org.apache.http.auth.AuthScheme; import org.apache.http.auth.AuthSchemeProvider; import org.apache.http.auth.AuthScope; @@ -36,14 +38,15 @@ import org.apache.http.client.methods.HttpPost; import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.config.RegistryBuilder; +import org.apache.http.conn.ConnectTimeoutException; import org.apache.http.entity.mime.HttpMultipartMode; import org.apache.http.entity.mime.MultipartEntityBuilder; import org.apache.http.impl.client.BasicAuthCache; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.message.BasicHeader; -import org.apache.http.util.EntityUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.codelibs.core.beans.BeanDesc; @@ -59,8 +62,6 @@ import org.codelibs.fess.crawler.entity.ExtractData; import org.codelibs.fess.crawler.exception.ExtractException; -import com.google.common.base.Charsets; - import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; @@ -71,6 +72,9 @@ public class ApiExtractor extends AbstractExtractor { private static final Logger logger = LogManager.getLogger(ApiExtractor.class); + /** Parameter key that, if present, overrides the configured URL for a single request. */ + public static final String PARAM_EXTRACTOR_URL = "extractorUrl"; + /** The URL of the API endpoint. */ protected String url; @@ -78,7 +82,7 @@ public class ApiExtractor extends AbstractExtractor { protected Integer accessTimeout; // sec /** The HTTP client used for API calls. */ - protected CloseableHttpClient httpClient; + protected volatile CloseableHttpClient httpClient; /** The connection timeout in milliseconds. */ protected Integer connectionTimeout; @@ -86,6 +90,21 @@ public class ApiExtractor extends AbstractExtractor { /** The socket timeout in milliseconds. */ protected Integer soTimeout; + /** Maximum total connections in the pool. */ + protected int maxConnections = 50; + + /** Maximum connections per route in the pool. */ + protected int maxConnectionsPerRoute = 10; + + /** Maximum size in bytes accepted for an API response body. Default is 100 MiB. */ + protected long maxResponseSize = 100L * 1024L * 1024L; + + /** Maximum number of retry attempts on transient failures. */ + protected int maxRetries = 2; + + /** Initial backoff in milliseconds between retries (doubled per attempt). */ + protected long retryBackoffMs = 500L; + /** The map of authentication scheme providers. */ protected Map authSchemeProviderMap; @@ -107,6 +126,9 @@ public class ApiExtractor extends AbstractExtractor { /** List of request headers */ private final List
requestHeaderList = new ArrayList<>(); + /** Pooled connection manager backing the HTTP client. */ + protected PoolingHttpClientConnectionManager connectionManager; + /** * Constructs a new ApiExtractor. */ @@ -133,10 +155,16 @@ public void init() { if (connectionTimeoutParam != null) { requestConfigBuilder.setConnectTimeout(connectionTimeoutParam); + } else { + // sane default to avoid hanging on dead servers + requestConfigBuilder.setConnectTimeout(5000); } final Integer soTimeoutParam = soTimeout; if (soTimeoutParam != null) { requestConfigBuilder.setSocketTimeout(soTimeoutParam); + } else { + // sane default to mitigate slow loris servers + requestConfigBuilder.setSocketTimeout(30000); } // AuthSchemeFactory @@ -177,6 +205,14 @@ public void init() { } } + // Pooled connection manager so connections are reused across calls. + connectionManager = new PoolingHttpClientConnectionManager(); + connectionManager.setMaxTotal(maxConnections); + connectionManager.setDefaultMaxPerRoute(maxConnectionsPerRoute); + httpClientBuilder.setConnectionManager(connectionManager); + // Disable the built-in retry handler; we implement our own classification below. + httpClientBuilder.disableAutomaticRetries(); + final CloseableHttpClient closeableHttpClient = httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build()).build(); if (!httpClientPropertyMap.isEmpty()) { final BeanDesc beanDesc = BeanDescFactory.getBeanDesc(closeableHttpClient.getClass()); @@ -204,6 +240,17 @@ public void destroy() { httpClient.close(); } catch (final IOException e) { logger.warn("Failed to close HTTP client for API extractor", e); + } finally { + httpClient = null; + } + } + if (connectionManager != null) { + try { + connectionManager.close(); + } catch (final Exception e) { + logger.warn("Failed to close connection manager for API extractor", e); + } finally { + connectionManager = null; } } } @@ -212,14 +259,23 @@ public void destroy() { * Extracts text from the input stream using the API endpoint. * * @param in the input stream to extract text from - * @param params additional parameters + * @param params additional parameters (an {@code extractorUrl} entry overrides the configured URL) * @return the extracted data * @throws ExtractException if extraction fails */ @Override public ExtractData getText(final InputStream in, final Map params) { + // Allow per-call URL override. + String targetUrl = url; + if (params != null) { + final String override = params.get(PARAM_EXTRACTOR_URL); + if (StringUtil.isNotBlank(override)) { + targetUrl = override; + } + } + if (logger.isDebugEnabled()) { - logger.debug("Accessing {}", url); + logger.debug("Accessing {}", targetUrl); } // start @@ -230,38 +286,251 @@ public ExtractData getText(final InputStream in, final Map param accessTimeoutTask = TimeoutManager.getInstance().addTimeoutTarget(accessTimeoutTarget, accessTimeout, false); } - final ExtractData data = new ExtractData(); - final HttpPost httpPost = new HttpPost(url); + try { + // The request body is the supplied input stream; multipart body builders consume it, + // so we can only execute the post once. Retries thus only apply when the request + // can be re-issued — currently only on connection-establishment failures or + // a status-code-only retry path that doesn't consume the entity. To support retries + // we buffer the request stream into memory (bounded by maxResponseSize as a heuristic + // safeguard against runaway uploads). + return executeWithRetries(in, targetUrl); + } finally { + if (accessTimeout != null) { + accessTimeoutTarget.stop(); + if (!accessTimeoutTask.isCanceled()) { + accessTimeoutTask.cancel(); + } + } + } + } + + /** + * Executes the API request with bounded retries on transient failures. + * + * @param in the input stream to forward to the API endpoint + * @param targetUrl the URL to call + * @return the extracted data + */ + protected ExtractData executeWithRetries(final InputStream in, final String targetUrl) { + // Buffer the input once so we can re-send it on retry. + final byte[] requestBody; + try { + requestBody = in.readAllBytes(); + } catch (final IOException e) { + throw new ExtractException("Failed to read input stream for API extractor", e); + } + + IOException lastIoException = null; + for (int attempt = 0; attempt <= maxRetries; attempt++) { + try { + return executeOnce(requestBody, targetUrl, attempt); + } catch (final RetryableStatusException e) { + final long sleepMs = computeBackoff(attempt, e.retryAfterMs); + if (attempt >= maxRetries) { + throw new ExtractException("API request failed after " + (attempt + 1) + " attempts: status=" + e.statusCode); + } + if (logger.isDebugEnabled()) { + logger.debug("Retrying API request (status={}, attempt={}/{}, sleep={}ms)", e.statusCode, attempt + 1, maxRetries + 1, + sleepMs); + } + sleepQuietly(sleepMs); + } catch (final ConnectTimeoutException e) { + lastIoException = e; + if (attempt >= maxRetries) { + throw new ExtractException("API request failed", e); + } + sleepQuietly(computeBackoff(attempt, -1L)); + } catch (final NoHttpResponseException e) { + lastIoException = e; + if (attempt >= maxRetries) { + throw new ExtractException("API request failed", e); + } + sleepQuietly(computeBackoff(attempt, -1L)); + } catch (final IOException e) { + lastIoException = e; + if (attempt >= maxRetries) { + throw new ExtractException("API request failed", e); + } + sleepQuietly(computeBackoff(attempt, -1L)); + } + } + + // Should not reach here, but defend against it. + throw new ExtractException("API request failed", lastIoException != null ? lastIoException : new IOException("unknown error")); + } + + /** + * Executes a single API call. + * + * @param requestBody bytes to upload as the multipart body + * @param targetUrl URL to POST to + * @param attempt current attempt number (0-based) — included for logging + * @return the extracted data on success + * @throws RetryableStatusException when the response status is retryable + * @throws IOException on transport-level failure + */ + protected ExtractData executeOnce(final byte[] requestBody, final String targetUrl, final int attempt) throws IOException { + final HttpPost httpPost = new HttpPost(targetUrl); final HttpEntity postEntity = MultipartEntityBuilder.create() .setMode(HttpMultipartMode.BROWSER_COMPATIBLE) .setCharset(Charset.forName("UTF-8")) - .addBinaryBody("filedata", in) + .addBinaryBody("filedata", requestBody) .build(); httpPost.setEntity(postEntity); try (CloseableHttpResponse response = httpClient.execute(httpPost)) { - if (response.getStatusLine().getStatusCode() != Constants.OK_STATUS_CODE) { - logger.warn("Failed to access API extractor endpoint: url={}, statusCode={}", url, - response.getStatusLine().getStatusCode()); + final int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode != Constants.OK_STATUS_CODE) { + if (isRetryableStatus(statusCode)) { + final long retryAfterMs = parseRetryAfterMs(response.getFirstHeader("Retry-After")); + // Drain entity so the connection can be reused. + consumeQuietly(response.getEntity()); + throw new RetryableStatusException(statusCode, retryAfterMs); + } + logger.warn("Failed to access API extractor endpoint: url={}, statusCode={}", targetUrl, statusCode); + consumeQuietly(response.getEntity()); return null; } - data.setContent(EntityUtils.toString(response.getEntity(), Charsets.UTF_8)); + final ExtractData data = new ExtractData(); + final HttpEntity entity = response.getEntity(); + if (entity != null) { + final Charset charset = resolveCharset(entity); + try (InputStream entityStream = entity.getContent(); + BoundedInputStream bounded = + BoundedInputStream.builder().setInputStream(entityStream).setMaxCount(maxResponseSize + 1L).get()) { + final byte[] body = bounded.readAllBytes(); + if (body.length > maxResponseSize) { + throw new ExtractException("ApiExtractor response exceeded limit: limit=" + maxResponseSize); + } + data.setContent(new String(body, charset)); + } + } else { + data.setContent(""); + } final Header[] headers = response.getAllHeaders(); for (final Header header : headers) { data.putValue(header.getName(), header.getValue()); } - } catch (final IOException e) { - throw new ExtractException(e); - } finally { - if (accessTimeout != null) { - accessTimeoutTarget.stop(); - if (!accessTimeoutTask.isCanceled()) { - accessTimeoutTask.cancel(); - } + return data; + } + } + + /** + * Returns true if the given status code is considered retryable. + * + * @param statusCode HTTP status code + * @return true when the request should be retried + */ + protected boolean isRetryableStatus(final int statusCode) { + if (statusCode >= 500 && statusCode <= 599) { + return true; + } + return statusCode == 408 || statusCode == 429; + } + + /** + * Parses a {@code Retry-After} header into milliseconds. + * Only the delta-seconds form is honored; HTTP-date form is ignored as best-effort. + * + * @param header the {@code Retry-After} header (may be null) + * @return delay in milliseconds, or {@code -1L} if unspecified or invalid + */ + protected long parseRetryAfterMs(final Header header) { + if (header == null) { + return -1L; + } + final String value = header.getValue(); + if (StringUtil.isBlank(value)) { + return -1L; + } + try { + final long seconds = Long.parseLong(value.trim()); + if (seconds < 0L) { + return -1L; } + return seconds * 1000L; + } catch (final NumberFormatException e) { + // HTTP-date form not supported; fall through to default backoff. + return -1L; + } + } + + /** + * Computes exponential backoff for a retry attempt. + * + * @param attempt 0-based attempt number + * @param retryAfterMs server-suggested delay; non-negative values take precedence + * @return sleep duration in milliseconds + */ + protected long computeBackoff(final int attempt, final long retryAfterMs) { + if (retryAfterMs >= 0L) { + return retryAfterMs; + } + return retryBackoffMs * (1L << attempt); + } + + /** + * Sleeps without throwing on interruption (preserves the interrupt flag). + * + * @param millis sleep duration in milliseconds + */ + protected void sleepQuietly(final long millis) { + if (millis <= 0L) { + return; + } + try { + Thread.sleep(millis); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + /** + * Drains the response entity, ignoring errors. Safe even when the entity is null. + * + * @param entity entity to consume (may be null) + */ + protected void consumeQuietly(final HttpEntity entity) { + if (entity == null) { + return; + } + try { + org.apache.http.util.EntityUtils.consumeQuietly(entity); + } catch (final Exception e) { + // ignore + } + } + + /** + * Resolves the charset advertised by the response entity, falling back to UTF-8. + * + * @param entity response entity (non-null) + * @return charset to decode the response with + */ + protected Charset resolveCharset(final HttpEntity entity) { + try { + final org.apache.http.entity.ContentType contentType = org.apache.http.entity.ContentType.get(entity); + if (contentType != null && contentType.getCharset() != null) { + return contentType.getCharset(); + } + } catch (final Exception e) { + // ignore and fall back to UTF-8 + } + return Constants.UTF_8_CHARSET; + } + + /** Internal signal that a response was retryable; carries the status and any Retry-After hint. */ + @SuppressWarnings("serial") + protected static class RetryableStatusException extends IOException { + final int statusCode; + final long retryAfterMs; + + RetryableStatusException(final int statusCode, final long retryAfterMs) { + super("retryable status: " + statusCode); + this.statusCode = statusCode; + this.retryAfterMs = retryAfterMs; } - return data; } /** @@ -336,4 +605,46 @@ public void setAccessTimeout(final Integer accessTimeout) { this.accessTimeout = accessTimeout; } + /** + * Sets the maximum total connections in the pool. + * @param maxConnections maximum total pooled connections + */ + public void setMaxConnections(final int maxConnections) { + this.maxConnections = maxConnections; + } + + /** + * Sets the maximum connections per route in the pool. + * @param maxConnectionsPerRoute maximum pooled connections per route + */ + public void setMaxConnectionsPerRoute(final int maxConnectionsPerRoute) { + this.maxConnectionsPerRoute = maxConnectionsPerRoute; + } + + /** + * Sets the maximum size in bytes accepted for an API response body. + * Responses larger than this trigger an {@link ExtractException}. + * @param maxResponseSize maximum accepted response size in bytes + */ + public void setMaxResponseSize(final long maxResponseSize) { + this.maxResponseSize = maxResponseSize; + } + + /** + * Sets the maximum number of retry attempts for transient failures. + * @param maxRetries number of retries (in addition to the initial attempt) + */ + public void setMaxRetries(final int maxRetries) { + this.maxRetries = maxRetries; + } + + /** + * Sets the initial backoff in milliseconds between retries. + * The actual delay doubles per attempt. + * @param retryBackoffMs initial backoff in milliseconds + */ + public void setRetryBackoffMs(final long retryBackoffMs) { + this.retryBackoffMs = retryBackoffMs; + } + } diff --git a/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java b/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java index f0d560c4..0e33873c 100644 --- a/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java +++ b/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java @@ -16,16 +16,22 @@ package org.codelibs.fess.crawler.extractor.impl; import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import org.codelibs.fess.crawler.entity.ExtractData; import org.codelibs.fess.crawler.exception.CrawlerSystemException; +import org.codelibs.fess.crawler.exception.ExtractException; import org.dbflute.utflute.core.PlainTestCase; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInfo; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.io.Content; @@ -35,6 +41,12 @@ import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.handler.DefaultHandler; import org.eclipse.jetty.util.Callback; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; /** * @author shinsuke @@ -79,7 +91,293 @@ public void test_getText() throws Exception { assertEquals(content, text.getContent()); } - // TODO other tests + /** + * Connection reuse / pooling sanity check: 10 sequential calls share the pool without + * exhausting sockets. + */ + @Test + public void test_connectionReuse() throws Exception { + final String testStr = "abc"; + final String expected = ATTR_NAME + "," + testStr; + for (int i = 0; i < 10; i++) { + final ExtractData text = extractor.getText(new ByteArrayInputStream(testStr.getBytes()), new HashMap<>()); + assertEquals(expected, text.getContent()); + } + } + + /** + * Per-call URL override via {@code params.extractorUrl}. + */ + @Test + public void test_extractorUrlOverride() throws Exception { + // Misconfigure the default URL; the override in params should be used instead. + final ApiExtractor overrideExtractor = new ApiExtractor(); + overrideExtractor.setUrl("http://127.0.0.1:1/does-not-exist"); + overrideExtractor.init(); + try { + final Map params = new HashMap<>(); + params.put(ApiExtractor.PARAM_EXTRACTOR_URL, "http://127.0.0.1:" + port + "/"); + final ExtractData text = overrideExtractor.getText(new ByteArrayInputStream("ovr".getBytes()), params); + assertEquals(ATTR_NAME + ",ovr", text.getContent()); + } finally { + overrideExtractor.destroy(); + } + } + + /** + * Response larger than {@code maxResponseSize} must throw {@link ExtractException}. + */ + @Test + public void test_responseExceedsMaxSize_throwsExtractException() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + // Always respond with 1 KiB of payload regardless of input. + final byte[] payload = new byte[1024]; + for (int i = 0; i < payload.length; i++) { + payload[i] = (byte) ('a' + (i % 26)); + } + simple.setHandler(exchange -> { + drain(exchange.getRequestBody()); + final byte[] body = payload; + exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=UTF-8"); + exchange.sendResponseHeaders(200, body.length); + try (OutputStream out = exchange.getResponseBody()) { + out.write(body); + } + }); + simple.start(); + try { + final ApiExtractor capped = new ApiExtractor(); + capped.setUrl("http://127.0.0.1:" + simple.port() + "/"); + capped.setMaxResponseSize(64L); + capped.setMaxRetries(0); + capped.init(); + try { + capped.getText(new ByteArrayInputStream("x".getBytes()), new HashMap<>()); + fail(); + } catch (final ExtractException e) { + org.junit.jupiter.api.Assertions.assertTrue(e.getMessage().contains("exceeded limit"), + "message should mention limit: " + e.getMessage()); + } finally { + capped.destroy(); + } + } finally { + simple.stop(); + } + } + + /** + * Two 5xx responses followed by a 200 must succeed via the retry loop. + */ + @Test + public void test_retryOn5xx_succeedsOn3rdAttempt() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + final AtomicInteger calls = new AtomicInteger(); + simple.setHandler(exchange -> { + drain(exchange.getRequestBody()); + final int n = calls.incrementAndGet(); + if (n < 3) { + exchange.sendResponseHeaders(503, -1); + exchange.close(); + } else { + final byte[] body = "ok".getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=UTF-8"); + exchange.sendResponseHeaders(200, body.length); + try (OutputStream out = exchange.getResponseBody()) { + out.write(body); + } + } + }); + simple.start(); + try { + final ApiExtractor retrying = new ApiExtractor(); + retrying.setUrl("http://127.0.0.1:" + simple.port() + "/"); + retrying.setMaxRetries(2); + retrying.setRetryBackoffMs(10L); + retrying.init(); + try { + final ExtractData data = retrying.getText(new ByteArrayInputStream("x".getBytes()), new HashMap<>()); + assertNotNull(data); + assertEquals("ok", data.getContent()); + assertEquals(3, calls.get()); + } finally { + retrying.destroy(); + } + } finally { + simple.stop(); + } + } + + /** + * 4xx (other than 408/429) must NOT be retried; the extractor returns null after a single call. + */ + @Test + public void test_noRetryOn4xx_returnsNullImmediately() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + final AtomicInteger calls = new AtomicInteger(); + simple.setHandler(exchange -> { + drain(exchange.getRequestBody()); + calls.incrementAndGet(); + exchange.sendResponseHeaders(404, -1); + exchange.close(); + }); + simple.start(); + try { + final ApiExtractor retrying = new ApiExtractor(); + retrying.setUrl("http://127.0.0.1:" + simple.port() + "/"); + retrying.setMaxRetries(3); + retrying.setRetryBackoffMs(10L); + retrying.init(); + try { + final ExtractData data = retrying.getText(new ByteArrayInputStream("x".getBytes()), new HashMap<>()); + // Existing contract: non-OK non-retryable status returns null. + assertNull(data); + assertEquals(1, calls.get()); + } finally { + retrying.destroy(); + } + } finally { + simple.stop(); + } + } + + /** + * Honors the {@code Retry-After} header (delta-seconds form) for HTTP 429 responses. + */ + @Test + public void test_retryOn429_respectsRetryAfter() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + final AtomicInteger calls = new AtomicInteger(); + simple.setHandler(exchange -> { + drain(exchange.getRequestBody()); + final int n = calls.incrementAndGet(); + if (n == 1) { + exchange.getResponseHeaders().add("Retry-After", "0"); + exchange.sendResponseHeaders(429, -1); + exchange.close(); + } else { + final byte[] body = "ok".getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=UTF-8"); + exchange.sendResponseHeaders(200, body.length); + try (OutputStream out = exchange.getResponseBody()) { + out.write(body); + } + } + }); + simple.start(); + try { + final ApiExtractor retrying = new ApiExtractor(); + retrying.setUrl("http://127.0.0.1:" + simple.port() + "/"); + retrying.setMaxRetries(1); + retrying.setRetryBackoffMs(10L); + retrying.init(); + try { + final long t0 = System.currentTimeMillis(); + final ExtractData data = retrying.getText(new ByteArrayInputStream("x".getBytes()), new HashMap<>()); + final long elapsed = System.currentTimeMillis() - t0; + assertNotNull(data); + assertEquals("ok", data.getContent()); + assertEquals(2, calls.get()); + // Retry-After: 0 means we should not artificially delay. + org.junit.jupiter.api.Assertions.assertTrue(elapsed < 2000, + "retry should not block when Retry-After=0 (was " + elapsed + "ms)"); + } finally { + retrying.destroy(); + } + } finally { + simple.stop(); + } + } + + /** + * A server that accepts the connection but never responds must trigger a socket-timeout + * surfaced as {@link ExtractException}. + */ + @Test + public void test_timeoutOnSlowResponse_throwsExtractException() throws Exception { + // ServerSocket-based "black hole" server: accepts and then sleeps. + final ServerSocket serverSocket = new ServerSocket(0); + serverSocket.setSoTimeout(0); + final int p = serverSocket.getLocalPort(); + final Thread acceptor = new Thread(() -> { + while (!Thread.currentThread().isInterrupted()) { + try { + final Socket s = serverSocket.accept(); + // Leak the socket on purpose — we just want the client to time out. + new Thread(() -> { + try { + Thread.sleep(60_000L); + } catch (final InterruptedException ignore) { + Thread.currentThread().interrupt(); + } finally { + try { + s.close(); + } catch (final IOException ignore) { + // ignore + } + } + }).start(); + } catch (final IOException e) { + return; + } + } + }, "slow-loris-acceptor"); + acceptor.setDaemon(true); + acceptor.start(); + try { + final ApiExtractor slow = new ApiExtractor(); + slow.setUrl("http://127.0.0.1:" + p + "/"); + slow.setSoTimeout(300); + slow.setConnectionTimeout(300); + slow.setMaxRetries(0); + slow.init(); + try { + slow.getText(new ByteArrayInputStream("x".getBytes()), new HashMap<>()); + fail(); + } catch (final ExtractException e) { + // expected + } finally { + slow.destroy(); + } + } finally { + try { + serverSocket.close(); + } catch (final IOException ignore) { + // ignore + } + acceptor.interrupt(); + } + } + + private static void drain(final InputStream in) throws IOException { + final byte[] buf = new byte[4096]; + while (in.read(buf) >= 0) { + // discard + } + } + + /** Lightweight HTTP server used for retry / size / status tests. */ + private static class SimpleHttpServer { + private HttpServer http; + private int boundPort; + + void setHandler(final HttpHandler handler) throws IOException { + http = HttpServer.create(new InetSocketAddress("127.0.0.1", 0), 0); + http.createContext("/", handler); + } + + void start() { + http.start(); + boundPort = http.getAddress().getPort(); + } + + void stop() { + http.stop(0); + } + + int port() { + return boundPort; + } + } static class TestApiExtractorServer { private Server server; From eef53bcb15fdc19c420b3a480bf4f99f313c4fe8 Mon Sep 17 00:00:00 2001 From: Shinsuke Sugaya Date: Tue, 5 May 2026 12:37:08 +0900 Subject: [PATCH 2/7] fix(extractor): cap ApiExtractor request body, parse HTTP-date Retry-After, add jitter, bound 5xx drain --- .../crawler/extractor/impl/ApiExtractor.java | 114 ++++++++++++--- .../extractor/impl/ApiExtractorTest.java | 138 ++++++++++++++++++ 2 files changed, 233 insertions(+), 19 deletions(-) diff --git a/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java b/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java index 3e03619f..cc6c0716 100644 --- a/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java +++ b/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java @@ -19,9 +19,11 @@ import java.io.InputStream; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; import org.apache.commons.io.input.BoundedInputStream; import org.apache.http.Header; @@ -37,6 +39,7 @@ import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.client.utils.DateUtils; import org.apache.http.config.RegistryBuilder; import org.apache.http.conn.ConnectTimeoutException; import org.apache.http.entity.mime.HttpMultipartMode; @@ -99,6 +102,9 @@ public class ApiExtractor extends AbstractExtractor { /** Maximum size in bytes accepted for an API response body. Default is 100 MiB. */ protected long maxResponseSize = 100L * 1024L * 1024L; + /** Maximum size in bytes accepted for an outgoing request body. Default is 100 MiB. */ + protected long maxRequestSize = 100L * 1024L * 1024L; + /** Maximum number of retry attempts on transient failures. */ protected int maxRetries = 2; @@ -291,8 +297,8 @@ public ExtractData getText(final InputStream in, final Map param // so we can only execute the post once. Retries thus only apply when the request // can be re-issued — currently only on connection-establishment failures or // a status-code-only retry path that doesn't consume the entity. To support retries - // we buffer the request stream into memory (bounded by maxResponseSize as a heuristic - // safeguard against runaway uploads). + // we buffer the request stream into memory, bounded by maxRequestSize so a hostile + // or malformed source document cannot OOM the extractor. return executeWithRetries(in, targetUrl); } finally { if (accessTimeout != null) { @@ -312,13 +318,17 @@ public ExtractData getText(final InputStream in, final Map param * @return the extracted data */ protected ExtractData executeWithRetries(final InputStream in, final String targetUrl) { - // Buffer the input once so we can re-send it on retry. + // Buffer the input once so we can re-send it on retry. Bounded to maxRequestSize so a + // hostile/large source document cannot OOM the extractor. final byte[] requestBody; - try { - requestBody = in.readAllBytes(); + try (BoundedInputStream bounded = BoundedInputStream.builder().setInputStream(in).setMaxCount(maxRequestSize + 1L).get()) { + requestBody = bounded.readAllBytes(); } catch (final IOException e) { throw new ExtractException("Failed to read input stream for API extractor", e); } + if (requestBody.length > maxRequestSize) { + throw new ExtractException("ApiExtractor request body exceeded limit: limit=" + maxRequestSize); + } IOException lastIoException = null; for (int attempt = 0; attempt <= maxRetries; attempt++) { @@ -339,19 +349,34 @@ protected ExtractData executeWithRetries(final InputStream in, final String targ if (attempt >= maxRetries) { throw new ExtractException("API request failed", e); } - sleepQuietly(computeBackoff(attempt, -1L)); + final long sleepMs = computeBackoff(attempt, -1L); + if (logger.isDebugEnabled()) { + logger.debug("Retrying API request after connect timeout (attempt={}/{}, sleep={}ms)", attempt + 1, maxRetries + 1, + sleepMs, e); + } + sleepQuietly(sleepMs); } catch (final NoHttpResponseException e) { lastIoException = e; if (attempt >= maxRetries) { throw new ExtractException("API request failed", e); } - sleepQuietly(computeBackoff(attempt, -1L)); + final long sleepMs = computeBackoff(attempt, -1L); + if (logger.isDebugEnabled()) { + logger.debug("Retrying API request after no-response (attempt={}/{}, sleep={}ms)", attempt + 1, maxRetries + 1, sleepMs, + e); + } + sleepQuietly(sleepMs); } catch (final IOException e) { lastIoException = e; if (attempt >= maxRetries) { throw new ExtractException("API request failed", e); } - sleepQuietly(computeBackoff(attempt, -1L)); + final long sleepMs = computeBackoff(attempt, -1L); + if (logger.isDebugEnabled()) { + logger.debug("Retrying API request after I/O error (attempt={}/{}, sleep={}ms)", attempt + 1, maxRetries + 1, sleepMs, + e); + } + sleepQuietly(sleepMs); } } @@ -370,6 +395,9 @@ protected ExtractData executeWithRetries(final InputStream in, final String targ * @throws IOException on transport-level failure */ protected ExtractData executeOnce(final byte[] requestBody, final String targetUrl, final int attempt) throws IOException { + if (attempt > 0 && logger.isDebugEnabled()) { + logger.debug("Executing API request (url={}, attempt={})", targetUrl, attempt); + } final HttpPost httpPost = new HttpPost(targetUrl); final HttpEntity postEntity = MultipartEntityBuilder.create() .setMode(HttpMultipartMode.BROWSER_COMPATIBLE) @@ -383,12 +411,13 @@ protected ExtractData executeOnce(final byte[] requestBody, final String targetU if (statusCode != Constants.OK_STATUS_CODE) { if (isRetryableStatus(statusCode)) { final long retryAfterMs = parseRetryAfterMs(response.getFirstHeader("Retry-After")); - // Drain entity so the connection can be reused. - consumeQuietly(response.getEntity()); + // Drain entity so the connection can be reused. Bound the drain so a hostile + // server cannot stream an unbounded body just to keep us reading. + drainBounded(response.getEntity()); throw new RetryableStatusException(statusCode, retryAfterMs); } - logger.warn("Failed to access API extractor endpoint: url={}, statusCode={}", targetUrl, statusCode); - consumeQuietly(response.getEntity()); + logger.warn("Failed to access API extractor endpoint: url={}, statusCode={}, attempt={}", targetUrl, statusCode, attempt); + drainBounded(response.getEntity()); return null; } @@ -430,8 +459,9 @@ protected boolean isRetryableStatus(final int statusCode) { } /** - * Parses a {@code Retry-After} header into milliseconds. - * Only the delta-seconds form is honored; HTTP-date form is ignored as best-effort. + * Parses a {@code Retry-After} header into milliseconds. Both delta-seconds and the + * HTTP-date form (RFC 7231) are honored; on parse failure {@code -1L} is returned and the + * caller should fall back to the default backoff schedule. * * @param header the {@code Retry-After} header (may be null) * @return delay in milliseconds, or {@code -1L} if unspecified or invalid @@ -444,20 +474,33 @@ protected long parseRetryAfterMs(final Header header) { if (StringUtil.isBlank(value)) { return -1L; } + final String trimmed = value.trim(); + // Try delta-seconds first. try { - final long seconds = Long.parseLong(value.trim()); + final long seconds = Long.parseLong(trimmed); if (seconds < 0L) { return -1L; } return seconds * 1000L; - } catch (final NumberFormatException e) { - // HTTP-date form not supported; fall through to default backoff. + } catch (final NumberFormatException ignore) { + // Not a numeric value — try HTTP-date form below. + } + // HTTP-date form per RFC 7231. + final Date when = DateUtils.parseDate(trimmed); + if (when == null) { return -1L; } + final long deltaMs = when.getTime() - System.currentTimeMillis(); + if (deltaMs <= 0L) { + return 0L; + } + return deltaMs; } /** - * Computes exponential backoff for a retry attempt. + * Computes exponential backoff for a retry attempt. A small random jitter in the range + * {@code [0, retryBackoffMs)} is added to the exponential base to avoid thundering-herd + * retry storms when many extractors hit the same upstream concurrently. * * @param attempt 0-based attempt number * @param retryAfterMs server-suggested delay; non-negative values take precedence @@ -467,7 +510,9 @@ protected long computeBackoff(final int attempt, final long retryAfterMs) { if (retryAfterMs >= 0L) { return retryAfterMs; } - return retryBackoffMs * (1L << attempt); + final long base = retryBackoffMs * (1L << attempt); + final long jitter = retryBackoffMs > 0L ? ThreadLocalRandom.current().nextLong(0L, retryBackoffMs) : 0L; + return base + jitter; } /** @@ -502,6 +547,28 @@ protected void consumeQuietly(final HttpEntity entity) { } } + /** + * Drains the response entity content with a hard upper bound of {@link #maxResponseSize} + * bytes so a hostile server cannot trickle out an unbounded body purely to keep us reading. + * Errors are intentionally swallowed: the goal is just to make the connection reusable. + * + * @param entity entity to drain (may be null) + */ + protected void drainBounded(final HttpEntity entity) { + if (entity == null) { + return; + } + try (InputStream content = entity.getContent(); + BoundedInputStream bounded = BoundedInputStream.builder().setInputStream(content).setMaxCount(maxResponseSize).get()) { + final byte[] buf = new byte[8192]; + while (bounded.read(buf) >= 0) { + // discard + } + } catch (final Exception e) { + // ignore — best-effort drain + } + } + /** * Resolves the charset advertised by the response entity, falling back to UTF-8. * @@ -630,6 +697,15 @@ public void setMaxResponseSize(final long maxResponseSize) { this.maxResponseSize = maxResponseSize; } + /** + * Sets the maximum size in bytes accepted for an outgoing request body. + * Inputs larger than this trigger an {@link ExtractException} before any HTTP call is made. + * @param maxRequestSize maximum accepted request body size in bytes + */ + public void setMaxRequestSize(final long maxRequestSize) { + this.maxRequestSize = maxRequestSize; + } + /** * Sets the maximum number of retry attempts for transient failures. * @param maxRetries number of retries (in addition to the initial attempt) diff --git a/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java b/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java index 0e33873c..b63c3e6a 100644 --- a/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java +++ b/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java @@ -24,8 +24,12 @@ import java.net.Socket; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.HashMap; +import java.util.Locale; import java.util.Map; +import java.util.TimeZone; import java.util.concurrent.atomic.AtomicInteger; import org.codelibs.fess.crawler.entity.ExtractData; @@ -348,6 +352,140 @@ public void test_timeoutOnSlowResponse_throwsExtractException() throws Exception } } + /** + * Upload bigger than {@code maxRequestSize} must throw {@link ExtractException} before any + * HTTP call is made. Asserts no request reaches the server. + */ + @Test + public void test_uploadExceedsMaxRequestSize_throws() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + final AtomicInteger calls = new AtomicInteger(); + simple.setHandler(exchange -> { + calls.incrementAndGet(); + drain(exchange.getRequestBody()); + exchange.sendResponseHeaders(200, -1); + exchange.close(); + }); + simple.start(); + try { + final ApiExtractor capped = new ApiExtractor(); + capped.setUrl("http://127.0.0.1:" + simple.port() + "/"); + capped.setMaxRequestSize(16L); + capped.setMaxRetries(0); + capped.init(); + try { + final byte[] tooBig = new byte[64]; + for (int i = 0; i < tooBig.length; i++) { + tooBig[i] = (byte) ('a' + (i % 26)); + } + capped.getText(new ByteArrayInputStream(tooBig), new HashMap<>()); + fail(); + } catch (final ExtractException e) { + org.junit.jupiter.api.Assertions.assertTrue(e.getMessage().contains("request body exceeded limit"), + "message should mention request body limit: " + e.getMessage()); + assertEquals(0, calls.get()); + } finally { + capped.destroy(); + } + } finally { + simple.stop(); + } + } + + /** + * {@code Retry-After} header in the HTTP-date form (RFC 7231) must be parsed correctly: the + * extractor must wait approximately the indicated delta before retrying. + */ + @Test + public void test_retryAfterHttpDate_parsedCorrectly() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + final AtomicInteger calls = new AtomicInteger(); + // Pre-compute an HTTP-date 2 seconds into the future for the first response. + final SimpleDateFormat httpDateFmt = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss zzz", Locale.US); + httpDateFmt.setTimeZone(TimeZone.getTimeZone("GMT")); + simple.setHandler(exchange -> { + drain(exchange.getRequestBody()); + final int n = calls.incrementAndGet(); + if (n == 1) { + final String httpDate = httpDateFmt.format(new Date(System.currentTimeMillis() + 2_000L)); + exchange.getResponseHeaders().add("Retry-After", httpDate); + exchange.sendResponseHeaders(429, -1); + exchange.close(); + } else { + final byte[] body = "ok".getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=UTF-8"); + exchange.sendResponseHeaders(200, body.length); + try (OutputStream out = exchange.getResponseBody()) { + out.write(body); + } + } + }); + simple.start(); + try { + final ApiExtractor retrying = new ApiExtractor(); + retrying.setUrl("http://127.0.0.1:" + simple.port() + "/"); + retrying.setMaxRetries(1); + retrying.setRetryBackoffMs(10L); + retrying.init(); + try { + final long t0 = System.currentTimeMillis(); + final ExtractData data = retrying.getText(new ByteArrayInputStream("x".getBytes()), new HashMap<>()); + final long elapsed = System.currentTimeMillis() - t0; + assertNotNull(data); + assertEquals("ok", data.getContent()); + assertEquals(2, calls.get()); + // Must wait at least ~1.5s (allow some slack), well above the default ~10ms backoff. + org.junit.jupiter.api.Assertions.assertTrue(elapsed >= 1_500L, + "retry must respect HTTP-date Retry-After (elapsed=" + elapsed + "ms)"); + // And not block longer than is reasonable. + org.junit.jupiter.api.Assertions.assertTrue(elapsed < 10_000L, "retry must not stall (elapsed=" + elapsed + "ms)"); + } finally { + retrying.destroy(); + } + } finally { + simple.stop(); + } + } + + /** + * Probabilistic check that {@link ApiExtractor#computeBackoff(int, long)} adds a jitter + * within {@code [base, 2*base)} for {@code attempt=0}. Runs many iterations so the bounds + * are exercised, and asserts at least two distinct values are observed (i.e., it isn't a + * constant). + */ + @Test + public void test_jitterAppliedToBackoff() throws Exception { + final ApiExtractor jitterExtractor = new ApiExtractor(); + jitterExtractor.setUrl("http://127.0.0.1:1/unused"); + final long base = 100L; + jitterExtractor.setRetryBackoffMs(base); + jitterExtractor.init(); + try { + long min = Long.MAX_VALUE; + long max = Long.MIN_VALUE; + final java.util.HashSet distinct = new java.util.HashSet<>(); + for (int i = 0; i < 200; i++) { + final long v = jitterExtractor.computeBackoff(0, -1L); + if (v < min) { + min = v; + } + if (v > max) { + max = v; + } + distinct.add(v); + } + // Each value must lie within [base, 2*base) — base + jitter where jitter ∈ [0, base). + org.junit.jupiter.api.Assertions.assertTrue(min >= base, "min should be >= base: min=" + min + ", base=" + base); + org.junit.jupiter.api.Assertions.assertTrue(max < 2L * base, "max should be < 2*base: max=" + max + ", base=" + base); + // Probabilistic: with 200 samples and 100 buckets the chance of a single repeated + // value is astronomically low — at least 2 distinct values is a near-certainty. + org.junit.jupiter.api.Assertions.assertTrue(distinct.size() >= 2, + "jitter should produce varied values, got distinct=" + distinct.size()); + } finally { + jitterExtractor.destroy(); + } + } + private static void drain(final InputStream in) throws IOException { final byte[] buf = new byte[4096]; while (in.read(buf) >= 0) { From 3f9bb76e10e4f08850b63ccd398e731f08468659 Mon Sep 17 00:00:00 2001 From: Shinsuke Sugaya Date: Tue, 5 May 2026 19:46:52 +0900 Subject: [PATCH 3/7] fix(extractor): bail out of ApiExtractor retry loop on interrupt sleepQuietly previously suppressed InterruptedException and only restored the interrupt flag, allowing the for-loop in executeWithRetries to proceed to another HTTP attempt. With AccessTimeoutTarget interrupting the worker thread once per second after accessTimeout fires, that meant a request could continue retrying past the configured timeout window. sleepQuietly now throws ExtractException on interrupt (interrupt flag still preserved for upstream observers), aborting the retry loop immediately. Adds unit tests covering both the helper itself and the loop-level bail-out. --- .../crawler/extractor/impl/ApiExtractor.java | 8 +- .../extractor/impl/ApiExtractorTest.java | 83 +++++++++++++++++++ 2 files changed, 90 insertions(+), 1 deletion(-) diff --git a/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java b/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java index cc6c0716..e143c83e 100644 --- a/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java +++ b/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java @@ -516,9 +516,14 @@ protected long computeBackoff(final int attempt, final long retryAfterMs) { } /** - * Sleeps without throwing on interruption (preserves the interrupt flag). + * Sleeps between retries. If the current thread is interrupted (e.g. by + * {@code accessTimeout} via {@code AccessTimeoutTarget}), aborts the retry + * loop by throwing {@link ExtractException} instead of silently continuing + * to the next HTTP attempt. The interrupt flag is preserved so callers + * further up the stack can also observe the cancellation. * * @param millis sleep duration in milliseconds + * @throws ExtractException if interrupted while sleeping */ protected void sleepQuietly(final long millis) { if (millis <= 0L) { @@ -528,6 +533,7 @@ protected void sleepQuietly(final long millis) { Thread.sleep(millis); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); + throw new ExtractException("API retry was interrupted", e); } } diff --git a/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java b/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java index b63c3e6a..cc9e7cf7 100644 --- a/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java +++ b/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java @@ -486,6 +486,89 @@ public void test_jitterAppliedToBackoff() throws Exception { } } + public void test_sleepQuietly_interruptedThrowsAndPreservesFlag() throws Exception { + final ApiExtractor extractor = new ApiExtractor(); + extractor.setUrl("http://127.0.0.1:1/unused"); + extractor.init(); + try { + // Pre-interrupt the thread; sleepQuietly should observe it and bail. + Thread.currentThread().interrupt(); + try { + extractor.sleepQuietly(100L); + fail(); + } catch (final ExtractException e) { + org.junit.jupiter.api.Assertions.assertTrue(Thread.currentThread().isInterrupted(), + "interrupt flag must be preserved for upstream observers"); + } + } finally { + // Drain the interrupt flag so it does not leak into subsequent tests. + Thread.interrupted(); + extractor.destroy(); + } + } + + public void test_executeWithRetries_interruptStopsRetryLoop() throws Exception { + final SimpleHttpServer server = new SimpleHttpServer(); + final AtomicInteger attempts = new AtomicInteger(); + try { + // Always reply 503 with a long Retry-After so the extractor enters sleepQuietly. + server.setHandler(new HttpHandler() { + @Override + public void handle(final HttpExchange exchange) throws IOException { + attempts.incrementAndGet(); + drain(exchange.getRequestBody()); + exchange.getResponseHeaders().add("Retry-After", "30"); + exchange.sendResponseHeaders(503, 0); + exchange.getResponseBody().close(); + } + }); + server.start(); + + final ApiExtractor extractor = new ApiExtractor(); + extractor.setUrl("http://127.0.0.1:" + server.port() + "/"); + extractor.setMaxRetries(5); + extractor.setRetryBackoffMs(50L); + extractor.init(); + try { + final Thread target = Thread.currentThread(); + final Thread interrupter = new Thread(() -> { + try { + // Wait for the extractor to issue its first request and start sleeping. + Thread.sleep(500L); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + target.interrupt(); + }); + interrupter.setDaemon(true); + interrupter.start(); + + final long startMs = System.currentTimeMillis(); + try { + extractor.getText(new ByteArrayInputStream("payload".getBytes(StandardCharsets.UTF_8)), new HashMap()); + fail(); + } catch (final ExtractException e) { + // expected + } + final long elapsed = System.currentTimeMillis() - startMs; + + // Bail out should be prompt: well under one full Retry-After (30s). + org.junit.jupiter.api.Assertions.assertTrue(elapsed < 5_000L, + "interrupt must short-circuit the retry loop, elapsed=" + elapsed + "ms"); + // Only one HTTP attempt should have reached the server before the interrupt fired. + org.junit.jupiter.api.Assertions.assertTrue(attempts.get() <= 2, + "retry loop must stop on interrupt, attempts=" + attempts.get()); + + interrupter.join(1_000L); + } finally { + Thread.interrupted(); + extractor.destroy(); + } + } finally { + server.stop(); + } + } + private static void drain(final InputStream in) throws IOException { final byte[] buf = new byte[4096]; while (in.read(buf) >= 0) { From 90e3cb1ad084c6037e1f54f2f14988fcd7fb8fe7 Mon Sep 17 00:00:00 2001 From: Shinsuke Sugaya Date: Tue, 5 May 2026 20:47:18 +0900 Subject: [PATCH 4/7] fix(extractor): gate ApiExtractor URL override and run interrupt tests The extractorUrl param introduced in this PR opens a small SSRF surface if a downstream caller ever lets attacker-influenced data into the params Map. Make the override opt-in: - allowExtractorUrlOverride defaults to false; the param is silently ignored unless the operator explicitly enables it - when enabled, the override URL must be http(s); other schemes are logged at WARN and ignored - log applied overrides at INFO so usage is auditable Also add the @Test annotations missing from the two interrupt regression tests so the JUnit Platform actually runs them (test count goes 11 -> 15). --- .../crawler/extractor/impl/ApiExtractor.java | 70 ++++++++++++++++--- .../extractor/impl/ApiExtractorTest.java | 39 +++++++++++ 2 files changed, 100 insertions(+), 9 deletions(-) diff --git a/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java b/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java index e143c83e..279606c4 100644 --- a/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java +++ b/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java @@ -111,6 +111,15 @@ public class ApiExtractor extends AbstractExtractor { /** Initial backoff in milliseconds between retries (doubled per attempt). */ protected long retryBackoffMs = 500L; + /** + * When {@code true}, an {@link #PARAM_EXTRACTOR_URL} entry in the per-call params Map overrides + * the configured {@link #url}. Defaults to {@code false} so callers cannot redirect the extractor + * to an arbitrary endpoint unless the operator explicitly enables it. When enabling this, the + * caller is responsible for ensuring that the params Map cannot be populated from untrusted + * data — otherwise this becomes an SSRF gadget against internal hosts. + */ + protected boolean allowExtractorUrlOverride = false; + /** The map of authentication scheme providers. */ protected Map authSchemeProviderMap; @@ -261,24 +270,57 @@ public void destroy() { } } + /** + * Resolves the URL the extractor should POST to for this call. Falls back to the configured + * {@link #url} unless the operator has set {@link #allowExtractorUrlOverride} to {@code true} + * and the params Map carries a non-blank {@link #PARAM_EXTRACTOR_URL} entry whose + * scheme is {@code http} or {@code https}. Disallowed override values are logged and ignored. + */ + protected String resolveTargetUrl(final Map params) { + if (params == null) { + return url; + } + final String override = params.get(PARAM_EXTRACTOR_URL); + if (StringUtil.isBlank(override)) { + return url; + } + if (!allowExtractorUrlOverride) { + if (logger.isDebugEnabled()) { + logger.debug("Ignoring {} param because allowExtractorUrlOverride is disabled.", PARAM_EXTRACTOR_URL); + } + return url; + } + if (!isAllowedOverrideScheme(override)) { + logger.warn("Ignoring {} param with disallowed scheme: {}", PARAM_EXTRACTOR_URL, override); + return url; + } + if (logger.isInfoEnabled()) { + logger.info("Overriding extractor URL: configured={} override={}", url, override); + } + return override; + } + + private static boolean isAllowedOverrideScheme(final String candidate) { + try { + final String scheme = java.net.URI.create(candidate).getScheme(); + return "http".equalsIgnoreCase(scheme) || "https".equalsIgnoreCase(scheme); + } catch (final IllegalArgumentException e) { + return false; + } + } + /** * Extracts text from the input stream using the API endpoint. * * @param in the input stream to extract text from - * @param params additional parameters (an {@code extractorUrl} entry overrides the configured URL) + * @param params additional parameters; an {@code extractorUrl} entry overrides the configured URL + * only when {@link #allowExtractorUrlOverride} is {@code true} (default {@code false}) * @return the extracted data * @throws ExtractException if extraction fails */ @Override public ExtractData getText(final InputStream in, final Map params) { - // Allow per-call URL override. - String targetUrl = url; - if (params != null) { - final String override = params.get(PARAM_EXTRACTOR_URL); - if (StringUtil.isNotBlank(override)) { - targetUrl = override; - } - } + final String targetUrl = resolveTargetUrl(params); if (logger.isDebugEnabled()) { logger.debug("Accessing {}", targetUrl); @@ -729,4 +771,14 @@ public void setRetryBackoffMs(final long retryBackoffMs) { this.retryBackoffMs = retryBackoffMs; } + /** + * Enables or disables the per-call {@link #PARAM_EXTRACTOR_URL} override. Defaults to {@code false}; + * enable only when the operator can guarantee that the params Map cannot be populated from + * untrusted data, otherwise this becomes an SSRF gadget. + * @param allowExtractorUrlOverride {@code true} to honor the {@code extractorUrl} param + */ + public void setAllowExtractorUrlOverride(final boolean allowExtractorUrlOverride) { + this.allowExtractorUrlOverride = allowExtractorUrlOverride; + } + } diff --git a/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java b/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java index cc9e7cf7..9abcdc40 100644 --- a/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java +++ b/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java @@ -117,6 +117,7 @@ public void test_extractorUrlOverride() throws Exception { // Misconfigure the default URL; the override in params should be used instead. final ApiExtractor overrideExtractor = new ApiExtractor(); overrideExtractor.setUrl("http://127.0.0.1:1/does-not-exist"); + overrideExtractor.setAllowExtractorUrlOverride(true); overrideExtractor.init(); try { final Map params = new HashMap<>(); @@ -128,6 +129,42 @@ public void test_extractorUrlOverride() throws Exception { } } + @Test + public void test_extractorUrlOverride_disabledByDefault_isIgnored() throws Exception { + // With allowExtractorUrlOverride defaulting to false, the params override must be ignored + // and the configured (working) URL must be used regardless of the bogus override. + final ApiExtractor extractor = new ApiExtractor(); + extractor.setUrl("http://127.0.0.1:" + port + "/"); + extractor.init(); + try { + final Map params = new HashMap<>(); + params.put(ApiExtractor.PARAM_EXTRACTOR_URL, "http://127.0.0.1:1/should-be-ignored"); + final ExtractData text = extractor.getText(new ByteArrayInputStream("def".getBytes()), params); + // Successful extraction proves the configured URL was used, not the unreachable override. + assertEquals(ATTR_NAME + ",def", text.getContent()); + } finally { + extractor.destroy(); + } + } + + @Test + public void test_extractorUrlOverride_disallowedSchemeRejected() throws Exception { + // Even when override is enabled, non-http(s) schemes must be rejected. The configured URL + // is used instead, so the request still succeeds against the in-process server. + final ApiExtractor extractor = new ApiExtractor(); + extractor.setUrl("http://127.0.0.1:" + port + "/"); + extractor.setAllowExtractorUrlOverride(true); + extractor.init(); + try { + final Map params = new HashMap<>(); + params.put(ApiExtractor.PARAM_EXTRACTOR_URL, "file:///etc/passwd"); + final ExtractData text = extractor.getText(new ByteArrayInputStream("rej".getBytes()), params); + assertEquals(ATTR_NAME + ",rej", text.getContent()); + } finally { + extractor.destroy(); + } + } + /** * Response larger than {@code maxResponseSize} must throw {@link ExtractException}. */ @@ -486,6 +523,7 @@ public void test_jitterAppliedToBackoff() throws Exception { } } + @Test public void test_sleepQuietly_interruptedThrowsAndPreservesFlag() throws Exception { final ApiExtractor extractor = new ApiExtractor(); extractor.setUrl("http://127.0.0.1:1/unused"); @@ -507,6 +545,7 @@ public void test_sleepQuietly_interruptedThrowsAndPreservesFlag() throws Excepti } } + @Test public void test_executeWithRetries_interruptStopsRetryLoop() throws Exception { final SimpleHttpServer server = new SimpleHttpServer(); final AtomicInteger attempts = new AtomicInteger(); From 00f5d287f2d20779982277a14b96f436b07972a3 Mon Sep 17 00:00:00 2001 From: Shinsuke Sugaya Date: Tue, 5 May 2026 21:11:39 +0900 Subject: [PATCH 5/7] fix(test): stabilize Retry-After HTTP-date test against second truncation HTTP-date has 1-second resolution per RFC 7231, so formatting `now + 2s` loses up to ~999ms when the server happens to format late within a second, causing the elapsed wait to fall below the 1.5s threshold (CI saw 1313ms). Round the future timestamp up to the next whole second and bump the target delta to ~3s so the assertion stays above its lower bound. --- .../crawler/extractor/impl/ApiExtractorTest.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java b/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java index 9abcdc40..699dc8f8 100644 --- a/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java +++ b/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java @@ -437,14 +437,19 @@ public void test_uploadExceedsMaxRequestSize_throws() throws Exception { public void test_retryAfterHttpDate_parsedCorrectly() throws Exception { final SimpleHttpServer simple = new SimpleHttpServer(); final AtomicInteger calls = new AtomicInteger(); - // Pre-compute an HTTP-date 2 seconds into the future for the first response. + // Pre-compute an HTTP-date ~3 seconds into the future for the first response. + // HTTP-date has 1-second resolution (RFC 7231), so a naive `now + N seconds` value + // can truncate to ~(N-1) seconds in the worst case. Round UP to the next whole + // second to keep the effective wait close to the intended delta. final SimpleDateFormat httpDateFmt = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss zzz", Locale.US); httpDateFmt.setTimeZone(TimeZone.getTimeZone("GMT")); simple.setHandler(exchange -> { drain(exchange.getRequestBody()); final int n = calls.incrementAndGet(); if (n == 1) { - final String httpDate = httpDateFmt.format(new Date(System.currentTimeMillis() + 2_000L)); + final long futureMs = System.currentTimeMillis() + 3_000L; + final long futureCeilMs = (futureMs + 999L) / 1000L * 1000L; + final String httpDate = httpDateFmt.format(new Date(futureCeilMs)); exchange.getResponseHeaders().add("Retry-After", httpDate); exchange.sendResponseHeaders(429, -1); exchange.close(); @@ -471,8 +476,8 @@ public void test_retryAfterHttpDate_parsedCorrectly() throws Exception { assertNotNull(data); assertEquals("ok", data.getContent()); assertEquals(2, calls.get()); - // Must wait at least ~1.5s (allow some slack), well above the default ~10ms backoff. - org.junit.jupiter.api.Assertions.assertTrue(elapsed >= 1_500L, + // Must wait at least ~2.5s (allow some slack), well above the default ~10ms backoff. + org.junit.jupiter.api.Assertions.assertTrue(elapsed >= 2_500L, "retry must respect HTTP-date Retry-After (elapsed=" + elapsed + "ms)"); // And not block longer than is reasonable. org.junit.jupiter.api.Assertions.assertTrue(elapsed < 10_000L, "retry must not stall (elapsed=" + elapsed + "ms)"); From f0dfd40808926fbb6739be4186d432c2406ee433 Mon Sep 17 00:00:00 2001 From: Shinsuke Sugaya Date: Tue, 5 May 2026 21:54:32 +0900 Subject: [PATCH 6/7] fix(extractor): bound ApiExtractor pool wait, spool request body, cap Retry-After Three follow-up gaps in the ApiExtractor robustness PR: - HC4 RequestConfig#connectionRequestTimeout was not set, leaving the default of -1 (wait forever). With maxConnectionsPerRoute=10, slow upstream responses pinning the pool would silently block all later callers. Defaults to 5000ms, matching the connect timeout, with a setter for tuning. - The previous fix buffered the entire request body in a byte[] sized by maxRequestSize (default 100 MiB) before each call. Concurrent extractors could pin many such arrays. Switch to DeferredFileOutputStream so small bodies stay in memory while larger ones spill to a temp file, and clean the file up via FileUtil.deleteInBackground after the retry loop. - A hostile or misbehaving server could return Retry-After: 86400 to stall the extractor thread for 24 hours. Cap the effective Retry-After at maxRetryAfterMs (default 60 seconds) inside computeBackoff so the parsed value remains accurate but the actual sleep is bounded. Tests: - test_connectionRequestTimeout_failsFastWhenPoolExhausted: a slow background caller occupies the only pooled connection; a second caller must surface ExtractException in well under the slow-handler duration. - test_requestSpoolThreshold_spillsToFileAndUploadsSuccessfully: forces every body to spill by setting threshold below the payload size, then verifies the server received the exact uploaded bytes. - test_retryAfterClampedToMaxRetryAfter: server responds with Retry-After: 86400; with maxRetryAfterMs=150ms, the retry must complete promptly instead of stalling. --- .../crawler/extractor/impl/ApiExtractor.java | 209 +++++++++++++----- .../extractor/impl/ApiExtractorTest.java | 186 ++++++++++++++++ 2 files changed, 345 insertions(+), 50 deletions(-) diff --git a/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java b/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java index 279606c4..3b588137 100644 --- a/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java +++ b/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java @@ -15,6 +15,8 @@ */ package org.codelibs.fess.crawler.extractor.impl; +import java.io.ByteArrayInputStream; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; @@ -25,7 +27,10 @@ import java.util.Map; import java.util.concurrent.ThreadLocalRandom; +import org.apache.commons.io.IOUtils; import org.apache.commons.io.input.BoundedInputStream; +import org.apache.commons.io.output.DeferredFileOutputStream; +import org.apache.commons.lang3.SystemUtils; import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; @@ -55,6 +60,7 @@ import org.codelibs.core.beans.BeanDesc; import org.codelibs.core.beans.PropertyDesc; import org.codelibs.core.beans.factory.BeanDescFactory; +import org.codelibs.core.io.FileUtil; import org.codelibs.core.lang.StringUtil; import org.codelibs.core.timer.TimeoutManager; import org.codelibs.core.timer.TimeoutTask; @@ -93,6 +99,14 @@ public class ApiExtractor extends AbstractExtractor { /** The socket timeout in milliseconds. */ protected Integer soTimeout; + /** + * Maximum time in milliseconds to wait for a connection to be checked out from the pool. + * Without this bound, requests block indefinitely once {@link #maxConnections} or + * {@link #maxConnectionsPerRoute} is exhausted by slow upstream responses, defeating the + * point of the connect/socket timeouts above. + */ + protected Integer connectionRequestTimeout; + /** Maximum total connections in the pool. */ protected int maxConnections = 50; @@ -105,12 +119,27 @@ public class ApiExtractor extends AbstractExtractor { /** Maximum size in bytes accepted for an outgoing request body. Default is 100 MiB. */ protected long maxRequestSize = 100L * 1024L * 1024L; + /** + * Threshold in bytes at which the request body spills from memory to a temp file. Bodies + * smaller than this stay in a {@code byte[]}; larger ones are written to a temp file via + * {@link DeferredFileOutputStream} so concurrent extractors don't pile up multi-MiB arrays + * on the heap. + */ + protected int requestSpoolThreshold = 1024 * 1024; + /** Maximum number of retry attempts on transient failures. */ protected int maxRetries = 2; /** Initial backoff in milliseconds between retries (doubled per attempt). */ protected long retryBackoffMs = 500L; + /** + * Hard upper bound in milliseconds applied to a server-supplied {@code Retry-After} value. + * Without this cap, a hostile or misbehaving server could stall an extractor thread for + * arbitrary time (e.g. {@code Retry-After: 86400}). Default 60 seconds. + */ + protected long maxRetryAfterMs = 60_000L; + /** * When {@code true}, an {@link #PARAM_EXTRACTOR_URL} entry in the per-call params Map overrides * the configured {@link #url}. Defaults to {@code false} so callers cannot redirect the extractor @@ -181,6 +210,15 @@ public void init() { // sane default to mitigate slow loris servers requestConfigBuilder.setSocketTimeout(30000); } + final Integer connectionRequestTimeoutParam = connectionRequestTimeout; + if (connectionRequestTimeoutParam != null) { + requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeoutParam); + } else { + // Without an explicit cap HC4 defaults to -1 (wait forever) — once the pool is full + // of slow requests, every new caller blocks indefinitely. Default to the same 5s used + // for connect timeout so socket exhaustion is visible to the caller, not silent. + requestConfigBuilder.setConnectionRequestTimeout(5000); + } // AuthSchemeFactory final RegistryBuilder authSchemeProviderBuilder = RegistryBuilder.create(); @@ -355,27 +393,51 @@ public ExtractData getText(final InputStream in, final Map param /** * Executes the API request with bounded retries on transient failures. * + *

The input stream is spooled into a {@link DeferredFileOutputStream}: small bodies stay + * in memory while bigger ones spill to a temp file, so concurrent extractors do not retain + * multi-MiB arrays on the heap. The total bytes accepted is capped at {@link #maxRequestSize} + * so a hostile or oversized source cannot exhaust memory or disk.

+ * * @param in the input stream to forward to the API endpoint * @param targetUrl the URL to call * @return the extracted data */ protected ExtractData executeWithRetries(final InputStream in, final String targetUrl) { - // Buffer the input once so we can re-send it on retry. Bounded to maxRequestSize so a - // hostile/large source document cannot OOM the extractor. - final byte[] requestBody; - try (BoundedInputStream bounded = BoundedInputStream.builder().setInputStream(in).setMaxCount(maxRequestSize + 1L).get()) { - requestBody = bounded.readAllBytes(); + // Spool the input once so we can re-send it on retry. Small bodies stay in memory; larger + // ones spill to a temp file so we don't pin maxRequestSize bytes of heap per concurrent call. + File spoolFile = null; + try (DeferredFileOutputStream dfos = DeferredFileOutputStream.builder() + .setThreshold(requestSpoolThreshold) + .setPrefix("apiExtractor-") + .setSuffix(".tmp") + .setDirectory(SystemUtils.getJavaIoTmpDir()) + .get(); + BoundedInputStream bounded = BoundedInputStream.builder().setInputStream(in).setMaxCount(maxRequestSize + 1L).get()) { + final long copied = IOUtils.copyLarge(bounded, dfos); + dfos.close(); + if (copied > maxRequestSize) { + throw new ExtractException("ApiExtractor request body exceeded limit: limit=" + maxRequestSize); + } + if (!dfos.isInMemory()) { + spoolFile = dfos.getFile(); + } + return runRetryLoop(dfos, targetUrl); + } catch (final ExtractException e) { + throw e; } catch (final IOException e) { throw new ExtractException("Failed to read input stream for API extractor", e); + } finally { + if (spoolFile != null) { + FileUtil.deleteInBackground(spoolFile); + } } - if (requestBody.length > maxRequestSize) { - throw new ExtractException("ApiExtractor request body exceeded limit: limit=" + maxRequestSize); - } + } + private ExtractData runRetryLoop(final DeferredFileOutputStream dfos, final String targetUrl) { IOException lastIoException = null; for (int attempt = 0; attempt <= maxRetries; attempt++) { try { - return executeOnce(requestBody, targetUrl, attempt); + return executeOnce(dfos, targetUrl, attempt); } catch (final RetryableStatusException e) { final long sleepMs = computeBackoff(attempt, e.retryAfterMs); if (attempt >= maxRetries) { @@ -429,64 +491,78 @@ protected ExtractData executeWithRetries(final InputStream in, final String targ /** * Executes a single API call. * - * @param requestBody bytes to upload as the multipart body + *

The body is sourced from the spooled {@link DeferredFileOutputStream}: an in-memory copy + * is wrapped in a {@link ByteArrayInputStream}, while a spilled body is streamed from its temp + * file. The InputStream is closed before the method returns, but the underlying spool persists + * across attempts so retries can re-issue the same body without re-reading the original.

+ * + * @param dfos spooled request body produced by {@link #executeWithRetries(InputStream, String)} * @param targetUrl URL to POST to * @param attempt current attempt number (0-based) — included for logging * @return the extracted data on success * @throws RetryableStatusException when the response status is retryable * @throws IOException on transport-level failure */ - protected ExtractData executeOnce(final byte[] requestBody, final String targetUrl, final int attempt) throws IOException { + protected ExtractData executeOnce(final DeferredFileOutputStream dfos, final String targetUrl, final int attempt) throws IOException { if (attempt > 0 && logger.isDebugEnabled()) { logger.debug("Executing API request (url={}, attempt={})", targetUrl, attempt); } final HttpPost httpPost = new HttpPost(targetUrl); - final HttpEntity postEntity = MultipartEntityBuilder.create() - .setMode(HttpMultipartMode.BROWSER_COMPATIBLE) - .setCharset(Charset.forName("UTF-8")) - .addBinaryBody("filedata", requestBody) - .build(); - httpPost.setEntity(postEntity); - - try (CloseableHttpResponse response = httpClient.execute(httpPost)) { - final int statusCode = response.getStatusLine().getStatusCode(); - if (statusCode != Constants.OK_STATUS_CODE) { - if (isRetryableStatus(statusCode)) { - final long retryAfterMs = parseRetryAfterMs(response.getFirstHeader("Retry-After")); - // Drain entity so the connection can be reused. Bound the drain so a hostile - // server cannot stream an unbounded body just to keep us reading. + try (InputStream bodyStream = openSpooledBody(dfos)) { + final HttpEntity postEntity = MultipartEntityBuilder.create() + .setMode(HttpMultipartMode.BROWSER_COMPATIBLE) + .setCharset(Charset.forName("UTF-8")) + .addBinaryBody("filedata", bodyStream, org.apache.http.entity.ContentType.APPLICATION_OCTET_STREAM, "filedata") + .build(); + httpPost.setEntity(postEntity); + try (CloseableHttpResponse response = httpClient.execute(httpPost)) { + final int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode != Constants.OK_STATUS_CODE) { + if (isRetryableStatus(statusCode)) { + final long retryAfterMs = parseRetryAfterMs(response.getFirstHeader("Retry-After")); + // Drain entity so the connection can be reused. Bound the drain so a hostile + // server cannot stream an unbounded body just to keep us reading. + drainBounded(response.getEntity()); + throw new RetryableStatusException(statusCode, retryAfterMs); + } + logger.warn("Failed to access API extractor endpoint: url={}, statusCode={}, attempt={}", targetUrl, statusCode, + attempt); drainBounded(response.getEntity()); - throw new RetryableStatusException(statusCode, retryAfterMs); + return null; } - logger.warn("Failed to access API extractor endpoint: url={}, statusCode={}, attempt={}", targetUrl, statusCode, attempt); - drainBounded(response.getEntity()); - return null; - } - final ExtractData data = new ExtractData(); - final HttpEntity entity = response.getEntity(); - if (entity != null) { - final Charset charset = resolveCharset(entity); - try (InputStream entityStream = entity.getContent(); - BoundedInputStream bounded = - BoundedInputStream.builder().setInputStream(entityStream).setMaxCount(maxResponseSize + 1L).get()) { - final byte[] body = bounded.readAllBytes(); - if (body.length > maxResponseSize) { - throw new ExtractException("ApiExtractor response exceeded limit: limit=" + maxResponseSize); + final ExtractData data = new ExtractData(); + final HttpEntity entity = response.getEntity(); + if (entity != null) { + final Charset charset = resolveCharset(entity); + try (InputStream entityStream = entity.getContent(); + BoundedInputStream bounded = + BoundedInputStream.builder().setInputStream(entityStream).setMaxCount(maxResponseSize + 1L).get()) { + final byte[] body = bounded.readAllBytes(); + if (body.length > maxResponseSize) { + throw new ExtractException("ApiExtractor response exceeded limit: limit=" + maxResponseSize); + } + data.setContent(new String(body, charset)); } - data.setContent(new String(body, charset)); + } else { + data.setContent(""); } - } else { - data.setContent(""); - } - final Header[] headers = response.getAllHeaders(); - for (final Header header : headers) { - data.putValue(header.getName(), header.getValue()); + final Header[] headers = response.getAllHeaders(); + for (final Header header : headers) { + data.putValue(header.getName(), header.getValue()); + } + return data; } - return data; } } + private InputStream openSpooledBody(final DeferredFileOutputStream dfos) throws IOException { + if (dfos.isInMemory()) { + return new ByteArrayInputStream(dfos.getData()); + } + return new java.io.BufferedInputStream(new java.io.FileInputStream(dfos.getFile())); + } + /** * Returns true if the given status code is considered retryable. * @@ -542,7 +618,10 @@ protected long parseRetryAfterMs(final Header header) { /** * Computes exponential backoff for a retry attempt. A small random jitter in the range * {@code [0, retryBackoffMs)} is added to the exponential base to avoid thundering-herd - * retry storms when many extractors hit the same upstream concurrently. + * retry storms when many extractors hit the same upstream concurrently. Server-supplied + * {@code Retry-After} hints take precedence but are clamped to {@link #maxRetryAfterMs} + * so a hostile or misbehaving upstream cannot pin the extractor thread for arbitrary time + * (e.g. {@code Retry-After: 86400}). * * @param attempt 0-based attempt number * @param retryAfterMs server-suggested delay; non-negative values take precedence @@ -550,7 +629,7 @@ protected long parseRetryAfterMs(final Header header) { */ protected long computeBackoff(final int attempt, final long retryAfterMs) { if (retryAfterMs >= 0L) { - return retryAfterMs; + return Math.min(retryAfterMs, Math.max(0L, maxRetryAfterMs)); } final long base = retryBackoffMs * (1L << attempt); final long jitter = retryBackoffMs > 0L ? ThreadLocalRandom.current().nextLong(0L, retryBackoffMs) : 0L; @@ -672,6 +751,16 @@ public void setSoTimeout(final Integer soTimeout) { this.soTimeout = soTimeout; } + /** + * Sets the maximum time in milliseconds to wait for a connection from the pool. Without + * this cap, callers block indefinitely once the pool is exhausted by slow upstream + * responses, defeating the connect/socket timeouts. + * @param connectionRequestTimeout pool checkout timeout in milliseconds + */ + public void setConnectionRequestTimeout(final Integer connectionRequestTimeout) { + this.connectionRequestTimeout = connectionRequestTimeout; + } + /** * Sets the map of authentication scheme providers. * @param authSchemeProviderMap The map of authentication scheme providers. @@ -771,6 +860,26 @@ public void setRetryBackoffMs(final long retryBackoffMs) { this.retryBackoffMs = retryBackoffMs; } + /** + * Sets the upper bound applied to a server-supplied {@code Retry-After} value. A hostile or + * misbehaving upstream returning {@code Retry-After: 86400} otherwise pins this thread for + * 24 hours; clamping it to a sane ceiling keeps retries responsive. + * @param maxRetryAfterMs maximum effective Retry-After value in milliseconds + */ + public void setMaxRetryAfterMs(final long maxRetryAfterMs) { + this.maxRetryAfterMs = maxRetryAfterMs; + } + + /** + * Sets the byte threshold at which the request body spills from memory to a temp file. + * Bodies smaller than this stay in a {@code byte[]}; larger bodies are written to disk so + * concurrent extractors do not pin {@link #maxRequestSize} bytes of heap each. + * @param requestSpoolThreshold byte threshold for memory-to-file spillover + */ + public void setRequestSpoolThreshold(final int requestSpoolThreshold) { + this.requestSpoolThreshold = requestSpoolThreshold; + } + /** * Enables or disables the per-call {@link #PARAM_EXTRACTOR_URL} override. Defaults to {@code false}; * enable only when the operator can guarantee that the params Map cannot be populated from diff --git a/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java b/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java index 699dc8f8..1d8144e8 100644 --- a/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java +++ b/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java @@ -550,6 +550,177 @@ public void test_sleepQuietly_interruptedThrowsAndPreservesFlag() throws Excepti } } + /** + * When the connection pool is exhausted by a slow upstream, a new caller must surface a + * pool-checkout timeout instead of blocking indefinitely. Without + * {@link RequestConfig#setConnectionRequestTimeout(int)} the second call would hang for + * the full duration of the slow handler. + */ + @Test + public void test_connectionRequestTimeout_failsFastWhenPoolExhausted() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + // Slow handler holds the only available connection for several seconds. + simple.setHandler(exchange -> { + drain(exchange.getRequestBody()); + try { + Thread.sleep(3_000L); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + } + exchange.sendResponseHeaders(200, -1); + exchange.close(); + }); + simple.start(); + try { + final ApiExtractor capped = new ApiExtractor(); + capped.setUrl("http://127.0.0.1:" + simple.port() + "/"); + capped.setMaxConnections(1); + capped.setMaxConnectionsPerRoute(1); + capped.setConnectionRequestTimeout(200); + capped.setMaxRetries(0); + capped.init(); + try { + // First caller occupies the only pooled connection in a background thread. + final Thread occupier = new Thread(() -> { + try { + capped.getText(new ByteArrayInputStream("a".getBytes()), new HashMap<>()); + } catch (final Exception ignore) { + // ignore + } + }); + occupier.setDaemon(true); + occupier.start(); + // Give the occupier a moment to issue its request and acquire the connection. + Thread.sleep(300L); + + // Second caller must fail quickly because the pool is exhausted. + final long t0 = System.currentTimeMillis(); + try { + capped.getText(new ByteArrayInputStream("b".getBytes()), new HashMap<>()); + fail(); + } catch (final ExtractException e) { + final long elapsed = System.currentTimeMillis() - t0; + org.junit.jupiter.api.Assertions.assertTrue(elapsed < 2_000L, + "second call must fail fast on pool exhaustion (was " + elapsed + "ms)"); + } + occupier.interrupt(); + occupier.join(2_000L); + } finally { + capped.destroy(); + } + } finally { + simple.stop(); + } + } + + /** + * A request body larger than the spool threshold must succeed (proving the spool-to-disk + * path works) and the server must observe the exact uploaded bytes. This exercises the + * branch that streams from a temp file rather than a {@code byte[]}. + */ + @Test + public void test_requestSpoolThreshold_spillsToFileAndUploadsSuccessfully() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + final int payloadSize = 32 * 1024; + final byte[] payload = new byte[payloadSize]; + for (int i = 0; i < payload.length; i++) { + payload[i] = (byte) ('A' + (i % 26)); + } + final java.util.concurrent.atomic.AtomicReference received = new java.util.concurrent.atomic.AtomicReference<>(); + simple.setHandler(exchange -> { + // Read the complete multipart body verbatim so the test can verify the payload bytes. + final java.io.ByteArrayOutputStream sink = new java.io.ByteArrayOutputStream(); + final byte[] buf = new byte[4096]; + int n; + while ((n = exchange.getRequestBody().read(buf)) >= 0) { + sink.write(buf, 0, n); + } + received.set(sink.toByteArray()); + final byte[] body = "spooled".getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=UTF-8"); + exchange.sendResponseHeaders(200, body.length); + try (OutputStream out = exchange.getResponseBody()) { + out.write(body); + } + }); + simple.start(); + try { + final ApiExtractor spooled = new ApiExtractor(); + spooled.setUrl("http://127.0.0.1:" + simple.port() + "/"); + // Force every body to spill to a temp file by setting threshold smaller than the payload. + spooled.setRequestSpoolThreshold(1024); + spooled.setMaxRetries(0); + spooled.init(); + try { + final ExtractData data = spooled.getText(new ByteArrayInputStream(payload), new HashMap<>()); + assertNotNull(data); + assertEquals("spooled", data.getContent()); + final byte[] body = received.get(); + assertNotNull(body); + // Multipart framing wraps the payload, so just verify the payload bytes are embedded + // verbatim somewhere inside the multipart body. + org.junit.jupiter.api.Assertions.assertTrue(body.length >= payload.length, + "received body must be at least the payload size"); + org.junit.jupiter.api.Assertions.assertTrue(indexOf(body, payload) >= 0, + "spooled payload bytes must appear in the multipart body"); + } finally { + spooled.destroy(); + } + } finally { + simple.stop(); + } + } + + /** + * A hostile {@code Retry-After} value must be clamped to {@code maxRetryAfterMs} so a + * 24-hour value cannot stall the extractor thread. + */ + @Test + public void test_retryAfterClampedToMaxRetryAfter() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + final AtomicInteger calls = new AtomicInteger(); + simple.setHandler(exchange -> { + drain(exchange.getRequestBody()); + final int n = calls.incrementAndGet(); + if (n == 1) { + // 24 hours — without clamping this would block the test forever. + exchange.getResponseHeaders().add("Retry-After", "86400"); + exchange.sendResponseHeaders(503, -1); + exchange.close(); + } else { + final byte[] body = "ok".getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=UTF-8"); + exchange.sendResponseHeaders(200, body.length); + try (OutputStream out = exchange.getResponseBody()) { + out.write(body); + } + } + }); + simple.start(); + try { + final ApiExtractor clamped = new ApiExtractor(); + clamped.setUrl("http://127.0.0.1:" + simple.port() + "/"); + clamped.setMaxRetries(1); + clamped.setRetryBackoffMs(10L); + clamped.setMaxRetryAfterMs(150L); + clamped.init(); + try { + final long t0 = System.currentTimeMillis(); + final ExtractData data = clamped.getText(new ByteArrayInputStream("x".getBytes()), new HashMap<>()); + final long elapsed = System.currentTimeMillis() - t0; + assertNotNull(data); + assertEquals("ok", data.getContent()); + assertEquals(2, calls.get()); + org.junit.jupiter.api.Assertions.assertTrue(elapsed < 5_000L, + "Retry-After must be clamped, not honored as 24h (elapsed=" + elapsed + "ms)"); + } finally { + clamped.destroy(); + } + } finally { + simple.stop(); + } + } + @Test public void test_executeWithRetries_interruptStopsRetryLoop() throws Exception { final SimpleHttpServer server = new SimpleHttpServer(); @@ -620,6 +791,21 @@ private static void drain(final InputStream in) throws IOException { } } + private static int indexOf(final byte[] haystack, final byte[] needle) { + if (needle.length == 0 || haystack.length < needle.length) { + return -1; + } + outer: for (int i = 0; i <= haystack.length - needle.length; i++) { + for (int j = 0; j < needle.length; j++) { + if (haystack[i + j] != needle[j]) { + continue outer; + } + } + return i; + } + return -1; + } + /** Lightweight HTTP server used for retry / size / status tests. */ private static class SimpleHttpServer { private HttpServer http; From c1df1833e9e2c35d31bb18a695d8a81a1775f897 Mon Sep 17 00:00:00 2001 From: Shinsuke Sugaya Date: Sat, 16 May 2026 11:24:03 +0900 Subject: [PATCH 7/7] fix(extractor): harden ApiExtractor URL override, retry classification, and lifecycle - Tighten isAllowedOverrideScheme: reject control characters, userinfo, opaque URIs, and parse with new URI() so URISyntaxException is caught. - Collapse three near-duplicate IOException catches into one; do not retry on non-transient errors (UnknownHostException, SSLHandshakeException, SSLPeerUnverifiedException). - Preserve cause on retry-exhausted ExtractException so the originating RetryableStatusException is recoverable from logs. - Snapshot httpClient in executeOnce and surface a clear ExtractException when getText is called after destroy(). - Set ConnectionManagerShared(true) so destroy() owns the manager's lifecycle; narrow the manager close catch from Exception to RuntimeException. - Null-defensive finally guards for accessTimeoutTarget / accessTimeoutTask. - Validate numeric setters (max retries / sizes / connections / backoffs) with IllegalArgumentException. - Log silent 200-with-empty-entity at debug; key=value formatting for new log statements; remove dead consumeQuietly method. - Strengthen test_retryOn429_respectsRetryAfter and the disabled-override test to actually prove the behavior under test (override server hit count). - Add tests for: control-character/userinfo/opaque URI rejection, HTTPS scheme acceptance, retry exhaustion with cause, UnknownHost non-retry, post-destroy guard, destroy idempotency, setter validation, empty 200 entity, Retry-After garbage/negative fallback, and Shift_JIS round-trip. --- .../crawler/extractor/impl/ApiExtractor.java | 146 +++--- .../extractor/impl/ApiExtractorTest.java | 423 +++++++++++++++++- 2 files changed, 495 insertions(+), 74 deletions(-) diff --git a/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java b/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java index 3b588137..b60fe276 100644 --- a/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java +++ b/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java @@ -15,10 +15,15 @@ */ package org.codelibs.fess.crawler.extractor.impl; +import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Date; @@ -27,6 +32,9 @@ import java.util.Map; import java.util.concurrent.ThreadLocalRandom; +import javax.net.ssl.SSLHandshakeException; +import javax.net.ssl.SSLPeerUnverifiedException; + import org.apache.commons.io.IOUtils; import org.apache.commons.io.input.BoundedInputStream; import org.apache.commons.io.output.DeferredFileOutputStream; @@ -34,7 +42,6 @@ import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; -import org.apache.http.NoHttpResponseException; import org.apache.http.auth.AuthScheme; import org.apache.http.auth.AuthSchemeProvider; import org.apache.http.auth.AuthScope; @@ -46,7 +53,7 @@ import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.client.utils.DateUtils; import org.apache.http.config.RegistryBuilder; -import org.apache.http.conn.ConnectTimeoutException; +import org.apache.http.entity.ContentType; import org.apache.http.entity.mime.HttpMultipartMode; import org.apache.http.entity.mime.MultipartEntityBuilder; import org.apache.http.impl.client.BasicAuthCache; @@ -263,6 +270,10 @@ public void init() { connectionManager.setMaxTotal(maxConnections); connectionManager.setDefaultMaxPerRoute(maxConnectionsPerRoute); httpClientBuilder.setConnectionManager(connectionManager); + // We manage the connection manager's lifecycle in destroy(), so prevent the client from + // shutting it down on close(). This makes destroy ordering explicit and avoids the + // double-close path that previously required a broad catch. + httpClientBuilder.setConnectionManagerShared(true); // Disable the built-in retry handler; we implement our own classification below. httpClientBuilder.disableAutomaticRetries(); @@ -300,7 +311,7 @@ public void destroy() { if (connectionManager != null) { try { connectionManager.close(); - } catch (final Exception e) { + } catch (final RuntimeException e) { logger.warn("Failed to close connection manager for API extractor", e); } finally { connectionManager = null; @@ -324,12 +335,12 @@ protected String resolveTargetUrl(final Map params) { } if (!allowExtractorUrlOverride) { if (logger.isDebugEnabled()) { - logger.debug("Ignoring {} param because allowExtractorUrlOverride is disabled.", PARAM_EXTRACTOR_URL); + logger.debug("Ignoring param=[{}] because allowExtractorUrlOverride is disabled.", PARAM_EXTRACTOR_URL); } return url; } if (!isAllowedOverrideScheme(override)) { - logger.warn("Ignoring {} param with disallowed scheme: {}", PARAM_EXTRACTOR_URL, override); + logger.warn("Ignoring param=[{}] with disallowed scheme: override={}", PARAM_EXTRACTOR_URL, override); return url; } if (logger.isInfoEnabled()) { @@ -339,10 +350,29 @@ protected String resolveTargetUrl(final Map params) { } private static boolean isAllowedOverrideScheme(final String candidate) { + if (candidate == null) { + return false; + } + for (int i = 0; i < candidate.length(); i++) { + final char c = candidate.charAt(i); + if (c <= 0x20 || c == 0x7F) { + return false; + } + } try { - final String scheme = java.net.URI.create(candidate).getScheme(); - return "http".equalsIgnoreCase(scheme) || "https".equalsIgnoreCase(scheme); - } catch (final IllegalArgumentException e) { + final URI uri = new URI(candidate); + final String scheme = uri.getScheme(); + if (!"http".equalsIgnoreCase(scheme) && !"https".equalsIgnoreCase(scheme)) { + return false; + } + if (uri.getHost() == null || uri.getHost().isEmpty()) { + return false; + } + if (uri.getUserInfo() != null) { + return false; + } + return true; + } catch (final URISyntaxException e) { return false; } } @@ -361,7 +391,7 @@ public ExtractData getText(final InputStream in, final Map param final String targetUrl = resolveTargetUrl(params); if (logger.isDebugEnabled()) { - logger.debug("Accessing {}", targetUrl); + logger.debug("Accessing url={}", targetUrl); } // start @@ -381,11 +411,11 @@ public ExtractData getText(final InputStream in, final Map param // or malformed source document cannot OOM the extractor. return executeWithRetries(in, targetUrl); } finally { - if (accessTimeout != null) { + if (accessTimeoutTarget != null) { accessTimeoutTarget.stop(); - if (!accessTimeoutTask.isCanceled()) { - accessTimeoutTask.cancel(); - } + } + if (accessTimeoutTask != null && !accessTimeoutTask.isCanceled()) { + accessTimeoutTask.cancel(); } } } @@ -441,44 +471,26 @@ private ExtractData runRetryLoop(final DeferredFileOutputStream dfos, final Stri } catch (final RetryableStatusException e) { final long sleepMs = computeBackoff(attempt, e.retryAfterMs); if (attempt >= maxRetries) { - throw new ExtractException("API request failed after " + (attempt + 1) + " attempts: status=" + e.statusCode); + throw new ExtractException("API request failed after " + (attempt + 1) + " attempts: status=" + e.statusCode, e); } if (logger.isDebugEnabled()) { logger.debug("Retrying API request (status={}, attempt={}/{}, sleep={}ms)", e.statusCode, attempt + 1, maxRetries + 1, sleepMs); } sleepQuietly(sleepMs); - } catch (final ConnectTimeoutException e) { - lastIoException = e; - if (attempt >= maxRetries) { - throw new ExtractException("API request failed", e); - } - final long sleepMs = computeBackoff(attempt, -1L); - if (logger.isDebugEnabled()) { - logger.debug("Retrying API request after connect timeout (attempt={}/{}, sleep={}ms)", attempt + 1, maxRetries + 1, - sleepMs, e); - } - sleepQuietly(sleepMs); - } catch (final NoHttpResponseException e) { - lastIoException = e; - if (attempt >= maxRetries) { - throw new ExtractException("API request failed", e); - } - final long sleepMs = computeBackoff(attempt, -1L); - if (logger.isDebugEnabled()) { - logger.debug("Retrying API request after no-response (attempt={}/{}, sleep={}ms)", attempt + 1, maxRetries + 1, sleepMs, - e); - } - sleepQuietly(sleepMs); } catch (final IOException e) { + // M5: skip retries for non-transient errors — retrying will not help. + if (e instanceof UnknownHostException || e instanceof SSLHandshakeException || e instanceof SSLPeerUnverifiedException) { + throw new ExtractException("API request failed (non-transient): url=" + targetUrl, e); + } lastIoException = e; if (attempt >= maxRetries) { throw new ExtractException("API request failed", e); } final long sleepMs = computeBackoff(attempt, -1L); if (logger.isDebugEnabled()) { - logger.debug("Retrying API request after I/O error (attempt={}/{}, sleep={}ms)", attempt + 1, maxRetries + 1, sleepMs, - e); + logger.debug("Retrying API request after {} (attempt={}/{}, sleep={}ms)", e.getClass().getSimpleName(), attempt + 1, + maxRetries + 1, sleepMs, e); } sleepQuietly(sleepMs); } @@ -504,6 +516,10 @@ private ExtractData runRetryLoop(final DeferredFileOutputStream dfos, final Stri * @throws IOException on transport-level failure */ protected ExtractData executeOnce(final DeferredFileOutputStream dfos, final String targetUrl, final int attempt) throws IOException { + final CloseableHttpClient client = httpClient; + if (client == null) { + throw new ExtractException("ApiExtractor has been destroyed"); + } if (attempt > 0 && logger.isDebugEnabled()) { logger.debug("Executing API request (url={}, attempt={})", targetUrl, attempt); } @@ -512,10 +528,10 @@ protected ExtractData executeOnce(final DeferredFileOutputStream dfos, final Str final HttpEntity postEntity = MultipartEntityBuilder.create() .setMode(HttpMultipartMode.BROWSER_COMPATIBLE) .setCharset(Charset.forName("UTF-8")) - .addBinaryBody("filedata", bodyStream, org.apache.http.entity.ContentType.APPLICATION_OCTET_STREAM, "filedata") + .addBinaryBody("filedata", bodyStream, ContentType.APPLICATION_OCTET_STREAM, "filedata") .build(); httpPost.setEntity(postEntity); - try (CloseableHttpResponse response = httpClient.execute(httpPost)) { + try (CloseableHttpResponse response = client.execute(httpPost)) { final int statusCode = response.getStatusLine().getStatusCode(); if (statusCode != Constants.OK_STATUS_CODE) { if (isRetryableStatus(statusCode)) { @@ -545,6 +561,9 @@ protected ExtractData executeOnce(final DeferredFileOutputStream dfos, final Str data.setContent(new String(body, charset)); } } else { + if (logger.isDebugEnabled()) { + logger.debug("API extractor returned 200 with no response entity: url={}", targetUrl); + } data.setContent(""); } final Header[] headers = response.getAllHeaders(); @@ -560,7 +579,7 @@ private InputStream openSpooledBody(final DeferredFileOutputStream dfos) throws if (dfos.isInMemory()) { return new ByteArrayInputStream(dfos.getData()); } - return new java.io.BufferedInputStream(new java.io.FileInputStream(dfos.getFile())); + return new BufferedInputStream(new FileInputStream(dfos.getFile())); } /** @@ -658,26 +677,11 @@ protected void sleepQuietly(final long millis) { } } - /** - * Drains the response entity, ignoring errors. Safe even when the entity is null. - * - * @param entity entity to consume (may be null) - */ - protected void consumeQuietly(final HttpEntity entity) { - if (entity == null) { - return; - } - try { - org.apache.http.util.EntityUtils.consumeQuietly(entity); - } catch (final Exception e) { - // ignore - } - } - /** * Drains the response entity content with a hard upper bound of {@link #maxResponseSize} * bytes so a hostile server cannot trickle out an unbounded body purely to keep us reading. - * Errors are intentionally swallowed: the goal is just to make the connection reusable. + * If the server sent more than the bound, the under-consumed entity causes HC to discard the + * underlying connection — that is the trade-off for the cap. Errors are intentionally swallowed. * * @param entity entity to drain (may be null) */ @@ -704,7 +708,7 @@ protected void drainBounded(final HttpEntity entity) { */ protected Charset resolveCharset(final HttpEntity entity) { try { - final org.apache.http.entity.ContentType contentType = org.apache.http.entity.ContentType.get(entity); + final ContentType contentType = ContentType.get(entity); if (contentType != null && contentType.getCharset() != null) { return contentType.getCharset(); } @@ -814,6 +818,9 @@ public void setAccessTimeout(final Integer accessTimeout) { * @param maxConnections maximum total pooled connections */ public void setMaxConnections(final int maxConnections) { + if (maxConnections <= 0) { + throw new IllegalArgumentException("maxConnections must be > 0: maxConnections=" + maxConnections); + } this.maxConnections = maxConnections; } @@ -822,6 +829,9 @@ public void setMaxConnections(final int maxConnections) { * @param maxConnectionsPerRoute maximum pooled connections per route */ public void setMaxConnectionsPerRoute(final int maxConnectionsPerRoute) { + if (maxConnectionsPerRoute <= 0) { + throw new IllegalArgumentException("maxConnectionsPerRoute must be > 0: maxConnectionsPerRoute=" + maxConnectionsPerRoute); + } this.maxConnectionsPerRoute = maxConnectionsPerRoute; } @@ -831,6 +841,9 @@ public void setMaxConnectionsPerRoute(final int maxConnectionsPerRoute) { * @param maxResponseSize maximum accepted response size in bytes */ public void setMaxResponseSize(final long maxResponseSize) { + if (maxResponseSize < 0) { + throw new IllegalArgumentException("maxResponseSize must be >= 0: maxResponseSize=" + maxResponseSize); + } this.maxResponseSize = maxResponseSize; } @@ -840,6 +853,9 @@ public void setMaxResponseSize(final long maxResponseSize) { * @param maxRequestSize maximum accepted request body size in bytes */ public void setMaxRequestSize(final long maxRequestSize) { + if (maxRequestSize < 0) { + throw new IllegalArgumentException("maxRequestSize must be >= 0: maxRequestSize=" + maxRequestSize); + } this.maxRequestSize = maxRequestSize; } @@ -848,6 +864,9 @@ public void setMaxRequestSize(final long maxRequestSize) { * @param maxRetries number of retries (in addition to the initial attempt) */ public void setMaxRetries(final int maxRetries) { + if (maxRetries < 0) { + throw new IllegalArgumentException("maxRetries must be >= 0: maxRetries=" + maxRetries); + } this.maxRetries = maxRetries; } @@ -857,6 +876,9 @@ public void setMaxRetries(final int maxRetries) { * @param retryBackoffMs initial backoff in milliseconds */ public void setRetryBackoffMs(final long retryBackoffMs) { + if (retryBackoffMs < 0) { + throw new IllegalArgumentException("retryBackoffMs must be >= 0: retryBackoffMs=" + retryBackoffMs); + } this.retryBackoffMs = retryBackoffMs; } @@ -867,6 +889,9 @@ public void setRetryBackoffMs(final long retryBackoffMs) { * @param maxRetryAfterMs maximum effective Retry-After value in milliseconds */ public void setMaxRetryAfterMs(final long maxRetryAfterMs) { + if (maxRetryAfterMs < 0) { + throw new IllegalArgumentException("maxRetryAfterMs must be >= 0: maxRetryAfterMs=" + maxRetryAfterMs); + } this.maxRetryAfterMs = maxRetryAfterMs; } @@ -877,6 +902,9 @@ public void setMaxRetryAfterMs(final long maxRetryAfterMs) { * @param requestSpoolThreshold byte threshold for memory-to-file spillover */ public void setRequestSpoolThreshold(final int requestSpoolThreshold) { + if (requestSpoolThreshold < 0) { + throw new IllegalArgumentException("requestSpoolThreshold must be >= 0: requestSpoolThreshold=" + requestSpoolThreshold); + } this.requestSpoolThreshold = requestSpoolThreshold; } diff --git a/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java b/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java index 1d8144e8..66d11615 100644 --- a/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java +++ b/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java @@ -131,19 +131,30 @@ public void test_extractorUrlOverride() throws Exception { @Test public void test_extractorUrlOverride_disabledByDefault_isIgnored() throws Exception { - // With allowExtractorUrlOverride defaulting to false, the params override must be ignored - // and the configured (working) URL must be used regardless of the bogus override. - final ApiExtractor extractor = new ApiExtractor(); - extractor.setUrl("http://127.0.0.1:" + port + "/"); - extractor.init(); + final SimpleHttpServer overrideServer = new SimpleHttpServer(); + final AtomicInteger overrideHits = new AtomicInteger(); + overrideServer.setHandler(exchange -> { + overrideHits.incrementAndGet(); + drain(exchange.getRequestBody()); + exchange.sendResponseHeaders(200, -1); + exchange.close(); + }); + overrideServer.start(); try { - final Map params = new HashMap<>(); - params.put(ApiExtractor.PARAM_EXTRACTOR_URL, "http://127.0.0.1:1/should-be-ignored"); - final ExtractData text = extractor.getText(new ByteArrayInputStream("def".getBytes()), params); - // Successful extraction proves the configured URL was used, not the unreachable override. - assertEquals(ATTR_NAME + ",def", text.getContent()); + final ApiExtractor extractor = new ApiExtractor(); + extractor.setUrl("http://127.0.0.1:" + port + "/"); + extractor.init(); + try { + final Map params = new HashMap<>(); + params.put(ApiExtractor.PARAM_EXTRACTOR_URL, "http://127.0.0.1:" + overrideServer.port() + "/"); + final ExtractData text = extractor.getText(new ByteArrayInputStream("def".getBytes()), params); + assertEquals(ATTR_NAME + ",def", text.getContent()); + assertEquals(0, overrideHits.get()); + } finally { + extractor.destroy(); + } } finally { - extractor.destroy(); + overrideServer.stop(); } } @@ -309,7 +320,8 @@ public void test_retryOn429_respectsRetryAfter() throws Exception { final ApiExtractor retrying = new ApiExtractor(); retrying.setUrl("http://127.0.0.1:" + simple.port() + "/"); retrying.setMaxRetries(1); - retrying.setRetryBackoffMs(10L); + // Large default backoff: if Retry-After is ignored, retry would take ~5s. + retrying.setRetryBackoffMs(5000L); retrying.init(); try { final long t0 = System.currentTimeMillis(); @@ -318,9 +330,9 @@ public void test_retryOn429_respectsRetryAfter() throws Exception { assertNotNull(data); assertEquals("ok", data.getContent()); assertEquals(2, calls.get()); - // Retry-After: 0 means we should not artificially delay. - org.junit.jupiter.api.Assertions.assertTrue(elapsed < 2000, - "retry should not block when Retry-After=0 (was " + elapsed + "ms)"); + // Retry-After: 0 must beat the 5s default backoff. + org.junit.jupiter.api.Assertions.assertTrue(elapsed < 2500L, + "Retry-After:0 must override default backoff (elapsed=" + elapsed + "ms)"); } finally { retrying.destroy(); } @@ -784,6 +796,387 @@ public void handle(final HttpExchange exchange) throws IOException { } } + @Test + public void test_overrideRejects_ControlCharacters() throws Exception { + final SimpleHttpServer overrideServer = new SimpleHttpServer(); + final AtomicInteger overrideHits = new AtomicInteger(); + overrideServer.setHandler(exchange -> { + overrideHits.incrementAndGet(); + drain(exchange.getRequestBody()); + exchange.sendResponseHeaders(200, -1); + exchange.close(); + }); + overrideServer.start(); + try { + final ApiExtractor extractor = new ApiExtractor(); + extractor.setUrl("http://127.0.0.1:" + port + "/"); + extractor.setAllowExtractorUrlOverride(true); + extractor.init(); + try { + // URL with embedded newline must be rejected even when override is enabled. + final Map params = new HashMap<>(); + params.put(ApiExtractor.PARAM_EXTRACTOR_URL, "http://127.0.0.1:" + overrideServer.port() + "/\r\nX-Injected: yes"); + final ExtractData text = extractor.getText(new ByteArrayInputStream("ctl".getBytes()), params); + assertEquals(ATTR_NAME + ",ctl", text.getContent()); + assertEquals(0, overrideHits.get()); + } finally { + extractor.destroy(); + } + } finally { + overrideServer.stop(); + } + } + + @Test + public void test_overrideRejects_Userinfo() throws Exception { + final SimpleHttpServer overrideServer = new SimpleHttpServer(); + final AtomicInteger overrideHits = new AtomicInteger(); + overrideServer.setHandler(exchange -> { + overrideHits.incrementAndGet(); + drain(exchange.getRequestBody()); + exchange.sendResponseHeaders(200, -1); + exchange.close(); + }); + overrideServer.start(); + try { + final ApiExtractor extractor = new ApiExtractor(); + extractor.setUrl("http://127.0.0.1:" + port + "/"); + extractor.setAllowExtractorUrlOverride(true); + extractor.init(); + try { + final Map params = new HashMap<>(); + params.put(ApiExtractor.PARAM_EXTRACTOR_URL, "http://user:pass@127.0.0.1:" + overrideServer.port() + "/"); + final ExtractData text = extractor.getText(new ByteArrayInputStream("ui".getBytes()), params); + assertEquals(ATTR_NAME + ",ui", text.getContent()); + assertEquals(0, overrideHits.get()); + } finally { + extractor.destroy(); + } + } finally { + overrideServer.stop(); + } + } + + @Test + public void test_overrideRejects_OpaqueUri() throws Exception { + final ApiExtractor extractor = new ApiExtractor(); + extractor.setUrl("http://127.0.0.1:" + port + "/"); + extractor.setAllowExtractorUrlOverride(true); + extractor.init(); + try { + final Map params = new HashMap<>(); + params.put(ApiExtractor.PARAM_EXTRACTOR_URL, "http:opaque"); + final ExtractData text = extractor.getText(new ByteArrayInputStream("op".getBytes()), params); + assertEquals(ATTR_NAME + ",op", text.getContent()); + } finally { + extractor.destroy(); + } + } + + @Test + public void test_overrideAccepts_HttpsScheme() throws Exception { + // We can't easily stand up an HTTPS server in the test, so just confirm that the SCHEME + // gate accepts https by trying to call an unreachable https URL. The call should fail + // attempting to connect (not be silently rejected as disallowed scheme — that would + // succeed against the configured URL). + final ApiExtractor extractor = new ApiExtractor(); + extractor.setUrl("http://127.0.0.1:" + port + "/"); + extractor.setAllowExtractorUrlOverride(true); + extractor.setMaxRetries(0); + extractor.setConnectionTimeout(300); + extractor.setSoTimeout(300); + extractor.init(); + try { + final Map params = new HashMap<>(); + // Port 1 is reserved; connection refused is expected. + params.put(ApiExtractor.PARAM_EXTRACTOR_URL, "https://127.0.0.1:1/blackhole"); + try { + extractor.getText(new ByteArrayInputStream("hs".getBytes()), params); + org.junit.jupiter.api.Assertions.fail( + "Override to unreachable https should have raised ExtractException, not silently fallen back to configured URL."); + } catch (final ExtractException expected) { + // expected: the override scheme passes, so we tried to connect and failed. + } + } finally { + extractor.destroy(); + } + } + + @Test + public void test_retryExhausted_5xx_throwsWithCauseAndAttemptCount() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + final AtomicInteger calls = new AtomicInteger(); + simple.setHandler(exchange -> { + calls.incrementAndGet(); + drain(exchange.getRequestBody()); + exchange.sendResponseHeaders(503, -1); + exchange.close(); + }); + simple.start(); + try { + final ApiExtractor retrying = new ApiExtractor(); + retrying.setUrl("http://127.0.0.1:" + simple.port() + "/"); + retrying.setMaxRetries(2); + retrying.setRetryBackoffMs(10L); + retrying.init(); + try { + retrying.getText(new ByteArrayInputStream("x".getBytes()), new HashMap<>()); + org.junit.jupiter.api.Assertions.fail("Expected ExtractException after 3 failed attempts"); + } catch (final ExtractException e) { + org.junit.jupiter.api.Assertions.assertTrue(e.getMessage().contains("3 attempts"), + "message should contain attempt count: " + e.getMessage()); + org.junit.jupiter.api.Assertions.assertTrue(e.getMessage().contains("status=503"), + "message should contain status code: " + e.getMessage()); + org.junit.jupiter.api.Assertions.assertNotNull(e.getCause(), "exhaustion ExtractException must carry a cause"); + assertEquals(3, calls.get()); + } finally { + retrying.destroy(); + } + } finally { + simple.stop(); + } + } + + @Test + public void test_unknownHost_notRetried() throws Exception { + final ApiExtractor extractor = new ApiExtractor(); + // .invalid is reserved per RFC 2606 — should always be unresolvable. + extractor.setUrl("http://this-host-must-not-resolve.invalid/"); + extractor.setMaxRetries(5); + extractor.setRetryBackoffMs(1000L); + extractor.setConnectionTimeout(2000); + extractor.init(); + try { + final long t0 = System.currentTimeMillis(); + try { + extractor.getText(new ByteArrayInputStream("uh".getBytes()), new HashMap<>()); + org.junit.jupiter.api.Assertions.fail("Expected ExtractException for unknown host"); + } catch (final ExtractException e) { + final long elapsed = System.currentTimeMillis() - t0; + org.junit.jupiter.api.Assertions.assertTrue(elapsed < 3000L, + "UnknownHost should not be retried (elapsed=" + elapsed + "ms)"); + org.junit.jupiter.api.Assertions.assertTrue(e.getMessage().contains("non-transient"), + "message should mark error as non-transient: " + e.getMessage()); + } + } finally { + extractor.destroy(); + } + } + + @Test + public void test_getTextAfterDestroy_throwsExtractException() throws Exception { + final ApiExtractor extractor = new ApiExtractor(); + extractor.setUrl("http://127.0.0.1:" + port + "/"); + extractor.init(); + extractor.destroy(); + try { + extractor.getText(new ByteArrayInputStream("post-destroy".getBytes()), new HashMap<>()); + org.junit.jupiter.api.Assertions.fail("Expected ExtractException after destroy()"); + } catch (final ExtractException e) { + org.junit.jupiter.api.Assertions.assertTrue(e.getMessage().contains("destroyed"), + "message should explain why: " + e.getMessage()); + } + } + + @Test + public void test_destroyIsIdempotent() throws Exception { + final ApiExtractor extractor = new ApiExtractor(); + extractor.setUrl("http://127.0.0.1:" + port + "/"); + extractor.init(); + extractor.destroy(); + extractor.destroy(); // must not throw + } + + @Test + public void test_setterValidation_rejectsNegative() throws Exception { + final ApiExtractor extractor = new ApiExtractor(); + try { + extractor.setMaxRetries(-1); + fail(); + } catch (final IllegalArgumentException expected) { + // ok + } + try { + extractor.setMaxResponseSize(-1L); + fail(); + } catch (final IllegalArgumentException expected) { + // ok + } + try { + extractor.setMaxRequestSize(-1L); + fail(); + } catch (final IllegalArgumentException expected) { + // ok + } + try { + extractor.setMaxConnections(0); + fail(); + } catch (final IllegalArgumentException expected) { + // ok + } + try { + extractor.setMaxConnectionsPerRoute(0); + fail(); + } catch (final IllegalArgumentException expected) { + // ok + } + try { + extractor.setRetryBackoffMs(-1L); + fail(); + } catch (final IllegalArgumentException expected) { + // ok + } + try { + extractor.setMaxRetryAfterMs(-1L); + fail(); + } catch (final IllegalArgumentException expected) { + // ok + } + try { + extractor.setRequestSpoolThreshold(-1); + fail(); + } catch (final IllegalArgumentException expected) { + // ok + } + } + + @Test + public void test_responseBody_emptyEntity_returnsEmptyContent() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + simple.setHandler(exchange -> { + drain(exchange.getRequestBody()); + // Status 200 with explicit zero-length body — exercises the entity-present-but-empty path. + exchange.sendResponseHeaders(200, -1); + exchange.close(); + }); + simple.start(); + try { + final ApiExtractor extractor = new ApiExtractor(); + extractor.setUrl("http://127.0.0.1:" + simple.port() + "/"); + extractor.setMaxRetries(0); + extractor.init(); + try { + final ExtractData data = extractor.getText(new ByteArrayInputStream("x".getBytes()), new HashMap<>()); + assertNotNull(data); + assertEquals("", data.getContent()); + } finally { + extractor.destroy(); + } + } finally { + simple.stop(); + } + } + + @Test + public void test_parseRetryAfter_garbageFallsBackToDefaultBackoff() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + final AtomicInteger calls = new AtomicInteger(); + simple.setHandler(exchange -> { + drain(exchange.getRequestBody()); + final int n = calls.incrementAndGet(); + if (n == 1) { + // Garbage Retry-After: not a number, not an HTTP-date. + exchange.getResponseHeaders().add("Retry-After", "tomorrow-please"); + exchange.sendResponseHeaders(503, -1); + exchange.close(); + } else { + final byte[] body = "ok".getBytes(StandardCharsets.UTF_8); + exchange.sendResponseHeaders(200, body.length); + try (OutputStream out = exchange.getResponseBody()) { + out.write(body); + } + } + }); + simple.start(); + try { + final ApiExtractor retrying = new ApiExtractor(); + retrying.setUrl("http://127.0.0.1:" + simple.port() + "/"); + retrying.setMaxRetries(1); + retrying.setRetryBackoffMs(10L); + retrying.init(); + try { + final ExtractData data = retrying.getText(new ByteArrayInputStream("x".getBytes()), new HashMap<>()); + assertNotNull(data); + assertEquals("ok", data.getContent()); + assertEquals(2, calls.get()); + } finally { + retrying.destroy(); + } + } finally { + simple.stop(); + } + } + + @Test + public void test_parseRetryAfter_negativeFallsBackToDefaultBackoff() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + final AtomicInteger calls = new AtomicInteger(); + simple.setHandler(exchange -> { + drain(exchange.getRequestBody()); + final int n = calls.incrementAndGet(); + if (n == 1) { + exchange.getResponseHeaders().add("Retry-After", "-5"); + exchange.sendResponseHeaders(503, -1); + exchange.close(); + } else { + final byte[] body = "ok".getBytes(StandardCharsets.UTF_8); + exchange.sendResponseHeaders(200, body.length); + try (OutputStream out = exchange.getResponseBody()) { + out.write(body); + } + } + }); + simple.start(); + try { + final ApiExtractor retrying = new ApiExtractor(); + retrying.setUrl("http://127.0.0.1:" + simple.port() + "/"); + retrying.setMaxRetries(1); + retrying.setRetryBackoffMs(10L); + retrying.init(); + try { + final ExtractData data = retrying.getText(new ByteArrayInputStream("x".getBytes()), new HashMap<>()); + assertNotNull(data); + assertEquals("ok", data.getContent()); + assertEquals(2, calls.get()); + } finally { + retrying.destroy(); + } + } finally { + simple.stop(); + } + } + + @Test + public void test_resolveCharset_shiftJis() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + final String greeting = "こんにちは"; + final byte[] sjis = greeting.getBytes(java.nio.charset.Charset.forName("Shift_JIS")); + simple.setHandler(exchange -> { + drain(exchange.getRequestBody()); + exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=Shift_JIS"); + exchange.sendResponseHeaders(200, sjis.length); + try (OutputStream out = exchange.getResponseBody()) { + out.write(sjis); + } + }); + simple.start(); + try { + final ApiExtractor extractor = new ApiExtractor(); + extractor.setUrl("http://127.0.0.1:" + simple.port() + "/"); + extractor.setMaxRetries(0); + extractor.init(); + try { + final ExtractData data = extractor.getText(new ByteArrayInputStream("x".getBytes()), new HashMap<>()); + assertNotNull(data); + assertEquals(greeting, data.getContent()); + } finally { + extractor.destroy(); + } + } finally { + simple.stop(); + } + } + private static void drain(final InputStream in) throws IOException { final byte[] buf = new byte[4096]; while (in.read(buf) >= 0) {