diff --git a/src/datasets/api.rs b/src/datasets/api.rs index f98a99ff..232f1032 100644 --- a/src/datasets/api.rs +++ b/src/datasets/api.rs @@ -1,11 +1,12 @@ use std::collections::HashSet; -use anyhow::{bail, Result}; +use anyhow::{bail, Context, Result}; +use reqwest::header::HeaderMap; use serde::{Deserialize, Serialize}; use serde_json::{json, Map, Value}; use urlencoding::encode; -use crate::http::ApiClient; +use crate::http::{ApiClient, HttpError}; use super::records::DATASET_RECORD_FIELDS; @@ -72,11 +73,51 @@ impl Dataset { } } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DatasetSnapshot { + pub id: String, + pub name: String, + pub dataset_id: String, + pub description: Option, + pub xact_id: String, + pub created: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CreateDatasetSnapshotResult { + pub dataset_snapshot: DatasetSnapshot, + pub found_existing: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DatasetRestorePreview { + pub rows_to_restore: usize, + pub rows_to_delete: usize, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DatasetRestoreResult { + pub xact_id: String, + pub rows_restored: usize, + pub rows_deleted: usize, +} + #[derive(Debug, Deserialize)] struct ListResponse { objects: Vec, } +#[derive(Debug, Deserialize)] +struct ListResponseGeneric { + objects: Vec, +} + +#[derive(Debug, Deserialize)] +struct DatasetHeadXactRow { + #[serde(rename = "_xact_id", default)] + xact_id: Option, +} + pub async fn list_datasets(client: &ApiClient, project_id: &str) -> Result> { let path = format!( "/v1/dataset?org_name={}&project_id={}", @@ -203,6 +244,87 @@ pub async fn delete_dataset(client: &ApiClient, dataset_id: &str) -> Result<()> client.delete(&path).await } +pub async fn list_dataset_snapshots( + client: &ApiClient, + dataset_id: &str, +) -> Result> { + let path = format!("/v1/dataset_snapshot?dataset_id={}", encode(dataset_id)); + let list: ListResponseGeneric = client.get(&path).await?; + Ok(list.objects) +} + +pub async fn create_dataset_snapshot( + client: &ApiClient, + dataset_id: &str, + name: &str, + description: Option<&str>, + xact_id: &str, +) -> Result { + let mut body = serde_json::json!({ + "dataset_id": dataset_id, + "name": name, + "xact_id": xact_id, + }); + if let Some(description) = description { + body["description"] = Value::String(description.to_string()); + } + let response = client + .post_with_headers_raw("/v1/dataset_snapshot", &body, &[]) + .await?; + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(HttpError { status, body }.into()); + } + + let found_existing = found_existing_snapshot_header(response.headers()); + let dataset_snapshot = response.json().await.context("failed to parse response")?; + Ok(CreateDatasetSnapshotResult { + dataset_snapshot, + found_existing, + }) +} + +pub async fn preview_dataset_restore( + client: &ApiClient, + dataset_id: &str, + xact_id: &str, +) -> Result { + let path = format!("/v1/dataset/{}/restore/preview", encode(dataset_id)); + client + .post(&path, &serde_json::json!({ "version": xact_id })) + .await +} + +pub async fn restore_dataset( + client: &ApiClient, + dataset_id: &str, + xact_id: &str, +) -> Result { + let path = format!("/v1/dataset/{}/restore", encode(dataset_id)); + client + .post(&path, &serde_json::json!({ "version": xact_id })) + .await +} + +pub async fn get_dataset_head_xact_id( + client: &ApiClient, + dataset_id: &str, +) -> Result> { + let query = build_dataset_head_xact_query(dataset_id); + let response = client + .btql_structured::(&query) + .await?; + let head = response + .data + .into_iter() + .filter_map(|row| row.xact_id) + .map(|value| value.trim().to_string()) + .filter(|value| !value.is_empty()) + .max_by(compare_xact_ids); + Ok(head) +} + fn resolve_dataset_rows_page_limit(max_rows: Option, loaded_rows: usize) -> Option { match max_rows { None => Some(MAX_DATASET_ROWS_PAGE_LIMIT), @@ -256,6 +378,44 @@ fn dataset_rows_select_fields() -> Vec { .collect() } +fn build_dataset_head_xact_query(dataset_id: &str) -> Value { + json!({ + "select": [{ + "expr": {"op": "ident", "name": ["_xact_id"]}, + "alias": "_xact_id", + }], + "from": { + "op": "function", + "name": {"op": "ident", "name": ["dataset"]}, + "args": [{"op": "literal", "value": dataset_id}] + }, + "filter": { + "op": "ge", + "left": {"op": "ident", "name": ["created"]}, + "right": {"op": "literal", "value": DATASET_ROWS_SINCE} + }, + "sort": [{ + "expr": {"op": "ident", "name": ["_xact_id"]}, + "dir": "desc", + }], + "limit": 1 + }) +} + +fn compare_xact_ids(left: &String, right: &String) -> std::cmp::Ordering { + match (left.parse::(), right.parse::()) { + (Ok(left), Ok(right)) => left.cmp(&right), + _ => left.cmp(right), + } +} + +fn found_existing_snapshot_header(headers: &HeaderMap) -> bool { + headers + .get("x-bt-found-existing") + .and_then(|value| value.to_str().ok()) + .is_some_and(|value| value.eq_ignore_ascii_case("true") || value == "1") +} + #[cfg(test)] mod tests { use super::*; @@ -338,6 +498,99 @@ mod tests { ); } + #[test] + fn dataset_head_query_includes_required_filter_and_limit() { + let query = build_dataset_head_xact_query("dataset-id"); + assert_eq!( + query, + serde_json::json!({ + "select": [{ + "expr": {"op": "ident", "name": ["_xact_id"]}, + "alias": "_xact_id", + }], + "from": { + "op": "function", + "name": {"op": "ident", "name": ["dataset"]}, + "args": [{"op": "literal", "value": "dataset-id"}] + }, + "filter": { + "op": "ge", + "left": {"op": "ident", "name": ["created"]}, + "right": {"op": "literal", "value": "1970-01-01T00:00:00Z"} + }, + "sort": [{ + "expr": {"op": "ident", "name": ["_xact_id"]}, + "dir": "desc", + }], + "limit": 1 + }) + ); + } + + #[test] + fn dataset_head_query_keeps_dataset_id_as_literal() { + let query = build_dataset_head_xact_query("dataset'with-quote"); + assert_eq!( + query.pointer("/from/args/0/value").and_then(Value::as_str), + Some("dataset'with-quote") + ); + } + + #[test] + fn compare_xact_ids_prefers_numeric_order_when_possible() { + assert_eq!( + compare_xact_ids(&"10".to_string(), &"2".to_string()), + std::cmp::Ordering::Greater + ); + assert_eq!( + compare_xact_ids(&"b".to_string(), &"a".to_string()), + std::cmp::Ordering::Greater + ); + } + + #[test] + fn dataset_snapshot_deserializes_service_schema() { + let snapshot: DatasetSnapshot = serde_json::from_value(serde_json::json!({ + "id": "01926568-8088-7109-99ab-123456789abc", + "dataset_id": "01926568-8088-7109-99ab-abcdef012345", + "name": "baseline", + "description": null, + "xact_id": "1000192656880881099", + "created": null + })) + .expect("deserialize snapshot"); + + assert_eq!(snapshot.dataset_id, "01926568-8088-7109-99ab-abcdef012345"); + assert_eq!(snapshot.name, "baseline"); + assert!(snapshot.description.is_none()); + assert_eq!(snapshot.xact_id, "1000192656880881099"); + assert!(snapshot.created.is_none()); + } + + #[test] + fn dataset_restore_preview_deserializes_count_fields() { + let preview: DatasetRestorePreview = serde_json::from_value(serde_json::json!({ + "rows_to_restore": 7, + "rows_to_delete": 2 + })) + .expect("deserialize preview"); + assert_eq!(preview.rows_to_restore, 7); + assert_eq!(preview.rows_to_delete, 2); + } + + #[test] + fn dataset_restore_result_deserializes_count_fields() { + let result: DatasetRestoreResult = serde_json::from_value(serde_json::json!({ + "xact_id": "1000192656880881099", + "rows_restored": 7, + "rows_deleted": 2 + })) + .expect("deserialize result"); + assert_eq!(result.xact_id, "1000192656880881099"); + assert_eq!(result.rows_restored, 7); + assert_eq!(result.rows_deleted, 2); + } + #[test] fn dataset_rows_page_limit_defaults_to_api_max() { assert_eq!( @@ -356,4 +609,23 @@ mod tests { fn dataset_rows_page_limit_stops_when_limit_reached() { assert_eq!(resolve_dataset_rows_page_limit(Some(200), 200), None); } + + #[test] + fn found_existing_snapshot_header_accepts_true_and_one() { + let mut headers = HeaderMap::new(); + headers.insert("x-bt-found-existing", "true".parse().expect("header")); + assert!(found_existing_snapshot_header(&headers)); + + headers.insert("x-bt-found-existing", "1".parse().expect("header")); + assert!(found_existing_snapshot_header(&headers)); + } + + #[test] + fn found_existing_snapshot_header_rejects_missing_or_false() { + assert!(!found_existing_snapshot_header(&HeaderMap::new())); + + let mut headers = HeaderMap::new(); + headers.insert("x-bt-found-existing", "false".parse().expect("header")); + assert!(!found_existing_snapshot_header(&headers)); + } } diff --git a/src/datasets/mod.rs b/src/datasets/mod.rs index eb59226d..91f408c0 100644 --- a/src/datasets/mod.rs +++ b/src/datasets/mod.rs @@ -16,6 +16,7 @@ mod create; mod delete; mod list; mod records; +mod snapshots; mod update; mod utils; mod view; @@ -86,6 +87,13 @@ struct DatasetInputArgs { bt datasets update my-dataset --file records.jsonl bt datasets add my-dataset --rows '[{"id":"case-2","input":{"text":"bye"},"expected":"goodbye"}]' bt datasets refresh my-dataset --file records.jsonl --id-field metadata.case_id + bt datasets snapshots list my-dataset + bt datasets snapshots create my-dataset + bt datasets snapshots create my-dataset baseline + bt datasets snapshots create my-dataset baseline --xact-id 1000192656880881099 + bt datasets snapshots restore my-dataset + bt datasets snapshots restore my-dataset --name baseline + bt datasets snapshots restore my-dataset --snapshot 1000192656880881099 --force bt datasets view my-dataset bt datasets delete my-dataset "#)] @@ -107,6 +115,9 @@ enum DatasetsCommands { View(ViewArgs), /// Delete a dataset Delete(DeleteArgs), + /// Manage dataset snapshots + #[command(visible_aliases = ["versions", "version"])] + Snapshots(snapshots::SnapshotsArgs), } #[derive(Debug, Clone, Args)] @@ -296,14 +307,20 @@ pub async fn run(base: BaseArgs, args: DatasetsArgs) -> Result<()> { Some(DatasetsCommands::Delete(delete_args)) => { delete::run(&ctx, delete_args.name(), delete_args.force).await } + Some(DatasetsCommands::Snapshots(snapshot_args)) => { + snapshots::run(&ctx, &base, snapshot_args).await + } } } fn datasets_command_is_read_only(command: Option<&DatasetsCommands>) -> bool { - matches!( - command, - None | Some(DatasetsCommands::List) | Some(DatasetsCommands::View(_)) - ) + match command { + None | Some(DatasetsCommands::List) | Some(DatasetsCommands::View(_)) => true, + Some(DatasetsCommands::Snapshots(args)) => snapshots::command_is_read_only(args), + Some(DatasetsCommands::Create(_)) + | Some(DatasetsCommands::Update(_)) + | Some(DatasetsCommands::Delete(_)) => false, + } } fn resolve_view_row_limit(args: &ViewArgs) -> Option { @@ -320,6 +337,10 @@ mod tests { use clap::{Parser, Subcommand}; + use super::snapshots::{ + SnapshotCreateArgs, SnapshotDatasetArgs, SnapshotListArgs, SnapshotNameArgs, + SnapshotRestoreArgs, SnapshotsArgs, SnapshotsCommands, + }; use super::*; #[derive(Debug, Parser)] @@ -560,6 +581,16 @@ mod tests { all_rows: false, }) ))); + assert!(datasets_command_is_read_only(Some( + &DatasetsCommands::Snapshots(SnapshotsArgs { + command: SnapshotsCommands::List(SnapshotListArgs { + dataset: SnapshotDatasetArgs { + dataset_positional: Some("dataset".to_string()), + dataset_flag: None, + }, + }), + }) + ))); } #[test] @@ -600,5 +631,172 @@ mod tests { force: true, }) ))); + assert!(!datasets_command_is_read_only(Some( + &DatasetsCommands::Snapshots(SnapshotsArgs { + command: SnapshotsCommands::Create(SnapshotCreateArgs { + dataset: SnapshotDatasetArgs { + dataset_positional: Some("dataset".to_string()), + dataset_flag: None, + }, + snapshot: SnapshotNameArgs { + name_positional: Some("baseline".to_string()), + name_flag: None, + }, + xact_id: None, + description: None, + }), + }) + ))); + assert!(!datasets_command_is_read_only(Some( + &DatasetsCommands::Snapshots(SnapshotsArgs { + command: SnapshotsCommands::Restore(SnapshotRestoreArgs { + dataset: SnapshotDatasetArgs { + dataset_positional: Some("dataset".to_string()), + dataset_flag: None, + }, + name: Some("baseline".to_string()), + snapshot: None, + force: false, + }), + }) + ))); + } + + #[test] + fn snapshots_list_parses_dataset_name() { + let parsed = + parse(&["datasets", "snapshots", "list", "my-dataset"]).expect("parse snapshots list"); + let DatasetsCommands::Snapshots(snapshots) = parsed.command.expect("subcommand") else { + panic!("expected snapshots command"); + }; + let SnapshotsCommands::List(list) = snapshots.command else { + panic!("expected snapshots list command"); + }; + assert_eq!(list.dataset_name(), Some("my-dataset")); + } + + #[test] + fn version_alias_parses_create_flags_for_compatibility() { + let parsed = parse(&[ + "datasets", + "version", + "create", + "--dataset", + "my-dataset", + "--name", + "baseline", + "--xact-id", + "1000192656880881099", + "--description", + "Initial snapshot", + ]) + .expect("parse version alias create"); + let DatasetsCommands::Snapshots(snapshots) = parsed.command.expect("subcommand") else { + panic!("expected snapshots command"); + }; + let SnapshotsCommands::Create(create) = snapshots.command else { + panic!("expected snapshots create command"); + }; + assert_eq!(create.dataset_name(), Some("my-dataset")); + assert_eq!(create.snapshot_name(), Some("baseline")); + assert_eq!(create.xact_id.as_deref(), Some("1000192656880881099")); + assert_eq!(create.description.as_deref(), Some("Initial snapshot")); + } + + #[test] + fn snapshots_create_allows_omitting_xact_id() { + let parsed = parse(&["datasets", "snapshots", "create", "my-dataset", "baseline"]) + .expect("parse snapshots create without xact id"); + let DatasetsCommands::Snapshots(snapshots) = parsed.command.expect("subcommand") else { + panic!("expected snapshots command"); + }; + let SnapshotsCommands::Create(create) = snapshots.command else { + panic!("expected snapshots create command"); + }; + assert_eq!(create.dataset_name(), Some("my-dataset")); + assert_eq!(create.snapshot_name(), Some("baseline")); + assert!(create.xact_id.is_none()); + } + + #[test] + fn snapshots_create_allows_omitting_snapshot_name() { + let parsed = parse(&["datasets", "snapshots", "create", "my-dataset"]) + .expect("parse snapshots create without snapshot name"); + let DatasetsCommands::Snapshots(snapshots) = parsed.command.expect("subcommand") else { + panic!("expected snapshots command"); + }; + let SnapshotsCommands::Create(create) = snapshots.command else { + panic!("expected snapshots create command"); + }; + assert_eq!(create.dataset_name(), Some("my-dataset")); + assert!(create.snapshot_name().is_none()); + } + + #[test] + fn snapshots_restore_parses_name_target() { + let parsed = parse(&[ + "datasets", + "snapshots", + "restore", + "my-dataset", + "--name", + "baseline", + ]) + .expect("parse snapshots restore --name"); + let DatasetsCommands::Snapshots(snapshots) = parsed.command.expect("subcommand") else { + panic!("expected snapshots command"); + }; + let SnapshotsCommands::Restore(restore) = snapshots.command else { + panic!("expected snapshots restore command"); + }; + assert_eq!(restore.dataset_name(), Some("my-dataset")); + assert_eq!(restore.snapshot_name(), Some("baseline")); + assert!(restore.snapshot_xact_id().is_none()); + assert!(!restore.force); + } + + #[test] + fn version_alias_parses_restore_version_alias_and_force() { + let parsed = parse(&[ + "datasets", + "version", + "restore", + "--dataset", + "my-dataset", + "--version", + "1000192656880881099", + "--force", + ]) + .expect("parse version alias restore"); + let DatasetsCommands::Snapshots(snapshots) = parsed.command.expect("subcommand") else { + panic!("expected snapshots command"); + }; + let SnapshotsCommands::Restore(restore) = snapshots.command else { + panic!("expected snapshots restore command"); + }; + assert_eq!(restore.dataset_name(), Some("my-dataset")); + assert_eq!(restore.snapshot_xact_id(), Some("1000192656880881099")); + assert!(restore.snapshot_name().is_none()); + assert!(restore.force); + } + + #[test] + fn snapshots_restore_rejects_name_and_snapshot_together() { + let err = parse(&[ + "datasets", + "snapshots", + "restore", + "my-dataset", + "--name", + "baseline", + "--snapshot", + "1000192656880881099", + ]) + .expect_err("restore target conflict should fail"); + let message = err.to_string(); + assert!( + message.contains("--name") && message.contains("--snapshot"), + "unexpected clap error: {message}" + ); } } diff --git a/src/datasets/snapshots.rs b/src/datasets/snapshots.rs new file mode 100644 index 00000000..75956290 --- /dev/null +++ b/src/datasets/snapshots.rs @@ -0,0 +1,833 @@ +use std::{fmt::Write as _, time::Duration}; + +use anyhow::{anyhow, bail, Result}; +use chrono::{DateTime, Utc}; +use clap::{builder::BoolishValueParser, Args, Subcommand}; +use dialoguer::{console, Confirm}; +use serde_json::json; + +use crate::{ + args::BaseArgs, + ui::{ + apply_column_padding, header, is_interactive, print_command_status, print_with_pager, + styled_table, truncate, with_spinner, with_spinner_visible, CommandStatus, + }, + utils::{pluralize, profile_author_slug, resolve_profile_info, sanitize_name_segment}, +}; + +use super::{ + api::{self, Dataset, DatasetRestorePreview, DatasetRestoreResult, DatasetSnapshot}, + ResolvedContext, +}; + +#[derive(Debug, Clone, Args)] +#[command(after_help = "\ +Examples: + bt datasets snapshots list my-dataset + bt datasets snapshots create my-dataset + bt datasets snapshots create my-dataset baseline + bt datasets snapshots create my-dataset baseline --xact-id 1000192656880881099 + bt datasets snapshots restore my-dataset + bt datasets snapshots restore my-dataset --name baseline + bt datasets snapshots restore my-dataset --snapshot 1000192656880881099 --force +")] +pub(super) struct SnapshotsArgs { + #[command(subcommand)] + pub(super) command: SnapshotsCommands, +} + +#[derive(Debug, Clone, Subcommand)] +pub(super) enum SnapshotsCommands { + /// List snapshots for a dataset + List(SnapshotListArgs), + /// Create a new snapshot for a dataset + Create(SnapshotCreateArgs), + /// Restore a dataset to a saved snapshot + Restore(SnapshotRestoreArgs), +} + +#[derive(Debug, Clone, Args)] +pub(super) struct SnapshotDatasetArgs { + /// Dataset name (positional) + #[arg(value_name = "DATASET")] + pub(super) dataset_positional: Option, + + /// Dataset name (flag) + #[arg(long = "dataset", short = 'd')] + pub(super) dataset_flag: Option, +} + +impl SnapshotDatasetArgs { + pub(super) fn dataset_name(&self) -> Option<&str> { + self.dataset_positional + .as_deref() + .or(self.dataset_flag.as_deref()) + } +} + +#[derive(Debug, Clone, Args)] +pub(super) struct SnapshotNameArgs { + /// Snapshot name (positional) + #[arg(value_name = "SNAPSHOT")] + pub(super) name_positional: Option, + + /// Snapshot name (flag) + #[arg(long = "name", short = 'n')] + pub(super) name_flag: Option, +} + +impl SnapshotNameArgs { + fn name(&self) -> Option<&str> { + self.name_positional + .as_deref() + .or(self.name_flag.as_deref()) + } +} + +#[derive(Debug, Clone, Args)] +pub(super) struct SnapshotListArgs { + #[command(flatten)] + pub(super) dataset: SnapshotDatasetArgs, +} + +impl SnapshotListArgs { + pub(super) fn dataset_name(&self) -> Option<&str> { + self.dataset.dataset_name() + } +} + +#[derive(Debug, Clone, Args)] +pub(super) struct SnapshotCreateArgs { + #[command(flatten)] + pub(super) dataset: SnapshotDatasetArgs, + + #[command(flatten)] + pub(super) snapshot: SnapshotNameArgs, + + /// Transaction id to snapshot. Defaults to the dataset's current head xact. + #[arg( + long = "xact-id", + env = "BT_DATASETS_SNAPSHOT_XACT_ID", + value_name = "XACT_ID" + )] + pub(super) xact_id: Option, + + /// Optional snapshot description + #[arg(long, env = "BT_DATASETS_SNAPSHOT_DESCRIPTION", value_name = "TEXT")] + pub(super) description: Option, +} + +impl SnapshotCreateArgs { + pub(super) fn dataset_name(&self) -> Option<&str> { + self.dataset.dataset_name() + } + + pub(super) fn snapshot_name(&self) -> Option<&str> { + self.snapshot.name() + } +} + +#[derive(Debug, Clone, Args)] +pub(super) struct SnapshotRestoreArgs { + #[command(flatten)] + pub(super) dataset: SnapshotDatasetArgs, + + /// Saved snapshot name to restore + #[arg( + long, + short = 'n', + env = "BT_DATASETS_SNAPSHOT_RESTORE_NAME", + value_name = "NAME", + conflicts_with = "snapshot" + )] + pub(super) name: Option, + + /// Transaction id to restore + #[arg( + long = "snapshot", + visible_alias = "version", + env = "BT_DATASETS_SNAPSHOT_RESTORE_XACT_ID", + value_name = "XACT_ID", + conflicts_with = "name" + )] + pub(super) snapshot: Option, + + /// Skip confirmation after preview and apply the restore + #[arg( + long, + short = 'f', + env = "BT_DATASETS_SNAPSHOT_RESTORE_FORCE", + value_parser = BoolishValueParser::new(), + default_value_t = false + )] + pub(super) force: bool, +} + +impl SnapshotRestoreArgs { + pub(super) fn dataset_name(&self) -> Option<&str> { + self.dataset.dataset_name() + } + + pub(super) fn snapshot_name(&self) -> Option<&str> { + self.name.as_deref() + } + + pub(super) fn snapshot_xact_id(&self) -> Option<&str> { + self.snapshot.as_deref() + } +} + +pub(super) fn command_is_read_only(args: &SnapshotsArgs) -> bool { + matches!(args.command, SnapshotsCommands::List(_)) +} + +pub(super) async fn run(ctx: &ResolvedContext, base: &BaseArgs, args: SnapshotsArgs) -> Result<()> { + match args.command { + SnapshotsCommands::List(list_args) => run_list(ctx, &list_args, base.json).await, + SnapshotsCommands::Create(create_args) => { + run_create(ctx, base, &create_args, base.json).await + } + SnapshotsCommands::Restore(restore_args) => { + run_restore(ctx, &restore_args, base.json).await + } + } +} + +#[derive(Debug, Clone)] +struct RestoreTarget { + name: Option, + xact_id: String, +} + +impl RestoreTarget { + fn display_target(&self) -> String { + match self.name.as_deref() { + Some(name) => format!("snapshot '{name}' (xact {})", self.xact_id), + None => format!("xact {}", self.xact_id), + } + } +} + +async fn run_list(ctx: &ResolvedContext, args: &SnapshotListArgs, json: bool) -> Result<()> { + let dataset = resolve_existing_dataset(ctx, args.dataset_name(), "snapshots list").await?; + + let mut snapshots = with_spinner( + "Loading dataset snapshots...", + api::list_dataset_snapshots(&ctx.client, &dataset.id), + ) + .await?; + sort_snapshots_for_display(&mut snapshots); + + if json { + println!( + "{}", + serde_json::to_string(&json!({ + "dataset": dataset, + "snapshots": snapshots, + }))? + ); + return Ok(()); + } + + let mut output = String::new(); + let count = format!( + "{} {}", + snapshots.len(), + pluralize(snapshots.len(), "snapshot", None) + ); + writeln!( + output, + "{} found for {} {} {} {} {}\n", + console::style(count), + console::style(ctx.client.org_name()).bold(), + console::style("/").dim().bold(), + console::style(&ctx.project.name).bold(), + console::style("/").dim().bold(), + console::style(&dataset.name).bold() + )?; + + let mut table = styled_table(); + table.set_header(vec![ + header("Name"), + header("Description"), + header("Xact"), + header("Created"), + ]); + apply_column_padding(&mut table, (0, 6)); + + for snapshot in &snapshots { + let description = snapshot + .description + .as_deref() + .filter(|description| !description.is_empty()) + .map(|description| truncate(description, 60)) + .unwrap_or_else(|| "-".to_string()); + let xact_id = &snapshot.xact_id; + let created = snapshot + .created + .as_deref() + .map(|created| truncate(created, 10)) + .unwrap_or_else(|| "-".to_string()); + table.add_row(vec![&snapshot.name, &description, &xact_id, &created]); + } + + write!(output, "{table}")?; + print_with_pager(&output)?; + Ok(()) +} + +async fn run_create( + ctx: &ResolvedContext, + base: &BaseArgs, + args: &SnapshotCreateArgs, + json_output: bool, +) -> Result<()> { + let dataset = resolve_existing_dataset(ctx, args.dataset_name(), "snapshots create").await?; + let snapshot_name = resolve_snapshot_name(base, ctx, args.snapshot_name()); + let xact_id = resolve_snapshot_xact_id(ctx, &dataset, args.xact_id.as_deref()).await?; + let description = normalize_optional_text(args.description.as_deref()); + + let create_result = match with_spinner_visible( + "Creating dataset snapshot...", + api::create_dataset_snapshot( + &ctx.client, + &dataset.id, + &snapshot_name, + description.as_deref(), + &xact_id, + ), + Duration::from_millis(300), + ) + .await + { + Ok(create_result) => create_result, + Err(error) => { + print_command_status( + CommandStatus::Error, + &format!( + "Failed to create snapshot '{}' for dataset '{}'", + snapshot_name, dataset.name + ), + ); + return Err(error); + } + }; + let snapshot = create_result.dataset_snapshot; + let found_existing = create_result.found_existing; + + if json_output { + println!( + "{}", + serde_json::to_string(&json!({ + "dataset": dataset, + "snapshot": snapshot, + "found_existing": found_existing, + "mode": "snapshot_create", + }))? + ); + return Ok(()); + } + + if found_existing { + print_command_status( + CommandStatus::Warning, + &format!( + "Snapshot '{}' already exists for '{}' (xact {}).", + snapshot.name, dataset.name, snapshot.xact_id + ), + ); + return Ok(()); + } + + print_command_status( + CommandStatus::Success, + &format!( + "Created snapshot '{}' for '{}' (xact {}).", + snapshot.name, dataset.name, snapshot.xact_id + ), + ); + Ok(()) +} + +async fn run_restore( + ctx: &ResolvedContext, + args: &SnapshotRestoreArgs, + json_output: bool, +) -> Result<()> { + let dataset = resolve_existing_dataset(ctx, args.dataset_name(), "snapshots restore").await?; + let target = resolve_restore_target(ctx, &dataset, args).await?; + + let preview = match with_spinner_visible( + "Previewing dataset restore...", + api::preview_dataset_restore(&ctx.client, &dataset.id, &target.xact_id), + Duration::from_millis(300), + ) + .await + { + Ok(preview) => preview, + Err(error) => { + print_command_status( + CommandStatus::Error, + &format!( + "Failed to preview restore for dataset '{}' to {}", + dataset.name, + target.display_target() + ), + ); + return Err(error); + } + }; + + if json_output { + return run_restore_json(ctx, dataset, target, preview, args.force).await; + } + + print_restore_preview(&dataset, &target, &preview)?; + + if !args.force { + if !is_interactive() { + print_command_status( + CommandStatus::Warning, + &format!( + "Restore preview complete for '{}'. Re-run with --force to apply it non-interactively.", + dataset.name + ), + ); + return Ok(()); + } + + let confirmed = Confirm::new() + .with_prompt(format!( + "Restore dataset '{}' to {}?", + dataset.name, + target.display_target() + )) + .default(false) + .interact()?; + if !confirmed { + print_command_status( + CommandStatus::Warning, + &format!( + "Cancelled restore for '{}' (no changes applied).", + dataset.name + ), + ); + return Ok(()); + } + } + + run_restore_execute(ctx, &dataset, &target).await?; + Ok(()) +} + +async fn run_restore_json( + ctx: &ResolvedContext, + dataset: Dataset, + target: RestoreTarget, + preview: DatasetRestorePreview, + force: bool, +) -> Result<()> { + if !force { + println!( + "{}", + serde_json::to_string(&json!({ + "dataset": dataset, + "target": { + "name": target.name.as_deref(), + "xact_id": target.xact_id, + }, + "preview": preview, + "restored": false, + "mode": "snapshot_restore", + }))? + ); + return Ok(()); + } + + let result = run_restore_execute(ctx, &dataset, &target).await?; + println!( + "{}", + serde_json::to_string(&json!({ + "dataset": dataset, + "target": { + "name": target.name.as_deref(), + "xact_id": target.xact_id, + }, + "preview": preview, + "result": result, + "restored": true, + "mode": "snapshot_restore", + }))? + ); + Ok(()) +} + +async fn run_restore_execute( + ctx: &ResolvedContext, + dataset: &Dataset, + target: &RestoreTarget, +) -> Result { + let result = match with_spinner_visible( + "Restoring dataset...", + api::restore_dataset(&ctx.client, &dataset.id, &target.xact_id), + Duration::from_millis(300), + ) + .await + { + Ok(result) => result, + Err(error) => { + print_command_status( + CommandStatus::Error, + &format!( + "Failed to restore dataset '{}' to {}", + dataset.name, + target.display_target() + ), + ); + return Err(error); + } + }; + + print_command_status( + CommandStatus::Success, + &format!( + "Restored dataset '{}' to {} (xact {}; {} restored, {} deleted).", + dataset.name, + target.display_target(), + result.xact_id.as_str(), + result.rows_restored, + result.rows_deleted + ), + ); + Ok(result) +} + +async fn resolve_existing_dataset( + ctx: &ResolvedContext, + name: Option<&str>, + command: &str, +) -> Result { + match name.map(str::trim).filter(|value| !value.is_empty()) { + Some(name) => with_spinner( + "Loading dataset...", + api::get_dataset_by_name(&ctx.client, &ctx.project.id, name), + ) + .await? + .ok_or_else(|| anyhow!("dataset '{name}' not found")), + None => { + if !is_interactive() { + bail!("dataset name required. Use: bt datasets {command} "); + } + super::select_dataset_interactive(&ctx.client, &ctx.project.id).await + } + } +} + +fn resolve_snapshot_name(base: &BaseArgs, ctx: &ResolvedContext, name: Option<&str>) -> String { + match name.map(str::trim).filter(|value| !value.is_empty()) { + Some(name) => name.to_string(), + None => default_snapshot_name( + resolve_default_snapshot_author(base, ctx) + .as_deref() + .unwrap_or("user"), + Utc::now(), + ), + } +} + +async fn resolve_snapshot_xact_id( + ctx: &ResolvedContext, + dataset: &Dataset, + explicit_xact_id: Option<&str>, +) -> Result { + if let Some(xact_id) = explicit_xact_id.and_then(|value| normalize_optional_text(Some(value))) { + return Ok(xact_id); + } + + let head_xact_id = with_spinner( + "Resolving dataset head xact...", + api::get_dataset_head_xact_id(&ctx.client, &dataset.id), + ) + .await?; + + head_xact_id.ok_or_else(|| { + anyhow!( + "dataset '{}' has no rows, so no head xact could be inferred; pass --xact-id explicitly", + dataset.name + ) + }) +} + +async fn resolve_restore_target( + ctx: &ResolvedContext, + dataset: &Dataset, + args: &SnapshotRestoreArgs, +) -> Result { + if let Some(xact_id) = args + .snapshot_xact_id() + .and_then(|value| normalize_optional_text(Some(value))) + { + return Ok(RestoreTarget { + name: None, + xact_id, + }); + } + + let snapshots = with_spinner( + "Loading dataset snapshots...", + api::list_dataset_snapshots(&ctx.client, &dataset.id), + ) + .await?; + if let Some(snapshot_name) = args + .snapshot_name() + .and_then(|value| normalize_optional_text(Some(value))) + { + return resolve_restore_target_by_name(&snapshots, &dataset.name, &snapshot_name); + } + + if is_interactive() { + return select_restore_target_interactive(&dataset.name, &snapshots); + } + + bail!( + "restore target required. Use: bt datasets snapshots restore (--name | --snapshot )" + ); +} + +fn sort_snapshots_for_display(snapshots: &mut [DatasetSnapshot]) { + snapshots.sort_by(|a, b| b.created.cmp(&a.created).then_with(|| a.name.cmp(&b.name))); +} + +fn select_restore_target_interactive( + dataset_name: &str, + snapshots: &[DatasetSnapshot], +) -> Result { + let mut restorable_snapshots: Vec<&DatasetSnapshot> = snapshots.iter().collect(); + + if restorable_snapshots.is_empty() { + bail!( + "no restorable dataset snapshots found for '{}'", + dataset_name + ); + } + + restorable_snapshots + .sort_by(|a, b| b.created.cmp(&a.created).then_with(|| a.name.cmp(&b.name))); + + let labels: Vec = restorable_snapshots + .iter() + .map(|snapshot| restore_snapshot_label(snapshot)) + .collect(); + let selection = crate::ui::fuzzy_select("Select dataset snapshot", &labels, 0)?; + restore_target_from_snapshot(restorable_snapshots[selection]) +} + +fn restore_snapshot_label(snapshot: &DatasetSnapshot) -> String { + let created = snapshot + .created + .as_deref() + .map(|created| truncate(created, 10)) + .unwrap_or_else(|| "-".to_string()); + + match snapshot + .description + .as_deref() + .filter(|value| !value.is_empty()) + { + Some(description) => format!( + "{} (xact {}, created {}, {})", + snapshot.name, + snapshot.xact_id, + created, + truncate(description, 40) + ), + None => format!( + "{} (xact {}, created {})", + snapshot.name, snapshot.xact_id, created + ), + } +} + +fn restore_target_from_snapshot(snapshot: &DatasetSnapshot) -> Result { + Ok(RestoreTarget { + name: Some(snapshot.name.clone()), + xact_id: snapshot.xact_id.clone(), + }) +} + +fn resolve_restore_target_by_name( + snapshots: &[DatasetSnapshot], + dataset_name: &str, + snapshot_name: &str, +) -> Result { + let mut matches = snapshots + .iter() + .filter(|snapshot| snapshot.name == snapshot_name); + let Some(snapshot) = matches.next() else { + bail!( + "dataset snapshot '{}' was not found for '{}'", + snapshot_name, + dataset_name + ); + }; + if matches.next().is_some() { + bail!( + "multiple dataset snapshots named '{}' were found for '{}'; use --snapshot instead", + snapshot_name, + dataset_name + ); + } + + restore_target_from_snapshot(snapshot) +} + +fn normalize_optional_text(value: Option<&str>) -> Option { + value + .map(str::trim) + .filter(|value| !value.is_empty()) + .map(ToOwned::to_owned) +} + +fn print_restore_preview( + dataset: &Dataset, + target: &RestoreTarget, + preview: &DatasetRestorePreview, +) -> Result<()> { + let mut output = String::new(); + writeln!( + output, + "Restore preview for {} to {}:\n", + console::style(&dataset.name).bold(), + console::style(target.display_target()).bold() + )?; + writeln!( + output, + "Rows to restore: {}", + console::style(preview.rows_to_restore).bold() + )?; + writeln!( + output, + "Rows to delete: {}", + console::style(preview.rows_to_delete).bold() + )?; + + print_with_pager(&output)?; + Ok(()) +} + +fn resolve_default_snapshot_author(base: &BaseArgs, ctx: &ResolvedContext) -> Option { + if api_key_override_active(base) { + return None; + } + + let profile = resolve_profile_info(base.profile.as_deref(), Some(ctx.client.org_name()))?; + profile_author_slug(&profile) +} + +fn default_snapshot_name(author: &str, now: DateTime) -> String { + let author = sanitize_name_segment(author).unwrap_or_else(|| "user".to_string()); + format!("{author}-{}", now.format("%Y%m%d-%H%M%Sz")) +} + +fn api_key_override_active(base: &BaseArgs) -> bool { + !base.prefer_profile + && base + .api_key + .as_deref() + .is_some_and(|value| !value.trim().is_empty()) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn dataset_snapshot( + name: &str, + xact_id: &str, + created: &str, + description: Option<&str>, + ) -> DatasetSnapshot { + DatasetSnapshot { + id: format!("snapshot_{name}"), + dataset_id: "dataset_1".to_string(), + name: name.to_string(), + description: description.map(ToOwned::to_owned), + xact_id: xact_id.to_string(), + created: Some(created.to_string()), + } + } + + #[test] + fn default_snapshot_name_formats_author_and_timestamp() { + let now = DateTime::parse_from_rfc3339("2026-04-10T12:34:56Z") + .expect("parse timestamp") + .with_timezone(&Utc); + assert_eq!( + default_snapshot_name("Alice Smith", now), + "alice-smith-20260410-123456z" + ); + } + + #[test] + fn restore_snapshot_label_includes_disambiguating_details() { + let label = restore_snapshot_label(&dataset_snapshot( + "baseline", + "1000192656880881099", + "2026-04-10T12:34:56Z", + Some("Initial snapshot for restore flow"), + )); + assert!(label.contains("baseline")); + assert!(label.contains("1000192656880881099")); + assert!(label.contains(&truncate("2026-04-10T12:34:56Z", 10))); + assert!(label.contains("Initial snapshot for restore flow")); + } + + #[test] + fn resolve_restore_target_by_name_returns_unique_match() { + let snapshots = vec![dataset_snapshot( + "baseline", + "1000192656880881099", + "2026-04-10T00:00:00Z", + None, + )]; + + let target = + resolve_restore_target_by_name(&snapshots, "my-dataset", "baseline").expect("target"); + assert_eq!(target.name.as_deref(), Some("baseline")); + assert_eq!(target.xact_id, "1000192656880881099"); + } + + #[test] + fn resolve_restore_target_by_name_rejects_duplicates() { + let snapshots = vec![ + dataset_snapshot( + "baseline", + "1000192656880881099", + "2026-04-10T00:00:00Z", + None, + ), + dataset_snapshot( + "baseline", + "1000192656880881100", + "2026-04-11T00:00:00Z", + None, + ), + ]; + + let error = resolve_restore_target_by_name(&snapshots, "my-dataset", "baseline") + .expect_err("duplicate snapshot names should fail"); + assert!(error.to_string().contains("use --snapshot ")); + } + + #[test] + fn restore_target_display_uses_name_when_available() { + let target = RestoreTarget { + name: Some("baseline".to_string()), + xact_id: "1000192656880881099".to_string(), + }; + assert_eq!( + target.display_target(), + "snapshot 'baseline' (xact 1000192656880881099)" + ); + } +} diff --git a/src/status.rs b/src/status.rs index 940b90b2..e3411d66 100644 --- a/src/status.rs +++ b/src/status.rs @@ -4,7 +4,7 @@ use serde::Serialize; use crate::args::BaseArgs; use crate::auth; -use crate::config; +use crate::{config, utils::resolve_profile_info}; #[derive(Debug, Clone, Args)] #[command(after_help = "\ @@ -224,37 +224,6 @@ fn cli_flag_value(flags: &[&str]) -> Option { None } -fn resolve_profile_info(profile: Option<&str>, org: Option<&str>) -> Option { - let profiles = auth::list_profiles().ok()?; - - if let Some(p) = profile { - if let Some(profile) = profiles.iter().find(|pi| pi.name == p).cloned() { - return Some(profile); - } - } - - if let Some(o) = org { - if profiles.iter().any(|pi| pi.name == o) { - return profiles.into_iter().find(|pi| pi.name == o); - } - let org_matches: Vec<&auth::ProfileInfo> = profiles - .iter() - .filter(|pi| pi.org_name.as_deref() == Some(o)) - .collect(); - if org_matches.len() == 1 { - let name = org_matches[0].name.clone(); - return profiles.into_iter().find(|pi| pi.name == name); - } - return None; - } - - if profiles.len() == 1 { - return profiles.into_iter().next(); - } - - None -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/sync.rs b/src/sync.rs index 9bf07f82..2f0d4182 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -26,7 +26,6 @@ use crate::experiments::api::create_experiment; use crate::http::ApiClient; use crate::projects::api::{create_project, list_projects, Project}; use crate::ui::{animations_enabled, fuzzy_select, is_quiet}; -use crate::utils::parse_duration_to_seconds; const STATE_SCHEMA_VERSION: u32 = 1; const DEFAULT_PULL_LIMIT: usize = 100; diff --git a/src/traces.rs b/src/traces.rs index 94e0b3df..378fd6f3 100644 --- a/src/traces.rs +++ b/src/traces.rs @@ -36,7 +36,6 @@ use crate::args::BaseArgs; use crate::auth::{self, login}; use crate::http::ApiClient; use crate::ui::{fuzzy_select, is_interactive, with_spinner}; -use crate::utils::parse_duration_to_seconds; const MAX_TRACE_SPANS: usize = 5000; const MAX_BTQL_PAGE_LIMIT: usize = 1000; diff --git a/src/utils/mod.rs b/src/utils/mod.rs index fcc4fcfd..a94eb1b7 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -4,10 +4,11 @@ mod git; mod ids; mod json_object; mod plurals; +mod profile; -pub use duration::parse_duration_to_seconds; pub use fs_atomic::write_text_atomic; pub use git::GitRepo; pub(crate) use ids::new_uuid_id; pub(crate) use json_object::lookup_object_path; pub use plurals::pluralize; +pub(crate) use profile::{profile_author_slug, resolve_profile_info, sanitize_name_segment}; diff --git a/src/utils/profile.rs b/src/utils/profile.rs new file mode 100644 index 00000000..b97812b2 --- /dev/null +++ b/src/utils/profile.rs @@ -0,0 +1,174 @@ +use crate::auth::{self, ProfileInfo}; + +pub(crate) fn resolve_profile_info( + profile: Option<&str>, + org: Option<&str>, +) -> Option { + let profiles = auth::list_profiles().ok()?; + resolve_profile_info_from_profiles(profile, org, profiles) +} + +fn resolve_profile_info_from_profiles( + profile: Option<&str>, + org: Option<&str>, + profiles: Vec, +) -> Option { + if let Some(profile_name) = profile { + if let Some(profile) = profiles + .iter() + .find(|profile| profile.name == profile_name) + .cloned() + { + return Some(profile); + } + } + + if let Some(org_name) = org { + if profiles.iter().any(|profile| profile.name == org_name) { + return profiles + .into_iter() + .find(|profile| profile.name == org_name); + } + + let org_matches: Vec<&ProfileInfo> = profiles + .iter() + .filter(|profile| profile.org_name.as_deref() == Some(org_name)) + .collect(); + if org_matches.len() == 1 { + let profile_name = org_matches[0].name.clone(); + return profiles + .into_iter() + .find(|profile| profile.name == profile_name); + } + return None; + } + + if profiles.len() == 1 { + return profiles.into_iter().next(); + } + + None +} + +pub(crate) fn profile_author_slug(profile: &ProfileInfo) -> Option { + [ + profile.user_name.as_deref(), + profile.email.as_deref().and_then(email_local_part), + Some(profile.name.as_str()), + ] + .into_iter() + .flatten() + .find_map(sanitize_name_segment) +} + +fn email_local_part(email: &str) -> Option<&str> { + email + .split_once('@') + .map(|(local, _)| local) + .or(Some(email)) +} + +pub(crate) fn sanitize_name_segment(value: &str) -> Option { + let mut normalized = String::new(); + let mut last_was_dash = false; + + for ch in value.chars() { + if ch.is_ascii_alphanumeric() { + normalized.push(ch.to_ascii_lowercase()); + last_was_dash = false; + } else if !normalized.is_empty() && !last_was_dash { + normalized.push('-'); + last_was_dash = true; + } + } + + while normalized.ends_with('-') { + normalized.pop(); + } + + if normalized.is_empty() { + None + } else { + Some(normalized) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn profile_info( + name: &str, + org_name: Option<&str>, + user_name: Option<&str>, + email: Option<&str>, + ) -> ProfileInfo { + ProfileInfo { + name: name.to_string(), + org_name: org_name.map(ToOwned::to_owned), + user_name: user_name.map(ToOwned::to_owned), + email: email.map(ToOwned::to_owned), + api_key_hint: None, + } + } + + #[test] + fn resolve_profile_info_prefers_explicit_profile() { + let profile = resolve_profile_info_from_profiles( + Some("work"), + Some("other-org"), + vec![ + profile_info("other", Some("other-org"), None, None), + profile_info("work", Some("work-org"), None, None), + ], + ) + .expect("profile"); + + assert_eq!(profile.name, "work"); + } + + #[test] + fn resolve_profile_info_finds_profile_by_org_name() { + let profile = resolve_profile_info_from_profiles( + None, + Some("work-org"), + vec![profile_info("work", Some("work-org"), None, None)], + ) + .expect("profile"); + + assert_eq!(profile.name, "work"); + } + + #[test] + fn profile_author_slug_prefers_user_name() { + let profile = profile_info("work", None, Some("Alice Smith"), Some("alice@example.com")); + assert_eq!( + profile_author_slug(&profile).as_deref(), + Some("alice-smith") + ); + } + + #[test] + fn profile_author_slug_falls_back_to_email_local_part() { + let profile = profile_info("work", None, None, Some("alice.dev@example.com")); + assert_eq!(profile_author_slug(&profile).as_deref(), Some("alice-dev")); + } + + #[test] + fn profile_author_slug_falls_back_to_profile_name() { + let profile = profile_info("Work Profile", None, None, None); + assert_eq!( + profile_author_slug(&profile).as_deref(), + Some("work-profile") + ); + } + + #[test] + fn sanitize_name_segment_collapses_non_alnum() { + assert_eq!( + sanitize_name_segment(" A/B C__D ").as_deref(), + Some("a-b-c-d") + ); + assert!(sanitize_name_segment("!!!").is_none()); + } +} diff --git a/tests/datasets.rs b/tests/datasets.rs index a0a7570e..bef09c25 100644 --- a/tests/datasets.rs +++ b/tests/datasets.rs @@ -104,12 +104,26 @@ struct MockDataset { created: String, } +#[derive(Debug, Clone)] +struct MockDatasetSnapshot { + id: String, + dataset_id: String, + name: String, + xact_id: String, + description: Option, + created: String, +} + +type MockDatasetRows = BTreeMap>; +type MockDatasetRowsByDataset = BTreeMap; + #[derive(Debug)] struct MockServerState { requests: Mutex>, projects: Mutex>, datasets: Mutex>, - dataset_rows: Mutex>>>, + dataset_snapshots: Mutex>, + dataset_rows: Mutex, btql_dataset_id: Mutex>, } @@ -123,6 +137,7 @@ impl MockServerState { org_id: "org_mock".to_string(), }]), datasets: Mutex::new(Vec::new()), + dataset_snapshots: Mutex::new(Vec::new()), dataset_rows: Mutex::new(BTreeMap::new()), btql_dataset_id: Mutex::new(None), } @@ -148,6 +163,14 @@ impl MockServer { .route("/v1/project", web::get().to(mock_list_projects)) .route("/v1/dataset", web::get().to(mock_list_datasets)) .route("/v1/dataset", web::post().to(mock_create_dataset)) + .route( + "/v1/dataset_snapshot", + web::get().to(mock_list_dataset_snapshots), + ) + .route( + "/v1/dataset_snapshot", + web::post().to(mock_create_dataset_snapshot), + ) .route("/btql", web::post().to(mock_btql)) .route("/version", web::get().to(mock_version)) .route("/logs3", web::post().to(mock_logs3)) @@ -274,6 +297,123 @@ async fn mock_create_dataset( })) } +#[derive(Debug, Deserialize)] +struct CreateDatasetSnapshotRequest { + dataset_id: String, + name: String, + xact_id: String, + #[serde(default)] + description: Option, + #[serde(default)] + dataset_snapshot_name: Option, +} + +async fn mock_create_dataset_snapshot( + state: web::Data>, + req: HttpRequest, + body: web::Json, +) -> HttpResponse { + log_request(state.get_ref(), &req); + + if body.dataset_snapshot_name.is_some() { + return HttpResponse::BadRequest() + .body("dataset_snapshot_name should not be sent to data plane endpoint"); + } + if body.name.trim().is_empty() { + return HttpResponse::BadRequest().body("snapshot name is required"); + } + + let datasets = state.datasets.lock().expect("datasets lock"); + let Some(dataset) = datasets + .iter() + .find(|dataset| dataset.id == body.dataset_id) + .cloned() + else { + return HttpResponse::BadRequest() + .body(format!("unknown dataset id '{}'", body.dataset_id)); + }; + if dataset.name == body.dataset_id { + return HttpResponse::BadRequest().body("snapshot request used dataset name instead of id"); + } + drop(datasets); + + let mut snapshots = state + .dataset_snapshots + .lock() + .expect("dataset snapshots lock"); + if let Some(snapshot) = snapshots + .iter() + .find(|snapshot| snapshot.dataset_id == body.dataset_id && snapshot.xact_id == body.xact_id) + .cloned() + { + return HttpResponse::Ok() + .insert_header(("x-bt-found-existing", "true")) + .json(serde_json::json!({ + "id": snapshot.id, + "dataset_id": snapshot.dataset_id, + "name": snapshot.name, + "xact_id": snapshot.xact_id, + "description": snapshot.description, + "created": snapshot.created + })); + } + + let snapshot = MockDatasetSnapshot { + id: format!("snapshot_{}", snapshots.len() + 1), + dataset_id: body.dataset_id.clone(), + name: body.name.clone(), + xact_id: body.xact_id.clone(), + description: body.description.clone(), + created: "2026-01-02T00:00:00Z".to_string(), + }; + snapshots.push(snapshot.clone()); + + HttpResponse::Ok() + .insert_header(("x-bt-found-existing", "false")) + .json(serde_json::json!({ + "id": snapshot.id, + "dataset_id": snapshot.dataset_id, + "name": snapshot.name, + "xact_id": snapshot.xact_id, + "description": snapshot.description, + "created": snapshot.created + })) +} + +async fn mock_list_dataset_snapshots( + state: web::Data>, + req: HttpRequest, +) -> HttpResponse { + log_request(state.get_ref(), &req); + let query = parse_query(req.query_string()); + let requested_dataset_id = query.get("dataset_id").cloned(); + let snapshots = state + .dataset_snapshots + .lock() + .expect("dataset snapshots lock") + .clone(); + let objects = snapshots + .into_iter() + .filter(|snapshot| { + requested_dataset_id + .as_deref() + .is_none_or(|dataset_id| snapshot.dataset_id == dataset_id) + }) + .map(|snapshot| { + serde_json::json!({ + "id": snapshot.id, + "dataset_id": snapshot.dataset_id, + "name": snapshot.name, + "xact_id": snapshot.xact_id, + "description": snapshot.description, + "created": snapshot.created + }) + }) + .collect::>(); + + HttpResponse::Ok().json(serde_json::json!({ "objects": objects })) +} + async fn mock_btql( state: web::Data>, req: HttpRequest,