diff --git a/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java b/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java index 04b8d004..b60fe276 100644 --- a/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java +++ b/fess-crawler/src/main/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractor.java @@ -15,14 +15,30 @@ */ package org.codelibs.fess.crawler.extractor.impl; +import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import javax.net.ssl.SSLHandshakeException; +import javax.net.ssl.SSLPeerUnverifiedException; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.input.BoundedInputStream; +import org.apache.commons.io.output.DeferredFileOutputStream; +import org.apache.commons.lang3.SystemUtils; import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; @@ -35,20 +51,23 @@ import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.client.utils.DateUtils; import org.apache.http.config.RegistryBuilder; +import org.apache.http.entity.ContentType; import org.apache.http.entity.mime.HttpMultipartMode; import org.apache.http.entity.mime.MultipartEntityBuilder; import org.apache.http.impl.client.BasicAuthCache; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.message.BasicHeader; -import org.apache.http.util.EntityUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.codelibs.core.beans.BeanDesc; import org.codelibs.core.beans.PropertyDesc; import org.codelibs.core.beans.factory.BeanDescFactory; +import org.codelibs.core.io.FileUtil; import org.codelibs.core.lang.StringUtil; import org.codelibs.core.timer.TimeoutManager; import org.codelibs.core.timer.TimeoutTask; @@ -59,8 +78,6 @@ import org.codelibs.fess.crawler.entity.ExtractData; import org.codelibs.fess.crawler.exception.ExtractException; -import com.google.common.base.Charsets; - import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; @@ -71,6 +88,9 @@ public class ApiExtractor extends AbstractExtractor { private static final Logger logger = LogManager.getLogger(ApiExtractor.class); + /** Parameter key that, if present, overrides the configured URL for a single request. */ + public static final String PARAM_EXTRACTOR_URL = "extractorUrl"; + /** The URL of the API endpoint. */ protected String url; @@ -78,7 +98,7 @@ public class ApiExtractor extends AbstractExtractor { protected Integer accessTimeout; // sec /** The HTTP client used for API calls. */ - protected CloseableHttpClient httpClient; + protected volatile CloseableHttpClient httpClient; /** The connection timeout in milliseconds. */ protected Integer connectionTimeout; @@ -86,6 +106,56 @@ public class ApiExtractor extends AbstractExtractor { /** The socket timeout in milliseconds. */ protected Integer soTimeout; + /** + * Maximum time in milliseconds to wait for a connection to be checked out from the pool. + * Without this bound, requests block indefinitely once {@link #maxConnections} or + * {@link #maxConnectionsPerRoute} is exhausted by slow upstream responses, defeating the + * point of the connect/socket timeouts above. + */ + protected Integer connectionRequestTimeout; + + /** Maximum total connections in the pool. */ + protected int maxConnections = 50; + + /** Maximum connections per route in the pool. */ + protected int maxConnectionsPerRoute = 10; + + /** Maximum size in bytes accepted for an API response body. Default is 100 MiB. */ + protected long maxResponseSize = 100L * 1024L * 1024L; + + /** Maximum size in bytes accepted for an outgoing request body. Default is 100 MiB. */ + protected long maxRequestSize = 100L * 1024L * 1024L; + + /** + * Threshold in bytes at which the request body spills from memory to a temp file. Bodies + * smaller than this stay in a {@code byte[]}; larger ones are written to a temp file via + * {@link DeferredFileOutputStream} so concurrent extractors don't pile up multi-MiB arrays + * on the heap. + */ + protected int requestSpoolThreshold = 1024 * 1024; + + /** Maximum number of retry attempts on transient failures. */ + protected int maxRetries = 2; + + /** Initial backoff in milliseconds between retries (doubled per attempt). */ + protected long retryBackoffMs = 500L; + + /** + * Hard upper bound in milliseconds applied to a server-supplied {@code Retry-After} value. + * Without this cap, a hostile or misbehaving server could stall an extractor thread for + * arbitrary time (e.g. {@code Retry-After: 86400}). Default 60 seconds. + */ + protected long maxRetryAfterMs = 60_000L; + + /** + * When {@code true}, an {@link #PARAM_EXTRACTOR_URL} entry in the per-call params Map overrides + * the configured {@link #url}. Defaults to {@code false} so callers cannot redirect the extractor + * to an arbitrary endpoint unless the operator explicitly enables it. When enabling this, the + * caller is responsible for ensuring that the params Map cannot be populated from untrusted + * data — otherwise this becomes an SSRF gadget against internal hosts. + */ + protected boolean allowExtractorUrlOverride = false; + /** The map of authentication scheme providers. */ protected Map authSchemeProviderMap; @@ -107,6 +177,9 @@ public class ApiExtractor extends AbstractExtractor { /** List of request headers */ private final List
requestHeaderList = new ArrayList<>(); + /** Pooled connection manager backing the HTTP client. */ + protected PoolingHttpClientConnectionManager connectionManager; + /** * Constructs a new ApiExtractor. */ @@ -133,10 +206,25 @@ public void init() { if (connectionTimeoutParam != null) { requestConfigBuilder.setConnectTimeout(connectionTimeoutParam); + } else { + // sane default to avoid hanging on dead servers + requestConfigBuilder.setConnectTimeout(5000); } final Integer soTimeoutParam = soTimeout; if (soTimeoutParam != null) { requestConfigBuilder.setSocketTimeout(soTimeoutParam); + } else { + // sane default to mitigate slow loris servers + requestConfigBuilder.setSocketTimeout(30000); + } + final Integer connectionRequestTimeoutParam = connectionRequestTimeout; + if (connectionRequestTimeoutParam != null) { + requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeoutParam); + } else { + // Without an explicit cap HC4 defaults to -1 (wait forever) — once the pool is full + // of slow requests, every new caller blocks indefinitely. Default to the same 5s used + // for connect timeout so socket exhaustion is visible to the caller, not silent. + requestConfigBuilder.setConnectionRequestTimeout(5000); } // AuthSchemeFactory @@ -177,6 +265,18 @@ public void init() { } } + // Pooled connection manager so connections are reused across calls. + connectionManager = new PoolingHttpClientConnectionManager(); + connectionManager.setMaxTotal(maxConnections); + connectionManager.setDefaultMaxPerRoute(maxConnectionsPerRoute); + httpClientBuilder.setConnectionManager(connectionManager); + // We manage the connection manager's lifecycle in destroy(), so prevent the client from + // shutting it down on close(). This makes destroy ordering explicit and avoids the + // double-close path that previously required a broad catch. + httpClientBuilder.setConnectionManagerShared(true); + // Disable the built-in retry handler; we implement our own classification below. + httpClientBuilder.disableAutomaticRetries(); + final CloseableHttpClient closeableHttpClient = httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build()).build(); if (!httpClientPropertyMap.isEmpty()) { final BeanDesc beanDesc = BeanDescFactory.getBeanDesc(closeableHttpClient.getClass()); @@ -204,22 +304,94 @@ public void destroy() { httpClient.close(); } catch (final IOException e) { logger.warn("Failed to close HTTP client for API extractor", e); + } finally { + httpClient = null; + } + } + if (connectionManager != null) { + try { + connectionManager.close(); + } catch (final RuntimeException e) { + logger.warn("Failed to close connection manager for API extractor", e); + } finally { + connectionManager = null; } } } + /** + * Resolves the URL the extractor should POST to for this call. Falls back to the configured + * {@link #url} unless the operator has set {@link #allowExtractorUrlOverride} to {@code true} + * and the params Map carries a non-blank {@link #PARAM_EXTRACTOR_URL} entry whose + * scheme is {@code http} or {@code https}. Disallowed override values are logged and ignored. + */ + protected String resolveTargetUrl(final Map params) { + if (params == null) { + return url; + } + final String override = params.get(PARAM_EXTRACTOR_URL); + if (StringUtil.isBlank(override)) { + return url; + } + if (!allowExtractorUrlOverride) { + if (logger.isDebugEnabled()) { + logger.debug("Ignoring param=[{}] because allowExtractorUrlOverride is disabled.", PARAM_EXTRACTOR_URL); + } + return url; + } + if (!isAllowedOverrideScheme(override)) { + logger.warn("Ignoring param=[{}] with disallowed scheme: override={}", PARAM_EXTRACTOR_URL, override); + return url; + } + if (logger.isInfoEnabled()) { + logger.info("Overriding extractor URL: configured={} override={}", url, override); + } + return override; + } + + private static boolean isAllowedOverrideScheme(final String candidate) { + if (candidate == null) { + return false; + } + for (int i = 0; i < candidate.length(); i++) { + final char c = candidate.charAt(i); + if (c <= 0x20 || c == 0x7F) { + return false; + } + } + try { + final URI uri = new URI(candidate); + final String scheme = uri.getScheme(); + if (!"http".equalsIgnoreCase(scheme) && !"https".equalsIgnoreCase(scheme)) { + return false; + } + if (uri.getHost() == null || uri.getHost().isEmpty()) { + return false; + } + if (uri.getUserInfo() != null) { + return false; + } + return true; + } catch (final URISyntaxException e) { + return false; + } + } + /** * Extracts text from the input stream using the API endpoint. * * @param in the input stream to extract text from - * @param params additional parameters + * @param params additional parameters; an {@code extractorUrl} entry overrides the configured URL + * only when {@link #allowExtractorUrlOverride} is {@code true} (default {@code false}) * @return the extracted data * @throws ExtractException if extraction fails */ @Override public ExtractData getText(final InputStream in, final Map params) { + final String targetUrl = resolveTargetUrl(params); + if (logger.isDebugEnabled()) { - logger.debug("Accessing {}", url); + logger.debug("Accessing url={}", targetUrl); } // start @@ -230,38 +402,333 @@ public ExtractData getText(final InputStream in, final Map param accessTimeoutTask = TimeoutManager.getInstance().addTimeoutTarget(accessTimeoutTarget, accessTimeout, false); } - final ExtractData data = new ExtractData(); - final HttpPost httpPost = new HttpPost(url); - final HttpEntity postEntity = MultipartEntityBuilder.create() - .setMode(HttpMultipartMode.BROWSER_COMPATIBLE) - .setCharset(Charset.forName("UTF-8")) - .addBinaryBody("filedata", in) - .build(); - httpPost.setEntity(postEntity); - - try (CloseableHttpResponse response = httpClient.execute(httpPost)) { - if (response.getStatusLine().getStatusCode() != Constants.OK_STATUS_CODE) { - logger.warn("Failed to access API extractor endpoint: url={}, statusCode={}", url, - response.getStatusLine().getStatusCode()); - return null; + try { + // The request body is the supplied input stream; multipart body builders consume it, + // so we can only execute the post once. Retries thus only apply when the request + // can be re-issued — currently only on connection-establishment failures or + // a status-code-only retry path that doesn't consume the entity. To support retries + // we buffer the request stream into memory, bounded by maxRequestSize so a hostile + // or malformed source document cannot OOM the extractor. + return executeWithRetries(in, targetUrl); + } finally { + if (accessTimeoutTarget != null) { + accessTimeoutTarget.stop(); + } + if (accessTimeoutTask != null && !accessTimeoutTask.isCanceled()) { + accessTimeoutTask.cancel(); } + } + } - data.setContent(EntityUtils.toString(response.getEntity(), Charsets.UTF_8)); - final Header[] headers = response.getAllHeaders(); - for (final Header header : headers) { - data.putValue(header.getName(), header.getValue()); + /** + * Executes the API request with bounded retries on transient failures. + * + *

The input stream is spooled into a {@link DeferredFileOutputStream}: small bodies stay + * in memory while bigger ones spill to a temp file, so concurrent extractors do not retain + * multi-MiB arrays on the heap. The total bytes accepted is capped at {@link #maxRequestSize} + * so a hostile or oversized source cannot exhaust memory or disk.

+ * + * @param in the input stream to forward to the API endpoint + * @param targetUrl the URL to call + * @return the extracted data + */ + protected ExtractData executeWithRetries(final InputStream in, final String targetUrl) { + // Spool the input once so we can re-send it on retry. Small bodies stay in memory; larger + // ones spill to a temp file so we don't pin maxRequestSize bytes of heap per concurrent call. + File spoolFile = null; + try (DeferredFileOutputStream dfos = DeferredFileOutputStream.builder() + .setThreshold(requestSpoolThreshold) + .setPrefix("apiExtractor-") + .setSuffix(".tmp") + .setDirectory(SystemUtils.getJavaIoTmpDir()) + .get(); + BoundedInputStream bounded = BoundedInputStream.builder().setInputStream(in).setMaxCount(maxRequestSize + 1L).get()) { + final long copied = IOUtils.copyLarge(bounded, dfos); + dfos.close(); + if (copied > maxRequestSize) { + throw new ExtractException("ApiExtractor request body exceeded limit: limit=" + maxRequestSize); + } + if (!dfos.isInMemory()) { + spoolFile = dfos.getFile(); } + return runRetryLoop(dfos, targetUrl); + } catch (final ExtractException e) { + throw e; } catch (final IOException e) { - throw new ExtractException(e); + throw new ExtractException("Failed to read input stream for API extractor", e); } finally { - if (accessTimeout != null) { - accessTimeoutTarget.stop(); - if (!accessTimeoutTask.isCanceled()) { - accessTimeoutTask.cancel(); + if (spoolFile != null) { + FileUtil.deleteInBackground(spoolFile); + } + } + } + + private ExtractData runRetryLoop(final DeferredFileOutputStream dfos, final String targetUrl) { + IOException lastIoException = null; + for (int attempt = 0; attempt <= maxRetries; attempt++) { + try { + return executeOnce(dfos, targetUrl, attempt); + } catch (final RetryableStatusException e) { + final long sleepMs = computeBackoff(attempt, e.retryAfterMs); + if (attempt >= maxRetries) { + throw new ExtractException("API request failed after " + (attempt + 1) + " attempts: status=" + e.statusCode, e); + } + if (logger.isDebugEnabled()) { + logger.debug("Retrying API request (status={}, attempt={}/{}, sleep={}ms)", e.statusCode, attempt + 1, maxRetries + 1, + sleepMs); + } + sleepQuietly(sleepMs); + } catch (final IOException e) { + // M5: skip retries for non-transient errors — retrying will not help. + if (e instanceof UnknownHostException || e instanceof SSLHandshakeException || e instanceof SSLPeerUnverifiedException) { + throw new ExtractException("API request failed (non-transient): url=" + targetUrl, e); + } + lastIoException = e; + if (attempt >= maxRetries) { + throw new ExtractException("API request failed", e); + } + final long sleepMs = computeBackoff(attempt, -1L); + if (logger.isDebugEnabled()) { + logger.debug("Retrying API request after {} (attempt={}/{}, sleep={}ms)", e.getClass().getSimpleName(), attempt + 1, + maxRetries + 1, sleepMs, e); + } + sleepQuietly(sleepMs); + } + } + + // Should not reach here, but defend against it. + throw new ExtractException("API request failed", lastIoException != null ? lastIoException : new IOException("unknown error")); + } + + /** + * Executes a single API call. + * + *

The body is sourced from the spooled {@link DeferredFileOutputStream}: an in-memory copy + * is wrapped in a {@link ByteArrayInputStream}, while a spilled body is streamed from its temp + * file. The InputStream is closed before the method returns, but the underlying spool persists + * across attempts so retries can re-issue the same body without re-reading the original.

+ * + * @param dfos spooled request body produced by {@link #executeWithRetries(InputStream, String)} + * @param targetUrl URL to POST to + * @param attempt current attempt number (0-based) — included for logging + * @return the extracted data on success + * @throws RetryableStatusException when the response status is retryable + * @throws IOException on transport-level failure + */ + protected ExtractData executeOnce(final DeferredFileOutputStream dfos, final String targetUrl, final int attempt) throws IOException { + final CloseableHttpClient client = httpClient; + if (client == null) { + throw new ExtractException("ApiExtractor has been destroyed"); + } + if (attempt > 0 && logger.isDebugEnabled()) { + logger.debug("Executing API request (url={}, attempt={})", targetUrl, attempt); + } + final HttpPost httpPost = new HttpPost(targetUrl); + try (InputStream bodyStream = openSpooledBody(dfos)) { + final HttpEntity postEntity = MultipartEntityBuilder.create() + .setMode(HttpMultipartMode.BROWSER_COMPATIBLE) + .setCharset(Charset.forName("UTF-8")) + .addBinaryBody("filedata", bodyStream, ContentType.APPLICATION_OCTET_STREAM, "filedata") + .build(); + httpPost.setEntity(postEntity); + try (CloseableHttpResponse response = client.execute(httpPost)) { + final int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode != Constants.OK_STATUS_CODE) { + if (isRetryableStatus(statusCode)) { + final long retryAfterMs = parseRetryAfterMs(response.getFirstHeader("Retry-After")); + // Drain entity so the connection can be reused. Bound the drain so a hostile + // server cannot stream an unbounded body just to keep us reading. + drainBounded(response.getEntity()); + throw new RetryableStatusException(statusCode, retryAfterMs); + } + logger.warn("Failed to access API extractor endpoint: url={}, statusCode={}, attempt={}", targetUrl, statusCode, + attempt); + drainBounded(response.getEntity()); + return null; } + + final ExtractData data = new ExtractData(); + final HttpEntity entity = response.getEntity(); + if (entity != null) { + final Charset charset = resolveCharset(entity); + try (InputStream entityStream = entity.getContent(); + BoundedInputStream bounded = + BoundedInputStream.builder().setInputStream(entityStream).setMaxCount(maxResponseSize + 1L).get()) { + final byte[] body = bounded.readAllBytes(); + if (body.length > maxResponseSize) { + throw new ExtractException("ApiExtractor response exceeded limit: limit=" + maxResponseSize); + } + data.setContent(new String(body, charset)); + } + } else { + if (logger.isDebugEnabled()) { + logger.debug("API extractor returned 200 with no response entity: url={}", targetUrl); + } + data.setContent(""); + } + final Header[] headers = response.getAllHeaders(); + for (final Header header : headers) { + data.putValue(header.getName(), header.getValue()); + } + return data; } } - return data; + } + + private InputStream openSpooledBody(final DeferredFileOutputStream dfos) throws IOException { + if (dfos.isInMemory()) { + return new ByteArrayInputStream(dfos.getData()); + } + return new BufferedInputStream(new FileInputStream(dfos.getFile())); + } + + /** + * Returns true if the given status code is considered retryable. + * + * @param statusCode HTTP status code + * @return true when the request should be retried + */ + protected boolean isRetryableStatus(final int statusCode) { + if (statusCode >= 500 && statusCode <= 599) { + return true; + } + return statusCode == 408 || statusCode == 429; + } + + /** + * Parses a {@code Retry-After} header into milliseconds. Both delta-seconds and the + * HTTP-date form (RFC 7231) are honored; on parse failure {@code -1L} is returned and the + * caller should fall back to the default backoff schedule. + * + * @param header the {@code Retry-After} header (may be null) + * @return delay in milliseconds, or {@code -1L} if unspecified or invalid + */ + protected long parseRetryAfterMs(final Header header) { + if (header == null) { + return -1L; + } + final String value = header.getValue(); + if (StringUtil.isBlank(value)) { + return -1L; + } + final String trimmed = value.trim(); + // Try delta-seconds first. + try { + final long seconds = Long.parseLong(trimmed); + if (seconds < 0L) { + return -1L; + } + return seconds * 1000L; + } catch (final NumberFormatException ignore) { + // Not a numeric value — try HTTP-date form below. + } + // HTTP-date form per RFC 7231. + final Date when = DateUtils.parseDate(trimmed); + if (when == null) { + return -1L; + } + final long deltaMs = when.getTime() - System.currentTimeMillis(); + if (deltaMs <= 0L) { + return 0L; + } + return deltaMs; + } + + /** + * Computes exponential backoff for a retry attempt. A small random jitter in the range + * {@code [0, retryBackoffMs)} is added to the exponential base to avoid thundering-herd + * retry storms when many extractors hit the same upstream concurrently. Server-supplied + * {@code Retry-After} hints take precedence but are clamped to {@link #maxRetryAfterMs} + * so a hostile or misbehaving upstream cannot pin the extractor thread for arbitrary time + * (e.g. {@code Retry-After: 86400}). + * + * @param attempt 0-based attempt number + * @param retryAfterMs server-suggested delay; non-negative values take precedence + * @return sleep duration in milliseconds + */ + protected long computeBackoff(final int attempt, final long retryAfterMs) { + if (retryAfterMs >= 0L) { + return Math.min(retryAfterMs, Math.max(0L, maxRetryAfterMs)); + } + final long base = retryBackoffMs * (1L << attempt); + final long jitter = retryBackoffMs > 0L ? ThreadLocalRandom.current().nextLong(0L, retryBackoffMs) : 0L; + return base + jitter; + } + + /** + * Sleeps between retries. If the current thread is interrupted (e.g. by + * {@code accessTimeout} via {@code AccessTimeoutTarget}), aborts the retry + * loop by throwing {@link ExtractException} instead of silently continuing + * to the next HTTP attempt. The interrupt flag is preserved so callers + * further up the stack can also observe the cancellation. + * + * @param millis sleep duration in milliseconds + * @throws ExtractException if interrupted while sleeping + */ + protected void sleepQuietly(final long millis) { + if (millis <= 0L) { + return; + } + try { + Thread.sleep(millis); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ExtractException("API retry was interrupted", e); + } + } + + /** + * Drains the response entity content with a hard upper bound of {@link #maxResponseSize} + * bytes so a hostile server cannot trickle out an unbounded body purely to keep us reading. + * If the server sent more than the bound, the under-consumed entity causes HC to discard the + * underlying connection — that is the trade-off for the cap. Errors are intentionally swallowed. + * + * @param entity entity to drain (may be null) + */ + protected void drainBounded(final HttpEntity entity) { + if (entity == null) { + return; + } + try (InputStream content = entity.getContent(); + BoundedInputStream bounded = BoundedInputStream.builder().setInputStream(content).setMaxCount(maxResponseSize).get()) { + final byte[] buf = new byte[8192]; + while (bounded.read(buf) >= 0) { + // discard + } + } catch (final Exception e) { + // ignore — best-effort drain + } + } + + /** + * Resolves the charset advertised by the response entity, falling back to UTF-8. + * + * @param entity response entity (non-null) + * @return charset to decode the response with + */ + protected Charset resolveCharset(final HttpEntity entity) { + try { + final ContentType contentType = ContentType.get(entity); + if (contentType != null && contentType.getCharset() != null) { + return contentType.getCharset(); + } + } catch (final Exception e) { + // ignore and fall back to UTF-8 + } + return Constants.UTF_8_CHARSET; + } + + /** Internal signal that a response was retryable; carries the status and any Retry-After hint. */ + @SuppressWarnings("serial") + protected static class RetryableStatusException extends IOException { + final int statusCode; + final long retryAfterMs; + + RetryableStatusException(final int statusCode, final long retryAfterMs) { + super("retryable status: " + statusCode); + this.statusCode = statusCode; + this.retryAfterMs = retryAfterMs; + } } /** @@ -288,6 +755,16 @@ public void setSoTimeout(final Integer soTimeout) { this.soTimeout = soTimeout; } + /** + * Sets the maximum time in milliseconds to wait for a connection from the pool. Without + * this cap, callers block indefinitely once the pool is exhausted by slow upstream + * responses, defeating the connect/socket timeouts. + * @param connectionRequestTimeout pool checkout timeout in milliseconds + */ + public void setConnectionRequestTimeout(final Integer connectionRequestTimeout) { + this.connectionRequestTimeout = connectionRequestTimeout; + } + /** * Sets the map of authentication scheme providers. * @param authSchemeProviderMap The map of authentication scheme providers. @@ -336,4 +813,109 @@ public void setAccessTimeout(final Integer accessTimeout) { this.accessTimeout = accessTimeout; } + /** + * Sets the maximum total connections in the pool. + * @param maxConnections maximum total pooled connections + */ + public void setMaxConnections(final int maxConnections) { + if (maxConnections <= 0) { + throw new IllegalArgumentException("maxConnections must be > 0: maxConnections=" + maxConnections); + } + this.maxConnections = maxConnections; + } + + /** + * Sets the maximum connections per route in the pool. + * @param maxConnectionsPerRoute maximum pooled connections per route + */ + public void setMaxConnectionsPerRoute(final int maxConnectionsPerRoute) { + if (maxConnectionsPerRoute <= 0) { + throw new IllegalArgumentException("maxConnectionsPerRoute must be > 0: maxConnectionsPerRoute=" + maxConnectionsPerRoute); + } + this.maxConnectionsPerRoute = maxConnectionsPerRoute; + } + + /** + * Sets the maximum size in bytes accepted for an API response body. + * Responses larger than this trigger an {@link ExtractException}. + * @param maxResponseSize maximum accepted response size in bytes + */ + public void setMaxResponseSize(final long maxResponseSize) { + if (maxResponseSize < 0) { + throw new IllegalArgumentException("maxResponseSize must be >= 0: maxResponseSize=" + maxResponseSize); + } + this.maxResponseSize = maxResponseSize; + } + + /** + * Sets the maximum size in bytes accepted for an outgoing request body. + * Inputs larger than this trigger an {@link ExtractException} before any HTTP call is made. + * @param maxRequestSize maximum accepted request body size in bytes + */ + public void setMaxRequestSize(final long maxRequestSize) { + if (maxRequestSize < 0) { + throw new IllegalArgumentException("maxRequestSize must be >= 0: maxRequestSize=" + maxRequestSize); + } + this.maxRequestSize = maxRequestSize; + } + + /** + * Sets the maximum number of retry attempts for transient failures. + * @param maxRetries number of retries (in addition to the initial attempt) + */ + public void setMaxRetries(final int maxRetries) { + if (maxRetries < 0) { + throw new IllegalArgumentException("maxRetries must be >= 0: maxRetries=" + maxRetries); + } + this.maxRetries = maxRetries; + } + + /** + * Sets the initial backoff in milliseconds between retries. + * The actual delay doubles per attempt. + * @param retryBackoffMs initial backoff in milliseconds + */ + public void setRetryBackoffMs(final long retryBackoffMs) { + if (retryBackoffMs < 0) { + throw new IllegalArgumentException("retryBackoffMs must be >= 0: retryBackoffMs=" + retryBackoffMs); + } + this.retryBackoffMs = retryBackoffMs; + } + + /** + * Sets the upper bound applied to a server-supplied {@code Retry-After} value. A hostile or + * misbehaving upstream returning {@code Retry-After: 86400} otherwise pins this thread for + * 24 hours; clamping it to a sane ceiling keeps retries responsive. + * @param maxRetryAfterMs maximum effective Retry-After value in milliseconds + */ + public void setMaxRetryAfterMs(final long maxRetryAfterMs) { + if (maxRetryAfterMs < 0) { + throw new IllegalArgumentException("maxRetryAfterMs must be >= 0: maxRetryAfterMs=" + maxRetryAfterMs); + } + this.maxRetryAfterMs = maxRetryAfterMs; + } + + /** + * Sets the byte threshold at which the request body spills from memory to a temp file. + * Bodies smaller than this stay in a {@code byte[]}; larger bodies are written to disk so + * concurrent extractors do not pin {@link #maxRequestSize} bytes of heap each. + * @param requestSpoolThreshold byte threshold for memory-to-file spillover + */ + public void setRequestSpoolThreshold(final int requestSpoolThreshold) { + if (requestSpoolThreshold < 0) { + throw new IllegalArgumentException("requestSpoolThreshold must be >= 0: requestSpoolThreshold=" + requestSpoolThreshold); + } + this.requestSpoolThreshold = requestSpoolThreshold; + } + + /** + * Enables or disables the per-call {@link #PARAM_EXTRACTOR_URL} override. Defaults to {@code false}; + * enable only when the operator can guarantee that the params Map cannot be populated from + * untrusted data, otherwise this becomes an SSRF gadget. + * @param allowExtractorUrlOverride {@code true} to honor the {@code extractorUrl} param + */ + public void setAllowExtractorUrlOverride(final boolean allowExtractorUrlOverride) { + this.allowExtractorUrlOverride = allowExtractorUrlOverride; + } + } diff --git a/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java b/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java index f0d560c4..66d11615 100644 --- a/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java +++ b/fess-crawler/src/test/java/org/codelibs/fess/crawler/extractor/impl/ApiExtractorTest.java @@ -16,16 +16,26 @@ package org.codelibs.fess.crawler.extractor.impl; import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.HashMap; +import java.util.Locale; import java.util.Map; +import java.util.TimeZone; +import java.util.concurrent.atomic.AtomicInteger; import org.codelibs.fess.crawler.entity.ExtractData; import org.codelibs.fess.crawler.exception.CrawlerSystemException; +import org.codelibs.fess.crawler.exception.ExtractException; import org.dbflute.utflute.core.PlainTestCase; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInfo; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.io.Content; @@ -35,6 +45,12 @@ import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.handler.DefaultHandler; import org.eclipse.jetty.util.Callback; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; /** * @author shinsuke @@ -79,7 +95,1133 @@ public void test_getText() throws Exception { assertEquals(content, text.getContent()); } - // TODO other tests + /** + * Connection reuse / pooling sanity check: 10 sequential calls share the pool without + * exhausting sockets. + */ + @Test + public void test_connectionReuse() throws Exception { + final String testStr = "abc"; + final String expected = ATTR_NAME + "," + testStr; + for (int i = 0; i < 10; i++) { + final ExtractData text = extractor.getText(new ByteArrayInputStream(testStr.getBytes()), new HashMap<>()); + assertEquals(expected, text.getContent()); + } + } + + /** + * Per-call URL override via {@code params.extractorUrl}. + */ + @Test + public void test_extractorUrlOverride() throws Exception { + // Misconfigure the default URL; the override in params should be used instead. + final ApiExtractor overrideExtractor = new ApiExtractor(); + overrideExtractor.setUrl("http://127.0.0.1:1/does-not-exist"); + overrideExtractor.setAllowExtractorUrlOverride(true); + overrideExtractor.init(); + try { + final Map params = new HashMap<>(); + params.put(ApiExtractor.PARAM_EXTRACTOR_URL, "http://127.0.0.1:" + port + "/"); + final ExtractData text = overrideExtractor.getText(new ByteArrayInputStream("ovr".getBytes()), params); + assertEquals(ATTR_NAME + ",ovr", text.getContent()); + } finally { + overrideExtractor.destroy(); + } + } + + @Test + public void test_extractorUrlOverride_disabledByDefault_isIgnored() throws Exception { + final SimpleHttpServer overrideServer = new SimpleHttpServer(); + final AtomicInteger overrideHits = new AtomicInteger(); + overrideServer.setHandler(exchange -> { + overrideHits.incrementAndGet(); + drain(exchange.getRequestBody()); + exchange.sendResponseHeaders(200, -1); + exchange.close(); + }); + overrideServer.start(); + try { + final ApiExtractor extractor = new ApiExtractor(); + extractor.setUrl("http://127.0.0.1:" + port + "/"); + extractor.init(); + try { + final Map params = new HashMap<>(); + params.put(ApiExtractor.PARAM_EXTRACTOR_URL, "http://127.0.0.1:" + overrideServer.port() + "/"); + final ExtractData text = extractor.getText(new ByteArrayInputStream("def".getBytes()), params); + assertEquals(ATTR_NAME + ",def", text.getContent()); + assertEquals(0, overrideHits.get()); + } finally { + extractor.destroy(); + } + } finally { + overrideServer.stop(); + } + } + + @Test + public void test_extractorUrlOverride_disallowedSchemeRejected() throws Exception { + // Even when override is enabled, non-http(s) schemes must be rejected. The configured URL + // is used instead, so the request still succeeds against the in-process server. + final ApiExtractor extractor = new ApiExtractor(); + extractor.setUrl("http://127.0.0.1:" + port + "/"); + extractor.setAllowExtractorUrlOverride(true); + extractor.init(); + try { + final Map params = new HashMap<>(); + params.put(ApiExtractor.PARAM_EXTRACTOR_URL, "file:///etc/passwd"); + final ExtractData text = extractor.getText(new ByteArrayInputStream("rej".getBytes()), params); + assertEquals(ATTR_NAME + ",rej", text.getContent()); + } finally { + extractor.destroy(); + } + } + + /** + * Response larger than {@code maxResponseSize} must throw {@link ExtractException}. + */ + @Test + public void test_responseExceedsMaxSize_throwsExtractException() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + // Always respond with 1 KiB of payload regardless of input. + final byte[] payload = new byte[1024]; + for (int i = 0; i < payload.length; i++) { + payload[i] = (byte) ('a' + (i % 26)); + } + simple.setHandler(exchange -> { + drain(exchange.getRequestBody()); + final byte[] body = payload; + exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=UTF-8"); + exchange.sendResponseHeaders(200, body.length); + try (OutputStream out = exchange.getResponseBody()) { + out.write(body); + } + }); + simple.start(); + try { + final ApiExtractor capped = new ApiExtractor(); + capped.setUrl("http://127.0.0.1:" + simple.port() + "/"); + capped.setMaxResponseSize(64L); + capped.setMaxRetries(0); + capped.init(); + try { + capped.getText(new ByteArrayInputStream("x".getBytes()), new HashMap<>()); + fail(); + } catch (final ExtractException e) { + org.junit.jupiter.api.Assertions.assertTrue(e.getMessage().contains("exceeded limit"), + "message should mention limit: " + e.getMessage()); + } finally { + capped.destroy(); + } + } finally { + simple.stop(); + } + } + + /** + * Two 5xx responses followed by a 200 must succeed via the retry loop. + */ + @Test + public void test_retryOn5xx_succeedsOn3rdAttempt() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + final AtomicInteger calls = new AtomicInteger(); + simple.setHandler(exchange -> { + drain(exchange.getRequestBody()); + final int n = calls.incrementAndGet(); + if (n < 3) { + exchange.sendResponseHeaders(503, -1); + exchange.close(); + } else { + final byte[] body = "ok".getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=UTF-8"); + exchange.sendResponseHeaders(200, body.length); + try (OutputStream out = exchange.getResponseBody()) { + out.write(body); + } + } + }); + simple.start(); + try { + final ApiExtractor retrying = new ApiExtractor(); + retrying.setUrl("http://127.0.0.1:" + simple.port() + "/"); + retrying.setMaxRetries(2); + retrying.setRetryBackoffMs(10L); + retrying.init(); + try { + final ExtractData data = retrying.getText(new ByteArrayInputStream("x".getBytes()), new HashMap<>()); + assertNotNull(data); + assertEquals("ok", data.getContent()); + assertEquals(3, calls.get()); + } finally { + retrying.destroy(); + } + } finally { + simple.stop(); + } + } + + /** + * 4xx (other than 408/429) must NOT be retried; the extractor returns null after a single call. + */ + @Test + public void test_noRetryOn4xx_returnsNullImmediately() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + final AtomicInteger calls = new AtomicInteger(); + simple.setHandler(exchange -> { + drain(exchange.getRequestBody()); + calls.incrementAndGet(); + exchange.sendResponseHeaders(404, -1); + exchange.close(); + }); + simple.start(); + try { + final ApiExtractor retrying = new ApiExtractor(); + retrying.setUrl("http://127.0.0.1:" + simple.port() + "/"); + retrying.setMaxRetries(3); + retrying.setRetryBackoffMs(10L); + retrying.init(); + try { + final ExtractData data = retrying.getText(new ByteArrayInputStream("x".getBytes()), new HashMap<>()); + // Existing contract: non-OK non-retryable status returns null. + assertNull(data); + assertEquals(1, calls.get()); + } finally { + retrying.destroy(); + } + } finally { + simple.stop(); + } + } + + /** + * Honors the {@code Retry-After} header (delta-seconds form) for HTTP 429 responses. + */ + @Test + public void test_retryOn429_respectsRetryAfter() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + final AtomicInteger calls = new AtomicInteger(); + simple.setHandler(exchange -> { + drain(exchange.getRequestBody()); + final int n = calls.incrementAndGet(); + if (n == 1) { + exchange.getResponseHeaders().add("Retry-After", "0"); + exchange.sendResponseHeaders(429, -1); + exchange.close(); + } else { + final byte[] body = "ok".getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=UTF-8"); + exchange.sendResponseHeaders(200, body.length); + try (OutputStream out = exchange.getResponseBody()) { + out.write(body); + } + } + }); + simple.start(); + try { + final ApiExtractor retrying = new ApiExtractor(); + retrying.setUrl("http://127.0.0.1:" + simple.port() + "/"); + retrying.setMaxRetries(1); + // Large default backoff: if Retry-After is ignored, retry would take ~5s. + retrying.setRetryBackoffMs(5000L); + retrying.init(); + try { + final long t0 = System.currentTimeMillis(); + final ExtractData data = retrying.getText(new ByteArrayInputStream("x".getBytes()), new HashMap<>()); + final long elapsed = System.currentTimeMillis() - t0; + assertNotNull(data); + assertEquals("ok", data.getContent()); + assertEquals(2, calls.get()); + // Retry-After: 0 must beat the 5s default backoff. + org.junit.jupiter.api.Assertions.assertTrue(elapsed < 2500L, + "Retry-After:0 must override default backoff (elapsed=" + elapsed + "ms)"); + } finally { + retrying.destroy(); + } + } finally { + simple.stop(); + } + } + + /** + * A server that accepts the connection but never responds must trigger a socket-timeout + * surfaced as {@link ExtractException}. + */ + @Test + public void test_timeoutOnSlowResponse_throwsExtractException() throws Exception { + // ServerSocket-based "black hole" server: accepts and then sleeps. + final ServerSocket serverSocket = new ServerSocket(0); + serverSocket.setSoTimeout(0); + final int p = serverSocket.getLocalPort(); + final Thread acceptor = new Thread(() -> { + while (!Thread.currentThread().isInterrupted()) { + try { + final Socket s = serverSocket.accept(); + // Leak the socket on purpose — we just want the client to time out. + new Thread(() -> { + try { + Thread.sleep(60_000L); + } catch (final InterruptedException ignore) { + Thread.currentThread().interrupt(); + } finally { + try { + s.close(); + } catch (final IOException ignore) { + // ignore + } + } + }).start(); + } catch (final IOException e) { + return; + } + } + }, "slow-loris-acceptor"); + acceptor.setDaemon(true); + acceptor.start(); + try { + final ApiExtractor slow = new ApiExtractor(); + slow.setUrl("http://127.0.0.1:" + p + "/"); + slow.setSoTimeout(300); + slow.setConnectionTimeout(300); + slow.setMaxRetries(0); + slow.init(); + try { + slow.getText(new ByteArrayInputStream("x".getBytes()), new HashMap<>()); + fail(); + } catch (final ExtractException e) { + // expected + } finally { + slow.destroy(); + } + } finally { + try { + serverSocket.close(); + } catch (final IOException ignore) { + // ignore + } + acceptor.interrupt(); + } + } + + /** + * Upload bigger than {@code maxRequestSize} must throw {@link ExtractException} before any + * HTTP call is made. Asserts no request reaches the server. + */ + @Test + public void test_uploadExceedsMaxRequestSize_throws() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + final AtomicInteger calls = new AtomicInteger(); + simple.setHandler(exchange -> { + calls.incrementAndGet(); + drain(exchange.getRequestBody()); + exchange.sendResponseHeaders(200, -1); + exchange.close(); + }); + simple.start(); + try { + final ApiExtractor capped = new ApiExtractor(); + capped.setUrl("http://127.0.0.1:" + simple.port() + "/"); + capped.setMaxRequestSize(16L); + capped.setMaxRetries(0); + capped.init(); + try { + final byte[] tooBig = new byte[64]; + for (int i = 0; i < tooBig.length; i++) { + tooBig[i] = (byte) ('a' + (i % 26)); + } + capped.getText(new ByteArrayInputStream(tooBig), new HashMap<>()); + fail(); + } catch (final ExtractException e) { + org.junit.jupiter.api.Assertions.assertTrue(e.getMessage().contains("request body exceeded limit"), + "message should mention request body limit: " + e.getMessage()); + assertEquals(0, calls.get()); + } finally { + capped.destroy(); + } + } finally { + simple.stop(); + } + } + + /** + * {@code Retry-After} header in the HTTP-date form (RFC 7231) must be parsed correctly: the + * extractor must wait approximately the indicated delta before retrying. + */ + @Test + public void test_retryAfterHttpDate_parsedCorrectly() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + final AtomicInteger calls = new AtomicInteger(); + // Pre-compute an HTTP-date ~3 seconds into the future for the first response. + // HTTP-date has 1-second resolution (RFC 7231), so a naive `now + N seconds` value + // can truncate to ~(N-1) seconds in the worst case. Round UP to the next whole + // second to keep the effective wait close to the intended delta. + final SimpleDateFormat httpDateFmt = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss zzz", Locale.US); + httpDateFmt.setTimeZone(TimeZone.getTimeZone("GMT")); + simple.setHandler(exchange -> { + drain(exchange.getRequestBody()); + final int n = calls.incrementAndGet(); + if (n == 1) { + final long futureMs = System.currentTimeMillis() + 3_000L; + final long futureCeilMs = (futureMs + 999L) / 1000L * 1000L; + final String httpDate = httpDateFmt.format(new Date(futureCeilMs)); + exchange.getResponseHeaders().add("Retry-After", httpDate); + exchange.sendResponseHeaders(429, -1); + exchange.close(); + } else { + final byte[] body = "ok".getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=UTF-8"); + exchange.sendResponseHeaders(200, body.length); + try (OutputStream out = exchange.getResponseBody()) { + out.write(body); + } + } + }); + simple.start(); + try { + final ApiExtractor retrying = new ApiExtractor(); + retrying.setUrl("http://127.0.0.1:" + simple.port() + "/"); + retrying.setMaxRetries(1); + retrying.setRetryBackoffMs(10L); + retrying.init(); + try { + final long t0 = System.currentTimeMillis(); + final ExtractData data = retrying.getText(new ByteArrayInputStream("x".getBytes()), new HashMap<>()); + final long elapsed = System.currentTimeMillis() - t0; + assertNotNull(data); + assertEquals("ok", data.getContent()); + assertEquals(2, calls.get()); + // Must wait at least ~2.5s (allow some slack), well above the default ~10ms backoff. + org.junit.jupiter.api.Assertions.assertTrue(elapsed >= 2_500L, + "retry must respect HTTP-date Retry-After (elapsed=" + elapsed + "ms)"); + // And not block longer than is reasonable. + org.junit.jupiter.api.Assertions.assertTrue(elapsed < 10_000L, "retry must not stall (elapsed=" + elapsed + "ms)"); + } finally { + retrying.destroy(); + } + } finally { + simple.stop(); + } + } + + /** + * Probabilistic check that {@link ApiExtractor#computeBackoff(int, long)} adds a jitter + * within {@code [base, 2*base)} for {@code attempt=0}. Runs many iterations so the bounds + * are exercised, and asserts at least two distinct values are observed (i.e., it isn't a + * constant). + */ + @Test + public void test_jitterAppliedToBackoff() throws Exception { + final ApiExtractor jitterExtractor = new ApiExtractor(); + jitterExtractor.setUrl("http://127.0.0.1:1/unused"); + final long base = 100L; + jitterExtractor.setRetryBackoffMs(base); + jitterExtractor.init(); + try { + long min = Long.MAX_VALUE; + long max = Long.MIN_VALUE; + final java.util.HashSet distinct = new java.util.HashSet<>(); + for (int i = 0; i < 200; i++) { + final long v = jitterExtractor.computeBackoff(0, -1L); + if (v < min) { + min = v; + } + if (v > max) { + max = v; + } + distinct.add(v); + } + // Each value must lie within [base, 2*base) — base + jitter where jitter ∈ [0, base). + org.junit.jupiter.api.Assertions.assertTrue(min >= base, "min should be >= base: min=" + min + ", base=" + base); + org.junit.jupiter.api.Assertions.assertTrue(max < 2L * base, "max should be < 2*base: max=" + max + ", base=" + base); + // Probabilistic: with 200 samples and 100 buckets the chance of a single repeated + // value is astronomically low — at least 2 distinct values is a near-certainty. + org.junit.jupiter.api.Assertions.assertTrue(distinct.size() >= 2, + "jitter should produce varied values, got distinct=" + distinct.size()); + } finally { + jitterExtractor.destroy(); + } + } + + @Test + public void test_sleepQuietly_interruptedThrowsAndPreservesFlag() throws Exception { + final ApiExtractor extractor = new ApiExtractor(); + extractor.setUrl("http://127.0.0.1:1/unused"); + extractor.init(); + try { + // Pre-interrupt the thread; sleepQuietly should observe it and bail. + Thread.currentThread().interrupt(); + try { + extractor.sleepQuietly(100L); + fail(); + } catch (final ExtractException e) { + org.junit.jupiter.api.Assertions.assertTrue(Thread.currentThread().isInterrupted(), + "interrupt flag must be preserved for upstream observers"); + } + } finally { + // Drain the interrupt flag so it does not leak into subsequent tests. + Thread.interrupted(); + extractor.destroy(); + } + } + + /** + * When the connection pool is exhausted by a slow upstream, a new caller must surface a + * pool-checkout timeout instead of blocking indefinitely. Without + * {@link RequestConfig#setConnectionRequestTimeout(int)} the second call would hang for + * the full duration of the slow handler. + */ + @Test + public void test_connectionRequestTimeout_failsFastWhenPoolExhausted() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + // Slow handler holds the only available connection for several seconds. + simple.setHandler(exchange -> { + drain(exchange.getRequestBody()); + try { + Thread.sleep(3_000L); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + } + exchange.sendResponseHeaders(200, -1); + exchange.close(); + }); + simple.start(); + try { + final ApiExtractor capped = new ApiExtractor(); + capped.setUrl("http://127.0.0.1:" + simple.port() + "/"); + capped.setMaxConnections(1); + capped.setMaxConnectionsPerRoute(1); + capped.setConnectionRequestTimeout(200); + capped.setMaxRetries(0); + capped.init(); + try { + // First caller occupies the only pooled connection in a background thread. + final Thread occupier = new Thread(() -> { + try { + capped.getText(new ByteArrayInputStream("a".getBytes()), new HashMap<>()); + } catch (final Exception ignore) { + // ignore + } + }); + occupier.setDaemon(true); + occupier.start(); + // Give the occupier a moment to issue its request and acquire the connection. + Thread.sleep(300L); + + // Second caller must fail quickly because the pool is exhausted. + final long t0 = System.currentTimeMillis(); + try { + capped.getText(new ByteArrayInputStream("b".getBytes()), new HashMap<>()); + fail(); + } catch (final ExtractException e) { + final long elapsed = System.currentTimeMillis() - t0; + org.junit.jupiter.api.Assertions.assertTrue(elapsed < 2_000L, + "second call must fail fast on pool exhaustion (was " + elapsed + "ms)"); + } + occupier.interrupt(); + occupier.join(2_000L); + } finally { + capped.destroy(); + } + } finally { + simple.stop(); + } + } + + /** + * A request body larger than the spool threshold must succeed (proving the spool-to-disk + * path works) and the server must observe the exact uploaded bytes. This exercises the + * branch that streams from a temp file rather than a {@code byte[]}. + */ + @Test + public void test_requestSpoolThreshold_spillsToFileAndUploadsSuccessfully() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + final int payloadSize = 32 * 1024; + final byte[] payload = new byte[payloadSize]; + for (int i = 0; i < payload.length; i++) { + payload[i] = (byte) ('A' + (i % 26)); + } + final java.util.concurrent.atomic.AtomicReference received = new java.util.concurrent.atomic.AtomicReference<>(); + simple.setHandler(exchange -> { + // Read the complete multipart body verbatim so the test can verify the payload bytes. + final java.io.ByteArrayOutputStream sink = new java.io.ByteArrayOutputStream(); + final byte[] buf = new byte[4096]; + int n; + while ((n = exchange.getRequestBody().read(buf)) >= 0) { + sink.write(buf, 0, n); + } + received.set(sink.toByteArray()); + final byte[] body = "spooled".getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=UTF-8"); + exchange.sendResponseHeaders(200, body.length); + try (OutputStream out = exchange.getResponseBody()) { + out.write(body); + } + }); + simple.start(); + try { + final ApiExtractor spooled = new ApiExtractor(); + spooled.setUrl("http://127.0.0.1:" + simple.port() + "/"); + // Force every body to spill to a temp file by setting threshold smaller than the payload. + spooled.setRequestSpoolThreshold(1024); + spooled.setMaxRetries(0); + spooled.init(); + try { + final ExtractData data = spooled.getText(new ByteArrayInputStream(payload), new HashMap<>()); + assertNotNull(data); + assertEquals("spooled", data.getContent()); + final byte[] body = received.get(); + assertNotNull(body); + // Multipart framing wraps the payload, so just verify the payload bytes are embedded + // verbatim somewhere inside the multipart body. + org.junit.jupiter.api.Assertions.assertTrue(body.length >= payload.length, + "received body must be at least the payload size"); + org.junit.jupiter.api.Assertions.assertTrue(indexOf(body, payload) >= 0, + "spooled payload bytes must appear in the multipart body"); + } finally { + spooled.destroy(); + } + } finally { + simple.stop(); + } + } + + /** + * A hostile {@code Retry-After} value must be clamped to {@code maxRetryAfterMs} so a + * 24-hour value cannot stall the extractor thread. + */ + @Test + public void test_retryAfterClampedToMaxRetryAfter() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + final AtomicInteger calls = new AtomicInteger(); + simple.setHandler(exchange -> { + drain(exchange.getRequestBody()); + final int n = calls.incrementAndGet(); + if (n == 1) { + // 24 hours — without clamping this would block the test forever. + exchange.getResponseHeaders().add("Retry-After", "86400"); + exchange.sendResponseHeaders(503, -1); + exchange.close(); + } else { + final byte[] body = "ok".getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=UTF-8"); + exchange.sendResponseHeaders(200, body.length); + try (OutputStream out = exchange.getResponseBody()) { + out.write(body); + } + } + }); + simple.start(); + try { + final ApiExtractor clamped = new ApiExtractor(); + clamped.setUrl("http://127.0.0.1:" + simple.port() + "/"); + clamped.setMaxRetries(1); + clamped.setRetryBackoffMs(10L); + clamped.setMaxRetryAfterMs(150L); + clamped.init(); + try { + final long t0 = System.currentTimeMillis(); + final ExtractData data = clamped.getText(new ByteArrayInputStream("x".getBytes()), new HashMap<>()); + final long elapsed = System.currentTimeMillis() - t0; + assertNotNull(data); + assertEquals("ok", data.getContent()); + assertEquals(2, calls.get()); + org.junit.jupiter.api.Assertions.assertTrue(elapsed < 5_000L, + "Retry-After must be clamped, not honored as 24h (elapsed=" + elapsed + "ms)"); + } finally { + clamped.destroy(); + } + } finally { + simple.stop(); + } + } + + @Test + public void test_executeWithRetries_interruptStopsRetryLoop() throws Exception { + final SimpleHttpServer server = new SimpleHttpServer(); + final AtomicInteger attempts = new AtomicInteger(); + try { + // Always reply 503 with a long Retry-After so the extractor enters sleepQuietly. + server.setHandler(new HttpHandler() { + @Override + public void handle(final HttpExchange exchange) throws IOException { + attempts.incrementAndGet(); + drain(exchange.getRequestBody()); + exchange.getResponseHeaders().add("Retry-After", "30"); + exchange.sendResponseHeaders(503, 0); + exchange.getResponseBody().close(); + } + }); + server.start(); + + final ApiExtractor extractor = new ApiExtractor(); + extractor.setUrl("http://127.0.0.1:" + server.port() + "/"); + extractor.setMaxRetries(5); + extractor.setRetryBackoffMs(50L); + extractor.init(); + try { + final Thread target = Thread.currentThread(); + final Thread interrupter = new Thread(() -> { + try { + // Wait for the extractor to issue its first request and start sleeping. + Thread.sleep(500L); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + target.interrupt(); + }); + interrupter.setDaemon(true); + interrupter.start(); + + final long startMs = System.currentTimeMillis(); + try { + extractor.getText(new ByteArrayInputStream("payload".getBytes(StandardCharsets.UTF_8)), new HashMap()); + fail(); + } catch (final ExtractException e) { + // expected + } + final long elapsed = System.currentTimeMillis() - startMs; + + // Bail out should be prompt: well under one full Retry-After (30s). + org.junit.jupiter.api.Assertions.assertTrue(elapsed < 5_000L, + "interrupt must short-circuit the retry loop, elapsed=" + elapsed + "ms"); + // Only one HTTP attempt should have reached the server before the interrupt fired. + org.junit.jupiter.api.Assertions.assertTrue(attempts.get() <= 2, + "retry loop must stop on interrupt, attempts=" + attempts.get()); + + interrupter.join(1_000L); + } finally { + Thread.interrupted(); + extractor.destroy(); + } + } finally { + server.stop(); + } + } + + @Test + public void test_overrideRejects_ControlCharacters() throws Exception { + final SimpleHttpServer overrideServer = new SimpleHttpServer(); + final AtomicInteger overrideHits = new AtomicInteger(); + overrideServer.setHandler(exchange -> { + overrideHits.incrementAndGet(); + drain(exchange.getRequestBody()); + exchange.sendResponseHeaders(200, -1); + exchange.close(); + }); + overrideServer.start(); + try { + final ApiExtractor extractor = new ApiExtractor(); + extractor.setUrl("http://127.0.0.1:" + port + "/"); + extractor.setAllowExtractorUrlOverride(true); + extractor.init(); + try { + // URL with embedded newline must be rejected even when override is enabled. + final Map params = new HashMap<>(); + params.put(ApiExtractor.PARAM_EXTRACTOR_URL, "http://127.0.0.1:" + overrideServer.port() + "/\r\nX-Injected: yes"); + final ExtractData text = extractor.getText(new ByteArrayInputStream("ctl".getBytes()), params); + assertEquals(ATTR_NAME + ",ctl", text.getContent()); + assertEquals(0, overrideHits.get()); + } finally { + extractor.destroy(); + } + } finally { + overrideServer.stop(); + } + } + + @Test + public void test_overrideRejects_Userinfo() throws Exception { + final SimpleHttpServer overrideServer = new SimpleHttpServer(); + final AtomicInteger overrideHits = new AtomicInteger(); + overrideServer.setHandler(exchange -> { + overrideHits.incrementAndGet(); + drain(exchange.getRequestBody()); + exchange.sendResponseHeaders(200, -1); + exchange.close(); + }); + overrideServer.start(); + try { + final ApiExtractor extractor = new ApiExtractor(); + extractor.setUrl("http://127.0.0.1:" + port + "/"); + extractor.setAllowExtractorUrlOverride(true); + extractor.init(); + try { + final Map params = new HashMap<>(); + params.put(ApiExtractor.PARAM_EXTRACTOR_URL, "http://user:pass@127.0.0.1:" + overrideServer.port() + "/"); + final ExtractData text = extractor.getText(new ByteArrayInputStream("ui".getBytes()), params); + assertEquals(ATTR_NAME + ",ui", text.getContent()); + assertEquals(0, overrideHits.get()); + } finally { + extractor.destroy(); + } + } finally { + overrideServer.stop(); + } + } + + @Test + public void test_overrideRejects_OpaqueUri() throws Exception { + final ApiExtractor extractor = new ApiExtractor(); + extractor.setUrl("http://127.0.0.1:" + port + "/"); + extractor.setAllowExtractorUrlOverride(true); + extractor.init(); + try { + final Map params = new HashMap<>(); + params.put(ApiExtractor.PARAM_EXTRACTOR_URL, "http:opaque"); + final ExtractData text = extractor.getText(new ByteArrayInputStream("op".getBytes()), params); + assertEquals(ATTR_NAME + ",op", text.getContent()); + } finally { + extractor.destroy(); + } + } + + @Test + public void test_overrideAccepts_HttpsScheme() throws Exception { + // We can't easily stand up an HTTPS server in the test, so just confirm that the SCHEME + // gate accepts https by trying to call an unreachable https URL. The call should fail + // attempting to connect (not be silently rejected as disallowed scheme — that would + // succeed against the configured URL). + final ApiExtractor extractor = new ApiExtractor(); + extractor.setUrl("http://127.0.0.1:" + port + "/"); + extractor.setAllowExtractorUrlOverride(true); + extractor.setMaxRetries(0); + extractor.setConnectionTimeout(300); + extractor.setSoTimeout(300); + extractor.init(); + try { + final Map params = new HashMap<>(); + // Port 1 is reserved; connection refused is expected. + params.put(ApiExtractor.PARAM_EXTRACTOR_URL, "https://127.0.0.1:1/blackhole"); + try { + extractor.getText(new ByteArrayInputStream("hs".getBytes()), params); + org.junit.jupiter.api.Assertions.fail( + "Override to unreachable https should have raised ExtractException, not silently fallen back to configured URL."); + } catch (final ExtractException expected) { + // expected: the override scheme passes, so we tried to connect and failed. + } + } finally { + extractor.destroy(); + } + } + + @Test + public void test_retryExhausted_5xx_throwsWithCauseAndAttemptCount() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + final AtomicInteger calls = new AtomicInteger(); + simple.setHandler(exchange -> { + calls.incrementAndGet(); + drain(exchange.getRequestBody()); + exchange.sendResponseHeaders(503, -1); + exchange.close(); + }); + simple.start(); + try { + final ApiExtractor retrying = new ApiExtractor(); + retrying.setUrl("http://127.0.0.1:" + simple.port() + "/"); + retrying.setMaxRetries(2); + retrying.setRetryBackoffMs(10L); + retrying.init(); + try { + retrying.getText(new ByteArrayInputStream("x".getBytes()), new HashMap<>()); + org.junit.jupiter.api.Assertions.fail("Expected ExtractException after 3 failed attempts"); + } catch (final ExtractException e) { + org.junit.jupiter.api.Assertions.assertTrue(e.getMessage().contains("3 attempts"), + "message should contain attempt count: " + e.getMessage()); + org.junit.jupiter.api.Assertions.assertTrue(e.getMessage().contains("status=503"), + "message should contain status code: " + e.getMessage()); + org.junit.jupiter.api.Assertions.assertNotNull(e.getCause(), "exhaustion ExtractException must carry a cause"); + assertEquals(3, calls.get()); + } finally { + retrying.destroy(); + } + } finally { + simple.stop(); + } + } + + @Test + public void test_unknownHost_notRetried() throws Exception { + final ApiExtractor extractor = new ApiExtractor(); + // .invalid is reserved per RFC 2606 — should always be unresolvable. + extractor.setUrl("http://this-host-must-not-resolve.invalid/"); + extractor.setMaxRetries(5); + extractor.setRetryBackoffMs(1000L); + extractor.setConnectionTimeout(2000); + extractor.init(); + try { + final long t0 = System.currentTimeMillis(); + try { + extractor.getText(new ByteArrayInputStream("uh".getBytes()), new HashMap<>()); + org.junit.jupiter.api.Assertions.fail("Expected ExtractException for unknown host"); + } catch (final ExtractException e) { + final long elapsed = System.currentTimeMillis() - t0; + org.junit.jupiter.api.Assertions.assertTrue(elapsed < 3000L, + "UnknownHost should not be retried (elapsed=" + elapsed + "ms)"); + org.junit.jupiter.api.Assertions.assertTrue(e.getMessage().contains("non-transient"), + "message should mark error as non-transient: " + e.getMessage()); + } + } finally { + extractor.destroy(); + } + } + + @Test + public void test_getTextAfterDestroy_throwsExtractException() throws Exception { + final ApiExtractor extractor = new ApiExtractor(); + extractor.setUrl("http://127.0.0.1:" + port + "/"); + extractor.init(); + extractor.destroy(); + try { + extractor.getText(new ByteArrayInputStream("post-destroy".getBytes()), new HashMap<>()); + org.junit.jupiter.api.Assertions.fail("Expected ExtractException after destroy()"); + } catch (final ExtractException e) { + org.junit.jupiter.api.Assertions.assertTrue(e.getMessage().contains("destroyed"), + "message should explain why: " + e.getMessage()); + } + } + + @Test + public void test_destroyIsIdempotent() throws Exception { + final ApiExtractor extractor = new ApiExtractor(); + extractor.setUrl("http://127.0.0.1:" + port + "/"); + extractor.init(); + extractor.destroy(); + extractor.destroy(); // must not throw + } + + @Test + public void test_setterValidation_rejectsNegative() throws Exception { + final ApiExtractor extractor = new ApiExtractor(); + try { + extractor.setMaxRetries(-1); + fail(); + } catch (final IllegalArgumentException expected) { + // ok + } + try { + extractor.setMaxResponseSize(-1L); + fail(); + } catch (final IllegalArgumentException expected) { + // ok + } + try { + extractor.setMaxRequestSize(-1L); + fail(); + } catch (final IllegalArgumentException expected) { + // ok + } + try { + extractor.setMaxConnections(0); + fail(); + } catch (final IllegalArgumentException expected) { + // ok + } + try { + extractor.setMaxConnectionsPerRoute(0); + fail(); + } catch (final IllegalArgumentException expected) { + // ok + } + try { + extractor.setRetryBackoffMs(-1L); + fail(); + } catch (final IllegalArgumentException expected) { + // ok + } + try { + extractor.setMaxRetryAfterMs(-1L); + fail(); + } catch (final IllegalArgumentException expected) { + // ok + } + try { + extractor.setRequestSpoolThreshold(-1); + fail(); + } catch (final IllegalArgumentException expected) { + // ok + } + } + + @Test + public void test_responseBody_emptyEntity_returnsEmptyContent() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + simple.setHandler(exchange -> { + drain(exchange.getRequestBody()); + // Status 200 with explicit zero-length body — exercises the entity-present-but-empty path. + exchange.sendResponseHeaders(200, -1); + exchange.close(); + }); + simple.start(); + try { + final ApiExtractor extractor = new ApiExtractor(); + extractor.setUrl("http://127.0.0.1:" + simple.port() + "/"); + extractor.setMaxRetries(0); + extractor.init(); + try { + final ExtractData data = extractor.getText(new ByteArrayInputStream("x".getBytes()), new HashMap<>()); + assertNotNull(data); + assertEquals("", data.getContent()); + } finally { + extractor.destroy(); + } + } finally { + simple.stop(); + } + } + + @Test + public void test_parseRetryAfter_garbageFallsBackToDefaultBackoff() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + final AtomicInteger calls = new AtomicInteger(); + simple.setHandler(exchange -> { + drain(exchange.getRequestBody()); + final int n = calls.incrementAndGet(); + if (n == 1) { + // Garbage Retry-After: not a number, not an HTTP-date. + exchange.getResponseHeaders().add("Retry-After", "tomorrow-please"); + exchange.sendResponseHeaders(503, -1); + exchange.close(); + } else { + final byte[] body = "ok".getBytes(StandardCharsets.UTF_8); + exchange.sendResponseHeaders(200, body.length); + try (OutputStream out = exchange.getResponseBody()) { + out.write(body); + } + } + }); + simple.start(); + try { + final ApiExtractor retrying = new ApiExtractor(); + retrying.setUrl("http://127.0.0.1:" + simple.port() + "/"); + retrying.setMaxRetries(1); + retrying.setRetryBackoffMs(10L); + retrying.init(); + try { + final ExtractData data = retrying.getText(new ByteArrayInputStream("x".getBytes()), new HashMap<>()); + assertNotNull(data); + assertEquals("ok", data.getContent()); + assertEquals(2, calls.get()); + } finally { + retrying.destroy(); + } + } finally { + simple.stop(); + } + } + + @Test + public void test_parseRetryAfter_negativeFallsBackToDefaultBackoff() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + final AtomicInteger calls = new AtomicInteger(); + simple.setHandler(exchange -> { + drain(exchange.getRequestBody()); + final int n = calls.incrementAndGet(); + if (n == 1) { + exchange.getResponseHeaders().add("Retry-After", "-5"); + exchange.sendResponseHeaders(503, -1); + exchange.close(); + } else { + final byte[] body = "ok".getBytes(StandardCharsets.UTF_8); + exchange.sendResponseHeaders(200, body.length); + try (OutputStream out = exchange.getResponseBody()) { + out.write(body); + } + } + }); + simple.start(); + try { + final ApiExtractor retrying = new ApiExtractor(); + retrying.setUrl("http://127.0.0.1:" + simple.port() + "/"); + retrying.setMaxRetries(1); + retrying.setRetryBackoffMs(10L); + retrying.init(); + try { + final ExtractData data = retrying.getText(new ByteArrayInputStream("x".getBytes()), new HashMap<>()); + assertNotNull(data); + assertEquals("ok", data.getContent()); + assertEquals(2, calls.get()); + } finally { + retrying.destroy(); + } + } finally { + simple.stop(); + } + } + + @Test + public void test_resolveCharset_shiftJis() throws Exception { + final SimpleHttpServer simple = new SimpleHttpServer(); + final String greeting = "こんにちは"; + final byte[] sjis = greeting.getBytes(java.nio.charset.Charset.forName("Shift_JIS")); + simple.setHandler(exchange -> { + drain(exchange.getRequestBody()); + exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=Shift_JIS"); + exchange.sendResponseHeaders(200, sjis.length); + try (OutputStream out = exchange.getResponseBody()) { + out.write(sjis); + } + }); + simple.start(); + try { + final ApiExtractor extractor = new ApiExtractor(); + extractor.setUrl("http://127.0.0.1:" + simple.port() + "/"); + extractor.setMaxRetries(0); + extractor.init(); + try { + final ExtractData data = extractor.getText(new ByteArrayInputStream("x".getBytes()), new HashMap<>()); + assertNotNull(data); + assertEquals(greeting, data.getContent()); + } finally { + extractor.destroy(); + } + } finally { + simple.stop(); + } + } + + private static void drain(final InputStream in) throws IOException { + final byte[] buf = new byte[4096]; + while (in.read(buf) >= 0) { + // discard + } + } + + private static int indexOf(final byte[] haystack, final byte[] needle) { + if (needle.length == 0 || haystack.length < needle.length) { + return -1; + } + outer: for (int i = 0; i <= haystack.length - needle.length; i++) { + for (int j = 0; j < needle.length; j++) { + if (haystack[i + j] != needle[j]) { + continue outer; + } + } + return i; + } + return -1; + } + + /** Lightweight HTTP server used for retry / size / status tests. */ + private static class SimpleHttpServer { + private HttpServer http; + private int boundPort; + + void setHandler(final HttpHandler handler) throws IOException { + http = HttpServer.create(new InetSocketAddress("127.0.0.1", 0), 0); + http.createContext("/", handler); + } + + void start() { + http.start(); + boundPort = http.getAddress().getPort(); + } + + void stop() { + http.stop(0); + } + + int port() { + return boundPort; + } + } static class TestApiExtractorServer { private Server server;