diff --git a/apps/increment/src/main.rs b/apps/increment/src/main.rs index d2407ae..5a83f21 100644 --- a/apps/increment/src/main.rs +++ b/apps/increment/src/main.rs @@ -3,7 +3,7 @@ use std::{net::SocketAddr, path::PathBuf}; use clap::{Args, Parser, Subcommand}; use void_app_node::{ Mode, Options, - oracle::{ObserverOracle, Oracle, OracleStorageType}, + oracle::{ObserverOracle, Oracle, OracleMode, OracleStorageType}, signing::get_signer, }; use void_toolkit::load_config::load_config; @@ -62,27 +62,25 @@ async fn main() { let cli = Cli::parse(); let mode = match cli.command { - Commands::Publisher(publisher) => Mode::Publisher( - void_app_node::Publisher { - signer: get_signer(publisher.key).unwrap(), - oracle: Oracle { - oracle_config: load_config(&publisher.oracle_config).unwrap(), - oracle_bind_address: publisher.oracle_bind_address, - oracle_storage: cli - .oracle_db_path - .map_or(OracleStorageType::Memory, OracleStorageType::Db), - }, - node_network_bind_address: publisher.node_network_bind_address, - } - .into(), - ), + Commands::Publisher(publisher) => Mode::Publisher(void_app_node::Publisher { + signer: get_signer(publisher.key).unwrap(), + oracle: OracleMode::Publisher(Oracle { + oracle_config: load_config(&publisher.oracle_config).unwrap(), + oracle_bind_address: publisher.oracle_bind_address, + oracle_storage: cli + .oracle_db_path + .map_or(OracleStorageType::Memory, OracleStorageType::Db), + }), + node_network_bind_address: publisher.node_network_bind_address, + }), Commands::Observer(observer) => Mode::Observer(void_app_node::Observer { - oracle: ObserverOracle { + oracle: OracleMode::Observer(ObserverOracle { oracle_config: load_config(&observer.oracle_config).unwrap(), oracle_storage: cli .oracle_db_path .map_or(OracleStorageType::Memory, OracleStorageType::Db), - }, + }), + signer: None, node_network_endpoint: observer.node_network_endpoint, }), }; diff --git a/apps/increment/tests/tests.rs b/apps/increment/tests/tests.rs index 9c685ef..b31fa84 100644 --- a/apps/increment/tests/tests.rs +++ b/apps/increment/tests/tests.rs @@ -18,7 +18,7 @@ use tokio_util::{ io::StreamReader, }; use void_app_node::{ - oracle::{ObserverOracle, Oracle, OracleStorageType}, + oracle::{ObserverOracle, Oracle, OracleMode, OracleStorageType}, proof::SignedProof, }; use void_toolkit::types::Height; @@ -125,13 +125,13 @@ async fn test_api(nodes: Option) { }; let config = void_toolkit::oracle_types::config::Config { query, block }; - let oracle = Oracle { + let oracle = OracleMode::Publisher(Oracle { oracle_config: config, oracle_bind_address: SocketAddr::from(([127, 0, 0, 1], 4000)), oracle_storage: nodes.as_ref().map_or(OracleStorageType::Memory, |nodes| { OracleStorageType::Db(nodes.publisher.oracle_db_file.clone()) }), - }; + }); let increment_endpoint = "127.0.0.1:3500"; @@ -144,14 +144,11 @@ async fn test_api(nodes: Option) { tracing: true, server_bind_address: increment_endpoint.parse().unwrap(), db_path, - mode: void_app_node::Mode::Publisher( - void_app_node::Publisher { - oracle, - signer, - node_network_bind_address: SocketAddr::from(([127, 0, 0, 1], 5000)), - } - .into(), - ), + mode: void_app_node::Mode::Publisher(void_app_node::Publisher { + oracle, + signer, + node_network_bind_address: SocketAddr::from(([127, 0, 0, 1], 5000)), + }), }; increment::run_signing_node(options).await.unwrap(); panic!("increment app closed unexpectedly"); @@ -177,12 +174,12 @@ async fn test_api(nodes: Option) { // let oracle_db_file = observer_db_path.path().join("oracle.db"); // let data_type = increment::DataType::Db(db_file.to_str().unwrap().to_string()); // let oracle_db_path = oracle_db_file.to_str().unwrap().to_string(); - let oracle = ObserverOracle { + let oracle = OracleMode::Observer(ObserverOracle { oracle_config: config, oracle_storage: nodes.as_ref().map_or(OracleStorageType::Memory, |nodes| { OracleStorageType::Db(nodes.observer.oracle_db_file.clone()) }), - }; + }); let db_path = nodes.as_ref().map(|nodes| nodes.observer.db_file.clone()); tokio::spawn({ @@ -194,6 +191,7 @@ async fn test_api(nodes: Option) { db_path, mode: void_app_node::Mode::Observer(void_app_node::Observer { oracle, + signer: None, node_network_endpoint: "http://localhost:5000".to_string(), }), }; @@ -344,11 +342,11 @@ async fn test_run_for_front_end() { let config = void_toolkit::oracle_types::config::Config { query, block }; - let oracle = Oracle { + let oracle = OracleMode::Publisher(Oracle { oracle_config: config, oracle_bind_address: SocketAddr::from(([127, 0, 0, 1], 4000)), oracle_storage: OracleStorageType::Memory, - }; + }); let increment_endpoint = "127.0.0.1:3500"; @@ -360,14 +358,11 @@ async fn test_run_for_front_end() { tracing: true, server_bind_address: increment_endpoint.parse().unwrap(), db_path: None, - mode: void_app_node::Mode::Publisher( - void_app_node::Publisher { - oracle, - signer, - node_network_bind_address: SocketAddr::from(([127, 0, 0, 1], 5000)), - } - .into(), - ), + mode: void_app_node::Mode::Publisher(void_app_node::Publisher { + oracle, + signer, + node_network_bind_address: SocketAddr::from(([127, 0, 0, 1], 5000)), + }), }; increment::run_signing_node(options).await.unwrap(); panic!("increment app closed unexpectedly"); diff --git a/apps/transfers/src/main.rs b/apps/transfers/src/main.rs index a6b7937..f6ec4d2 100644 --- a/apps/transfers/src/main.rs +++ b/apps/transfers/src/main.rs @@ -3,7 +3,7 @@ use std::{net::SocketAddr, path::PathBuf}; use clap::{Args, Parser, Subcommand}; use void_app_node::{ Mode, Options, - oracle::{ObserverOracle, Oracle, OracleStorageType}, + oracle::{ObserverOracle, Oracle, OracleMode, OracleStorageType}, signing::get_signer, }; use void_toolkit::load_config::load_config; @@ -62,27 +62,25 @@ async fn main() { let cli = Cli::parse(); let mode = match cli.command { - Commands::Publisher(publisher) => Mode::Publisher( - void_app_node::Publisher { - signer: get_signer(publisher.key).unwrap(), - oracle: Oracle { - oracle_config: load_config(&publisher.oracle_config).unwrap(), - oracle_bind_address: publisher.oracle_bind_address, - oracle_storage: cli - .oracle_db_path - .map_or(OracleStorageType::Memory, OracleStorageType::Db), - }, - node_network_bind_address: publisher.node_network_bind_address, - } - .into(), - ), + Commands::Publisher(publisher) => Mode::Publisher(void_app_node::Publisher { + signer: get_signer(publisher.key).unwrap(), + oracle: OracleMode::Publisher(Oracle { + oracle_config: load_config(&publisher.oracle_config).unwrap(), + oracle_bind_address: publisher.oracle_bind_address, + oracle_storage: cli + .oracle_db_path + .map_or(OracleStorageType::Memory, OracleStorageType::Db), + }), + node_network_bind_address: publisher.node_network_bind_address, + }), Commands::Observer(observer) => Mode::Observer(void_app_node::Observer { - oracle: ObserverOracle { + oracle: OracleMode::Observer(ObserverOracle { oracle_config: load_config(&observer.oracle_config).unwrap(), oracle_storage: cli .oracle_db_path .map_or(OracleStorageType::Memory, OracleStorageType::Db), - }, + }), + signer: None, node_network_endpoint: observer.node_network_endpoint, }), }; diff --git a/apps/transfers/tests/tests.rs b/apps/transfers/tests/tests.rs index 6a5c1aa..9f8bd52 100644 --- a/apps/transfers/tests/tests.rs +++ b/apps/transfers/tests/tests.rs @@ -9,7 +9,7 @@ use alloy::{ use std::str::FromStr; use void_app_node::{ Mode, Options, Publisher, - oracle::{Oracle, OracleStorageType}, + oracle::{Oracle, OracleMode, OracleStorageType}, }; use void_toolkit::oracle_types::config::{ BlockConfig, ChainContractLogsConfig, Config, ConnectionType, QueryConfig, StreamConfig, @@ -120,19 +120,16 @@ async fn test_complete_bridge_flow() { let transfers_endpoint = transfers_endpoint.to_string(); let signer = signer.clone(); let oracle_config = oracle_config.clone(); - let oracle = Oracle { + let oracle = OracleMode::Publisher(Oracle { oracle_config, oracle_bind_address: transfers_oracle_endpoint.parse().unwrap(), oracle_storage: OracleStorageType::Memory, - }; - let mode = Mode::Publisher( - Publisher { - signer, - oracle, - node_network_bind_address: transfers_node_network_endpoint.parse().unwrap(), - } - .into(), - ); + }); + let mode = Mode::Publisher(Publisher { + signer, + oracle, + node_network_bind_address: transfers_node_network_endpoint.parse().unwrap(), + }); let options = Options { tracing: true, server_bind_address: transfers_endpoint.parse().unwrap(), diff --git a/node/src/lib.rs b/node/src/lib.rs index f38c77e..e0d9cd9 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -1,7 +1,7 @@ use std::{net::SocketAddr, path::PathBuf, sync::Arc}; use alloy::signers::local::PrivateKeySigner; -use futures::TryStreamExt; +use futures::{StreamExt, TryStreamExt}; use tokio::sync::mpsc; use void_toolkit::{ app::{Notification, UpdateLatestBlock, VoidStream}, @@ -13,7 +13,7 @@ use void_toolkit::{ use crate::{ db::{DbDataConstraints, InitDb}, - oracle::{ObserverOracle, Oracle}, + oracle::OracleMode, proof::{DigestFromProof, Proof, SignedProof, ZkProof}, server::AddHandlers, storage::{ @@ -228,21 +228,28 @@ pub struct Options { } pub enum Mode { - Publisher(Box), + Publisher(Publisher), Observer(Observer), + Optimistic(Optimistic), } pub struct Publisher { pub signer: PrivateKeySigner, - pub oracle: Oracle, + pub oracle: OracleMode, pub node_network_bind_address: SocketAddr, } pub struct Observer { - pub oracle: ObserverOracle, + pub signer: Option, + pub oracle: OracleMode, pub node_network_endpoint: String, } +pub struct Optimistic { + pub signer: Option, + pub oracle: OracleMode, +} + pub struct ElfCode { pub state_elf_code: Vec, pub recursive_elf_code: Vec, @@ -411,11 +418,15 @@ where node, observer.node_network_endpoint, observer.oracle, + observer.signer, stf, api_update, ) .await } + Mode::Optimistic(optimistic) => { + run_optimistic(node, optimistic.oracle, optimistic.signer, stf, api_update).await + } } }); let (server_res, node_res) = futures::future::try_join(server_jh, node_jh).await?; @@ -517,11 +528,15 @@ where node, observer.node_network_endpoint, observer.oracle, + observer.signer, stf, api_update, ) .await } + Mode::Optimistic(optimistic) => { + run_optimistic(node, optimistic.oracle, optimistic.signer, stf, api_update).await + } } }); let (server_res, node_res) = futures::future::try_join(server_jh, node_jh).await?; @@ -617,7 +632,7 @@ pub async fn run_signing_publisher( node: Node, DbData>, node_bind_address: SocketAddr, signer: PrivateKeySigner, - oracle: Oracle, + oracle: OracleMode, stf: Stf, api_update: ApiU, ) -> anyhow::Result<()> @@ -653,7 +668,7 @@ where async fn run_signing_publisher_stream( node: Node, DbData>, signer: PrivateKeySigner, - oracle: Oracle, + oracle: OracleMode, stf: Stf, api_update: ApiU, ) -> anyhow::Result<()> @@ -668,13 +683,6 @@ where Vec: From>, for<'a> as TryFrom<&'a [u8]>>::Error: Into, { - let Oracle { - oracle_config, - oracle_bind_address, - oracle_storage, - } = oracle; - let oracle_storage = oracle::oracle_storage(oracle_storage)?; - let (last_parent_height, last_parent_hash) = node .storage .get_current_block_height_and_hash() @@ -684,34 +692,29 @@ where let stf = Arc::new(stf); let api_update = Arc::new(api_update); - publisher_oracle( - oracle_config, - oracle_storage, - last_parent_height.map_or(0, |h| h + 1), - oracle_bind_address, - 10, - Some(signer.clone()), - ) - .map_err(|e| anyhow::anyhow!("Oracle error: {}", e)) - .block_height_parent(last_parent_height, last_parent_hash) - .and_then(|block| { - state_transition_with_storage(block, &node.storage, stf.clone(), api_update.clone()) - }) - .push_notification(node.state_notification.clone()) - .sign(signer, signing::sign) - .try_for_each(|signed_proof| async { - let signed_proof = Height::new(signed_proof.data.block_height, signed_proof.into()); - node.replicator.update(signed_proof).await?; - Ok(()) - }) - .await?; + let oracle_stream = create_oracle_stream(oracle, last_parent_height, Some(signer.clone()))?; + + oracle_stream + .block_height_parent(last_parent_height, last_parent_hash) + .and_then(|block| { + state_transition_with_storage(block, &node.storage, stf.clone(), api_update.clone()) + }) + .push_notification(node.state_notification.clone()) + .sign(signer, signing::sign) + .try_for_each(|signed_proof| async { + let signed_proof = Height::new(signed_proof.data.block_height, signed_proof.into()); + node.replicator.update(signed_proof).await?; + Ok(()) + }) + .await?; Ok(()) } async fn run_signing_observer( node: Node, DbData>, node_network_endpoint: String, - oracle: ObserverOracle, + oracle: OracleMode, + signer: Option, stf: Stf, api_update: ApiU, ) -> anyhow::Result<()> @@ -742,6 +745,7 @@ where let observer_jh = tokio::spawn(run_signing_observer_stream( node, oracle, + signer, proof_heights_rx, stf, api_update, @@ -752,9 +756,27 @@ where Ok(()) } +async fn run_optimistic( + node: Node, + oracle: OracleMode, + signer: Option, + stf: Stf, + api_update: ApiU, +) -> anyhow::Result<()> +where + Stf: AppTransition, + ApiU: ApiTransition, + App: Send + 'static, + Api: Send + 'static, + DbData: DbDataConstraints, +{ + run_optimistic_stream(node, oracle, signer, stf, api_update).await +} + async fn run_signing_observer_stream( node: Node, DbData>, - oracle: ObserverOracle, + oracle: OracleMode, + signer: Option, proof_heights: mpsc::Receiver, stf: Stf, api_update: ApiU, @@ -764,13 +786,14 @@ where ApiU: ApiTransition, DbData: DbDataConstraints, { - run_observer_stream(node, oracle, proof_heights, stf, api_update).await + run_observer_stream(node, oracle, signer, proof_heights, stf, api_update).await } async fn run_zk_observer( node: Node, node_network_endpoint: String, - oracle: ObserverOracle, + oracle: OracleMode, + signer: Option, stf: Stf, api_update: ApiU, ) -> anyhow::Result<()> @@ -798,6 +821,7 @@ where let observer_jh = tokio::spawn(run_zk_observer_stream( node, oracle, + signer, proof_heights_rx, stf, api_update, @@ -810,7 +834,8 @@ where async fn run_zk_observer_stream( node: Node, - oracle: ObserverOracle, + oracle: OracleMode, + signer: Option, proof_heights: mpsc::Receiver, stf: Stf, api_update: ApiU, @@ -820,12 +845,13 @@ where ApiU: ApiTransition, DbData: DbDataConstraints, { - run_observer_stream(node, oracle, proof_heights, stf, api_update).await + run_observer_stream(node, oracle, signer, proof_heights, stf, api_update).await } async fn run_observer_stream( node: Node, - oracle: ObserverOracle, + oracle: OracleMode, + signer: Option, proof_heights: mpsc::Receiver, stf: Stf, api_update: ApiU, @@ -843,12 +869,6 @@ where r }); - let ObserverOracle { - oracle_config, - oracle_storage, - } = oracle; - let oracle_storage = oracle::oracle_storage(oracle_storage)?; - let (last_parent_height, last_parent_hash) = node .storage .get_current_block_height_and_hash() @@ -857,8 +877,9 @@ where let stf = Arc::new(stf); let api_update = Arc::new(api_update); - observer_oracle(oracle_config, oracle_storage) - .map_err(|e| anyhow::anyhow!("Oracle error: {}", e)) + let oracle_stream = create_oracle_stream(oracle, last_parent_height, signer)?; + + oracle_stream .block_height_parent(last_parent_height, last_parent_hash) .blocks_await_proofs(proof_heights_stream) .and_then(|block| { @@ -875,6 +896,43 @@ where Ok(()) } +async fn run_optimistic_stream( + node: Node, + oracle: OracleMode, + signer: Option, + stf: Stf, + api_update: ApiU, +) -> anyhow::Result<()> +where + Stf: AppTransition, + ApiU: ApiTransition, + DbData: DbDataConstraints, +{ + let (last_parent_height, last_parent_hash) = node + .storage + .get_current_block_height_and_hash() + .await? + .map_or((None, [0; 32]), |(h, ph)| (Some(h), ph)); + let stf = Arc::new(stf); + let api_update = Arc::new(api_update); + + let oracle_stream = create_oracle_stream(oracle, last_parent_height, signer)?; + + oracle_stream + .block_height_parent(last_parent_height, last_parent_hash) + .and_then(|block| { + observer_state_transition_with_storage( + block, + &node.storage, + stf.clone(), + api_update.clone(), + ) + }) + .push_notification(node.state_notification.clone()) + .try_for_each(|_| std::future::ready(Ok(()))) + .await?; + Ok(()) +} impl Clone for Node { fn clone(&self) -> Self { Self { @@ -889,7 +947,7 @@ pub async fn run_zk_publisher( node: Node, node_bind_address: SocketAddr, signer: PrivateKeySigner, - oracle: Oracle, + oracle: OracleMode, stf: Stf, api_update: ApiU, elf_code: ElfCode, @@ -923,7 +981,7 @@ where async fn run_zk_publisher_stream( node: Node, signer: PrivateKeySigner, - oracle: Oracle, + oracle: OracleMode, stf: Stf, api_update: ApiU, elf_code: ElfCode, @@ -940,12 +998,6 @@ where recursive_elf_code, } = elf_code; let recursive_elf_code = Arc::new(recursive_elf_code); - let Oracle { - oracle_config, - oracle_bind_address, - oracle_storage, - } = oracle; - let oracle_storage = oracle::oracle_storage(oracle_storage)?; let (last_parent_height, last_parent_hash) = node .storage @@ -956,57 +1008,78 @@ where let stf = Arc::new(stf); let api_update = Arc::new(api_update); - publisher_oracle( - oracle_config, - oracle_storage, - last_parent_height.map_or(0, |h| h + 1), - oracle_bind_address, - 10, - Some(signer.clone()), - ) - .map_err(|e| anyhow::anyhow!("Oracle error: {}", e)) - .block_height_parent(last_parent_height, last_parent_hash) - .and_then(|block| { - state_transition_with_storage(block, &node.storage, stf.clone(), api_update.clone()) - }) - .push_notification(node.state_notification.clone()) - .prove(move |block, witness| void_toolkit::proof::host::prove(&block, witness, &state_elf_code)) - .and_then(|zk_state_proof| { - let node = node.clone(); - let recursive_elf_code = recursive_elf_code.clone(); - async move { - let Height { - block_height, - data: zk_state_proof, - } = zk_state_proof; - let zk_state_proof_receipt = serde_json::from_slice(&zk_state_proof)?; - let latest_proof = node.get_latest_proof().await?; - let recursive = match latest_proof { - Some(recursive_receipt) => { - let recursive_receipt = - serde_json::from_slice(&recursive_receipt.data.proof.data)?; - RecursiveArgs::Step(Box::new(recursive_receipt)) - } - None => RecursiveArgs::Init(Init { - this_image_id: risc0_zkvm::compute_image_id(&recursive_elf_code)?.into(), - }), - }; - let input = void_toolkit::proof::host::recursive::Input { - recursive, - state_receipts: vec![zk_state_proof_receipt], - }; - let result = tokio::task::spawn_blocking(move || { - void_toolkit::proof::host::recursive::proof(input, &recursive_elf_code) - }) - .await?; - Ok(Height::new(block_height, result?)) - } - }) - .try_for_each(|zk_proof| async { - let zk_proof = Height::new(zk_proof.block_height, ZkProof { proof: zk_proof }); - node.replicator.update(zk_proof).await?; - Ok(()) - }) - .await?; + let oracle_stream = create_oracle_stream(oracle, last_parent_height, Some(signer.clone()))?; + + oracle_stream + .block_height_parent(last_parent_height, last_parent_hash) + .and_then(|block| { + state_transition_with_storage(block, &node.storage, stf.clone(), api_update.clone()) + }) + .push_notification(node.state_notification.clone()) + .prove(move |block, witness| { + void_toolkit::proof::host::prove(&block, witness, &state_elf_code) + }) + .and_then(|zk_state_proof| { + let node = node.clone(); + let recursive_elf_code = recursive_elf_code.clone(); + async move { + let Height { + block_height, + data: zk_state_proof, + } = zk_state_proof; + let zk_state_proof_receipt = serde_json::from_slice(&zk_state_proof)?; + let latest_proof = node.get_latest_proof().await?; + let recursive = match latest_proof { + Some(recursive_receipt) => { + let recursive_receipt = + serde_json::from_slice(&recursive_receipt.data.proof.data)?; + RecursiveArgs::Step(Box::new(recursive_receipt)) + } + None => RecursiveArgs::Init(Init { + this_image_id: risc0_zkvm::compute_image_id(&recursive_elf_code)?.into(), + }), + }; + let input = void_toolkit::proof::host::recursive::Input { + recursive, + state_receipts: vec![zk_state_proof_receipt], + }; + let result = tokio::task::spawn_blocking(move || { + void_toolkit::proof::host::recursive::proof(input, &recursive_elf_code) + }) + .await?; + Ok(Height::new(block_height, result?)) + } + }) + .try_for_each(|zk_proof| async { + let zk_proof = Height::new(zk_proof.block_height, ZkProof { proof: zk_proof }); + node.replicator.update(zk_proof).await?; + Ok(()) + }) + .await?; Ok(()) } + +fn create_oracle_stream( + oracle: OracleMode, + last_parent_height: Option, + signer: Option, +) -> anyhow::Result>> { + let oracle_storage = oracle::oracle_storage(oracle.storage_type().clone())?; + + let s = match oracle { + OracleMode::Publisher(oracle) => publisher_oracle( + oracle.oracle_config, + oracle_storage, + last_parent_height.map_or(0, |h| h + 1), + oracle.oracle_bind_address, + 10, + signer, + ) + .map_err(|e| anyhow::anyhow!("Oracle error: {}", e)) + .boxed(), + OracleMode::Observer(oracle) => observer_oracle(oracle.oracle_config, oracle_storage) + .map_err(|e| anyhow::anyhow!("Oracle error: {}", e)) + .boxed(), + }; + Ok(s) +} diff --git a/node/src/oracle.rs b/node/src/oracle.rs index 0b9be2e..fe2ac7e 100644 --- a/node/src/oracle.rs +++ b/node/src/oracle.rs @@ -3,6 +3,12 @@ use std::{net::SocketAddr, path::PathBuf}; pub use void_toolkit::oracle_types::config::Config as OracleConfig; use void_toolkit::oracle_types::config::OracleBlocksConfig; +#[derive(Clone, Debug)] +pub enum OracleMode { + Publisher(Oracle), + Observer(ObserverOracle), +} + #[derive(Clone, Debug)] pub struct Oracle { pub oracle_config: OracleConfig, @@ -28,3 +34,12 @@ pub fn oracle_storage(storage_type: OracleStorageType) -> anyhow::Result Ok(void_toolkit::oracle::Db::memory()), } } + +impl OracleMode { + pub fn storage_type(&self) -> &OracleStorageType { + match self { + OracleMode::Publisher(oracle) => &oracle.oracle_storage, + OracleMode::Observer(oracle) => &oracle.oracle_storage, + } + } +}