Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
317 changes: 317 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

63 changes: 56 additions & 7 deletions crates/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ use std::sync::{Arc, RwLock};
use index::flat::index::FlatIndex;
use index::{IndexType, VectorIndex};
use snapshot::Snapshot;
use storage::rocks_db::RocksDbStorage;
use storage::{StorageEngine, StorageType, VectorPage};
use storage::{StorageEngine, StorageType, VectorPage, create_storage_engine};

use uuid::Uuid;

Expand Down Expand Up @@ -252,10 +251,7 @@ pub fn restore_from_snapshot(config: &DbRestoreConfig) -> Result<VectorDb, DbErr

pub fn init_api(config: DbConfig) -> Result<VectorDb> {
// Initialize the storage engine
let storage = match config.storage_type {
StorageType::RocksDb => Arc::new(RocksDbStorage::new(config.data_path)?),
_ => Arc::new(RocksDbStorage::new(config.data_path)?),
};
let storage = create_storage_engine(config.storage_type, config.data_path)?;

// Initialize the vector index
let index: Arc<RwLock<dyn VectorIndex>> = match config.index_type {
Expand Down Expand Up @@ -290,9 +286,13 @@ mod tests {

// Helper function to create a test database
fn create_test_db() -> (VectorDb, TempDir) {
create_test_db_with_storage(StorageType::RocksDb)
}

fn create_test_db_with_storage(storage_type: StorageType) -> (VectorDb, TempDir) {
let temp_dir = tempdir().unwrap();
let config = DbConfig {
storage_type: StorageType::RocksDb,
storage_type,
index_type: IndexType::Flat,
data_path: temp_dir.path().to_path_buf(),
dimension: 3,
Expand All @@ -301,6 +301,13 @@ mod tests {
(init_api(config).unwrap(), temp_dir)
}

fn test_payload(content: &str) -> Payload {
Payload {
content_type: ContentType::Text,
content: content.to_string(),
}
}

#[test]
fn test_insert_and_get() {
let (db, _temp_dir) = create_test_db();
Expand All @@ -326,6 +333,20 @@ mod tests {
assert_eq!(point.payload.as_ref().unwrap().content, "Test content");
}

#[test]
fn test_insert_and_get_with_in_memory_storage() {
let (db, _temp_dir) = create_test_db_with_storage(StorageType::InMemory);
let vector = vec![1.0, 2.0, 3.0];
let payload = test_payload("Test content");

let id = db.insert(vector.clone(), payload.clone()).unwrap();
let point = db.get(id).unwrap().unwrap();

assert_eq!(point.id, id);
assert_eq!(point.vector, Some(vector));
assert_eq!(point.payload, Some(payload));
}

#[test]
fn test_dimension_mismatch() {
let (db, _temp_dir) = create_test_db();
Expand Down Expand Up @@ -593,6 +614,34 @@ mod tests {
assert!(loaded_db.get(id2).unwrap().unwrap().vector.unwrap() == v2);
}

#[test]
fn test_create_and_load_snapshot_with_in_memory_storage() {
let (old_db, temp_dir) = create_test_db_with_storage(StorageType::InMemory);

let v1 = vec![0.0, 1.0, 2.0];
let v2 = vec![3.0, 4.0, 5.0];
let v3 = vec![6.0, 7.0, 8.0];

let id1 = old_db.insert(v1.clone(), test_payload("one")).unwrap();
let id2 = old_db.insert(v2.clone(), test_payload("two")).unwrap();

let temp_snapshot_dir = tempdir().unwrap();
let snapshot_path = old_db.create_snapshot(temp_snapshot_dir.path()).unwrap();

let id3 = old_db.insert(v3, test_payload("three")).unwrap();

let reload_config = DbRestoreConfig {
data_path: temp_dir.path().to_path_buf(),
snapshot_path,
};

let loaded_db = restore_from_snapshot(&reload_config).unwrap();

assert_eq!(loaded_db.get(id1).unwrap().unwrap().vector, Some(v1));
assert_eq!(loaded_db.get(id2).unwrap().unwrap().vector, Some(v2));
assert!(loaded_db.get(id3).unwrap().is_none());
}

#[test]
fn test_snapshot_engine() {
let (_db, _temp_dir) = create_test_db();
Expand Down
9 changes: 9 additions & 0 deletions crates/grpc/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,15 @@ impl From<storage::error::StorageError> for GrpcError {
StorageError::RocksDbFlush { source: _ } => GrpcError::Internal {
message: "flush error".to_string(),
},
StorageError::InMemoryLock {} => GrpcError::Internal {
message: "failed to lock in-memory storage".to_string(),
},
StorageError::InMemoryCheckpoint { msg } => GrpcError::Internal {
message: format!("in-memory checkpoint error: {}", msg),
},
StorageError::InMemoryCheckpointIo { msg, source: _ } => GrpcError::Internal {
message: format!("in-memory checkpoint io error: {}", msg),
},
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions crates/http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ serde_json.workspace = true
storage.workspace = true
tokio.workspace = true
tracing.workspace = true

[dev-dependencies]
axum-test.workspace = true
tempfile.workspace = true
5 changes: 4 additions & 1 deletion crates/http/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,10 @@ fn api_error_to_response(err: &ApiError) -> (StatusCode, String) {
| StorageError::RocksDbFlush { .. }
| StorageError::RocksDbInitialization { .. }
| StorageError::RocksDbCheckpointMsg { .. }
| StorageError::RocksDbCheckpointIo { .. } => {
| StorageError::RocksDbCheckpointIo { .. }
| StorageError::InMemoryLock { .. }
| StorageError::InMemoryCheckpoint { .. }
| StorageError::InMemoryCheckpointIo { .. } => {
(StatusCode::INTERNAL_SERVER_ERROR, source.to_string())
}
},
Expand Down
64 changes: 64 additions & 0 deletions crates/http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,67 @@ pub async fn run_http_server(db: Arc<VectorDb>, addr: SocketAddr) -> Result<(),
axum::serve(listener, app.into_make_service()).await?;
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
use api::DbConfig;
use axum::http::StatusCode;
use axum_test::TestServer;
use defs::Similarity;
use index::IndexType;
use serde_json::json;
use storage::StorageType;

#[tokio::test]
async fn in_memory_storage_http_smoke_test() {
let temp_dir = tempfile::tempdir().unwrap();
let db = api::init_api(DbConfig {
storage_type: StorageType::InMemory,
index_type: IndexType::Flat,
data_path: temp_dir.path().to_path_buf(),
dimension: 3,
similarity: Similarity::Cosine,
})
.unwrap();
let server = TestServer::new(create_router(Arc::new(db))).unwrap();

let insert_response = server
.post("/points")
.json(&json!({
"vector": [1.0, 0.0, 0.0],
"payload": {
"content_type": "Text",
"content": "smoke-test"
}
}))
.await;
insert_response.assert_status(StatusCode::CREATED);
let insert_body: serde_json::Value = insert_response.json();
let point_id = insert_body["point_id"].as_str().unwrap();

let get_response = server.get(&format!("/points/{point_id}")).await;
get_response.assert_status_ok();
let point_body: serde_json::Value = get_response.json();
assert_eq!(point_body["payload"]["content"], "smoke-test");
assert_eq!(point_body["vector"], json!([1.0, 0.0, 0.0]));

let search_response = server
.post("/points/search")
.json(&json!({
"vector": [1.0, 0.0, 0.0],
"similarity": "Cosine",
"limit": 1
}))
.await;
search_response.assert_status_ok();
let search_body: serde_json::Value = search_response.json();
assert_eq!(search_body["results"], json!([point_id]));

let delete_response = server.delete(&format!("/points/{point_id}")).await;
delete_response.assert_status(StatusCode::NO_CONTENT);

let missing_response = server.get(&format!("/points/{point_id}")).await;
missing_response.assert_status(StatusCode::NOT_FOUND);
}
}
10 changes: 3 additions & 7 deletions crates/snapshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use std::{
time::SystemTime,
};
use storage::{
StorageEngine, StorageType, checkpoint::StorageCheckpoint, rocks_db::RocksDbStorage,
StorageEngine, StorageType, checkpoint::StorageCheckpoint, in_memory::MemoryStorage,
rocks_db::RocksDbStorage,
};
use tar::Archive;
use tempfile::tempdir;
Expand Down Expand Up @@ -201,17 +202,12 @@ impl Snapshot {
));
}

// only rocksdb is supported for snapshots as of now
let mut storage_engine: Box<dyn StorageEngine> = match manifest.storage_type {
StorageType::InMemory => Box::new(MemoryStorage::new()),
StorageType::RocksDb => Box::new(
RocksDbStorage::new(storage_data_path)
.map_err(|e| DbError::StorageError(format!("Could not open storage: {e}")))?,
),
_ => {
return Err(DbError::SnapshotError(
"Unsupported storage type".to_string(),
));
}
};

let id = manifest.id;
Expand Down
1 change: 1 addition & 0 deletions crates/storage/src/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ impl StorageCheckpoint {
.0;

let storage_type = match marker {
INMEMORY_CHECKPOINT_FILENAME_MARKER => StorageType::InMemory,
ROCKSDB_CHECKPOINT_FILENAME_MARKER => StorageType::RocksDb,
_ => {
return Err(DbError::StorageCheckpointError(
Expand Down
9 changes: 9 additions & 0 deletions crates/storage/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ pub enum StorageError {
#[snafu(display("Failed to iterate over storage: {source}"))]
RocksDbIteration { source: rocksdb::Error },

#[snafu(display("Failed to lock in-memory storage"))]
InMemoryLock {},

#[snafu(display("In-memory checkpoint error: {}", msg))]
InMemoryCheckpoint { msg: String },

#[snafu(display("{} : {}", msg, source))]
InMemoryCheckpointIo { msg: String, source: std::io::Error },

#[snafu(display("Failed to serialize point {id}: {source}"))]
Serialization { id: PointId, source: bincode::Error },

Expand Down
Loading
Loading