diff --git a/src/args.rs b/src/args.rs index 46fe801b..27b5ca21 100644 --- a/src/args.rs +++ b/src/args.rs @@ -21,6 +21,9 @@ pub struct BaseArgs { #[arg(long, short = 'v', env = "BRAINTRUST_VERBOSE", global = true, conflicts_with = "quiet", value_parser = clap::builder::BoolishValueParser::new(), default_value_t = false)] pub verbose: bool, + #[arg(skip)] + pub verbose_source: Option, + /// Reduce interactive UI output #[arg(long, short = 'q', env = "BRAINTRUST_QUIET", global = true, value_parser = clap::builder::BoolishValueParser::new(), default_value_t = false)] pub quiet: bool, @@ -118,6 +121,10 @@ impl BaseArgs { pub fn ca_cert(&self) -> Option<&Path> { self.ca_cert.as_deref() } + + pub fn verbose_explicit(&self) -> bool { + self.verbose && self.verbose_source.is_some() + } } pub fn has_explicit_profile_arg(args: &[OsString]) -> bool { diff --git a/src/auth.rs b/src/auth.rs index d3b71594..3b2db07b 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -3251,6 +3251,7 @@ mod tests { BaseArgs { json: false, verbose: false, + verbose_source: None, quiet: false, quiet_source: None, no_color: false, diff --git a/src/config/mod.rs b/src/config/mod.rs index 32f2f3e0..adc9246a 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -431,6 +431,7 @@ mod tests { BaseArgs { json: false, verbose: false, + verbose_source: None, quiet: false, quiet_source: None, no_color: false, diff --git a/src/functions/push.rs b/src/functions/push.rs index 8fb168dd..ebc1f28d 100644 --- a/src/functions/push.rs +++ b/src/functions/push.rs @@ -3589,6 +3589,7 @@ mod tests { BaseArgs { json: false, verbose: false, + verbose_source: None, quiet: false, quiet_source: None, no_color: false, diff --git a/src/main.rs b/src/main.rs index 919be714..0a34fb4d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -301,6 +301,7 @@ fn try_main() -> Result<()> { } fn apply_base_arg_sources(matches: &ArgMatches, base: &mut BaseArgs) { + base.verbose_source = find_value_source(matches, "verbose").and_then(map_value_source); base.quiet_source = find_value_source(matches, "quiet").and_then(map_value_source); base.api_key_source = find_value_source(matches, "api_key").and_then(map_value_source); } @@ -501,6 +502,37 @@ mod tests { assert_eq!(cli.command.base().api_key_source, None); } + #[test] + fn apply_base_arg_sources_tracks_cli_verbose() { + let matches = Cli::command() + .try_get_matches_from(["bt", "sync", "pull", "--verbose"]) + .expect("matches"); + let mut cli = Cli::from_arg_matches(&matches).expect("cli"); + + apply_base_arg_sources(&matches, cli.command.base_mut()); + + assert_eq!( + cli.command.base().verbose_source, + Some(ArgValueSource::CommandLine) + ); + assert!(cli.command.base().verbose_explicit()); + } + + #[test] + fn default_verbose_output_is_not_explicit_verbose() { + let matches = Cli::command() + .try_get_matches_from(["bt", "sync", "pull"]) + .expect("matches"); + let mut cli = Cli::from_arg_matches(&matches).expect("cli"); + + apply_base_arg_sources(&matches, cli.command.base_mut()); + apply_base_output_defaults(&mut cli.command); + + assert!(cli.command.base().verbose); + assert_eq!(cli.command.base().verbose_source, None); + assert!(!cli.command.base().verbose_explicit()); + } + #[test] fn apply_base_output_defaults_keeps_setup_quiet_by_default() { let matches = Cli::command() diff --git a/src/setup/mod.rs b/src/setup/mod.rs index 747dd2f6..a904f214 100644 --- a/src/setup/mod.rs +++ b/src/setup/mod.rs @@ -4955,6 +4955,7 @@ mod tests { BaseArgs { json: false, verbose: false, + verbose_source: None, quiet: false, quiet_source: None, no_color: false, diff --git a/src/switch.rs b/src/switch.rs index 9c939554..127455cd 100644 --- a/src/switch.rs +++ b/src/switch.rs @@ -288,6 +288,7 @@ mod tests { BaseArgs { json: false, verbose: false, + verbose_source: None, quiet: false, quiet_source: None, no_color: false, diff --git a/src/sync.rs b/src/sync.rs index 1aa2138b..b8c2e2b4 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -39,6 +39,7 @@ const BTQL_MAX_ATTEMPTS: usize = 5; const BTQL_RETRY_BASE_DELAY_MS: u64 = 300; const BTQL_MAX_BACKOFF_SECS: u64 = 8; const PULL_OUTPUT_PART_MAX_BYTES: u64 = 128 * 1024 * 1024; +const LINE_COUNT_BUFFER_BYTES: usize = 1024 * 1024; pub(crate) fn default_workers() -> usize { std::thread::available_parallelism() @@ -546,15 +547,8 @@ pub async fn run(base: BaseArgs, args: SyncArgs) -> Result<()> { match command { SyncCommand::Pull(pull) => { - run_pull( - base.json, - base.verbose, - &ctx, - &client, - project.as_deref(), - pull, - ) - .await + let verbose = base.verbose_explicit(); + run_pull(base.json, verbose, &ctx, &client, project.as_deref(), pull).await } SyncCommand::Push(push) => { run_push(base.json, &ctx, &client, project.as_deref(), push).await @@ -1686,17 +1680,11 @@ async fn run_push( let project_id = destination.project_id.clone(); let input_files = resolve_push_input_files(&input_path)?; - let upload_total = upload_total_for_progress(&input_files, &scope, limit)?; + let mut upload_total_task = spawn_upload_total_task(input_files.clone(), scope.clone(), limit); + let mut upload_total: Option = None; - let pb = if let Some(total) = upload_total { - bounded_bar(total as u64, "Uploading rows", "spans") - } else { - spinner_bar("Uploading rows") - }; + let pb = spinner_bar("Uploading rows"); pb.set_prefix("Uploading rows".to_string()); - if let Some(total) = upload_total { - pb.set_position(state.items_done.min(total) as u64); - } let interrupted = Arc::new(AtomicBool::new(false)); let interrupted_signal = Arc::clone(&interrupted); @@ -1846,6 +1834,17 @@ async fn run_push( push_baseline_items_done, push_baseline_bytes_sent, )?; + update_upload_total_if_ready( + &mut upload_total_task, + &mut upload_total, + &state, + &pb, + push_phase_started_at, + push_baseline_roots_done, + push_baseline_items_done, + push_baseline_bytes_sent, + ) + .await?; } } } @@ -1882,6 +1881,17 @@ async fn run_push( push_baseline_items_done, push_baseline_bytes_sent, )?; + update_upload_total_if_ready( + &mut upload_total_task, + &mut upload_total, + &state, + &pb, + push_phase_started_at, + push_baseline_roots_done, + push_baseline_items_done, + push_baseline_bytes_sent, + ) + .await?; } if !pending_results.is_empty() || next_commit_index != next_batch_index { @@ -1892,6 +1902,9 @@ async fn run_push( let was_interrupted = interrupted.load(Ordering::SeqCst); ctrlc_task.abort(); + if let Some(task) = upload_total_task.take() { + task.abort(); + } if was_interrupted { state.status = RunStatus::Interrupted; @@ -2648,6 +2661,60 @@ fn spawn_push_upload_task( }); } +fn spawn_upload_total_task( + input_files: Vec, + scope: ScopeArg, + limit: Option, +) -> Option>>> { + if !push_progress_total_needs_line_count(&scope) { + return None; + } + + Some(tokio::task::spawn_blocking(move || { + upload_total_for_progress(&input_files, &scope, limit) + })) +} + +#[allow(clippy::too_many_arguments)] +async fn update_upload_total_if_ready( + upload_total_task: &mut Option>>>, + upload_total: &mut Option, + state: &PushState, + pb: &ProgressBar, + push_phase_started_at: u64, + push_baseline_roots_done: usize, + push_baseline_items_done: usize, + push_baseline_bytes_sent: u64, +) -> Result<()> { + if upload_total.is_some() + || !upload_total_task + .as_ref() + .is_some_and(tokio::task::JoinHandle::is_finished) + { + return Ok(()); + } + + let total = upload_total_task + .take() + .expect("checked task above") + .await + .context("push row count task failed")??; + if let Some(total) = total { + configure_upload_progress_bar(pb, total); + *upload_total = Some(total); + pb.set_position(state.items_done.min(total) as u64); + pb.set_message(push_progress_message_with_baseline( + state, + push_phase_started_at, + push_baseline_roots_done, + push_baseline_items_done, + push_baseline_bytes_sent, + *upload_total, + )); + } + Ok(()) +} + #[allow(clippy::too_many_arguments)] fn flush_ready_push_results( pending_results: &mut BTreeMap, @@ -4053,29 +4120,73 @@ fn upload_total_for_progress( scope: &ScopeArg, limit: Option, ) -> Result> { - if matches!(scope, ScopeArg::Traces) { + if !push_progress_total_needs_line_count(scope) { return Ok(None); } - let total_lines = count_lines(input_files)?; + let total_lines = count_lines(input_files, limit)?; let capped = limit.map(|l| l.min(total_lines)).unwrap_or(total_lines); Ok(Some(capped)) } -fn count_lines(paths: &[PathBuf]) -> Result { +fn push_progress_total_needs_line_count(scope: &ScopeArg) -> bool { + !matches!(scope, ScopeArg::Traces) +} + +fn count_lines(paths: &[PathBuf], max_lines: Option) -> Result { let mut count = 0usize; for path in paths { - let file = - File::open(path).with_context(|| format!("failed to open {}", path.display()))?; - let reader = BufReader::new(file); - for line in reader.lines() { - line.with_context(|| format!("failed reading {}", path.display()))?; - count += 1; + if max_lines.is_some_and(|max| count >= max) { + break; } + let remaining = max_lines.map(|max| max.saturating_sub(count)); + count += count_lines_in_file(path, remaining)?; } Ok(count) } +fn count_lines_in_file(path: &Path, max_lines: Option) -> Result { + let file = File::open(path).with_context(|| format!("failed to open {}", path.display()))?; + let mut reader = BufReader::with_capacity(LINE_COUNT_BUFFER_BYTES, file); + let mut count = 0usize; + let mut saw_any_byte = false; + let mut last_byte = b'\n'; + let mut reached_max = false; + + loop { + let consumed = { + let buffer = reader + .fill_buf() + .with_context(|| format!("failed reading {}", path.display()))?; + if buffer.is_empty() { + break; + } + saw_any_byte = true; + for byte in buffer { + if *byte == b'\n' { + count += 1; + if max_lines.is_some_and(|max| count >= max) { + reached_max = true; + break; + } + } + } + last_byte = *buffer.last().unwrap_or(&last_byte); + buffer.len() + }; + reader.consume(consumed); + if reached_max { + break; + } + } + + if !reached_max && saw_any_byte && last_byte != b'\n' { + count += 1; + } + + Ok(count) +} + fn collect_seen_roots_until_offset( paths: &[PathBuf], line_offset: usize, @@ -4235,18 +4346,17 @@ fn pull_status_line(show_checkpoint_hint: bool, retry_summary: Option<&str>) -> } } -fn bounded_bar(total: u64, message: &str, unit_label: &str) -> ProgressBar { - if !std::io::stderr().is_terminal() || !animations_enabled() || is_quiet() { - return ProgressBar::hidden(); - } - let pb = ProgressBar::new(total); +fn configure_upload_progress_bar(pb: &ProgressBar, total: usize) { + pb.set_length(total as u64); + configure_bounded_bar_style(pb, "spans"); + pb.set_prefix("Uploading rows".to_string()); +} + +fn configure_bounded_bar_style(pb: &ProgressBar, unit_label: &str) { let template = format!( "{{spinner:.cyan}} {{prefix}} [{{bar:40.cyan/blue}}] {{pos}}/{{len}} {unit_label} ({{percent:>3}}%) | {{msg}}" ); pb.set_style(ProgressStyle::with_template(&template).unwrap()); - pb.set_prefix(message.to_string()); - pb.enable_steady_tick(std::time::Duration::from_millis(80)); - pb } fn spinner_bar(message: &str) -> ProgressBar { @@ -4392,6 +4502,81 @@ mod tests { Ok(()) } + #[test] + fn upload_total_for_traces_does_not_scan_inputs() -> Result<()> { + let missing = PathBuf::from("definitely-missing-sync-input.jsonl"); + + let total = upload_total_for_progress(&[missing], &ScopeArg::Traces, Some(25))?; + + assert_eq!(total, None); + Ok(()) + } + + #[test] + fn upload_total_for_all_counts_lines() -> Result<()> { + let unique = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_nanos()) + .unwrap_or_default(); + let path = std::env::temp_dir().join(format!( + "bt-sync-upload-total-all-{}-{}.jsonl", + std::process::id(), + unique + )); + fs::write(&path, "{\"id\":\"1\"}\n{\"id\":\"2\"}\n{\"id\":\"3\"}\n")?; + + let total = upload_total_for_progress(std::slice::from_ref(&path), &ScopeArg::All, None)?; + + assert_eq!(total, Some(3)); + let _ = fs::remove_file(&path); + Ok(()) + } + + #[test] + fn upload_total_for_spans_caps_requested_limit() -> Result<()> { + let unique = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_nanos()) + .unwrap_or_default(); + let path = std::env::temp_dir().join(format!( + "bt-sync-upload-total-spans-{}-{}.jsonl", + std::process::id(), + unique + )); + let missing = std::env::temp_dir().join(format!( + "bt-sync-upload-total-spans-missing-{}-{}.jsonl", + std::process::id(), + unique + )); + fs::write(&path, "{\"id\":\"1\"}\n{\"id\":\"2\"}\n")?; + + let total = upload_total_for_progress(&[path.clone(), missing], &ScopeArg::Spans, Some(2))?; + + assert_eq!(total, Some(2)); + let _ = fs::remove_file(&path); + Ok(()) + } + + #[test] + fn count_lines_counts_final_line_without_newline() -> Result<()> { + let unique = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_nanos()) + .unwrap_or_default(); + let path = std::env::temp_dir().join(format!( + "bt-sync-count-lines-{}-{}.jsonl", + std::process::id(), + unique + )); + fs::write(&path, "{\"id\":\"1\"}\n{\"id\":\"2\"}")?; + + let total = count_lines_in_file(&path, None)?; + + assert_eq!(total, 2); + let _ = fs::remove_file(&path); + Ok(()) + } + #[test] fn trace_resume_with_empty_fetch_state_reenters_discovery() { let mut state = new_pull_state( diff --git a/src/traces.rs b/src/traces.rs index 1d3f32b7..94e0b3df 100644 --- a/src/traces.rs +++ b/src/traces.rs @@ -6061,6 +6061,7 @@ mod tests { BaseArgs { json: false, verbose: false, + verbose_source: None, quiet: false, quiet_source: None, no_color: false,