Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 50 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ uuid = { workspace = true }
tokio = { workspace = true }
tokio-rustls = { workspace = true }
tower-http = { workspace = true }
json-patch = "4.1.0"
redbx.workspace = true

[dev-dependencies]
tempfile = { workspace = true }
Expand Down
162 changes: 143 additions & 19 deletions crates/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,21 @@ async fn schema_info() -> Result<impl IntoResponse, StatusCode> {
}

/// GET /v1/:collection
/// Lists all documents in a collection.
/// Lists documents in a collection, chunked via cursor-based pagination.
///
/// Transcodes to JSON if requested; otherwise defaults to MessagePack with named fields.
/// Yeah, it's a full scan — pagination and cursors come in v0.3 Phase B.
/// Query params: `cursor` (string), `limit` (integer).
/// Defaults to returning MessagePack. Transcodes to JSON only if requested.
async fn list_docs(
State(state): State<AppState>,
Path(collection): Path<String>,
axum::extract::Query(params): axum::extract::Query<forge_types::pagination::PaginationParams>,
headers: axum::http::HeaderMap,
) -> Result<impl IntoResponse, StatusCode> {
match state.engine.list(&collection) {
Ok(docs) => {
let limit = params.limit.unwrap_or(50).clamp(1, 100) as usize;
let cursor = params.cursor.as_deref();

match state.engine.list_paginated(&collection, cursor, limit) {
Ok((docs, next_cursor)) => {
let accept = headers
.get(axum::http::header::ACCEPT)
.and_then(|h| h.to_str().ok())
Expand All @@ -140,25 +144,40 @@ async fn list_docs(
serde_json::json!({ "id": id, "doc": doc })
})
.collect();

let has_more = next_cursor.is_some();
let response = forge_types::pagination::PaginatedResponse {
data: json_docs,
next_cursor: next_cursor.clone(),
has_more,
};

Ok((
StatusCode::OK,
[(axum::http::header::CONTENT_TYPE, "application/json")],
serde_json::to_vec(&json_docs).unwrap_or_default(),
serde_json::to_vec(&response).unwrap_or_default(),
)
.into_response())
} else {
// For MessagePack, we need to wrap the internal documents in a structured array.
// We use serde_json::Value as an intermediate to deserialize the inner doc and
// re-serialize as named msgpack, avoiding strict struct typing.
let mut wrapper = Vec::new();
// For MessagePack, we deserialize the internal payload, wrap it,
// and pack it into a structured array inside a PaginatedResponse.
let mut wrapper = Vec::with_capacity(docs.len());
for (id, bytes) in docs {
if let Ok(val) = rmp_serde::from_slice::<serde_json::Value>(&bytes) {
wrapper.push(serde_json::json!({ "id": id, "doc": val }));
}
}

let has_more = next_cursor.is_some();
let response = forge_types::pagination::PaginatedResponse {
data: wrapper,
next_cursor,
has_more,
};

let resp_bytes =
forge_storage::document::serialize_doc_named(&wrapper).map_err(|e| {
tracing::error!("failed to serialize list to msgpack: {e}");
forge_storage::document::serialize_doc_named(&response).map_err(|e| {
tracing::error!("failed to serialize paginated list to msgpack: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})?;

Expand All @@ -171,7 +190,7 @@ async fn list_docs(
}
}
Err(e) => {
tracing::error!("list failed: {e}");
tracing::error!("list_paginated failed: {e}");
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
Expand Down Expand Up @@ -323,12 +342,117 @@ async fn query_docs_stub() -> impl IntoResponse {
}

/// PATCH /v1/:collection/:id
/// Partial update (placeholder).
async fn update_doc() -> impl IntoResponse {
(
StatusCode::NOT_IMPLEMENTED,
"Atomic updates planned for v0.3",
)
/// Partial atomic update using RFC 7396 JSON Merge Patch semantics.
///
/// Converts the inbound patch (JSON or MsgPack) to a generic `Value`, reads the
/// current document, merges the fields atomically, and saves it.
/// Returns the updated document.
async fn update_doc(
State(state): State<AppState>,
Path((collection, id)): Path<(String, String)>,
headers: axum::http::HeaderMap,
body: bytes::Bytes,
) -> Result<impl IntoResponse, StatusCode> {
let content_type = headers
.get(axum::http::header::CONTENT_TYPE)
.and_then(|h| h.to_str().ok())
.unwrap_or("");

// Deserialize inbound patch
let patch_val: serde_json::Value = if content_type.contains("application/json") {
serde_json::from_slice(&body).map_err(|e| {
tracing::warn!("failed to parse JSON patch: {e}");
StatusCode::BAD_REQUEST
})?
} else {
forge_storage::document::deserialize_doc(&body).map_err(|e| {
tracing::warn!("failed to parse MessagePack patch: {e}");
StatusCode::BAD_REQUEST
})?
};

// Serialize it back to bytes for the merge_fn signature (it expects `&[u8]`)
// Actually, we can just close over `patch_val` and ignore the `patch_bytes` param
// to avoid an extra alloc, or pass an empty slice. We'll pass the patch bytes to
// appease the signature, but use our decoded struct.

// However, the signature is `engine.update_doc(..., patch: &[u8], merge_fn)`
// So let's serialize the patch explicitly for the call.
let patch_bytes = forge_storage::document::serialize_doc(&patch_val).map_err(|e| {
tracing::error!("failed to serialize patch intermediate: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})?;

match state.engine.update_doc(
&collection,
&id,
&patch_bytes,
|existing_bytes, _patch_bytes| {
// Read the existing document into a mutable serde_json::Value
let mut doc: serde_json::Value =
forge_storage::document::deserialize_doc(existing_bytes).map_err(|e| {
tracing::error!("corrupted storage doc during update: {e}");
forge_types::ForgeError::Storage(redbx::Error::Io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Storage corruption",
)))
})?;

// Apply the RFC 7396 Merge Patch
json_patch::merge(&mut doc, &patch_val);

// Re-encode back to compact internal representation
let final_bytes = forge_storage::document::serialize_doc(&doc).map_err(|e| {
tracing::error!("failed to re-encode merged doc to msgpack: {e}");
forge_types::ForgeError::Storage(redbx::Error::Io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Re-encoding failed",
)))
})?;

Ok(final_bytes)
},
) {
Ok(merged_bytes) => {
let accept = headers
.get(axum::http::header::ACCEPT)
.and_then(|h| h.to_str().ok())
.unwrap_or("");

let val: serde_json::Value = forge_storage::document::deserialize_doc(&merged_bytes)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;

let response_body = serde_json::json!({ "id": id, "doc": val });

if accept.contains("application/json") {
let json_bytes = serde_json::to_vec(&response_body).unwrap_or_default();
Ok((
StatusCode::OK,
[(axum::http::header::CONTENT_TYPE, "application/json")],
json_bytes,
)
.into_response())
} else {
let msg_bytes = forge_storage::document::serialize_doc_named(&response_body)
.unwrap_or_default();
Ok((
StatusCode::OK,
[(axum::http::header::CONTENT_TYPE, "application/msgpack")],
msg_bytes,
)
.into_response())
}
}
Err(forge_types::ForgeError::Storage(redbx::Error::Io(e)))
if e.kind() == std::io::ErrorKind::NotFound =>
{
Err(StatusCode::NOT_FOUND)
}
Err(e) => {
tracing::error!("update failed: {e}");
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}

/// DELETE /v1/:collection/:id
Expand Down
Loading