Skip to content

Commit 763104f

Browse files
Jan van Lindtclaude
andcommitted
fix: eliminate deadlocks by separating node_states and callback mutexes
Use std::atomic<bool> for is_connected_ and primary_host_online_ in EdgeNode so Paho callbacks never need the application mutex. Add a dedicated node_states_mutex_ in HostApplication to protect node_states_ independently, allowing validate_message to call log() directly without the PendingLogs indirection. Phase connect()/disconnect() to release mutex_ before blocking waits. Fix missing primary_host_online_ transfer in EdgeNode move assignment. Safer destructors that always attempt disconnect. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 4571dd1 commit 763104f

8 files changed

Lines changed: 429 additions & 223 deletions

File tree

include/sparkplug/edge_node.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include "sparkplug_b.pb.h"
88
#include "topic.hpp"
99

10+
#include <atomic>
1011
#include <functional>
1112
#include <mutex>
1213
#include <optional>
@@ -465,8 +466,8 @@ class EdgeNode {
465466
// Track state of attached devices (device_id -> state, with heterogeneous lookup)
466467
std::unordered_map<std::string, DeviceState, StringHash, StringEqual> device_states_;
467468

468-
bool is_connected_{false};
469-
bool primary_host_online_{
469+
std::atomic<bool> is_connected_{false};
470+
std::atomic<bool> primary_host_online_{
470471
false}; // True if primary host is online (or no primary host configured)
471472

472473
// Mutex for thread-safe access to all mutable state

include/sparkplug/host_application.hpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include "sparkplug_b.pb.h"
88
#include "topic.hpp"
99

10+
#include <atomic>
1011
#include <functional>
1112
#include <memory>
1213
#include <mutex>
@@ -492,7 +493,7 @@ class HostApplication {
492493
private:
493494
Config config_;
494495
MQTTAsyncHandle client_;
495-
bool is_connected_{false};
496+
std::atomic<bool> is_connected_{false};
496497

497498
// MQTT connection options that must outlive async operations
498499
MQTTAsync_SSLOptions ssl_opts_{};
@@ -539,8 +540,9 @@ class HostApplication {
539540
};
540541

541542
std::unordered_map<NodeKey, NodeState, NodeKeyHash, NodeKeyEqual> node_states_;
543+
mutable std::mutex node_states_mutex_; // Protects node_states_ only
542544

543-
// Mutex for thread-safe access to all mutable state
545+
// Mutex for thread-safe access to config and other mutable state
544546
mutable std::mutex mutex_;
545547

546548
[[nodiscard]] stdx::expected<void, std::string>

src/c_bindings.cpp

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "sparkplug/payload_builder.hpp"
44
#include "sparkplug/sparkplug_c.h"
55

6+
#include <chrono>
67
#include <cstring>
78
#include <format>
89

@@ -34,6 +35,12 @@ copy_metrics_to_builder(sparkplug::PayloadBuilder& builder,
3435
const char* name = metric.has_name() ? metric.name().c_str() : "";
3536
std::optional<uint64_t> alias =
3637
metric.has_alias() ? std::optional<uint64_t>(metric.alias()) : std::nullopt;
38+
uint64_t ts = metric.has_timestamp()
39+
? metric.timestamp()
40+
: static_cast<uint64_t>(
41+
std::chrono::duration_cast<std::chrono::milliseconds>(
42+
std::chrono::system_clock::now().time_since_epoch())
43+
.count());
3744

3845
auto datatype = static_cast<sparkplug::DataType>(metric.datatype());
3946

@@ -42,87 +49,89 @@ copy_metrics_to_builder(sparkplug::PayloadBuilder& builder,
4249
case sparkplug::DataType::Int16:
4350
case sparkplug::DataType::Int32:
4451
if (alias.has_value() && name[0] == '\0') {
45-
builder.add_metric_by_alias(*alias, static_cast<int32_t>(metric.int_value()));
52+
builder.add_metric_by_alias(*alias, static_cast<int32_t>(metric.int_value()), ts);
4653
} else if (alias.has_value()) {
4754
builder.add_metric_with_alias(name, *alias,
48-
static_cast<int32_t>(metric.int_value()));
55+
static_cast<int32_t>(metric.int_value()), ts);
4956
} else {
50-
builder.add_metric(name, static_cast<int32_t>(metric.int_value()));
57+
builder.add_metric(name, static_cast<int32_t>(metric.int_value()), ts);
5158
}
5259
break;
5360

5461
case sparkplug::DataType::Int64:
5562
if (alias.has_value() && name[0] == '\0') {
56-
builder.add_metric_by_alias(*alias, metric.long_value());
63+
builder.add_metric_by_alias(*alias, metric.long_value(), ts);
5764
} else if (alias.has_value()) {
58-
builder.add_metric_with_alias(name, *alias, metric.long_value());
65+
builder.add_metric_with_alias(name, *alias, metric.long_value(), ts);
5966
} else {
60-
builder.add_metric(name, metric.long_value());
67+
builder.add_metric(name, metric.long_value(), ts);
6168
}
6269
break;
6370

6471
case sparkplug::DataType::UInt8:
6572
case sparkplug::DataType::UInt16:
6673
case sparkplug::DataType::UInt32:
6774
if (alias.has_value() && name[0] == '\0') {
68-
builder.add_metric_by_alias(*alias, static_cast<uint32_t>(metric.int_value()));
75+
builder.add_metric_by_alias(*alias, static_cast<uint32_t>(metric.int_value()),
76+
ts);
6977
} else if (alias.has_value()) {
7078
builder.add_metric_with_alias(name, *alias,
71-
static_cast<uint32_t>(metric.int_value()));
79+
static_cast<uint32_t>(metric.int_value()), ts);
7280
} else {
73-
builder.add_metric(name, static_cast<uint32_t>(metric.int_value()));
81+
builder.add_metric(name, static_cast<uint32_t>(metric.int_value()), ts);
7482
}
7583
break;
7684

7785
case sparkplug::DataType::UInt64:
7886
if (alias.has_value() && name[0] == '\0') {
79-
builder.add_metric_by_alias(*alias, static_cast<uint64_t>(metric.long_value()));
87+
builder.add_metric_by_alias(*alias, static_cast<uint64_t>(metric.long_value()),
88+
ts);
8089
} else if (alias.has_value()) {
8190
builder.add_metric_with_alias(name, *alias,
82-
static_cast<uint64_t>(metric.long_value()));
91+
static_cast<uint64_t>(metric.long_value()), ts);
8392
} else {
84-
builder.add_metric(name, static_cast<uint64_t>(metric.long_value()));
93+
builder.add_metric(name, static_cast<uint64_t>(metric.long_value()), ts);
8594
}
8695
break;
8796

8897
case sparkplug::DataType::Float:
8998
if (alias.has_value() && name[0] == '\0') {
90-
builder.add_metric_by_alias(*alias, metric.float_value());
99+
builder.add_metric_by_alias(*alias, metric.float_value(), ts);
91100
} else if (alias.has_value()) {
92-
builder.add_metric_with_alias(name, *alias, metric.float_value());
101+
builder.add_metric_with_alias(name, *alias, metric.float_value(), ts);
93102
} else {
94-
builder.add_metric(name, metric.float_value());
103+
builder.add_metric(name, metric.float_value(), ts);
95104
}
96105
break;
97106

98107
case sparkplug::DataType::Double:
99108
if (alias.has_value() && name[0] == '\0') {
100-
builder.add_metric_by_alias(*alias, metric.double_value());
109+
builder.add_metric_by_alias(*alias, metric.double_value(), ts);
101110
} else if (alias.has_value()) {
102-
builder.add_metric_with_alias(name, *alias, metric.double_value());
111+
builder.add_metric_with_alias(name, *alias, metric.double_value(), ts);
103112
} else {
104-
builder.add_metric(name, metric.double_value());
113+
builder.add_metric(name, metric.double_value(), ts);
105114
}
106115
break;
107116

108117
case sparkplug::DataType::Boolean:
109118
if (alias.has_value() && name[0] == '\0') {
110-
builder.add_metric_by_alias(*alias, metric.boolean_value());
119+
builder.add_metric_by_alias(*alias, metric.boolean_value(), ts);
111120
} else if (alias.has_value()) {
112-
builder.add_metric_with_alias(name, *alias, metric.boolean_value());
121+
builder.add_metric_with_alias(name, *alias, metric.boolean_value(), ts);
113122
} else {
114-
builder.add_metric(name, metric.boolean_value());
123+
builder.add_metric(name, metric.boolean_value(), ts);
115124
}
116125
break;
117126

118127
case sparkplug::DataType::String:
119128
case sparkplug::DataType::Text:
120129
if (alias.has_value() && name[0] == '\0') {
121-
builder.add_metric_by_alias(*alias, metric.string_value());
130+
builder.add_metric_by_alias(*alias, metric.string_value(), ts);
122131
} else if (alias.has_value()) {
123-
builder.add_metric_with_alias(name, *alias, metric.string_value());
132+
builder.add_metric_with_alias(name, *alias, metric.string_value(), ts);
124133
} else {
125-
builder.add_metric(name, metric.string_value());
134+
builder.add_metric(name, metric.string_value(), ts);
126135
}
127136
break;
128137

0 commit comments

Comments
 (0)