Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.alibaba.opensandbox.sandbox.domain.exceptions.PoolStateStoreUnavailab
import com.alibaba.opensandbox.sandbox.domain.pool.IdleEntry
import com.alibaba.opensandbox.sandbox.domain.pool.PoolStateStore
import com.alibaba.opensandbox.sandbox.domain.pool.StoreCounters
import com.alibaba.opensandbox.sandbox.domain.pool.TakeIdleResult
import redis.clients.jedis.UnifiedJedis
import redis.clients.jedis.params.SetParams
import java.time.Duration
Expand Down Expand Up @@ -55,11 +56,49 @@ class RedisPoolStateStore(
redis.eval(
TAKE_IDLE_SCRIPT,
listOf(idleListKey(poolName), idleExpiresKey(poolName)),
emptyList<String>(),
listOf("0"),
)
result as? String
decodeTakeIdleResult(result).sandboxId
}

override fun tryTakeIdle(
poolName: String,
minRemainingTtl: Duration,
): TakeIdleResult {
if (minRemainingTtl.isNegative || minRemainingTtl.isZero) {
return TakeIdleResult.of(tryTakeIdle(poolName))
}
val minRemainingTtlMs = minRemainingTtl.toMillis().coerceAtLeast(0).toString()
return execute("tryTakeIdle", poolName) {
val result =
redis.eval(
TAKE_IDLE_SCRIPT,
listOf(idleListKey(poolName), idleExpiresKey(poolName)),
listOf(minRemainingTtlMs),
)
decodeTakeIdleResult(result)
}
}

/**
* Decodes the Lua return value into [TakeIdleResult]. The script returns either nil (empty
* pool with no discarded entries) or a two-element array whose first slot holds the taken
* sandbox id (or an empty string when no entry satisfied the threshold but discarded-alive
* ids still need to be reported) and whose second slot holds the discarded-alive list.
* The empty-string sentinel is needed because Redis cannot return a nil literal inside an
* array reliably across clients.
*/
@Suppress("UNCHECKED_CAST")
private fun decodeTakeIdleResult(result: Any?): TakeIdleResult {
if (result == null) return TakeIdleResult.EMPTY
val list = result as? List<Any?> ?: return TakeIdleResult.of(result as? String)
if (list.isEmpty()) return TakeIdleResult.EMPTY
val takenRaw = list[0] as? String
val taken = takenRaw?.takeIf { it.isNotEmpty() }
val discarded = (list.getOrNull(1) as? List<Any?>)?.mapNotNull { it as? String } ?: emptyList()
return TakeIdleResult(sandboxId = taken, discardedAliveSandboxIds = discarded)
}

override fun putIdle(
poolName: String,
sandboxId: String,
Expand Down Expand Up @@ -132,15 +171,36 @@ class RedisPoolStateStore(
poolName: String,
now: Instant,
) {
execute("reapExpiredIdle", poolName) {
redis.eval(
REAP_EXPIRED_SCRIPT,
listOf(idleListKey(poolName), idleExpiresKey(poolName)),
emptyList<String>(),
)
reapIdle(poolName, "0")
}

override fun reapExpiredIdle(
poolName: String,
now: Instant,
minRemainingTtl: Duration,
): List<String> {
if (minRemainingTtl.isNegative || minRemainingTtl.isZero) {
reapIdle(poolName, "0")
return emptyList()
}
return reapIdle(poolName, minRemainingTtl.toMillis().coerceAtLeast(0).toString())
}

@Suppress("UNCHECKED_CAST")
private fun reapIdle(
poolName: String,
minRemainingTtlMs: String,
): List<String> =
execute("reapExpiredIdle", poolName) {
val result =
redis.eval(
REAP_EXPIRED_SCRIPT,
listOf(idleListKey(poolName), idleExpiresKey(poolName)),
listOf(minRemainingTtlMs),
)
(result as? List<Any?>)?.mapNotNull { it as? String } ?: emptyList()
}

override fun snapshotCounters(poolName: String): StoreCounters =
execute("snapshotCounters", poolName) {
StoreCounters(redis.hlen(idleExpiresKey(poolName)).toInt())
Expand Down Expand Up @@ -240,16 +300,29 @@ class RedisPoolStateStore(
"""
local redis_time = redis.call('TIME')
local now_ms = tonumber(redis_time[1]) * 1000 + math.floor(tonumber(redis_time[2]) / 1000)
local min_remaining_ttl_ms = tonumber(ARGV[1]) or 0
local cutoff_ms = now_ms + min_remaining_ttl_ms
-- Drop empty entries straight to nil so clients see the empty-pool case clearly.
local discarded_alive = {}
while true do
local sandbox_id = redis.call('LPOP', KEYS[1])
if not sandbox_id then
return nil
if #discarded_alive == 0 then
return nil
end
return {'', discarded_alive}
end
local expires_at = redis.call('HGET', KEYS[2], sandbox_id)
if expires_at then
redis.call('HDEL', KEYS[2], sandbox_id)
if tonumber(expires_at) > now_ms then
return sandbox_id
local exp = tonumber(expires_at)
if exp > cutoff_ms then
return {sandbox_id, discarded_alive}
end
-- Below threshold. Surface alive entries so the caller can kill them; drop
-- already-expired ones silently — the server has reaped them.
if exp > now_ms then
table.insert(discarded_alive, sandbox_id)
end
end
end
Expand Down Expand Up @@ -292,14 +365,22 @@ class RedisPoolStateStore(
"""
local redis_time = redis.call('TIME')
local now_ms = tonumber(redis_time[1]) * 1000 + math.floor(tonumber(redis_time[2]) / 1000)
local min_remaining_ttl_ms = tonumber(ARGV[1]) or 0
local cutoff_ms = now_ms + min_remaining_ttl_ms
local discarded_alive = {}
local entries = redis.call('HGETALL', KEYS[2])
for i = 1, #entries, 2 do
if tonumber(entries[i + 1]) <= now_ms then
redis.call('HDEL', KEYS[2], entries[i])
redis.call('LREM', KEYS[1], 0, entries[i])
local sandbox_id = entries[i]
local exp = tonumber(entries[i + 1])
if exp <= cutoff_ms then
redis.call('HDEL', KEYS[2], sandbox_id)
redis.call('LREM', KEYS[1], 0, sandbox_id)
if exp > now_ms then
table.insert(discarded_alive, sandbox_id)
end
end
end
return 1
return discarded_alive
"""
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,92 @@ class RedisPoolStateStoreTest {
assertEquals(7, stateStore.getMaxIdle(poolName))
}

@Test
fun `tryTakeIdle surfaces alive entries below the threshold so callers can kill them`() {
val stateStore = requireStore()

// 1s TTL: still alive (server-side TTL has not elapsed) but well below the 60s threshold.
stateStore.setIdleEntryTtl(poolName, Duration.ofSeconds(1))
stateStore.putIdle(poolName, "id-1")
stateStore.putIdle(poolName, "id-2")

val result = stateStore.tryTakeIdle(poolName, Duration.ofSeconds(60))
assertNull(result.sandboxId)
assertEquals(setOf("id-1", "id-2"), result.discardedAliveSandboxIds.toSet())
assertEquals(0, stateStore.snapshotCounters(poolName).idleCount)
}

@Test
fun `tryTakeIdle silently drops fully-expired entries`() {
val stateStore = requireStore()

stateStore.setIdleEntryTtl(poolName, Duration.ofMillis(50))
stateStore.putIdle(poolName, "expired")
Thread.sleep(150)
stateStore.setIdleEntryTtl(poolName, Duration.ofMinutes(10))
stateStore.putIdle(poolName, "alive")

val result = stateStore.tryTakeIdle(poolName, Duration.ofSeconds(60))
// expired silently dropped (no kill needed); alive returned.
assertEquals("alive", result.sandboxId)
assertEquals(emptyList<String>(), result.discardedAliveSandboxIds)
}

@Test
fun `reapExpiredIdle with minRemainingTtl returns alive evicted entries`() {
val stateStore = requireStore()

stateStore.setIdleEntryTtl(poolName, Duration.ofSeconds(1))
stateStore.putIdle(poolName, "id-1")
stateStore.putIdle(poolName, "id-2")

val discardedAlive =
stateStore.reapExpiredIdle(poolName, Instant.now(), Duration.ofSeconds(60))

assertEquals(setOf("id-1", "id-2"), discardedAlive.toSet())
assertEquals(0, stateStore.snapshotCounters(poolName).idleCount)
}

@Test
fun `reapExpiredIdle with minRemainingTtl preserves entries above the threshold`() {
val stateStore = requireStore()

stateStore.setIdleEntryTtl(poolName, Duration.ofMinutes(10))
stateStore.putIdle(poolName, "id-1")

val discardedAlive =
stateStore.reapExpiredIdle(poolName, Instant.now(), Duration.ofSeconds(60))

assertEquals(emptyList<String>(), discardedAlive)
assertEquals(1, stateStore.snapshotCounters(poolName).idleCount)
}

@Test
fun `tryTakeIdle with minRemainingTtl returns entries that satisfy the threshold`() {
val stateStore = requireStore()

stateStore.setIdleEntryTtl(poolName, Duration.ofMinutes(10))
stateStore.putIdle(poolName, "id-1")

val result = stateStore.tryTakeIdle(poolName, Duration.ofSeconds(60))
assertEquals("id-1", result.sandboxId)
assertEquals(emptyList<String>(), result.discardedAliveSandboxIds)
}

@Test
fun `tryTakeIdle with zero minRemainingTtl behaves like the base call`() {
val stateStore = requireStore()

stateStore.putIdle(poolName, "id-1")
val taken = stateStore.tryTakeIdle(poolName, Duration.ZERO)
assertEquals("id-1", taken.sandboxId)
assertEquals(emptyList<String>(), taken.discardedAliveSandboxIds)

val empty = stateStore.tryTakeIdle(poolName, Duration.ZERO)
assertNull(empty.sandboxId)
assertEquals(emptyList<String>(), empty.discardedAliveSandboxIds)
}

@Test
fun `setIdleEntryTtl validates positive duration`() {
val stateStore = requireStore()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ import kotlin.math.ceil
* ready (default: 200ms).
* @property acquireHealthCheck Optional custom health check for sandboxes returned by acquire.
* @property acquireSkipHealthCheck When true, skip readiness checks for sandboxes returned by acquire (default: false).
* @property acquireMinRemainingTtl Minimum remaining TTL an idle sandbox must have to be returned
* by acquire. Idle entries closer to expiry than this threshold are discarded so the subsequent
* ready-check and any user-side renew have time to run before server-side expiry. Set to
* [Duration.ZERO] to opt out and restore the pre-existing binary-expiry behavior.
*
* Default is auto-derived from [idleTimeout] so existing users with short idle timeouts are not
* silently broken: 60s when [idleTimeout] > 60s, otherwise `idleTimeout / 2` (rounded down). The
* resolved value is always strictly less than [idleTimeout]. Pass an explicit value to the builder
* to override.
* @property warmupReadyTimeout Max time to wait for a pool-created sandbox to become ready (default: 30s).
* @property warmupHealthCheckPollingInterval Poll interval while waiting for a pool-created sandbox to become ready
* (default: 200ms).
Expand All @@ -65,6 +74,7 @@ class PoolConfig private constructor(
val acquireHealthCheckPollingInterval: Duration,
val acquireHealthCheck: ((Sandbox) -> Boolean)?,
val acquireSkipHealthCheck: Boolean,
val acquireMinRemainingTtl: Duration,
val warmupReadyTimeout: Duration,
val warmupHealthCheckPollingInterval: Duration,
val warmupHealthCheck: ((Sandbox) -> Boolean)?,
Expand All @@ -87,6 +97,11 @@ class PoolConfig private constructor(
require(!acquireHealthCheckPollingInterval.isNegative && !acquireHealthCheckPollingInterval.isZero) {
"acquireHealthCheckPollingInterval must be positive"
}
require(!acquireMinRemainingTtl.isNegative) { "acquireMinRemainingTtl must be non-negative" }
Comment thread
Pangjiping marked this conversation as resolved.
require(acquireMinRemainingTtl < idleTimeout) {
"acquireMinRemainingTtl ($acquireMinRemainingTtl) must be strictly less than " +
"idleTimeout ($idleTimeout); otherwise every warmed idle entry would be rejected"
}
require(!warmupReadyTimeout.isNegative && !warmupReadyTimeout.isZero) { "warmupReadyTimeout must be positive" }
require(!warmupHealthCheckPollingInterval.isNegative && !warmupHealthCheckPollingInterval.isZero) {
"warmupHealthCheckPollingInterval must be positive"
Expand All @@ -101,13 +116,29 @@ class PoolConfig private constructor(
private const val DEFAULT_DEGRADED_THRESHOLD = 3
private val DEFAULT_ACQUIRE_READY_TIMEOUT = Duration.ofSeconds(30)
private val DEFAULT_ACQUIRE_HEALTH_CHECK_POLLING_INTERVAL = Duration.ofMillis(200)
private val DEFAULT_ACQUIRE_MIN_REMAINING_TTL_CAP: Duration = Duration.ofSeconds(60)
private val DEFAULT_WARMUP_READY_TIMEOUT = Duration.ofSeconds(30)
private val DEFAULT_WARMUP_HEALTH_CHECK_POLLING_INTERVAL = Duration.ofMillis(200)
private val DEFAULT_IDLE_TIMEOUT = Duration.ofHours(24)
private val DEFAULT_DRAIN_TIMEOUT = Duration.ofSeconds(30)

@JvmStatic
fun builder(): Builder = Builder()

/**
* Resolves the default `acquireMinRemainingTtl` from the user's [idleTimeout]:
* `min(60s, idleTimeout / 2)`. The result is always strictly less than [idleTimeout],
* so users with short idle timeouts get an automatically scaled threshold instead of a
* config-time error.
*/
internal fun defaultAcquireMinRemainingTtl(idleTimeout: Duration): Duration {
val half = idleTimeout.dividedBy(2L)
return if (DEFAULT_ACQUIRE_MIN_REMAINING_TTL_CAP < half) {
DEFAULT_ACQUIRE_MIN_REMAINING_TTL_CAP
} else {
half
}
}
}

internal fun withMaxIdle(maxIdle: Int): PoolConfig {
Expand All @@ -126,6 +157,7 @@ class PoolConfig private constructor(
acquireHealthCheckPollingInterval = acquireHealthCheckPollingInterval,
acquireHealthCheck = acquireHealthCheck,
acquireSkipHealthCheck = acquireSkipHealthCheck,
acquireMinRemainingTtl = acquireMinRemainingTtl,
warmupReadyTimeout = warmupReadyTimeout,
warmupHealthCheckPollingInterval = warmupHealthCheckPollingInterval,
warmupHealthCheck = warmupHealthCheck,
Expand All @@ -151,6 +183,7 @@ class PoolConfig private constructor(
private var acquireHealthCheckPollingInterval: Duration = DEFAULT_ACQUIRE_HEALTH_CHECK_POLLING_INTERVAL
private var acquireHealthCheck: ((Sandbox) -> Boolean)? = null
private var acquireSkipHealthCheck: Boolean = false
private var acquireMinRemainingTtl: Duration? = null
private var warmupReadyTimeout: Duration = DEFAULT_WARMUP_READY_TIMEOUT
private var warmupHealthCheckPollingInterval: Duration = DEFAULT_WARMUP_HEALTH_CHECK_POLLING_INTERVAL
private var warmupHealthCheck: ((Sandbox) -> Boolean)? = null
Expand Down Expand Up @@ -229,6 +262,21 @@ class PoolConfig private constructor(
return this
}

/**
* Sets the minimum remaining TTL an idle sandbox must have to be returned by acquire.
* Idle entries closer to expiry than [acquireMinRemainingTtl] are discarded so the
* subsequent ready-check and any user-side renew have time to run before the server-side
* expiry kicks in.
*
* Must be non-negative and strictly less than `idleTimeout`. If not set, the resolved
* default is `min(60s, idleTimeout / 2)`. Pass [Duration.ZERO] to opt out and restore the
* pre-existing binary-expiry behavior.
*/
fun acquireMinRemainingTtl(acquireMinRemainingTtl: Duration): Builder {
this.acquireMinRemainingTtl = acquireMinRemainingTtl
return this
}

fun warmupReadyTimeout(warmupReadyTimeout: Duration): Builder {
this.warmupReadyTimeout = warmupReadyTimeout
return this
Expand Down Expand Up @@ -277,6 +325,8 @@ class PoolConfig private constructor(
val spec = creationSpec ?: throw IllegalArgumentException("creationSpec is required")

val warmup = warmupConcurrency ?: ceil(max * 0.2).toInt().coerceAtLeast(1)
val resolvedAcquireMinRemainingTtl =
acquireMinRemainingTtl ?: defaultAcquireMinRemainingTtl(idleTimeout)

return PoolConfig(
poolName = name,
Expand All @@ -293,6 +343,7 @@ class PoolConfig private constructor(
acquireHealthCheckPollingInterval = acquireHealthCheckPollingInterval,
acquireHealthCheck = acquireHealthCheck,
acquireSkipHealthCheck = acquireSkipHealthCheck,
acquireMinRemainingTtl = resolvedAcquireMinRemainingTtl,
warmupReadyTimeout = warmupReadyTimeout,
warmupHealthCheckPollingInterval = warmupHealthCheckPollingInterval,
warmupHealthCheck = warmupHealthCheck,
Expand Down
Loading
Loading