-
Notifications
You must be signed in to change notification settings - Fork 1
Modernize Python API and Implement High-Performance Batch Ingestion (v0.1.7) #16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -388,7 +388,7 @@ impl CortexaDBStore { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let (guard, _) = cvar | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .wait_timeout(runtime, timeout) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .expect("sync runtime wait poisoned"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| runtime = guard; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| runtime = guard; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let timed_out = runtime | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .dirty_since | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .map(|d| d.elapsed() >= max_delay) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -559,6 +559,65 @@ impl CortexaDBStore { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.execute_write_transaction_locked(&mut writer, WriteOp::InsertMemory(effective)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| pub fn insert_memories_batch(&self, entries: Vec<MemoryEntry>) -> Result<CommandId> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let mut writer = self.writer.lock().expect("writer lock poisoned"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let sync_now = matches!(self.sync_policy, SyncPolicy::Strict); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let mut last_cmd_id = CommandId(0); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for entry in entries { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let mut effective = entry; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Check for previous state to handle partial updates if necessary | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if let Ok(prev) = writer.engine.get_state_machine().get_memory(effective.id) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let content_changed = prev.content != effective.content; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if content_changed && effective.embedding.is_none() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return Err(CortexaDBStoreError::MissingEmbeddingOnContentChange(effective.id)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+567
to
+574
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if !content_changed && effective.embedding.is_none() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| effective.embedding = prev.embedding.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Validate dimension | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if let Some(embedding) = effective.embedding.as_ref() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if embedding.len() != writer.indexes.vector.dimension() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return Err(crate::index::vector::VectorError::DimensionMismatch { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| expected: writer.indexes.vector.dimension(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| actual: embedding.len(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .into()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Execute unsynced for the whole batch | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| last_cmd_id = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| writer.engine.execute_command_unsynced(Command::InsertMemory(effective.clone()))?; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Update vector index | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| match effective.embedding { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Some(embedding) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| writer.indexes.vector_index_mut().index_in_namespace( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| &effective.namespace, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| effective.id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| embedding, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| )?; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| None => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let _ = writer.indexes.vector_index_mut().remove(effective.id); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+591
to
+605
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Execute unsynced for the whole batch | |
| last_cmd_id = | |
| writer.engine.execute_command_unsynced(Command::InsertMemory(effective.clone()))?; | |
| // Update vector index | |
| match effective.embedding { | |
| Some(embedding) => { | |
| writer.indexes.vector_index_mut().index_in_namespace( | |
| &effective.namespace, | |
| effective.id, | |
| embedding, | |
| )?; | |
| } | |
| None => { | |
| let _ = writer.indexes.vector_index_mut().remove(effective.id); | |
| // Capture data needed for indexing before moving `effective` into the command | |
| let id = effective.id; | |
| let namespace = effective.namespace.clone(); | |
| let embedding_for_index = effective.embedding.clone(); | |
| // Execute unsynced for the whole batch | |
| last_cmd_id = | |
| writer.engine.execute_command_unsynced(Command::InsertMemory(effective))?; | |
| // Update vector index | |
| match embedding_for_index { | |
| Some(embedding) => { | |
| writer.indexes.vector_index_mut().index_in_namespace( | |
| &namespace, | |
| id, | |
| embedding, | |
| )?; | |
| } | |
| None => { | |
| let _ = writer.indexes.vector_index_mut().remove(id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remember_batch()returns the last command ID (last_cmd_id.0) rather than an inserted memory ID or list of IDs. This is inconsistent withremember*()methods that return the inserted memory ID, and it’s easy for callers to misinterpret (as the Rust example does). Consider returning the inserted IDs (or at least the last inserted memory ID), or rename/retag the return value/type to make it unambiguous (e.g.,CommandId).