Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/core/secure_messaging_udp_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,13 @@ auto secure_messaging_udp_client::do_start_impl(
asio::ip::udp::socket udp_socket(*io_context_, asio::ip::udp::v4());

// Create DTLS socket
socket_ = std::make_shared<internal::dtls_socket>(
auto dtls_result = internal::dtls_socket::create(
std::move(udp_socket), ssl_ctx_);
if (dtls_result.is_err()) {
return error<void>(dtls_result.error().code,
dtls_result.error().message, "secure_messaging_udp_client::start");
}
socket_ = dtls_result.value();

// Set peer endpoint
{
Expand Down
6 changes: 5 additions & 1 deletion src/core/secure_messaging_udp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,12 @@ auto secure_messaging_udp_server::create_session(
asio::ip::udp::socket client_socket(*io_context_, asio::ip::udp::v4());

auto session = std::make_shared<dtls_session>();
session->socket = std::make_shared<internal::dtls_socket>(
auto dtls_result = internal::dtls_socket::create(
std::move(client_socket), ssl_ctx_);
if (dtls_result.is_err()) {
return;
}
session->socket = dtls_result.value();
session->socket->set_peer_endpoint(client_endpoint);

// Set receive callback
Expand Down
49 changes: 29 additions & 20 deletions src/integration/observability_bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,35 @@
// See the LICENSE file in the project root for full license information.

#include "internal/integration/observability_bridge.h"
#include <stdexcept>

namespace kcenon::network::integration {

Result<std::shared_ptr<ObservabilityBridge>> ObservabilityBridge::create(
std::shared_ptr<logger_interface> logger,
std::shared_ptr<monitoring_interface> monitor, BackendType backend_type) {
if (!logger) {
return error<std::shared_ptr<ObservabilityBridge>>(
error_codes::common_errors::invalid_argument,
"ObservabilityBridge requires non-null logger",
"ObservabilityBridge::create");
}
if (!monitor) {
return error<std::shared_ptr<ObservabilityBridge>>(
error_codes::common_errors::invalid_argument,
"ObservabilityBridge requires non-null monitor",
"ObservabilityBridge::create");
}
return ok(std::shared_ptr<ObservabilityBridge>(
new ObservabilityBridge(std::move(logger), std::move(monitor), backend_type)));
}

ObservabilityBridge::ObservabilityBridge(
std::shared_ptr<logger_interface> logger,
std::shared_ptr<monitoring_interface> monitor,
BackendType backend_type)
: logger_(std::move(logger))
, monitor_(std::move(monitor))
, backend_type_(backend_type) {
if (!logger_) {
throw std::invalid_argument("ObservabilityBridge requires non-null logger");
}
if (!monitor_) {
throw std::invalid_argument("ObservabilityBridge requires non-null monitor");
}
}

ObservabilityBridge::~ObservabilityBridge() {
Expand Down Expand Up @@ -144,27 +156,24 @@ ObservabilityBridge::BackendType ObservabilityBridge::get_backend_type() const {
}

#if KCENON_WITH_COMMON_SYSTEM
std::shared_ptr<ObservabilityBridge> ObservabilityBridge::from_common_system(
Result<std::shared_ptr<ObservabilityBridge>> ObservabilityBridge::from_common_system(
std::shared_ptr<::kcenon::common::interfaces::ILogger> logger,
std::shared_ptr<::kcenon::common::interfaces::IMonitor> monitor) {
if (!logger) {
throw std::invalid_argument("ObservabilityBridge::from_common_system requires non-null logger");
return error<std::shared_ptr<ObservabilityBridge>>(
error_codes::common_errors::invalid_argument,
"ObservabilityBridge::from_common_system requires non-null logger",
"ObservabilityBridge::from_common_system");
}
if (!monitor) {
throw std::invalid_argument("ObservabilityBridge::from_common_system requires non-null monitor");
return error<std::shared_ptr<ObservabilityBridge>>(
error_codes::common_errors::invalid_argument,
"ObservabilityBridge::from_common_system requires non-null monitor",
"ObservabilityBridge::from_common_system");
}

// Adapt common_system logger and monitor to network_system interfaces
auto adapted_logger = std::make_shared<common_system_logger_adapter>();

// For monitor, we use basic_monitoring since there's no direct adapter yet
// This is a known limitation - future work could create a proper adapter
auto adapted_monitor = std::make_shared<basic_monitoring>();

return std::make_shared<ObservabilityBridge>(
adapted_logger,
adapted_monitor,
BackendType::CommonSystem);
return create(adapted_logger, adapted_monitor, BackendType::CommonSystem);
}
#endif

Expand Down
49 changes: 31 additions & 18 deletions src/integration/thread_pool_bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,24 @@
#include "internal/integration/thread_pool_adapters.h"
#endif

#include <stdexcept>

namespace kcenon::network::integration {

Result<std::shared_ptr<ThreadPoolBridge>> ThreadPoolBridge::create(
std::shared_ptr<thread_pool_interface> pool, BackendType backend_type) {
if (!pool) {
return error<std::shared_ptr<ThreadPoolBridge>>(
error_codes::common_errors::invalid_argument,
"ThreadPoolBridge requires non-null thread pool",
"ThreadPoolBridge::create");
}
return ok(std::shared_ptr<ThreadPoolBridge>(
new ThreadPoolBridge(std::move(pool), backend_type)));
}

ThreadPoolBridge::ThreadPoolBridge(
std::shared_ptr<thread_pool_interface> pool,
BackendType backend_type)
: pool_(std::move(pool)), backend_type_(backend_type) {
if (!pool_) {
throw std::invalid_argument("ThreadPoolBridge requires non-null thread pool");
}
}

ThreadPoolBridge::~ThreadPoolBridge() {
Expand Down Expand Up @@ -122,29 +129,35 @@ ThreadPoolBridge::BackendType ThreadPoolBridge::get_backend_type() const {
return backend_type_;
}

std::shared_ptr<ThreadPoolBridge> ThreadPoolBridge::from_thread_system(
Result<std::shared_ptr<ThreadPoolBridge>> ThreadPoolBridge::from_thread_system(
const std::string& pool_name) {
(void)pool_name; // Pool name is informational only in current implementation

(void)pool_name;
auto pool = thread_integration_manager::instance().get_thread_pool();
if (!pool) {
throw std::runtime_error("Failed to get thread pool from thread_integration_manager");
return error<std::shared_ptr<ThreadPoolBridge>>(
error_codes::common_errors::not_initialized,
"Failed to get thread pool from thread_integration_manager",
"ThreadPoolBridge::from_thread_system");
}

return std::make_shared<ThreadPoolBridge>(pool, BackendType::ThreadSystem);
return create(pool, BackendType::ThreadSystem);
}

#if KCENON_WITH_COMMON_SYSTEM
std::shared_ptr<ThreadPoolBridge> ThreadPoolBridge::from_common_system(
Result<std::shared_ptr<ThreadPoolBridge>> ThreadPoolBridge::from_common_system(
std::shared_ptr<::kcenon::common::interfaces::IExecutor> executor) {
if (!executor) {
throw std::invalid_argument("ThreadPoolBridge::from_common_system requires non-null executor");
return error<std::shared_ptr<ThreadPoolBridge>>(
error_codes::common_errors::invalid_argument,
"ThreadPoolBridge::from_common_system requires non-null executor",
"ThreadPoolBridge::from_common_system");
}

// Adapt common_system executor to thread_pool_interface
auto adapted_pool = std::make_shared<common_to_network_thread_adapter>(std::move(executor));

return std::make_shared<ThreadPoolBridge>(adapted_pool, BackendType::CommonSystem);
auto adapter_result = common_to_network_thread_adapter::create(std::move(executor));
if (adapter_result.is_err()) {
return error<std::shared_ptr<ThreadPoolBridge>>(
adapter_result.error().code, adapter_result.error().message,
"ThreadPoolBridge::from_common_system");
}
return create(adapter_result.value(), BackendType::CommonSystem);
}
#endif

Expand Down
17 changes: 12 additions & 5 deletions src/integration/thread_system_adapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,27 @@
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"

#include <stdexcept>
#include <thread> // For std::thread::hardware_concurrency and std::this_thread::sleep_for (fallback)

#include <kcenon/thread/core/thread_worker.h>

namespace kcenon::network::integration {

Result<std::shared_ptr<thread_system_pool_adapter>> thread_system_pool_adapter::create(
std::shared_ptr<kcenon::thread::thread_pool> pool) {
if (!pool) {
return error<std::shared_ptr<thread_system_pool_adapter>>(
error_codes::common_errors::invalid_argument,
"thread_system_pool_adapter: pool is null",
"thread_system_pool_adapter::create");
}
return ok(std::shared_ptr<thread_system_pool_adapter>(
new thread_system_pool_adapter(std::move(pool))));
}

thread_system_pool_adapter::thread_system_pool_adapter(
std::shared_ptr<kcenon::thread::thread_pool> pool)
: pool_(std::move(pool)) {
if (!pool_) {
throw std::invalid_argument("thread_system_pool_adapter: pool is null");
}
// No scheduler thread needed - delayed tasks are handled by thread_pool::submit_delayed
}

thread_system_pool_adapter::~thread_system_pool_adapter() {
Expand Down
55 changes: 33 additions & 22 deletions src/internal/dtls_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,40 +39,51 @@ namespace

} // anonymous namespace

dtls_socket::dtls_socket(asio::ip::udp::socket socket, SSL_CTX* ssl_ctx)
: socket_(std::move(socket))
, ssl_ctx_(ssl_ctx)
, ssl_(nullptr)
, rbio_(nullptr)
, wbio_(nullptr)
Result<std::shared_ptr<dtls_socket>> dtls_socket::create(
asio::ip::udp::socket socket, SSL_CTX* ssl_ctx)
{
// Create SSL object
ssl_ = SSL_new(ssl_ctx_);
if (!ssl_)
SSL* ssl = SSL_new(ssl_ctx);
if (!ssl)
{
throw std::runtime_error("Failed to create SSL object");
return error<std::shared_ptr<dtls_socket>>(
error_codes::common_errors::internal_error,
"Failed to create SSL object",
"dtls_socket::create");
}

// Create memory BIOs for non-blocking I/O
rbio_ = BIO_new(BIO_s_mem());
wbio_ = BIO_new(BIO_s_mem());
if (!rbio_ || !wbio_)
BIO* rbio = BIO_new(BIO_s_mem());
BIO* wbio = BIO_new(BIO_s_mem());
if (!rbio || !wbio)
{
if (rbio_) BIO_free(rbio_);
if (wbio_) BIO_free(wbio_);
SSL_free(ssl_);
throw std::runtime_error("Failed to create BIO objects");
if (rbio) BIO_free(rbio);
if (wbio) BIO_free(wbio);
SSL_free(ssl);
return error<std::shared_ptr<dtls_socket>>(
error_codes::common_errors::internal_error,
"Failed to create BIO objects",
"dtls_socket::create");
}

// Set BIOs to non-blocking mode
BIO_set_nbio(rbio_, 1);
BIO_set_nbio(wbio_, 1);
BIO_set_nbio(rbio, 1);
BIO_set_nbio(wbio, 1);

// Connect BIOs to SSL object (SSL takes ownership of BIOs)
SSL_set_bio(ssl, rbio, wbio);

// Connect BIOs to SSL object (SSL takes ownership)
SSL_set_bio(ssl_, rbio_, wbio_);
return ok(std::shared_ptr<dtls_socket>(
new dtls_socket(std::move(socket), ssl, rbio, wbio)));
}

// Enable DTLS cookie exchange for servers (DoS protection)
// This is optional and can be configured later
dtls_socket::dtls_socket(asio::ip::udp::socket socket, SSL* ssl, BIO* rbio, BIO* wbio)
: socket_(std::move(socket))
, ssl_ctx_(nullptr)
, ssl_(ssl)
, rbio_(rbio)
, wbio_(wbio)
{
}

dtls_socket::~dtls_socket()
Expand Down
6 changes: 4 additions & 2 deletions src/internal/integration/observability_bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class ObservabilityBridge : public INetworkBridge {
* auto bridge = std::make_shared<ObservabilityBridge>(logger, monitor);
* @endcode
*/
explicit ObservabilityBridge(
[[nodiscard]] static Result<std::shared_ptr<ObservabilityBridge>> create(
std::shared_ptr<logger_interface> logger,
std::shared_ptr<monitoring_interface> monitor,
BackendType backend_type = BackendType::Standalone);
Expand Down Expand Up @@ -241,12 +241,14 @@ class ObservabilityBridge : public INetworkBridge {
* bridge->initialize(config);
* @endcode
*/
static std::shared_ptr<ObservabilityBridge> from_common_system(
[[nodiscard]] static Result<std::shared_ptr<ObservabilityBridge>> from_common_system(
std::shared_ptr<::kcenon::common::interfaces::ILogger> logger,
std::shared_ptr<::kcenon::common::interfaces::IMonitor> monitor);
#endif

private:
ObservabilityBridge(std::shared_ptr<logger_interface> logger,
std::shared_ptr<monitoring_interface> monitor, BackendType backend_type);
std::shared_ptr<logger_interface> logger_;
std::shared_ptr<monitoring_interface> monitor_;
BackendType backend_type_;
Expand Down
Loading
Loading