Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,28 @@ else()
message(WARNING "Install onnxruntime to enable ONNX model serving")
endif()

# Check for librdkafka (optional, for real Kafka streaming ingestion)
find_package(PkgConfig QUIET)
if(PkgConfig_FOUND)
pkg_check_modules(RDKAFKA QUIET rdkafka)
endif()
if(NOT RDKAFKA_FOUND)
find_library(RDKAFKA_LIBRARIES NAMES rdkafka)
find_path(RDKAFKA_INCLUDE_DIRS NAMES librdkafka/rdkafka.h)
if(RDKAFKA_LIBRARIES AND RDKAFKA_INCLUDE_DIRS)
set(RDKAFKA_FOUND TRUE)
endif()
endif()
if(RDKAFKA_FOUND)
target_compile_definitions(GigaVector PRIVATE HAVE_RDKAFKA)
target_include_directories(GigaVector PRIVATE ${RDKAFKA_INCLUDE_DIRS})
target_link_libraries(GigaVector PRIVATE ${RDKAFKA_LIBRARIES})
message(STATUS " Kafka streaming: Enabled (librdkafka found)")
else()
message(STATUS " Kafka streaming: Disabled (librdkafka not found)")
message(WARNING "Install librdkafka-dev to enable real Kafka streaming ingestion")
endif()

# SIMD runtime dispatch — compile distance.c with AVX2/AVX512 flags when the
# compiler supports them, without forcing -march=native across the whole build.
include(CheckCCompilerFlag)
Expand Down
48 changes: 43 additions & 5 deletions src/admin/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,49 @@ int cluster_start(GV_Cluster *cluster) {
local->state = GV_NODE_ACTIVE;
}

if (cluster->config.seed_nodes && cluster->config.seed_nodes[0] != '\0') {
char *seeds = gv_dup_cstr(cluster->config.seed_nodes);
if (seeds) {
char *saveptr = NULL;
char *tok = strtok_r(seeds, ",", &saveptr);
while (tok) {
while (*tok == ' ' || *tok == '\t') tok++;
char *end = tok + strlen(tok) - 1;
while (end > tok && (*end == ' ' || *end == '\t')) {
*end-- = '\0';
}
if (*tok != '\0' && cluster->node_count < MAX_NODES) {
char seed_id[128];
snprintf(seed_id, sizeof(seed_id), "seed-%.122s", tok);
for (char *p = seed_id + 5; *p; p++) {
if (*p == ':' || *p == '.') *p = '_';
}
if (!find_node(cluster, seed_id)) {
char *nid = gv_dup_cstr(seed_id);
char *addr = gv_dup_cstr(tok);
if (nid && addr) {
NodeEntry *entry = &cluster->nodes[cluster->node_count];
entry->node_id = nid;
entry->address = addr;
entry->role = GV_NODE_DATA;
entry->state = GV_NODE_JOINING;
entry->shard_ids = NULL;
entry->shard_count = 0;
entry->last_heartbeat = (uint64_t)time(NULL);
entry->load = 0.0;
cluster->node_count++;
} else {
free(nid);
free(addr);
}
}
}
tok = strtok_r(NULL, ",", &saveptr);
}
free(seeds);
}
}

pthread_rwlock_unlock(&cluster->rwlock);

/* Start heartbeat thread */
Expand All @@ -235,11 +278,6 @@ int cluster_start(GV_Cluster *cluster) {
return -1;
}

/* Note: Seed-node discovery and the RPC listener are not yet
* implemented. The cluster currently operates in single-node mode
* with all management (shard assignment, node health) done locally.
* A future version will add TCP-based gossip / RPC here. */

/* Mark cluster ready */
pthread_mutex_lock(&cluster->state_mutex);
cluster->is_ready = 1;
Expand Down
44 changes: 29 additions & 15 deletions src/admin/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -158,16 +158,27 @@ static void *replication_thread_func(void *arg) {
mgr->leader_id = NULL;
free(mgr->voted_for);
mgr->voted_for = mgr->node_id ? gv_dup_cstr(mgr->node_id) : NULL;

/* Note: In a real implementation, this would:
* 1. Request votes from all known replicas
* 2. Wait for majority response
* 3. If majority votes received, become leader
* 4. If another leader is discovered, become follower
* 5. If election times out, restart election with new term */
}
}
}
} else if (mgr->role == GV_REPL_CANDIDATE) {
/* Count in-process registered follower DBs as automatic votes: they
* are co-located and can trivially accept this node as leader.
* Remote-only replicas (follower_dbs[i] == NULL) cannot be asked
* without a wire protocol, so they are not counted. */
size_t votes = 1; /* self-vote */
for (size_t i = 0; i < mgr->replica_count; i++) {
if (mgr->follower_dbs[i] != NULL) {
votes++;
}
}
size_t quorum = (mgr->replica_count + 1) / 2 + 1;
if (votes >= quorum) {
mgr->role = GV_REPL_LEADER;
free(mgr->leader_id);
mgr->leader_id = mgr->node_id ? gv_dup_cstr(mgr->node_id) : NULL;
replication_embedded_followers_catch_up_locked(mgr);
}
}

pthread_rwlock_unlock(&mgr->rwlock);
Expand Down Expand Up @@ -333,22 +344,25 @@ int replication_request_leadership(GV_ReplicationManager *mgr) {
free(mgr->voted_for);
mgr->voted_for = gv_dup_cstr(mgr->node_id);

/* In a real implementation, we would:
* 1. Send RequestVote to all replicas
* 2. Wait for majority
* 3. Become leader if we win
*/

if (mgr->replica_count == 0) {
size_t votes = 1;
for (size_t i = 0; i < mgr->replica_count; i++) {
if (mgr->follower_dbs[i] != NULL) {
votes++;
}
}
size_t quorum = (mgr->replica_count + 1) / 2 + 1;
if (votes >= quorum) {
mgr->role = GV_REPL_LEADER;
free(mgr->leader_id);
mgr->leader_id = gv_dup_cstr(mgr->node_id);
replication_embedded_followers_catch_up_locked(mgr);
}

GV_ReplicationRole result_role = mgr->role;
pthread_rwlock_unlock(&mgr->rwlock);
pthread_mutex_unlock(&mgr->election_mutex);

return mgr->role == GV_REPL_LEADER ? 0 : -1;
return result_role == GV_REPL_LEADER ? 0 : -1;
}

int replication_add_follower(GV_ReplicationManager *mgr, const char *node_id,
Expand Down
Loading
Loading