From 52c1f3d9e153492557972772c35f7d75f0942dc3 Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Wed, 1 Jul 2026 16:46:58 -0400 Subject: [PATCH 1/2] billing: add `--dry-run` mode and always send manual invoices * Add `--dry-run`: classify every invoice against the livemode Stripe API and report what would happen without creating or modifying anything. The previous testmode trial run misreported manual bills that already had invoices as phantom creates, because the sandbox lacked livemode state. Manual bills already invoiced in an earlier run now report as `AlreadyProcessed` instead of erroring. * Manual invoices are always sent as invoices rather than charged to a stored payment method, even when one is on file. Stored payment methods cover monthly usage overages; manual bills such as contracts and one-off charges should let the customer choose how to pay. Both changes were part of PR #2849, which was closed while the rest of the branch merged via #2883 and #2902, so they never landed. This reapplies them onto current master. --- crates/billing-integrations/src/publish.rs | 725 ++++++++++++------ crates/billing-integrations/src/send.rs | 6 +- .../billing-integrations/src/stripe_utils.rs | 10 +- 3 files changed, 484 insertions(+), 257 deletions(-) diff --git a/crates/billing-integrations/src/publish.rs b/crates/billing-integrations/src/publish.rs index 5cc6de3f453..011c0ff6b84 100644 --- a/crates/billing-integrations/src/publish.rs +++ b/crates/billing-integrations/src/publish.rs @@ -58,6 +58,10 @@ pub struct PublishInvoice { /// Clean up dangling invoices that are not in the database #[clap(long, default_value_t = false)] pub clean_up: bool, + /// Run in read-only mode: classify all invoices and report what would + /// happen, without creating or modifying anything in Stripe. + #[clap(long, default_value_t = false)] + pub dry_run: bool, } fn parse_date(arg: &str) -> Result { @@ -90,20 +94,32 @@ enum InvoiceResult { FutureTrialStart, NoDataMoved, NoFullPipeline, + AlreadyProcessed, Error, } impl InvoiceResult { - pub fn message(&self) -> String { + pub fn message(&self, dry_run: bool) -> String { match self { InvoiceResult::Created(provider) => { + let verb = if dry_run { + "Would publish" + } else { + "Published" + }; if provider == &PaymentProvider::Stripe { - "Published new invoice".to_string() + format!("{verb} new invoice") } else { - format!("Published new invoice for tenant using {provider:?} provider") + format!("{verb} new invoice for tenant using {provider:?} provider") + } + } + InvoiceResult::Updated => { + if dry_run { + "Would update existing invoice".to_string() + } else { + "Updated existing invoice".to_string() } } - InvoiceResult::Updated => "Updated existing invoice".to_string(), InvoiceResult::LessThanMinimum => { "Skipping invoice for less than the minimum chargable amount ($0.50)".to_string() } @@ -117,11 +133,31 @@ impl InvoiceResult { InvoiceResult::NoFullPipeline => { "Skipping invoice for tenant without an active pipeline".to_string() } + InvoiceResult::AlreadyProcessed => { + "Skipping invoice already processed in a previous billing run".to_string() + } InvoiceResult::Error => "Error publishing invoices".to_string(), } } } +/// The outcome of the classify phase: what action should be taken for this invoice. +enum InvoiceAction { + /// Invoice should not be created. Carries the skip reason and the + /// customer (if found) for potential clean-up of stale drafts. + Skip { + result: InvoiceResult, + customer: Option, + }, + /// Create a new invoice. `replace` is set when --recreate-finalized + /// requires deleting an existing invoice first. + Create { replace: Option }, + /// Update an existing draft invoice's line items. + Update { + existing_invoice_id: stripe::InvoiceId, + }, +} + #[derive( Serialize, Deserialize, Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash, Copy, sqlx::Type, )] @@ -176,93 +212,215 @@ impl Invoice { Ok(invoice_search.into_iter().next()) } - #[tracing::instrument(skip(self, client, db_client), fields(tenant=self.billed_prefix, invoice_type=format!("{:?}",self.invoice_type), subtotal=format!("${:.2}", self.subtotal as f64 / 100.0)))] - async fn upsert_invoice( + /// Read-only classification: determines what action should be taken for this + /// invoice without making any writes to Stripe. + #[tracing::instrument(skip(self, client), fields(tenant=self.billed_prefix, invoice_type=format!("{:?}",self.invoice_type), subtotal=format!("${:.2}", self.subtotal as f64 / 100.0)))] + async fn classify( &self, client: &stripe::Client, - db_client: &Pool, recreate_finalized: bool, - mode: ChargeType, - ) -> anyhow::Result { + ) -> anyhow::Result { + // --- Phase 1: Cheap local checks (no Stripe calls) --- + match (&self.invoice_type, &self.extra) { (InvoiceType::Preview, _) => { bail!("Should not create Stripe invoices for preview invoices") } - (InvoiceType::Final, Some(extra)) => { - // If we have a payment method, don't skip the invoice - // If `has_payment_method` is Some, then there is a stripe customer to check - let validated_has_payment_method = - if let Some(has_payment_method) = self.has_payment_method { - // The Stripe capture in the database has been known to be unreliable. - // Let's double-check with Stripe to make sure it agrees that we really - // do not have a payment method set. - let real_default_payment_method = get_or_create_customer_for_tenant( - client, - db_client, - self.billed_prefix.to_owned(), - false, // If there's no customer, there's no way there can be a payment method - ) - .await? - .and_then(|customer| customer.invoice_settings) - .and_then(|i| i.default_payment_method); - - if has_payment_method != real_default_payment_method.is_some() { - tracing::warn!( - ?has_payment_method, - stripe_payment_method = real_default_payment_method.is_some(), - "Inconsistent payment method state" - ); - } - - real_default_payment_method.is_some() - } else { - false - }; - - let unwrapped_extra = extra.clone().0.expect( - "This is just a sqlx quirk, if the outer Option is Some then this will be Some", - ); - - if !validated_has_payment_method { - if unwrapped_extra.processed_data_gb.unwrap_or_default() == 0.0 - && !matches!(&self.invoice_type, InvoiceType::Manual) - { - return Ok(InvoiceResult::NoDataMoved); - } - - if !self.has_full_pipeline && !matches!(&self.invoice_type, InvoiceType::Manual) - { - return Ok(InvoiceResult::NoFullPipeline); - } - } - } (InvoiceType::Final, None) => { bail!("Invoice should have extra") } _ => {} }; - // An invoice should be generated in Stripe if the tenant is on a paid plan, which means: - // * The tenant has a free trial start date - // * The tenant's free trial start date is before the invoice period's end date if let InvoiceType::Final = self.invoice_type { match self.tenant_trial_start { Some(trial_start) if self.date_end < trial_start => { - return Ok(InvoiceResult::FutureTrialStart); + return Ok(InvoiceAction::Skip { + result: InvoiceResult::FutureTrialStart, + customer: None, + }); } None => { - return Ok(InvoiceResult::FreeTier); + return Ok(InvoiceAction::Skip { + result: InvoiceResult::FreeTier, + customer: None, + }); } _ => {} } } - // The minimum chargable amount of USD in Stripe is $0.50. - // https://stripe.com/docs/currencies#minimum-and-maximum-charge-amounts if self.subtotal < 50 { - return Ok(InvoiceResult::LessThanMinimum); + return Ok(InvoiceAction::Skip { + result: InvoiceResult::LessThanMinimum, + customer: None, + }); } + // --- Phase 2: Stripe calls (only for invoices that survived Phase 1) --- + + // For Final invoices, verify the payment method state with Stripe. + // The DB capture has been known to be unreliable, so Stripe is the + // source of truth. If the tenant has no payment method, skip on + // NoDataMoved / NoFullPipeline. + let mut found_customer: Option> = None; + + if let (InvoiceType::Final, Some(extra)) = (&self.invoice_type, &self.extra) { + let validated_has_payment_method = + if let Some(has_payment_method) = self.has_payment_method { + let customer = find_customer(client, &self.billed_prefix).await?; + let real_has_pm = customer + .as_ref() + .and_then(|c| c.invoice_settings.as_ref()) + .and_then(|i| i.default_payment_method.as_ref()) + .is_some(); + + if has_payment_method != real_has_pm { + tracing::warn!( + ?has_payment_method, + stripe_payment_method = real_has_pm, + "Inconsistent payment method state" + ); + } + + found_customer = Some(customer); + real_has_pm + } else { + false + }; + + if !validated_has_payment_method { + let unwrapped_extra = extra.clone().0.expect( + "This is just a sqlx quirk, if the outer Option is Some then this will be Some", + ); + + if unwrapped_extra.processed_data_gb.unwrap_or_default() == 0.0 { + return Ok(InvoiceAction::Skip { + result: InvoiceResult::NoDataMoved, + customer: found_customer.flatten(), + }); + } + + if !self.has_full_pipeline { + return Ok(InvoiceAction::Skip { + result: InvoiceResult::NoFullPipeline, + customer: found_customer.flatten(), + }); + } + } + } + + // Look up customer (reuse if already fetched during payment method validation) + let customer = match found_customer { + Some(c) => c, + None => find_customer(client, &self.billed_prefix).await?, + }; + + let customer = match customer { + Some(c) => c, + // No customer in Stripe means no existing invoice is possible + None => return Ok(InvoiceAction::Create { replace: None }), + }; + + let customer_id = customer.id.to_string(); + + // Search for an existing invoice in Stripe + if let Some(invoice) = self + .get_stripe_invoice(client, customer_id.as_str()) + .await? + { + match invoice.status { + Some(stripe::InvoiceStatus::Open | stripe::InvoiceStatus::Draft) + if recreate_finalized => + { + Ok(InvoiceAction::Create { + replace: Some(invoice.id), + }) + } + Some(stripe::InvoiceStatus::Draft) => { + tracing::debug!( + "Found existing draft invoice {id}", + id = invoice.id.to_string() + ); + Ok(InvoiceAction::Update { + existing_invoice_id: invoice.id, + }) + } + Some(stripe::InvoiceStatus::Open) + if matches!(self.invoice_type, InvoiceType::Manual) => + { + tracing::debug!( + "Manual invoice {id} already open, skipping", + id = invoice.id.to_string() + ); + Ok(InvoiceAction::Skip { + result: InvoiceResult::AlreadyProcessed, + customer: Some(customer), + }) + } + Some(stripe::InvoiceStatus::Open) => { + bail!( + "Found open invoice {id}. Pass --recreate-finalized to delete and recreate this invoice.", + id = invoice.id.to_string() + ) + } + Some( + status @ (stripe::InvoiceStatus::Paid + | stripe::InvoiceStatus::Void + | stripe::InvoiceStatus::Uncollectible), + ) if matches!(self.invoice_type, InvoiceType::Manual) => { + tracing::debug!( + "Manual invoice {id} already in state {status}, skipping", + id = invoice.id.to_string(), + status = status + ); + Ok(InvoiceAction::Skip { + result: InvoiceResult::AlreadyProcessed, + customer: Some(customer), + }) + } + Some(status) => { + bail!( + "Found invoice {id} in unsupported state {status}, skipping.", + id = invoice.id.to_string(), + status = status + ); + } + None => { + bail!( + "Unexpected missing status from invoice {id}", + id = invoice.id.to_string() + ); + } + } + } else { + Ok(InvoiceAction::Create { replace: None }) + } + } + + /// Execute the classified action: performs all Stripe writes (customer creation, + /// invoice creation/update, line item management, verification). + #[tracing::instrument(skip(self, client, db_client, action), fields(tenant=self.billed_prefix, invoice_type=format!("{:?}",self.invoice_type), subtotal=format!("${:.2}", self.subtotal as f64 / 100.0)))] + async fn execute( + &self, + client: &stripe::Client, + db_client: &Pool, + action: InvoiceAction, + mode: ChargeType, + ) -> anyhow::Result { + let (is_update, replace, existing_invoice_id) = match action { + InvoiceAction::Skip { result, .. } => return Ok(result), + InvoiceAction::Create { replace, .. } => (false, replace, None), + InvoiceAction::Update { + existing_invoice_id, + .. + } => (true, None, Some(existing_invoice_id)), + }; + + // Ensure customer exists and has an email (required for invoicing) + let customer = + ensure_customer_for_invoicing(client, db_client, &self.billed_prefix).await?; + // Anything before 12:00:00 renders as the previous day in Stripe let date_start_secs = self .date_start @@ -287,114 +445,97 @@ impl Invoice { let date_start_repr = self.date_start.format("%F").to_string(); let date_end_repr = self.date_end.format("%F").to_string(); - let customer = get_or_create_customer_for_tenant( - client, - db_client, - self.billed_prefix.to_owned(), - true, - ) - .await? - .expect("Should never return None"); - let customer_id = customer.id.to_string(); - - let maybe_invoice = if let Some(invoice) = self - .get_stripe_invoice(&client, customer_id.as_str()) - .await? - { - match invoice.status { - Some(state @ (stripe::InvoiceStatus::Open | stripe::InvoiceStatus::Draft)) - if recreate_finalized => - { + // Delete existing invoice if --recreate-finalized was used + if let Some(ref replace_id) = replace { + // Re-verify the invoice status before deleting (guard against race conditions) + let existing = stripe::Invoice::retrieve(client, replace_id, &[]).await?; + match existing.status { + Some(state @ (stripe::InvoiceStatus::Open | stripe::InvoiceStatus::Draft)) => { tracing::warn!( - "Found invoice {id} in state {state} deleting and recreating", - id = invoice.id.to_string(), + "Found invoice {id} in state {state}, deleting and recreating", + id = replace_id.to_string(), state = state ); - stripe::Invoice::delete(client, &invoice.id).await?; - None - } - Some(stripe::InvoiceStatus::Draft) => { - tracing::debug!( - "Updating existing invoice {id}", - id = invoice.id.to_string() - ); - Some(invoice) - } - Some(stripe::InvoiceStatus::Open) => { - bail!( - "Found open invoice {id}. Pass --recreate-finalized to delete and recreate this invoice.", - id = invoice.id.to_string() - ) + stripe::Invoice::delete(client, replace_id).await?; } Some(status) => { bail!( - "Found invoice {id} in unsupported state {status}, skipping.", - id = invoice.id.to_string(), + "Invoice {id} changed to state {status} since classification, cannot delete.", + id = replace_id.to_string(), status = status ); } None => { bail!( "Unexpected missing status from invoice {id}", - id = invoice.id.to_string() + id = replace_id.to_string() ); } } + } + + // Create or reuse the invoice + // Manual invoices should always be sent as invoices rather than + // charged to the customer's payment method. + let mode = if self.invoice_type == InvoiceType::Manual { + ChargeType::SendInvoice } else { - None + mode }; - let invoice = match maybe_invoice.clone() { - Some(inv) => inv, - None => { - let invoice = stripe::Invoice::create( - client, - stripe::CreateInvoice { - customer: Some(customer.id.to_owned()), - // Stripe timestamps are measured in _seconds_ since epoch - // Due date must be in the future. Bill net-30, so 30 days from today - due_date: match mode { - ChargeType::SendInvoice => Some((Utc::now() + Duration::days(30)).timestamp()), - ChargeType::AutoCharge => None - }, - description: Some( - format!( - "Your Flow bill for the billing period between {date_start_human} - {date_end_human}. Tenant: {tenant}", - tenant=self.billed_prefix.to_owned() - ) - .as_str(), - ), - collection_method: Some(match mode { - ChargeType::AutoCharge => stripe::CollectionMethod::ChargeAutomatically, - ChargeType::SendInvoice => stripe::CollectionMethod::SendInvoice, - }), - auto_advance: Some(false), - custom_fields: Some(vec![ - stripe::CreateInvoiceCustomFields { - name: "Billing Period Start".to_string(), - value: date_start_human.to_owned(), - }, - stripe::CreateInvoiceCustomFields { - name: "Billing Period End".to_string(), - value: date_end_human.to_owned(), - }, - ]), - metadata: Some( - InvoiceMetadata { - tenant: self.billed_prefix.to_owned(), - invoice_type: self.invoice_type, - period_start: date_start_repr, - period_end: date_end_repr, - } - .to_metadata_map(), - ), - ..Default::default() + let invoice = if let Some(existing_id) = existing_invoice_id { + tracing::debug!( + "Updating existing invoice {id}", + id = existing_id.to_string() + ); + stripe::Invoice::retrieve(client, &existing_id, &[]).await? + } else { + let description_text = format!( + "Your Flow bill for the billing period between {date_start_human} - {date_end_human}. Tenant: {tenant}", + tenant = self.billed_prefix + ); + let invoice = stripe::Invoice::create( + client, + stripe::CreateInvoice { + customer: Some(customer.id.to_owned()), + due_date: match mode { + ChargeType::SendInvoice => { + Some((Utc::now() + Duration::days(30)).timestamp()) + } + ChargeType::AutoCharge => None, }, - ) - .await.context("Creating a new invoice")?; - tracing::debug!("Created a new invoice {id}", id = invoice.id); - invoice - } + description: Some(description_text.as_str()), + collection_method: Some(match mode { + ChargeType::AutoCharge => stripe::CollectionMethod::ChargeAutomatically, + ChargeType::SendInvoice => stripe::CollectionMethod::SendInvoice, + }), + auto_advance: Some(false), + custom_fields: Some(vec![ + stripe::CreateInvoiceCustomFields { + name: "Billing Period Start".to_string(), + value: date_start_human.to_owned(), + }, + stripe::CreateInvoiceCustomFields { + name: "Billing Period End".to_string(), + value: date_end_human.to_owned(), + }, + ]), + metadata: Some( + InvoiceMetadata { + tenant: self.billed_prefix.to_owned(), + invoice_type: self.invoice_type, + period_start: date_start_repr, + period_end: date_end_repr, + } + .to_metadata_map(), + ), + ..Default::default() + }, + ) + .await + .context("Creating a new invoice")?; + tracing::debug!("Created a new invoice {id}", id = invoice.id); + invoice }; // Clear out line items from invoice, if there are any @@ -447,11 +588,10 @@ impl Invoice { ); } - // Let's double-check that the invoice total matches the desired total + // Re-fetch invoice and customer for fresh data (balance may have changed) let check_invoice = stripe::Invoice::retrieve(client, &invoice.id, &[]).await?; - - // Customers can have an invoice credit balance, so let's make sure we take that into account. - let credit_balance = customer.balance.unwrap_or(0); + let fresh_customer = stripe::Customer::retrieve(client, &customer.id, &[]).await?; + let credit_balance = fresh_customer.balance.unwrap_or(0); let expected = (self.subtotal + (diff.ceil() as i64) + credit_balance).max(0); @@ -463,10 +603,10 @@ impl Invoice { ) } - if maybe_invoice.is_some() { - return Ok(InvoiceResult::Updated); + if is_update { + Ok(InvoiceResult::Updated) } else { - return Ok(InvoiceResult::Created(self.payment_provider)); + Ok(InvoiceResult::Created(self.payment_provider)) } } } @@ -601,76 +741,85 @@ pub async fn do_publish_invoices(cmd: &PublishInvoice) -> anyhow::Result<()> { .or_default() += 1; }); - tracing::info!( - "Processing {usage} usage-based invoices, and {manual} manually-entered invoices.", - usage = invoice_type_counter - .remove(&InvoiceType::Final) - .unwrap_or_default(), - manual = invoice_type_counter - .remove(&InvoiceType::Manual) - .unwrap_or_default(), - ); + if cmd.dry_run { + tracing::info!( + "[DRY RUN] Classifying {usage} usage-based invoices and {manual} manually-entered invoices without making any changes to Stripe.", + usage = invoice_type_counter + .remove(&InvoiceType::Final) + .unwrap_or_default(), + manual = invoice_type_counter + .remove(&InvoiceType::Manual) + .unwrap_or_default(), + ); + } else { + tracing::info!( + "Processing {usage} usage-based invoices, and {manual} manually-entered invoices.", + usage = invoice_type_counter + .remove(&InvoiceType::Final) + .unwrap_or_default(), + manual = invoice_type_counter + .remove(&InvoiceType::Manual) + .unwrap_or_default(), + ); + } let invoice_futures: Vec<_> = invoices .iter() .map(|response| { let client = stripe_client.clone(); let db_pool = db_pool.clone(); + + let annotation = match response.invoice_type { + InvoiceType::Manual => Some(format!( + "[manual: {} - {}]", + response.date_start.format("%Y-%m-%d"), + response.date_end.format("%Y-%m-%d") + )), + _ => None, + }; + async move { - let res = response - .upsert_invoice( - &client, - &db_pool, - cmd.recreate_finalized, - cmd.charge_type, - ) + let action = response + .classify(&client, cmd.recreate_finalized) .await; - match res { + + match action { Err(err) => { let formatted = format!( - "Error publishing {invoice_type:?} invoice for {tenant}", + "Error classifying {invoice_type:?} invoice for {tenant}", tenant = response.billed_prefix, invoice_type = response.invoice_type ); - Err(anyhow::anyhow!(format!( - "{}: {err:#}", - formatted, - err = err - ))) + Err(anyhow::anyhow!("{formatted}: {err:#}")) } - Ok(res) => { + Ok(InvoiceAction::Skip { result, customer }) => { tracing::debug!( tenant = response.billed_prefix, invoice_type = format!("{:?}", response.invoice_type), subtotal = format!("${:.2}", response.subtotal as f64 / 100.0), "{}", - res.message() + result.message(cmd.dry_run) ); - match res { - InvoiceResult::Created(_) - | InvoiceResult::Updated - | InvoiceResult::Error => {} - // Remove any incorrectly created invoices that are now skipped for whatever reason - _ if cmd.clean_up => { - let task_res: Result<(), anyhow::Error> = async move { - let customer = match get_or_create_customer_for_tenant( - &client, - &db_pool, - response.billed_prefix.to_owned(), - false, - ) - .await? - { - Some(c) => c, - None => return Ok(()), - }; - - let customer_id = customer.id.to_string(); - - if let Some(invoice) = - response.get_stripe_invoice(&client, &customer_id).await? - { - if let Some(InvoiceStatus::Draft) = invoice.status { + + if cmd.clean_up { + let task_res: Result<(), anyhow::Error> = async { + let customer = match customer { + Some(c) => c, + None => return Ok(()), + }; + let customer_id = customer.id.to_string(); + + if let Some(invoice) = + response.get_stripe_invoice(&client, &customer_id).await? + { + if let Some(InvoiceStatus::Draft) = invoice.status { + if cmd.dry_run { + tracing::warn!( + tenant = response.billed_prefix.to_string(), + "[dry-run] Would delete stale draft invoice {}", + invoice.id + ); + } else { tracing::warn!( tenant = response.billed_prefix.to_string(), "Deleting draft invoice!" @@ -678,18 +827,67 @@ pub async fn do_publish_invoices(cmd: &PublishInvoice) -> anyhow::Result<()> { stripe::Invoice::delete(&client, &invoice.id).await?; } } - - Ok(()) } - .await; + Ok(()) + } + .await; - if let Err(e) = task_res { - tracing::warn!("Failed to check for or clear potential leaked draft invoices for {}, this is probably not a problem: {e:#}", response.billed_prefix.to_owned()); - } - }, - _ => {} + if let Err(e) = task_res { + tracing::warn!("Failed to check for or clear potential leaked draft invoices for {}, this is probably not a problem: {e:#}", response.billed_prefix.to_owned()); + } + } + + Ok((result, response.subtotal, response.billed_prefix.to_owned(), annotation)) + } + Ok(action) if cmd.dry_run => { + let result = match &action { + InvoiceAction::Create { replace: Some(id), .. } => { + tracing::info!( + tenant = response.billed_prefix, + "[dry-run] Would delete existing invoice {} and recreate", + id + ); + InvoiceResult::Created(response.payment_provider) + } + InvoiceAction::Create { .. } => { + InvoiceResult::Created(response.payment_provider) + } + InvoiceAction::Update { .. } => InvoiceResult::Updated, + InvoiceAction::Skip { .. } => unreachable!(), + }; + tracing::debug!( + tenant = response.billed_prefix, + invoice_type = format!("{:?}", response.invoice_type), + subtotal = format!("${:.2}", response.subtotal as f64 / 100.0), + "[dry-run] {}", + result.message(cmd.dry_run) + ); + Ok((result, response.subtotal, response.billed_prefix.to_owned(), annotation)) + } + Ok(action) => { + let res = response + .execute(&client, &db_pool, action, cmd.charge_type) + .await; + match res { + Err(err) => { + let formatted = format!( + "Error publishing {invoice_type:?} invoice for {tenant}", + tenant = response.billed_prefix, + invoice_type = response.invoice_type + ); + Err(anyhow::anyhow!("{formatted}: {err:#}")) + } + Ok(res) => { + tracing::debug!( + tenant = response.billed_prefix, + invoice_type = format!("{:?}", response.invoice_type), + subtotal = format!("${:.2}", response.subtotal as f64 / 100.0), + "{}", + res.message(cmd.dry_run) + ); + Ok((res, response.subtotal, response.billed_prefix.to_owned(), annotation)) + } } - Ok((res, response.subtotal, response.billed_prefix.to_owned())) } } } @@ -700,22 +898,22 @@ pub async fn do_publish_invoices(cmd: &PublishInvoice) -> anyhow::Result<()> { let total = invoice_futures.len(); - let collected: HashMap)> = + let collected: HashMap)>)> = futures::stream::iter(invoice_futures) .buffer_unordered(cmd.concurrency) .or_else(|(err, invoice)| async move { if !cmd.fail_fast { tracing::error!("[{}]: {err:#}", invoice.billed_prefix); - Ok((InvoiceResult::Error, 0, invoice.billed_prefix)) + Ok((InvoiceResult::Error, 0, invoice.billed_prefix, None)) } else { Err(err) } }) .try_fold( HashMap::new(), - |mut map, (res, subtotal, tenant)| async move { + |mut map, (res, subtotal, tenant, annotation)| async move { let overall_count = map.values().map(|(_, count, _)| *count).sum::() + 1; - let msg = res.message(); + let msg = res.message(cmd.dry_run); let (subtotal_sum, count_for_result_type, tenants) = map.entry(res).or_insert((0, 0, vec![])); @@ -723,7 +921,7 @@ pub async fn do_publish_invoices(cmd: &PublishInvoice) -> anyhow::Result<()> { *count_for_result_type += 1; tracing::info!("[{overall_count}/{total}, {tenant}]: {msg}"); - tenants.push((tenant, subtotal)); + tenants.push((tenant, subtotal, annotation)); Ok(map) }, ) @@ -733,7 +931,7 @@ pub async fn do_publish_invoices(cmd: &PublishInvoice) -> anyhow::Result<()> { tracing::info!( "[{:4} invoices]: {:70}${:.2}", count, - status.message(), + status.message(cmd.dry_run), *subtotal_agg as f64 / 100.0 ); let limit = match status { @@ -741,18 +939,24 @@ pub async fn do_publish_invoices(cmd: &PublishInvoice) -> anyhow::Result<()> { InvoiceResult::NoDataMoved | InvoiceResult::NoFullPipeline | InvoiceResult::LessThanMinimum - | InvoiceResult::FreeTier => 0, + | InvoiceResult::FreeTier + | InvoiceResult::AlreadyProcessed => 0, _ => 10, }; let sorted_tenants = tenants .iter() - .sorted_by(|(_, a), (_, b)| b.cmp(a)) + .sorted_by(|(_, a, _), (_, b, _)| b.cmp(a)) .collect_vec(); let (displayed_tenants, remainder_tenants) = sorted_tenants.split_at(limit.min(tenants.len())); - for (tenant, subtotal) in displayed_tenants { - tracing::info!(" - {:} ${:.2}", tenant, *subtotal as f64 / 100.0); + for (tenant, subtotal, annotation) in displayed_tenants { + match annotation { + Some(note) => { + tracing::info!(" - {} ${:.2} {}", tenant, *subtotal as f64 / 100.0, note) + } + None => tracing::info!(" - {} ${:.2}", tenant, *subtotal as f64 / 100.0), + } } if limit > 0 && remainder_tenants.len() > 0 { tracing::info!(" - ... {} Others", remainder_tenants.len(),); @@ -762,35 +966,51 @@ pub async fn do_publish_invoices(cmd: &PublishInvoice) -> anyhow::Result<()> { Ok(()) } -#[tracing::instrument(skip(client, db_client))] -async fn get_or_create_customer_for_tenant( +/// Read-only: search Stripe for an existing customer by tenant metadata. +#[tracing::instrument(skip(client))] +async fn find_customer( client: &stripe::Client, - db_client: &Pool, - tenant: String, - create: bool, + tenant: &str, ) -> anyhow::Result> { - let billing_row = sqlx::query!( - r#"SELECT billing_email, billing_address FROM tenants WHERE tenant = $1"#, - tenant, - ) - .fetch_optional(db_client) - .await?; - - let customers: Vec = stripe_search( + let mut customers: Vec = stripe_search( client, "customers", SearchParams { - query: customer_search_query(&tenant), + query: customer_search_query(tenant), ..Default::default() }, ) .await .context(format!("Searching for tenant {tenant}"))?; - let customer = if let Some(customer) = customers.into_iter().next() { + if let Some(customer) = customers.drain(..).next() { tracing::debug!("Found existing customer {id}", id = customer.id.to_string()); + Ok(Some(customer)) + } else { + Ok(None) + } +} + +/// Ensures a Stripe customer exists for this tenant and is ready for invoicing. +/// Finds an existing customer or creates a new one (carrying the tenant's billing +/// email and address from the control-plane DB), then ensures the customer has an +/// email set (falling back to the earliest admin on the tenant if needed). +#[tracing::instrument(skip(client, db_client))] +async fn ensure_customer_for_invoicing( + client: &stripe::Client, + db_client: &Pool, + tenant: &str, +) -> anyhow::Result { + let billing_row = sqlx::query!( + r#"SELECT billing_email, billing_address FROM tenants WHERE tenant = $1"#, + tenant, + ) + .fetch_optional(db_client) + .await?; + + let customer = if let Some(customer) = find_customer(client, tenant).await? { customer - } else if create { + } else { tracing::debug!("Creating new customer"); // Match the deterministic Idempotency-Key used by the GraphQL path so // a setup-intent flow racing against billing automation can't produce @@ -798,7 +1018,7 @@ async fn get_or_create_customer_for_tenant( let create_client = client .clone() .with_strategy(stripe::RequestStrategy::Idempotent( - customer_create_idempotency_key(&tenant), + customer_create_idempotency_key(tenant), )); let billing_email = billing_row @@ -811,15 +1031,14 @@ async fn get_or_create_customer_for_tenant( .transpose() .context("deserializing billing_address")?; + let description = format!("Represents the billing entity for Flow tenant '{tenant}'"); let new_customer = stripe::Customer::create( &create_client, stripe::CreateCustomer { - name: Some(tenant.as_str()), + name: Some(tenant), email: billing_email, address: billing_address, - description: Some( - format!("Represents the billing entity for Flow tenant '{tenant}'").as_str(), - ), + description: Some(description.as_str()), metadata: Some(HashMap::from([ (TENANT_METADATA_KEY.to_string(), tenant.to_string()), ( @@ -838,8 +1057,6 @@ async fn get_or_create_customer_for_tenant( // Waking it during an invoicing run would add a Stripe lookup per new // customer for no contact change. new_customer - } else { - return Ok(None); }; if customer.email.is_none() { @@ -896,5 +1113,5 @@ async fn get_or_create_customer_for_tenant( } } } - Ok(Some(customer)) + Ok(customer) } diff --git a/crates/billing-integrations/src/send.rs b/crates/billing-integrations/src/send.rs index bd626870726..7fb1f58221c 100644 --- a/crates/billing-integrations/src/send.rs +++ b/crates/billing-integrations/src/send.rs @@ -183,13 +183,15 @@ async fn update_draft_collection_methods( stripe_client: &Client, mut to_update: Vec, ) -> anyhow::Result> { - // Identify invoices that are `charge_automatically` but don't have a default payment method + // Identify invoices that need to be switched to `send_invoice`: + // - Manual invoices should always be sent as invoices, never auto-charged + // - Auto-charge invoices without a payment method on file must be sent as invoices let needs_update: HashSet = to_update .iter() .filter(|inv| { inv.collection_method().map_or(false, |cm| { cm == stripe::CollectionMethod::ChargeAutomatically - }) && !inv.has_cc() + }) && (inv.is_manual() || !inv.has_cc()) }) .map(|inv| inv.id().clone()) .collect::>(); diff --git a/crates/billing-integrations/src/stripe_utils.rs b/crates/billing-integrations/src/stripe_utils.rs index fa35f97f562..37bb20ea0ca 100644 --- a/crates/billing-integrations/src/stripe_utils.rs +++ b/crates/billing-integrations/src/stripe_utils.rs @@ -1,4 +1,4 @@ -use billing_types::{InvoiceMetadata, SearchParams, stripe_search}; +use billing_types::{InvoiceMetadata, InvoiceType, SearchParams, stripe_search}; use num_format::{Locale, ToFormattedString}; use std::ops::{Deref, DerefMut}; @@ -84,6 +84,14 @@ impl Invoice { self.0.status.clone() } + pub fn is_manual(&self) -> bool { + self.0 + .metadata + .as_ref() + .and_then(InvoiceMetadata::from_metadata_map) + .is_some_and(|m| m.invoice_type == InvoiceType::Manual) + } + pub fn period_start(&self) -> Option { self.0 .metadata From 5810d413bc787d42102a27e74a7b4f3f76e0eb81 Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Wed, 1 Jul 2026 22:05:35 -0400 Subject: [PATCH 2/2] billing: restore --clean_up draft removal and exclude manual invoices from --recreate-finalized The classify/execute split returns no Stripe customer for skips resolved in the read-only Phase 1, which silently disabled --clean_up for those tenants. Separately, --recreate-finalized still applied to manual invoices, conflicting with the new `AlreadyProcessed` handling that makes manual reruns idempotent. * --clean_up falls back to a `find_customer` lookup when the `Skip` action carries no customer, so it again removes stale draft invoices for `FreeTier`, `FutureTrialStart`, and `LessThanMinimum` tenants. Phase 1 stays free of Stripe calls, so the lookup only runs under --clean_up. * Manual invoices are excluded from --recreate-finalized so an already-sent open manual invoice is not voided and reissued. An open manual invoice reports `AlreadyProcessed`, and a manual draft is refreshed through the normal Update path. --- crates/billing-integrations/src/publish.rs | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/crates/billing-integrations/src/publish.rs b/crates/billing-integrations/src/publish.rs index 011c0ff6b84..52cde1e2944 100644 --- a/crates/billing-integrations/src/publish.rs +++ b/crates/billing-integrations/src/publish.rs @@ -330,8 +330,11 @@ impl Invoice { .await? { match invoice.status { + // Manual invoices are excluded from --recreate-finalized: an + // already-sent (open) manual invoice must not be voided and + // reissued, and a manual draft is refreshed via the Update arm below. Some(stripe::InvoiceStatus::Open | stripe::InvoiceStatus::Draft) - if recreate_finalized => + if recreate_finalized && !matches!(self.invoice_type, InvoiceType::Manual) => { Ok(InvoiceAction::Create { replace: Some(invoice.id), @@ -803,9 +806,20 @@ pub async fn do_publish_invoices(cmd: &PublishInvoice) -> anyhow::Result<()> { if cmd.clean_up { let task_res: Result<(), anyhow::Error> = async { + // FreeTier / FutureTrialStart / LessThanMinimum skip in + // classify's Phase 1 without a Stripe lookup, so their Skip + // action carries no customer. Look one up here so --clean-up + // can still remove a stale draft for those tenants. let customer = match customer { Some(c) => c, - None => return Ok(()), + None => { + match find_customer(&client, &response.billed_prefix) + .await? + { + Some(c) => c, + None => return Ok(()), + } + } }; let customer_id = customer.id.to_string();