Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/flowctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ pub struct CliContext {
config: config::Config,
/// Selected output format (table / JSON / YAML) for command results.
output: output::Output,
/// Tracks in-flight work units; cloned into preview-next connector drivers.
/// Tracks in-flight work units; cloned into preview connector drivers.
registry: service_kit::Registry,
}

Expand Down
4 changes: 2 additions & 2 deletions crates/flowctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ fn main() -> Result<(), anyhow::Error> {
// Process-global handler registry. Service-kit's trace layer consults this
// for per-handler verbosity overrides, and its event layer records opt-in
// `event!` breadcrumbs onto registered handlers. Both are inert unless a
// command (today: `raw preview-next --debug-port`) actually registers
// handlers and serves the admin surface that lets an operator flip them on.
// command (today: `preview --debug-port`) actually registers handlers and
// serves the admin surface that lets an operator flip them on.
let registry = service_kit::Registry::new();

let env_filter = EnvFilter::builder()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Capture driver for `flowctl preview-next`.
//! Capture driver for `flowctl preview`.
//!
//! Captures are leaderless: each shard runs its own connector container,
//! RocksDB, publish loop, and transaction loop with no cross-shard coordination.
Expand All @@ -12,7 +12,9 @@
//! interpreting the output as a workload signal requires a range-partitioning
//! connector.

use crate::raw::preview_next::services::Run;
use crate::preview::Controls;
use crate::preview::services::Run;
use anyhow::Context;
use prost::Message;
use proto_flow::{flow, runtime as cruntime};
use runtime_next::proto;
Expand All @@ -24,10 +26,10 @@ pub async fn run_sessions(
run: &Run,
spec: &flow::CaptureSpec,
session_targets: Vec<u32>,
controls: Controls,
stop_token: CancellationToken,
) -> anyhow::Result<()> {
let join_shards =
crate::raw::preview_next::shards::build_capture_join_shards(run.n_shards, spec)?;
let join_shards = crate::preview::shards::build_capture_join_shards(run.n_shards, spec)?;

let mut handles = Vec::with_capacity(run.n_shards as usize);
for i in 0..run.n_shards {
Expand All @@ -37,16 +39,25 @@ pub async fn run_sessions(
// own auto-managed tempdir via RocksDB::open(None).
rocksdb_path: (i == 0).then(|| run.rocksdb_path.clone()),
network: run.network.clone(),
log_handler: run.log_handler,
registry: run.registry.clone(),
};
let spec = spec.clone();
let join_shard = join_shards[i as usize].clone();
let session_targets = session_targets.clone();
let controls = controls.clone();
let stop_token = stop_token.clone();

handles.push(tokio::spawn(async move {
drive_one_shard(run_handle, spec, i, join_shard, session_targets, stop_token).await
drive_one_shard(
run_handle,
spec,
i,
join_shard,
session_targets,
controls,
stop_token,
)
.await
}));
}

Expand Down Expand Up @@ -78,7 +89,6 @@ pub async fn run_sessions(
struct RunHandle {
rocksdb_path: Option<String>,
network: String,
log_handler: fn(&::ops::Log),
registry: service_kit::Registry,
}

Expand All @@ -88,23 +98,18 @@ async fn drive_one_shard(
shard_index: u32,
join_shard: proto::join::Shard,
session_targets: Vec<u32>,
controls: Controls,
stop_token: CancellationToken,
) -> anyhow::Result<()> {
let task_name = format!("preview-capture-{shard_index:03}");

let publisher_factory: gazette::journal::ClientFactory = std::sync::Arc::new({
move |_authz_sub: String, _authz_obj: String| -> gazette::journal::Client {
unreachable!("live Publisher is not used by preview ({_authz_sub}, {_authz_obj})")
}
});

let shard_svc = runtime_next::shard::Service::new(
cruntime::Plane::Local,
run.network,
run.log_handler,
None,
task_name,
publisher_factory,
controls.publisher_factory.clone(),
controls.logger_factory.clone(),
run.registry,
None, // No AuthN+AuthZ signer (local loopback).
);
Expand All @@ -113,6 +118,23 @@ async fn drive_one_shard(
let mut response_rx = shard_svc.spawn_capture(UnboundedReceiverStream::new(request_rx));
let spec_bytes: bytes::Bytes = spec.encode_to_vec().into();

// Seed shard zero's RocksDB with any `--initial-state` before the runtime
// opens it at SessionLoop, so it recovers the state on its first scan.
// Only shard zero carries a tracked `rocksdb_path`.
if let Some(rocksdb_path) = &run.rocksdb_path {
if !controls.initial_state_json.is_empty() {
runtime_next::seed_initial_connector_state(
cruntime::RocksDbDescriptor {
rocksdb_path: rocksdb_path.clone(),
rocksdb_env_memptr: 0,
},
&controls.initial_state_json,
)
.await
.context("seeding --initial-state into shard-zero RocksDB")?;
}
}

let rocksdb_descriptor = run.rocksdb_path.map(|p| cruntime::RocksDbDescriptor {
rocksdb_path: p,
rocksdb_env_memptr: 0,
Expand Down Expand Up @@ -156,7 +178,6 @@ async fn drive_one_shard(
.send(Ok(proto::Capture {
task: Some(proto::Task {
spec: spec_bytes.clone(),
preview: true,
max_transactions: target_txns,
sqlite_vfs_uri: String::new(),
publisher_id: Default::default(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
//! as the `Task.sqlite_vfs_uri` (production supplies a recorded recovery-log
//! VFS instead).

use crate::raw::preview_next::services::Run;
use crate::preview::Controls;
use crate::preview::services::Run;
use anyhow::Context;
use prost::Message;
use proto_flow::{flow, flow::collection_spec::derivation::ConnectorType, runtime as cruntime};
use runtime_next::proto;
Expand All @@ -17,10 +19,11 @@ pub async fn run_sessions(
run: &Run,
spec: &flow::CollectionSpec,
session_targets: Vec<u32>,
fixture_dirs: Vec<String>,
controls: Controls,
stop_token: CancellationToken,
) -> anyhow::Result<()> {
let join_shards =
crate::raw::preview_next::shards::build_derive_join_shards(run.n_shards, spec)?;
let join_shards = crate::preview::shards::build_derive_join_shards(run.n_shards, spec)?;

// SQLite derivations require a VFS URI; preview supplies a plain tempfile
// path (the connector opens it with SQLite's default file VFS).
Expand All @@ -37,12 +40,13 @@ pub async fn run_sessions(
shuffle_log_dir: run.shuffle_log_dir.clone(),
rocksdb_path: run.rocksdb_path.clone(),
network: run.network.clone(),
log_handler: run.log_handler,
registry: run.registry.clone(),
};
let spec = spec.clone();
let join_shards = join_shards.clone();
let session_targets = session_targets.clone();
let fixture_dirs = fixture_dirs.clone();
let controls = controls.clone();
let stop_token = stop_token.clone();

handles.push(tokio::spawn(async move {
Expand All @@ -53,6 +57,8 @@ pub async fn run_sessions(
is_sqlite,
join_shards,
session_targets,
fixture_dirs,
controls,
stop_token,
)
.await
Expand Down Expand Up @@ -85,7 +91,6 @@ struct RunHandle {
shuffle_log_dir: String,
rocksdb_path: String,
network: String,
log_handler: fn(&::ops::Log),
registry: service_kit::Registry,
}

Expand All @@ -96,25 +101,21 @@ async fn drive_one_shard(
is_sqlite: bool,
join_shards: Vec<proto::join::Shard>,
session_targets: Vec<u32>,
fixture_dirs: Vec<String>,
controls: Controls,
stop_token: CancellationToken,
) -> anyhow::Result<()> {
let (request_tx, request_rx) = mpsc::unbounded_channel::<tonic::Result<proto::Derive>>();

let task_name = format!("preview-derive-{shard_index:03}");

let publisher_factory: gazette::journal::ClientFactory = std::sync::Arc::new({
move |_authz_sub: String, _authz_obj: String| -> gazette::journal::Client {
unreachable!("live Publisher is not used by preview ({_authz_sub}, {_authz_obj})")
}
});

let shard_svc = runtime_next::shard::Service::new(
cruntime::Plane::Local,
run.network.clone(),
run.log_handler,
None,
task_name,
publisher_factory,
controls.publisher_factory.clone(),
controls.logger_factory.clone(),
run.registry,
None, // No AuthN+AuthZ signer (local loopback).
);
Expand All @@ -130,6 +131,20 @@ async fn drive_one_shard(
String::new()
};

// Seed shard zero's RocksDB with any `--initial-state` before the runtime
// opens it at SessionLoop, so it recovers the state on its first scan.
if shard_index == 0 && !controls.initial_state_json.is_empty() {
runtime_next::seed_initial_connector_state(
cruntime::RocksDbDescriptor {
rocksdb_path: run.rocksdb_path.clone(),
rocksdb_env_memptr: 0,
},
&controls.initial_state_json,
)
.await
.context("seeding --initial-state into shard-zero RocksDB")?;
}

let rocksdb_descriptor = if shard_index == 0 {
Some(cruntime::RocksDbDescriptor {
rocksdb_path: run.rocksdb_path.clone(),
Expand All @@ -151,13 +166,20 @@ async fn drive_one_shard(
}
let session_index = idx + 1;

// A fixture preview reads each session from its own directory (fresh
// segments from segment one); live preview shares the run's directory.
let shuffle_directory = fixture_dirs
.get(idx)
.cloned()
.unwrap_or_else(|| run.shuffle_log_dir.clone());

request_tx
.send(Ok(proto::Derive {
join: Some(proto::Join {
etcd_mod_revision: session_index as i64,
shards: join_shards.clone(),
shard_index,
shuffle_directory: run.shuffle_log_dir.clone(),
shuffle_directory,
shuffle_endpoint: run.peer_endpoint.clone(),
leader_endpoint: run.peer_endpoint.clone(),
}),
Expand All @@ -177,10 +199,9 @@ async fn drive_one_shard(
.send(Ok(proto::Derive {
task: Some(proto::Task {
spec: spec_bytes.clone(),
preview: true,
max_transactions: target_txns,
sqlite_vfs_uri: sqlite_vfs_uri.clone(),
publisher_id: Default::default(), // Unused when `preview`.
publisher_id: Default::default(), // The harness forwards no leader producer.
}),
..Default::default()
}))
Expand Down
Loading
Loading