Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,960 changes: 1,851 additions & 109 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 11 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ members = [
]

[features]
default = ["bin", "nats", "net", "quic", "wasmtime", "web-transport"]
default = ["bin", "nats", "net", "quic", "wasmtime", "web-transport", "zenoh-transport"]

bin = ["bin-bindgen", "bin-wasmtime"]
bin-bindgen = [
Expand All @@ -43,6 +43,7 @@ net = ["wrpc-transport/net"]
quic = ["dep:wrpc-transport-quic"]
wasmtime = ["dep:wrpc-runtime-wasmtime"]
web-transport = ["dep:wrpc-transport-web"]
zenoh-transport = ["dep:zenoh", "dep:wrpc-transport-zenoh", "wrpc-cli/zenoh-transport"]

[[bin]]
name = "wit-bindgen-wrpc"
Expand Down Expand Up @@ -84,6 +85,8 @@ wrpc-transport-nats = { workspace = true, optional = true }
wrpc-transport-quic = { workspace = true, optional = true }
wrpc-transport-web = { workspace = true, optional = true }
wrpc-wasmtime-cli = { workspace = true, optional = true }
wrpc-transport-zenoh = { workspace = true, optional = true }
zenoh = { workspace = true, optional = true }

[dev-dependencies]
anyhow = { workspace = true }
Expand Down Expand Up @@ -114,8 +117,10 @@ wasmtime-cli-flags = { workspace = true, features = [
"pooling-allocator",
"threads",
] }
wrpc-test = { workspace = true, features = ["nats", "quic", "web-transport"] }
wrpc-test = { workspace = true, features = ["nats", "quic", "web-transport", "zenoh-transport"] }
wrpc-transport = { workspace = true, features = ["net"] }
serial_test = "*"


[workspace.dependencies]
anyhow = { version = "1", default-features = false }
Expand Down Expand Up @@ -182,8 +187,12 @@ wrpc-transport = { version = "0.29", path = "./crates/transport", default-featur
wrpc-transport-nats = { version = "0.31", path = "./crates/transport-nats", default-features = false }
wrpc-transport-quic = { version = "0.6", path = "./crates/transport-quic", default-features = false }
wrpc-transport-web = { version = "0.3", path = "./crates/transport-web", default-features = false }
wrpc-transport-zenoh = { version = "0.29", path = "./crates/transport-zenoh", default-features = false }
wrpc-wasi-keyvalue = { version = "0.1.1", path = "./crates/wasi-keyvalue", default-features = false }
wrpc-wasi-keyvalue-mem = { version = "0.2", path = "./crates/wasi-keyvalue-mem", default-features = false }
wrpc-wasi-keyvalue-redis = { version = "0.2", path = "./crates/wasi-keyvalue-redis", default-features = false }
wrpc-wasmtime-cli = { version = "0.9", path = "./crates/wasmtime-cli", default-features = false }
wtransport = { version = "0.6.1", default-features = false }
zenoh = {version = "1.7", default-features = false}
flume = {version = "0.11", default-features = false}
postcard = {version = "1.1"}
123 changes: 123 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,128 @@ We will use the following two Rust wRPC applications using [NATS.io] transport:
wrpc-wasmtime nats run --import native ./target/wasm32-wasip2/release/hello-component-client.wasm
```

#### Using [zenoh.io] transport

We will use the following two Rust wRPC applications using [zenoh.io] transport:
- [examples/rust/hello-zenoh-client](examples/rust/hello-zenoh-client)
- [examples/rust/hello-zenoh-server](examples/rust/hello-zenoh-server)

1. Run [zenoh.io] (more thorough documentation available [here](https://docs.nats.io/running-a-nats-service/introduction/running)):

- using standalone binary:
```sh
zenohd
```

- using nix flake develop environment:
```sh
nix develop
zenohd
```

2. Serve Wasm `hello` server via [zenoh.io]

```sh
wrpc-wasmtime zenoh serve --export rust ./target/wasm32-wasip2/release/hello_component_server.wasm
```

- Sample output:
> INFO async_nats: event: connected
>
> INFO wrpc_wasmtime_cli: serving instance function name="hello"

3. Call Wasm `hello` server using a Wasm `hello` client via [zenoh.io]:

```sh
wrpc-wasmtime zenoh run --import rust ./target/wasm32-wasip2/release/hello-component-client.wasm
```

- Sample output in the client:
> INFO async_nats: event: connected
>
>hello from Rust

- Sample output in the server:
> INFO wrpc_wasmtime_cli: serving instance function invocation
>
> INFO wrpc_wasmtime_cli: successfully served instance function invocation

4. Call the Wasm `hello` server using a native wRPC `hello` client via [zenoh.io]:

```sh
cargo run -p hello-zenoh-client rust
```

5. Serve native wRPC `hello` server via [zenoh.io]:

```sh
cargo run -p hello-zenoh-server native
```

6. Call both the native wRPC `hello` server and Wasm `hello` server using native wRPC `hello` client via [zenoh.io]:

```sh
cargo run -p hello-zenoh-client rust native
```

7. Call native wRPC `hello` server using Wasm `hello` client via [zenoh.io]:

```sh
wrpc-wasmtime zenoh run --import native ./target/wasm32-wasip2/release/hello-component-client.wasm
```


To test the transport-zenoh package run:

```sh
cargo test zenoh -- --nocapture
```

- Sample output:
> test result: ok. 3 passed; 0 failed; 0 ignored; 0 measured; 13 filtered out; finished in 6.17s

### Zenoh Configuration
By default we set Zenoh to run in **client mode**. In this mode it behaves
similarly to a typical NATS client: the application connects to a local
Zenoh router (`zenohd`) that acts as the message broker.

The client attempts to connect to a `zenohd` instance running on the
**default Zenoh port (`7447`) on the same machine**.

Zenohd has been
added to the flake nix file and is available after running nix develop in the
command lnine.

This behavior can be altered by providing a **custom Zenoh configuration
file** and setting the `ZENOH_CONFIG` environment variable to its path.

Example:

``` bash
export ZENOH_CONFIG="/path/to/config/zenoh_conf.json5"
```


When this environment variable is set, Zenoh will load the configuration
from the specified file instead of using the default settings.

Example configuration:

``` json
{
"mode": "client",
"listen": {
"endpoints": ["tcp/0.0.0.0:7447"]
}
}
```


More information about available configuration options and how to confige zenoh in peer mode can be found in
the official Zenoh configuration manual:

https://zenoh.io/docs/manual/configuration/

## Repository structure

This repository contains (for all supported languages):
Expand All @@ -256,3 +378,4 @@ Whether you're a seasoned developer or just getting started, your contributions

[Docker]: https://www.docker.com/
[NATS.io]: https://nats.io/
[zenoh.io]: https://zenoh.io/
2 changes: 1 addition & 1 deletion benches/reactor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ license.workspace = true
repository.workspace = true

[lib]
crate-type = ["rlib", "cdylib"]
crate-type = ["rlib"]

[dependencies]
wit-bindgen = { workspace = true, features = ["macros", "realloc"] }
5 changes: 4 additions & 1 deletion crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ license.workspace = true
repository.workspace = true

[features]
default = ["nats"]
default = ["nats", "zenoh-transport"]
nats = ["async-nats/ring", "dep:async-nats", "dep:tokio", "tokio/sync"]
zenoh-transport = ["dep:zenoh", "dep:tokio", "tokio/sync"]

[dependencies]
anyhow = { workspace = true, features = ["std"] }
Expand All @@ -25,3 +26,5 @@ tracing-subscriber = { workspace = true, features = [
"smallvec",
"tracing-log",
] }
zenoh = { workspace = true, optional = true, features = ["transport_tcp"] }
serde_json = { workspace = true, default-features = false }
2 changes: 2 additions & 0 deletions crates/cli/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(feature = "nats")]
pub mod nats;
pub mod tracing;
#[cfg(feature = "zenoh-transport")]
pub mod zenoh;
24 changes: 24 additions & 0 deletions crates/cli/src/zenoh.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use serde_json::json;
use zenoh::{Config, Session};

/// Open a regular Zenoh session with configs supplied by an environment variable.
pub async fn connect() -> anyhow::Result<Session> {
let cfg = if let Ok(cfg) = Config::from_env() { cfg } else {
let mut config = Config::default();
// Set mode
config
.insert_json5("mode", &json!("client").to_string())
.unwrap();
config
.insert_json5(
"connect/endpoints",
&json!(["tcp/0.0.0.0:7447"]).to_string(),
)
.unwrap();

config
};

let session = zenoh::open(cfg).await.unwrap();
Ok(session)
}
6 changes: 5 additions & 1 deletion crates/test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ license.workspace = true
repository.workspace = true

[features]
default = ["nats", "quic", "web-transport"]
default = ["nats", "quic", "web-transport", "zenoh-transport"]
nats = ["dep:async-nats", "async-nats/ring", "wrpc-cli/nats"]
quic = [
"dep:quinn",
Expand All @@ -22,6 +22,7 @@ quic = [
"quinn/rustls",
]
web-transport = ["dep:wtransport", "wtransport/self-signed"]
zenoh-transport = ["dep:zenoh", "dep:wrpc-transport-zenoh", "wrpc-cli/zenoh-transport"]

[dependencies]
anyhow = { workspace = true }
Expand All @@ -33,3 +34,6 @@ tokio = { workspace = true, features = ["net", "process", "rt-multi-thread"] }
tracing = { workspace = true }
wrpc-cli = { workspace = true }
wtransport = { workspace = true, features = ["ring"], optional = true }
wrpc-transport-zenoh = { workspace = true, optional = true }
zenoh = { workspace = true, optional = true }
serde_json = { workspace = true }
106 changes: 101 additions & 5 deletions crates/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ use rcgen::{generate_simple_self_signed, CertifiedKey};
use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
use rustls::version::TLS13;
use rustls::{ClientConfig, RootCertStore, ServerConfig};
use tokio::net::TcpListener;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::{TcpListener, TcpStream};
use tokio::process::Command;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tokio::{select, spawn};

pub async fn free_port() -> anyhow::Result<u16> {
Expand Down Expand Up @@ -100,11 +103,9 @@ pub async fn start_nats() -> anyhow::Result<(
"failed to execute nats-server"
};
anyhow::bail!(
"{}. Please install nats-server >= 2.10.20. \
"{error_msg}. Please install nats-server >= 2.10.20. \
See https://docs.nats.io/running-a-nats-service/introduction/installation for installation instructions. \
Original error: {}",
error_msg,
e
Original error: {e}"
);
}

Expand All @@ -120,6 +121,101 @@ pub async fn start_nats() -> anyhow::Result<(
Ok((port, client, server, stop_tx))
}

#[cfg(feature = "zenoh-transport")]
pub async fn start_zenoh() -> anyhow::Result<(
u16,
Arc<zenoh::Session>,
JoinHandle<anyhow::Result<ExitStatus>>,
oneshot::Sender<()>,
)> {
// Check if nats-server is available

use zenoh::Config;
let nats_server_check = Command::new("zenohd").arg("--version").output().await;
if let Err(e) = nats_server_check {
let error_msg = if e.kind() == std::io::ErrorKind::NotFound {
"zenohd is not installed or not in PATH"
} else if e.kind() == std::io::ErrorKind::PermissionDenied {
"zenohd is not executable or permission denied"
} else {
"failed to execute zenohd"
};
anyhow::bail!(
"{error_msg}. Please install zenohd \
See https://zenoh.io/docs/getting-started/installation/ for installation instructions. \
Original error: {e}"
);
}

let port = free_port().await?; // not used in setup -- just return

let listen = format!("tcp/0.0.0.0:{port}");

let (server, stop_tx) = spawn_server(Command::new("zenohd").args(["-l", &listen]))
.await
.context("failed to start zenohd server")?;

let mut ready = false;

// Check that zenohd is ready
for _ in 0..50 {
if TcpStream::connect(("127.0.0.1", port)).await.is_ok() {
ready = true;
break;
}
sleep(Duration::from_millis(100)).await;
}

if !ready {
anyhow::bail!("zenohd did not open port {port}");
}

let cfg = if let Ok(cfg) = Config::from_env() { cfg } else {
use serde_json::json;

let mut config = Config::default();

// Set mode
config
.insert_json5("mode", &json!("client").to_string())
.unwrap();
config
.insert_json5(
"connect/endpoints",
&json!([format!("tcp/127.0.0.1:{port}")]).to_string(),
)
.unwrap();

config
};

let session = zenoh::open(cfg).await.unwrap();

let arc_session = Arc::new(session);

Ok((port, arc_session, server, stop_tx))
}

#[cfg(feature = "zenoh-transport")]
pub async fn with_zenoh<T, Fut>(
f: impl FnOnce(u16, Arc<zenoh::Session>) -> Fut,
) -> anyhow::Result<T>
where
Fut: core::future::Future<Output = anyhow::Result<T>>,
{
let (port, zenoh_session, zenoh_server, stop_tx) = start_zenoh()
.await
.context("failed to start Zenoh server")?;
let res = f(port, zenoh_session).await.context("closure failed")?;

stop_tx.send(()).expect("failed to stop Zenoh server");
zenoh_server
.await
.context("failed to await Zenoh server stop")?
.context("Zenoh server failed to stop")?;
Ok(res)
}

#[cfg(feature = "nats")]
pub async fn with_nats<T, Fut>(f: impl FnOnce(u16, async_nats::Client) -> Fut) -> anyhow::Result<T>
where
Expand Down
Loading
Loading