-
Notifications
You must be signed in to change notification settings - Fork 0
Add delayqueue primitive and tenantstate limits #27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Caution Review failedThe pull request is closed. 📝 WalkthroughWalkthroughReplaces sync CLI chunk execution with scheduler-await, migrates TenantState Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Scheduler
participant DelayChannel
participant DelayQueue
participant Timer
Note over Client,Scheduler: Client schedules delayed insertion
Client->>Scheduler: add(value, delay)
Scheduler->>Timer: schedule timeout(delay)
Timer-->>Scheduler: timeout fires
Scheduler->>DelayChannel: enqueue expired item
Note over Client,DelayChannel: consumer awaits next()
Client->>DelayChannel: await next()
DelayChannel->>DelayQueue: poll for expired items
DelayQueue-->>DelayChannel: return expired value
DelayChannel-->>Client: deliver value
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
📜 Recent review detailsConfiguration used: defaults Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
crates/runtime/src/traits/ir/runtime.rs (1)
43-63: Consider simplifying the metatable validation logic.The current structure with an empty
ifblock for the success case is slightly awkward. A minor refactor could improve readability.🔎 Suggested refactor
// Ensure data is either a Object or Nil match data { LuaValue::Table(ref t) => { - if t.metatable().is_none() { - // OK - } else { + if t.metatable().is_some() { return Err(LuaError::FromLuaConversionError { from: "table with metatable", to: "TenantState".to_string(), message: Some("data field must be an object/map with no metatable or nil".to_string()), }); } }, LuaValue::Nil => {}delayqueuetest.luau (1)
4-9: Clarify the expected behavior in the test.The test schedules
task.delay(2, ...)which adds an item after 2 seconds, and then the item itself has a 2-second expiration delay viatimedelta_seconds(2). This meansnext()will block for approximately 4 seconds total.If the intent is to demonstrate the delay queue functionality, consider adding a comment explaining this expected behavior, or simplifying to a single delay for clarity.
crates/runtime/src/core/channel.rs (1)
114-126: Consider adding a maximum delay limit for consistency.
OneshotChannel::recvtimeoutenforces aMAX_TIMEOUTof 7 seconds, butDelayChannel::addandadd_with_handleaccept arbitrary delays without validation. This inconsistency could allow very long delays. Consider whether a similar limit should apply here.🔎 Suggested change if limit is desired
+const MAX_DELAY: Duration = Duration::from_secs(60); // or appropriate limit + /// Inserts a value into the delay channel with the given delay - pub fn add(&self, value: LuaValue, delay: Duration) { + pub fn add(&self, value: LuaValue, delay: Duration) -> Result<(), &'static str> { + if delay > MAX_DELAY { + return Err("Delay cannot exceed maximum allowed duration"); + } self.queue.borrow_mut().insert(value, delay); self.notify.notify_one(); + Ok(()) }
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (6)
crates/cli/src/cli.rscrates/cli/src/provider/mod.rscrates/runtime/Cargo.tomlcrates/runtime/src/core/channel.rscrates/runtime/src/traits/ir/runtime.rsdelayqueuetest.luau
🧰 Additional context used
🧬 Code graph analysis (1)
crates/runtime/src/traits/ir/runtime.rs (1)
crates/cli/src/provider/mod.rs (16)
data(156-156)data(161-161)data(162-162)data(164-164)data(165-165)data(166-166)data(167-167)data(194-194)data(199-199)data(200-200)data(202-202)data(203-203)data(204-204)data(205-205)data(417-417)data(422-422)
🔇 Additional comments (13)
crates/runtime/Cargo.toml (1)
14-14: LGTM!The
tokio-utildependency with thetimefeature is correctly added to support the newDelayQueue-based functionality in the channel module.crates/runtime/src/traits/ir/runtime.rs (3)
5-6: LGTM!The
MAX_EVENTSconstant provides a sensible upper bound to prevent runaway event accumulation.
29-37: LGTM!The events length validation provides a good safeguard against application logic errors with a clear error message.
75-87: LGTM!The
IntoLuaimplementation correctly serializes the newdatafield usinglua.to_value().crates/cli/src/cli.rs (1)
216-220: LGTM!The shift to
call_in_scheduleraligns with the async scheduler-based execution model, properly awaiting the chunk execution.crates/cli/src/provider/mod.rs (3)
786-791: LGTM!The local
TenantStatestruct is correctly updated to usedata: serde_json::Value, aligning with the runtime layer changes.
805-820: LGTM!The
get_tenant_stateimplementation correctly maps the newdatafield and provides an appropriate default ofserde_json::Value::Null.
822-830: LGTM!The
set_tenant_statecorrectly propagates thedatafield through serialization.crates/runtime/src/core/channel.rs (5)
72-98: LGTM!The
KeyHandleimplementation provides proper cancellation semantics with appropriate error handling when the queue has been dropped. TheEqmetamethod correctly compares both the key and queue reference.
128-145: LGTM!The
next()implementation correctly usesbiasedintokio::select!to prioritize draining expired items. The loop structure handles notification wakeups properly by recreating the stream wrapper.
148-165: LGTM!The
LuaUserDataimplementation correctly convertsTimeDeltatostd::time::Durationwith proper error handling for invalid conversions.
167-180: LGTM!The
QueueStreamwrapper correctly implementsStreamby borrowing theRefCellonly duringpoll_next, avoiding borrow conflicts with other operations.
189-191: LGTM!The
DelayChannelconstructor is correctly exposed ininit_plugin, making it available to Lua scripts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
crates/runtime/src/core/channel.rs (1)
148-169: Consider adding maximum delay validation for consistency.The
OneshotChannelvalidates that timeouts don't exceedMAX_TIMEOUT(7 seconds), butDelayChannel::add()andaddwithhandle()accept any duration. This inconsistency could allow extremely long delays that may not be intended.🔎 Proposed validation
Add validation in the Lua binding methods:
methods.add_method("add", |_, this, (value, delay): (LuaValue, LuaUserDataRef<TimeDelta>)| { let delay = delay.timedelta.to_std().map_err(LuaError::external)?; + if delay > MAX_TIMEOUT { + return Err(LuaError::external("Delay cannot be greater than the max timeout")); + } Ok(this.add(value, delay)) }); methods.add_method("addwithhandle", |_, this, (value, delay): (LuaValue, LuaUserDataRef<TimeDelta>)| { let delay = delay.timedelta.to_std().map_err(LuaError::external)?; + if delay > MAX_TIMEOUT { + return Err(LuaError::external("Delay cannot be greater than the max timeout")); + } let handle = this.add_with_handle(value, delay); Ok(handle) });Note: If longer delays are intentionally supported, consider documenting this difference or using a separate constant like
MAX_DELAY.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
crates/runtime/src/core/channel.rs
🧰 Additional context used
🧬 Code graph analysis (1)
crates/runtime/src/core/channel.rs (1)
crates/runtime/src/core/datetime.rs (1)
from_secs(19-23)
🔇 Additional comments (6)
crates/runtime/src/core/channel.rs (6)
1-14: LGTM: Imports and type updates support the new DelayQueue implementation.The new imports for
Stream,DelayQueue,Notify, and related types are appropriate for the delay channel functionality. TheMAX_TIMEOUTtype change toDurationaligns with standard Rust async patterns.
16-70: LGTM: OneshotChannel enhanced to support selection patterns.The addition of the
rxfield withRc<RefCell<Option<...>>>wrapping properly enables the receiver to be shared and taken once, which is necessary for theselectoneshotsfunctionality while maintaining correct ownership semantics.
72-98: LGTM: KeyHandle properly implements cancellation with safe borrowing.The use of
Weakreference prevents reference cycles,try_borrow_mut()handles potential borrow conflicts gracefully, and theEqimplementation correctly checks both key and queue identity.
128-145: LGTM: Correct async implementation with biased polling.The loop-and-recreate pattern is appropriate here: recreating the
QueueStreamensures fresh polling after notifications, and thebiasedselect correctly prioritizes expired items over notifications to prevent starvation. The method will block indefinitely if the queue remains empty, which is standard behavior for channel primitives.
171-184: LGTM: Stream wrapper correctly limits borrow scope.The
QueueStreamimplementation properly constrains the mutable borrow to just thepoll_expired()call. The explanatory comment confirms the brief borrow duration, which allows other operations to safely access the queue between polls.
186-238: LGTM: Module properly exposes DelayChannel to Lua.The
DelayChannelconstructor is correctly registered, and the existingselectoneshotsfunction properly integrates with the updatedOneshotChannelstructure.
crates/runtime/src/core/channel.rs
Outdated
| pub fn add(&self, value: LuaValue, delay: Duration) { | ||
| self.queue.borrow_mut().insert(value, delay); | ||
| self.notify.notify_one(); | ||
| } | ||
|
|
||
| /// Inserts a value into the delay channel with the given delay | ||
| /// and returns a handle that can be used to cancel it | ||
| pub fn add_with_handle(&self, value: LuaValue, delay: Duration) -> KeyHandle { | ||
| let key = self.queue.borrow_mut().insert(value, delay); | ||
| self.notify.notify_one(); | ||
| KeyHandle { key, queue: Rc::downgrade(&self.queue) } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential panic risk: use try_borrow_mut() for defensive error handling.
The add() and add_with_handle() methods use borrow_mut() directly, which will panic if the RefCell is already borrowed (e.g., if another operation is accessing the queue concurrently). While the single-threaded Lua scheduler makes this rare, it's better to handle borrow conflicts gracefully, especially since KeyHandle::cancel() already uses the safer try_borrow_mut() pattern.
🔎 Suggested defensive fix
-pub fn add(&self, value: LuaValue, delay: Duration) {
- self.queue.borrow_mut().insert(value, delay);
+pub fn add(&self, value: LuaValue, delay: Duration) -> LuaResult<()> {
+ self.queue.try_borrow_mut()
+ .map_err(|_| LuaError::external("Delay channel is currently being accessed"))?
+ .insert(value, delay);
self.notify.notify_one();
+ Ok(())
}
-pub fn add_with_handle(&self, value: LuaValue, delay: Duration) -> KeyHandle {
- let key = self.queue.borrow_mut().insert(value, delay);
+pub fn add_with_handle(&self, value: LuaValue, delay: Duration) -> LuaResult<KeyHandle> {
+ let key = self.queue.try_borrow_mut()
+ .map_err(|_| LuaError::external("Delay channel is currently being accessed"))?
+ .insert(value, delay);
self.notify.notify_one();
- KeyHandle { key, queue: Rc::downgrade(&self.queue) }
+ Ok(KeyHandle { key, queue: Rc::downgrade(&self.queue) })
}Then update the Lua bindings (lines 150-159) to propagate the Result:
methods.add_method("add", |_, this, (value, delay): (LuaValue, LuaUserDataRef<TimeDelta>)| {
let delay = delay.timedelta.to_std().map_err(LuaError::external)?;
- Ok(this.add(value, delay))
+ this.add(value, delay)
});
methods.add_method("addwithhandle", |_, this, (value, delay): (LuaValue, LuaUserDataRef<TimeDelta>)| {
let delay = delay.timedelta.to_std().map_err(LuaError::external)?;
- let handle = this.add_with_handle(value, delay);
- Ok(handle)
+ this.add_with_handle(value, delay)
});Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In crates/runtime/src/core/channel.rs around lines 115-126, replace the direct
RefCell::borrow_mut() calls with try_borrow_mut() to avoid panics: for add(),
attempt try_borrow_mut(), if Ok insert the value and notify, if Err return early
(or log a warning) without panicking; for add_with_handle() use try_borrow_mut()
and change the function signature to return a Result<KeyHandle, ChannelError>
(define a minimal ChannelError or use existing error type), on Ok construct and
return the KeyHandle, on Err return Err(ChannelError::BorrowFailed); then update
the Lua bindings (around lines 150-159) to propagate the Result instead of
assuming success so borrow failures surface to the caller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
crates/runtime/src/core/channel.rs (2)
137-142: Borrow conflict can still cause panics inadd_with_handle().Similar to
add(), Line 139 usesborrow_mut()which can panic on borrow conflicts. This should usetry_borrow_mut()for consistency withKeyHandle::cancel().🔎 Recommended fix
pub fn add_with_handle(&self, value: LuaValue, delay: Duration) -> LuaResult<KeyHandle> { Self::check_delay(delay)?; - let key = self.queue.borrow_mut().insert(value, delay); + let key = self.queue.try_borrow_mut() + .map_err(|_| LuaError::external("Delay channel is currently being accessed"))? + .insert(value, delay); self.notify.notify_one(); Ok(KeyHandle { key, queue: Rc::downgrade(&self.queue) }) }Based on learnings, past review comments indicated this should use defensive borrowing.
128-133: Borrow conflict can still cause panics inadd().The past review comment flagged that
borrow_mut()on Line 130 can panic if theRefCellis already borrowed. While the method now returnsLuaResult<()>, it still uses the panickingborrow_mut()instead oftry_borrow_mut(). Although the single-threaded Lua scheduler makes this unlikely, theKeyHandle::cancel()already demonstrates the safer pattern withtry_borrow_mut().🔎 Recommended fix
pub fn add(&self, value: LuaValue, delay: Duration) -> LuaResult<()> { Self::check_delay(delay)?; - self.queue.borrow_mut().insert(value, delay); + self.queue.try_borrow_mut() + .map_err(|_| LuaError::external("Delay channel is currently being accessed"))? + .insert(value, delay); self.notify.notify_one(); Ok(()) }Based on learnings, past review comments indicated this should use defensive borrowing.
🧹 Nitpick comments (2)
crates/runtime/src/core/datetime.rs (1)
31-36: Consider simplifying the error propagation.The explicit
matchwithErr(e) => Err(e)can be replaced with the?operator ormapfor more idiomatic Rust.🔎 Proposed simplification
pub const fn from_std(td: Duration) -> Result<TimeDelta, chrono::OutOfRangeError> { - match chrono::Duration::from_std(td) { - Ok(x) => Ok(TimeDelta { timedelta: x }), - Err(e) => Err(e) - } + Ok(TimeDelta { + timedelta: chrono::Duration::from_std(td)? + }) }crates/runtime/src/core/channel.rs (1)
197-202: Consider defensive borrowing inpoll_next().Line 201 uses
borrow_mut()which could panic if the queue is borrowed concurrently. While the borrow scope is tight and the async/await model makes this unlikely, usingtry_borrow_mut()would be more defensive and consistent with theKeyHandle::cancel()pattern.🔎 Proposed defensive fix
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - self.queue.borrow_mut().poll_expired(cx) + match self.queue.try_borrow_mut() { + Ok(mut queue) => queue.poll_expired(cx), + Err(_) => Poll::Pending, // Retry later if borrowed + } }
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
crates/runtime/src/core/channel.rscrates/runtime/src/core/datetime.rs
🔇 Additional comments (9)
crates/runtime/src/core/datetime.rs (2)
2-2: LGTM!The
Durationimport is correctly added to support the newfrom_stdconstructor.
25-29: LGTM!The
from_millisconstructor provides a convenient way to createTimeDeltafrom milliseconds and is correctly marked asconst.crates/runtime/src/core/channel.rs (7)
3-10: LGTM!The new imports are necessary for the
DelayChannelimplementation and related types.
14-20: LGTM!The constants define reasonable limits for the delay queue (~793 days max). The 5-second safety margin in
MAX_DURATION_OBJandMAX_DURATION_OBJ_STDprovides a buffer against edge cases.
22-34: LGTM!Adding the
rxfield toOneshotChannelenables theselectoneshotsfunction to access receivers. The wrapping pattern is consistent with the existingtxfield.
78-104: LGTM!The
KeyHandleimplementation is well-designed:
- Uses
Weakreference to avoid cycles- Defensively uses
try_borrow_mut()incancel()to handle borrow conflicts gracefully- Properly checks if the queue is still alive before attempting cancellation
144-161: LGTM!The
next()method correctly usestokio::select!withbiasedto prioritize expired items over notifications, ensuring efficient processing. Recreating theQueueStreamwrapper in the loop is a reasonable approach.
164-188: LGTM!The
LuaUserDataimplementation forDelayChannelproperly exposes the queue functionality to Lua. Good use oftry_borrow()in theLenmeta-method to avoid panics.
212-214: LGTM!The
DelayChannelconstructor is correctly registered in the module, making it available to Lua code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/cli/src/provider/mod.rs (1)
318-377: Remove unusedexpires_atfrom SELECT statements.The
findmethod still includesexpires_atin its SQL SELECT statements (lines 326 and 340), but this field has been systematically removed from all other KV operations in this file. The constructedKvRecordobjects (lines 362-371) don't useexpires_at, confirming it's unused.This inconsistency will cause a SQL error if the
expires_atcolumn has been dropped from thekv_v2table, or waste resources if it still exists.🔎 Proposed fix
let entries = if query == "%%" { // Fast path for querying all keys sqlx::query( - "SELECT id, key, value, created_at, last_updated_at, scopes, expires_at + "SELECT id, key, value, created_at, last_updated_at, scopes FROM kv_v2 WHERE guild_id = $1 AND scopes @> $2 ", ) .bind(self.guild_id.to_string()) .bind(scopes) .fetch_all(&self.pool) .await .map_err(|e| format!("Failed to get key: {e}"))? } else { sqlx::query( - "SELECT id, key, value, created_at, last_updated_at, scopes, expires_at + "SELECT id, key, value, created_at, last_updated_at, scopes FROM kv_v2 WHERE guild_id = $1 AND key ILIKE $2 AND scopes @> $3 ", ) .bind(self.guild_id.to_string()) .bind(query) .bind(scopes) .fetch_all(&self.pool) .await .map_err(|e| format!("Failed to get key: {e}"))? };
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
crates/cli/src/provider/mod.rscrates/runtime/src/plugins/antiraid/kv/mod.rscrates/runtime/src/traits/ir/kv.rscrates/runtime/src/traits/kvprovider.rs
💤 Files with no reviewable changes (2)
- crates/runtime/src/traits/ir/kv.rs
- crates/runtime/src/traits/kvprovider.rs
🧰 Additional context used
🧬 Code graph analysis (1)
crates/runtime/src/plugins/antiraid/kv/mod.rs (1)
crates/runtime/src/traits/kvprovider.rs (1)
exists(23-23)
🔇 Additional comments (4)
crates/cli/src/provider/mod.rs (2)
745-770: LGTM!The tenant state methods have been correctly updated to use the
datafield withserde_json::Value::Nullas the default. The serialization and deserialization logic properly maps between the local and runtimeTenantStaterepresentations.
725-732: Breaking change fromflags: u32todata: serde_json::Valuehas been properly implemented across all code paths.Verification confirms that all TenantState usages—including serialization in
get_tenant_state(lines 750-752), deserialization inset_tenant_state(lines 764-766), and Lua interop in bothFromLuaandIntoLuaimplementations—correctly use the newdatafield. No references to the oldflagsfield remain.crates/runtime/src/plugins/antiraid/kv/mod.rs (2)
148-190: LGTM! Good performance improvement.Changing the
actionandkeyparameters fromStringto&strreduces unnecessary allocations and is more idiomatic for read-only string parameters. The change is applied consistently throughout the method.
376-404: LGTM! Expiry removal is clean.The removal of expiry parameters from the
setmethod is consistent with the broader changes to eliminateexpires_athandling from the KV system. The simplified method signature is cleaner and easier to use.
Summary by CodeRabbit
New Features
Improvements
Chores
Tests
Breaking Changes
✏️ Tip: You can customize this high-level summary in your review settings.