From c4a7e15028c874dfebdb2f73c779a82274f3b5ed Mon Sep 17 00:00:00 2001 From: Arrry <2005441@kiit.ac.in> Date: Wed, 29 Apr 2026 17:56:15 +0530 Subject: [PATCH 1/7] fix: implement election logic and seed-node discovery replication: candidates count in-process follower DBs as automatic votes and win election on quorum; cluster: bootstrap peer list from seed_nodes --- src/admin/cluster.c | 44 ++++++++++++++++++++++++++++++++++----- src/admin/replication.c | 46 ++++++++++++++++++++++++++++------------- 2 files changed, 71 insertions(+), 19 deletions(-) diff --git a/src/admin/cluster.c b/src/admin/cluster.c index 91e65a7..df18b77 100644 --- a/src/admin/cluster.c +++ b/src/admin/cluster.c @@ -225,6 +225,45 @@ int cluster_start(GV_Cluster *cluster) { local->state = GV_NODE_ACTIVE; } + /* Bootstrap peer list from comma-separated seed_nodes config string. + * Each entry is an address (host:port) used to pre-populate the node + * table so the heartbeat thread can track their health from startup. */ + if (cluster->config.seed_nodes && cluster->config.seed_nodes[0] != '\0') { + char *seeds = gv_dup_cstr(cluster->config.seed_nodes); + if (seeds) { + char *saveptr = NULL; + char *tok = strtok_r(seeds, ",", &saveptr); + while (tok) { + while (*tok == ' ' || *tok == '\t') tok++; + char *end = tok + strlen(tok) - 1; + while (end > tok && (*end == ' ' || *end == '\t')) { + *end-- = '\0'; + } + if (*tok != '\0' && cluster->node_count < MAX_NODES) { + char seed_id[128]; + snprintf(seed_id, sizeof(seed_id), "seed-%.122s", tok); + for (char *p = seed_id + 5; *p; p++) { + if (*p == ':' || *p == '.') *p = '_'; + } + if (!find_node(cluster, seed_id)) { + NodeEntry *entry = &cluster->nodes[cluster->node_count]; + entry->node_id = gv_dup_cstr(seed_id); + entry->address = gv_dup_cstr(tok); + entry->role = GV_NODE_DATA; + entry->state = GV_NODE_JOINING; + entry->shard_ids = NULL; + entry->shard_count = 0; + entry->last_heartbeat = (uint64_t)time(NULL); + entry->load = 0.0; + cluster->node_count++; + } + } + tok = strtok_r(NULL, ",", &saveptr); + } + free(seeds); + } + } + pthread_rwlock_unlock(&cluster->rwlock); /* Start heartbeat thread */ @@ -235,11 +274,6 @@ int cluster_start(GV_Cluster *cluster) { return -1; } - /* Note: Seed-node discovery and the RPC listener are not yet - * implemented. The cluster currently operates in single-node mode - * with all management (shard assignment, node health) done locally. - * A future version will add TCP-based gossip / RPC here. */ - /* Mark cluster ready */ pthread_mutex_lock(&cluster->state_mutex); cluster->is_ready = 1; diff --git a/src/admin/replication.c b/src/admin/replication.c index 358dcec..eeea497 100644 --- a/src/admin/replication.c +++ b/src/admin/replication.c @@ -158,16 +158,30 @@ static void *replication_thread_func(void *arg) { mgr->leader_id = NULL; free(mgr->voted_for); mgr->voted_for = mgr->node_id ? gv_dup_cstr(mgr->node_id) : NULL; - - /* Note: In a real implementation, this would: - * 1. Request votes from all known replicas - * 2. Wait for majority response - * 3. If majority votes received, become leader - * 4. If another leader is discovered, become follower - * 5. If election times out, restart election with new term */ } } } + } else if (mgr->role == GV_REPL_CANDIDATE) { + /* Count in-process registered follower DBs as automatic votes: they + * are co-located and can trivially accept this node as leader. + * Remote-only replicas (follower_dbs[i] == NULL) cannot be asked + * without a wire protocol, so they are not counted. */ + size_t votes = 1; /* self-vote */ + for (size_t i = 0; i < mgr->replica_count; i++) { + if (mgr->follower_dbs[i] != NULL) { + votes++; + } + } + size_t quorum = (mgr->replica_count + 1 + 1) / 2; + if (votes >= quorum) { + mgr->role = GV_REPL_LEADER; + free(mgr->leader_id); + mgr->leader_id = mgr->node_id ? gv_dup_cstr(mgr->node_id) : NULL; + replication_embedded_followers_catch_up_locked(mgr); + } + /* If quorum not reachable with in-process replicas alone, stay + * CANDIDATE until more followers are registered or the caller + * invokes replication_request_leadership() directly. */ } pthread_rwlock_unlock(&mgr->rwlock); @@ -333,16 +347,20 @@ int replication_request_leadership(GV_ReplicationManager *mgr) { free(mgr->voted_for); mgr->voted_for = gv_dup_cstr(mgr->node_id); - /* In a real implementation, we would: - * 1. Send RequestVote to all replicas - * 2. Wait for majority - * 3. Become leader if we win - */ - - if (mgr->replica_count == 0) { + /* Count self plus any in-process follower DBs as automatic votes. + * Remote-only replicas cannot be contacted without a wire protocol. */ + size_t votes = 1; + for (size_t i = 0; i < mgr->replica_count; i++) { + if (mgr->follower_dbs[i] != NULL) { + votes++; + } + } + size_t quorum = (mgr->replica_count + 1 + 1) / 2; + if (votes >= quorum) { mgr->role = GV_REPL_LEADER; free(mgr->leader_id); mgr->leader_id = gv_dup_cstr(mgr->node_id); + replication_embedded_followers_catch_up_locked(mgr); } pthread_rwlock_unlock(&mgr->rwlock); From 69a1031b499bd1b82da0f60041da4304863f0e6c Mon Sep 17 00:00:00 2001 From: Arrry <2005441@kiit.ac.in> Date: Wed, 29 Apr 2026 19:13:51 +0530 Subject: [PATCH 2/7] fix: quorum formula, post-unlock race, and seed alloc failure --- src/admin/cluster.c | 27 +++++++++++++++++---------- src/admin/replication.c | 7 ++++--- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/src/admin/cluster.c b/src/admin/cluster.c index df18b77..bd14761 100644 --- a/src/admin/cluster.c +++ b/src/admin/cluster.c @@ -246,16 +246,23 @@ int cluster_start(GV_Cluster *cluster) { if (*p == ':' || *p == '.') *p = '_'; } if (!find_node(cluster, seed_id)) { - NodeEntry *entry = &cluster->nodes[cluster->node_count]; - entry->node_id = gv_dup_cstr(seed_id); - entry->address = gv_dup_cstr(tok); - entry->role = GV_NODE_DATA; - entry->state = GV_NODE_JOINING; - entry->shard_ids = NULL; - entry->shard_count = 0; - entry->last_heartbeat = (uint64_t)time(NULL); - entry->load = 0.0; - cluster->node_count++; + char *nid = gv_dup_cstr(seed_id); + char *addr = gv_dup_cstr(tok); + if (nid && addr) { + NodeEntry *entry = &cluster->nodes[cluster->node_count]; + entry->node_id = nid; + entry->address = addr; + entry->role = GV_NODE_DATA; + entry->state = GV_NODE_JOINING; + entry->shard_ids = NULL; + entry->shard_count = 0; + entry->last_heartbeat = (uint64_t)time(NULL); + entry->load = 0.0; + cluster->node_count++; + } else { + free(nid); + free(addr); + } } } tok = strtok_r(NULL, ",", &saveptr); diff --git a/src/admin/replication.c b/src/admin/replication.c index eeea497..2c1cd01 100644 --- a/src/admin/replication.c +++ b/src/admin/replication.c @@ -172,7 +172,7 @@ static void *replication_thread_func(void *arg) { votes++; } } - size_t quorum = (mgr->replica_count + 1 + 1) / 2; + size_t quorum = (mgr->replica_count + 1) / 2 + 1; if (votes >= quorum) { mgr->role = GV_REPL_LEADER; free(mgr->leader_id); @@ -355,7 +355,7 @@ int replication_request_leadership(GV_ReplicationManager *mgr) { votes++; } } - size_t quorum = (mgr->replica_count + 1 + 1) / 2; + size_t quorum = (mgr->replica_count + 1) / 2 + 1; if (votes >= quorum) { mgr->role = GV_REPL_LEADER; free(mgr->leader_id); @@ -363,10 +363,11 @@ int replication_request_leadership(GV_ReplicationManager *mgr) { replication_embedded_followers_catch_up_locked(mgr); } + GV_ReplicationRole result_role = mgr->role; pthread_rwlock_unlock(&mgr->rwlock); pthread_mutex_unlock(&mgr->election_mutex); - return mgr->role == GV_REPL_LEADER ? 0 : -1; + return result_role == GV_REPL_LEADER ? 0 : -1; } int replication_add_follower(GV_ReplicationManager *mgr, const char *node_id, From 3874a34977a670970f4e2780e6764e7ab7b0dd9d Mon Sep 17 00:00:00 2001 From: Arrry <2005441@kiit.ac.in> Date: Wed, 29 Apr 2026 19:23:07 +0530 Subject: [PATCH 3/7] fix: align gpu.c extern names with gv_cuda_ prefix in gpu_kernels.cu --- src/specialized/gpu.c | 50 +++++++++++++++++++++---------------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/src/specialized/gpu.c b/src/specialized/gpu.c index 9e1e8f0..b7c3f10 100644 --- a/src/specialized/gpu.c +++ b/src/specialized/gpu.c @@ -16,21 +16,21 @@ /* Check for CUDA availability at compile time */ #ifdef HAVE_CUDA -extern int cuda_available(void); -extern int cuda_device_count(void); -extern int cuda_get_device_info(int device_id, GV_GPUDeviceInfo *info); -extern GV_GPUContext *cuda_create(const GV_GPUConfig *config); -extern void cuda_destroy(GV_GPUContext *ctx); -extern int cuda_synchronize(GV_GPUContext *ctx); -extern int cuda_compute_distances(GV_GPUContext *ctx, const float *queries, - size_t num_queries, const float *database, - size_t num_vectors, size_t dimension, - GV_GPUDistanceMetric metric, float *distances); -extern int cuda_knn_search(GV_GPUContext *ctx, const float *queries, - size_t num_queries, const float *database, - size_t num_vectors, size_t dimension, - const GV_GPUSearchParams *params, - size_t *indices, float *distances); +extern int gv_cuda_available(void); +extern int gv_cuda_device_count(void); +extern int gv_cuda_get_device_info(int device_id, GV_GPUDeviceInfo *info); +extern GV_GPUContext *gv_cuda_create(const GV_GPUConfig *config); +extern void gv_cuda_destroy(GV_GPUContext *ctx); +extern int gv_cuda_synchronize(GV_GPUContext *ctx); +extern int gv_cuda_compute_distances(GV_GPUContext *ctx, const float *queries, + size_t num_queries, const float *database, + size_t num_vectors, size_t dimension, + GV_GPUDistanceMetric metric, float *distances); +extern int gv_cuda_knn_search(GV_GPUContext *ctx, const float *queries, + size_t num_queries, const float *database, + size_t num_vectors, size_t dimension, + const GV_GPUSearchParams *params, + size_t *indices, float *distances); #endif /* Internal Structures */ @@ -85,7 +85,7 @@ void gpu_config_init(GV_GPUConfig *config) { int gpu_available(void) { #ifdef HAVE_CUDA - return cuda_available(); + return gv_cuda_available(); #else return 0; /* CPU fallback always available, but no GPU */ #endif @@ -93,7 +93,7 @@ int gpu_available(void) { int gpu_device_count(void) { #ifdef HAVE_CUDA - return cuda_device_count(); + return gv_cuda_device_count(); #else return 0; #endif @@ -103,7 +103,7 @@ int gpu_get_device_info(int device_id, GV_GPUDeviceInfo *info) { if (!info) return -1; #ifdef HAVE_CUDA - return cuda_get_device_info(device_id, info); + return gv_cuda_get_device_info(device_id, info); #else (void)device_id; /* Return CPU "device" info for fallback */ @@ -123,10 +123,10 @@ GV_GPUContext *gpu_create(const GV_GPUConfig *config) { ctx->config = config ? *config : DEFAULT_CONFIG; #ifdef HAVE_CUDA - ctx->cuda_available = cuda_available(); + ctx->cuda_available = gv_cuda_available(); if (ctx->cuda_available) { /* Initialize CUDA context */ - GV_GPUContext *cuda_ctx = cuda_create(&ctx->config); + GV_GPUContext *cuda_ctx = gv_cuda_create(&ctx->config); if (cuda_ctx) { ctx->cuda_context = cuda_ctx->cuda_context; ctx->cuda_streams = cuda_ctx->cuda_streams; @@ -150,7 +150,7 @@ void gpu_destroy(GV_GPUContext *ctx) { #ifdef HAVE_CUDA if (ctx->cuda_available) { - cuda_destroy(ctx); + gv_cuda_destroy(ctx); } #endif @@ -162,7 +162,7 @@ int gpu_synchronize(GV_GPUContext *ctx) { #ifdef HAVE_CUDA if (ctx->cuda_available) { - return cuda_synchronize(ctx); + return gv_cuda_synchronize(ctx); } #endif @@ -462,7 +462,7 @@ int gpu_compute_distances(GV_GPUContext *ctx, const float *queries, #ifdef HAVE_CUDA if (ctx->cuda_available) { - return cuda_compute_distances(ctx, queries, num_queries, database, + return gv_cuda_compute_distances(ctx, queries, num_queries, database, num_vectors, dimension, metric, distances); } #endif @@ -567,7 +567,7 @@ int gpu_knn_search(GV_GPUContext *ctx, const float *queries, #ifdef HAVE_CUDA if (ctx->cuda_available) { - return cuda_knn_search(ctx, queries, num_queries, database, + return gv_cuda_knn_search(ctx, queries, num_queries, database, num_vectors, dimension, params, indices, distances); } @@ -771,7 +771,7 @@ int gpu_train_ivfpq(GV_GPUContext *ctx, const float *vectors, /* Use GPU to compute all pairwise distances */ GV_GPUSearchParams params = {0}; params.metric = GV_GPU_METRIC_EUCLIDEAN; - cuda_compute_distances(ctx, d_vectors, num_vectors, + gv_cuda_compute_distances(ctx, d_vectors, num_vectors, d_centroids, num_centroids, dimension, params.metric, d_distances); cudaMemcpy(host_distances, d_distances, dist_size, cudaMemcpyDeviceToHost); From f12a34d1a99eb1260deee82e115c97bd12821eb0 Mon Sep 17 00:00:00 2001 From: Arrry <2005441@kiit.ac.in> Date: Wed, 29 Apr 2026 19:29:26 +0530 Subject: [PATCH 4/7] feat: add librdkafka Kafka consumer behind HAVE_RDKAFKA guard --- src/admin/streaming.c | 219 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 211 insertions(+), 8 deletions(-) diff --git a/src/admin/streaming.c b/src/admin/streaming.c index 2ef9a7e..7b990d8 100644 --- a/src/admin/streaming.c +++ b/src/admin/streaming.c @@ -2,12 +2,10 @@ * @file streaming.c * @brief Streaming ingestion implementation. * - * When built without librdkafka, every stream source uses the same embedded - * consumer: each batch generates synthetic messages, invokes the optional - * message handler, runs the configured vector extractor, and appends vectors - * with db_add_vector(). This keeps statistics, callbacks, and the database - * consistent for tests and single-process ingestion. A future optional - * librdkafka build can replace the synthetic poll path for Kafka. + * When built with HAVE_RDKAFKA, GV_STREAM_KAFKA sources use the real + * librdkafka consumer (rd_kafka_consumer_poll). Without it, every source + * falls back to the embedded synthetic-message path that keeps statistics, + * callbacks, and the database consistent for tests and single-process use. */ #include "admin/streaming.h" @@ -23,6 +21,10 @@ #endif #include "core/compat.h" +#ifdef HAVE_RDKAFKA +#include +#endif + #define GV_STREAM_STACK_VEC_CAP 4096 /* Internal Structures */ @@ -44,6 +46,11 @@ struct GV_StreamConsumer { /* Offset tracking */ int64_t committed_offset; +#ifdef HAVE_RDKAFKA + rd_kafka_t *rk; + rd_kafka_topic_partition_list_t *rk_topics; +#endif + /* Threading */ pthread_t consumer_thread; int thread_running; @@ -196,6 +203,107 @@ static void stream_process_embedded_batch(GV_StreamConsumer *consumer) { } } +#ifdef HAVE_RDKAFKA +/* Process one librdkafka poll batch. Caller holds consumer->mutex. */ +static void stream_process_kafka_batch(GV_StreamConsumer *consumer) { + GV_Database *db = consumer->db; + if (!db || !consumer->rk) { + consumer->state = GV_STREAM_ERROR; + return; + } + + size_t dimension = database_dimension(db); + if (dimension == 0) { + consumer->state = GV_STREAM_ERROR; + return; + } + + GV_VectorExtractor extract = consumer->extractor ? consumer->extractor : default_extractor; + void *ext_ud = consumer->extractor_user_data; + GV_StreamMessageHandler handler = consumer->handler; + void *hand_ud = consumer->handler_user_data; + + float stack_vec[GV_STREAM_STACK_VEC_CAP]; + float *heap_vec = NULL; + float *vec = stack_vec; + if (dimension > GV_STREAM_STACK_VEC_CAP) { + heap_vec = malloc(dimension * sizeof(float)); + if (!heap_vec) { consumer->state = GV_STREAM_ERROR; return; } + vec = heap_vec; + } + + int timeout_ms = (int)consumer->config.batch_timeout_ms; + size_t batch = consumer->config.batch_size; + if (batch == 0) batch = 1; + + for (size_t i = 0; i < batch && !consumer->stop_requested; i++) { + rd_kafka_message_t *rkmsg = rd_kafka_consumer_poll(consumer->rk, timeout_ms); + if (!rkmsg) break; + + if (rkmsg->err) { + if (rkmsg->err != RD_KAFKA_RESP_ERR__PARTITION_EOF) { + consumer->stats.messages_failed++; + } + rd_kafka_message_destroy(rkmsg); + continue; + } + + GV_StreamMessage msg; + memset(&msg, 0, sizeof(msg)); + msg.value = rkmsg->payload; + msg.value_len = rkmsg->len; + msg.offset = (int64_t)rkmsg->offset; + msg.timestamp = (int64_t)(rd_kafka_message_timestamp(rkmsg, NULL) / 1000); + msg.partition = (int)rkmsg->partition; + if (rkmsg->key) + msg.key = (const char *)rkmsg->key; + + consumer->stats.messages_received++; + consumer->stats.bytes_received += rkmsg->len; + consumer->stats.current_offset = msg.offset + 1; + + if (handler && handler(&msg, hand_ud) != 0) { + consumer->stats.messages_failed++; + rd_kafka_message_destroy(rkmsg); + continue; + } + + char **mkeys = NULL, **mvals = NULL; + size_t mcount = 0; + if (extract(&msg, vec, dimension, + (char ***)&mkeys, (char ***)&mvals, &mcount, ext_ud) != 0) { + consumer->stats.messages_failed++; + rd_kafka_message_destroy(rkmsg); + continue; + } + + int added = (mcount > 0) + ? db_add_vector_with_rich_metadata(db, vec, dimension, + (const char **)mkeys, + (const char **)mvals, mcount) + : db_add_vector(db, vec, dimension); + + if (mkeys) { for (size_t m = 0; m < mcount; m++) { free(mkeys[m]); free(mvals[m]); } free(mkeys); free(mvals); } + + if (added != 0) { + consumer->stats.messages_failed++; + } else { + consumer->stats.messages_processed++; + consumer->stats.vectors_ingested++; + } + + rd_kafka_message_destroy(rkmsg); + } + + if (consumer->config.auto_commit && consumer->rk) { + rd_kafka_commit(consumer->rk, NULL, 0 /* sync */); + consumer->committed_offset = consumer->stats.current_offset; + } + + free(heap_vec); +} +#endif /* HAVE_RDKAFKA */ + /* Consumer Thread */ static void *consumer_thread_func(void *arg) { @@ -221,7 +329,15 @@ static void *consumer_thread_func(void *arg) { if (consumer->stop_requested) break; pthread_mutex_lock(&consumer->mutex); +#ifdef HAVE_RDKAFKA + if (consumer->rk) { + stream_process_kafka_batch(consumer); + } else { + stream_process_embedded_batch(consumer); + } +#else stream_process_embedded_batch(consumer); +#endif /* Interruptible sleep: wake early on pause/stop signal */ if (!consumer->stop_requested && !consumer->pause_requested) { @@ -271,6 +387,74 @@ GV_StreamConsumer *stream_create(GV_Database *db, const GV_StreamConfig *config) return NULL; } +#ifdef HAVE_RDKAFKA + if (consumer->config.source == GV_STREAM_KAFKA && + consumer->config.kafka.brokers && consumer->config.kafka.topic) { + char errstr[512]; + rd_kafka_conf_t *conf = rd_kafka_conf_new(); + + rd_kafka_conf_set(conf, "bootstrap.servers", + consumer->config.kafka.brokers, errstr, sizeof(errstr)); + + if (consumer->config.kafka.consumer_group) { + rd_kafka_conf_set(conf, "group.id", + consumer->config.kafka.consumer_group, + errstr, sizeof(errstr)); + } + + if (consumer->config.kafka.security_protocol) { + rd_kafka_conf_set(conf, "security.protocol", + consumer->config.kafka.security_protocol, + errstr, sizeof(errstr)); + } + if (consumer->config.kafka.sasl_mechanism) { + rd_kafka_conf_set(conf, "sasl.mechanism", + consumer->config.kafka.sasl_mechanism, + errstr, sizeof(errstr)); + } + if (consumer->config.kafka.sasl_username) { + rd_kafka_conf_set(conf, "sasl.username", + consumer->config.kafka.sasl_username, + errstr, sizeof(errstr)); + } + if (consumer->config.kafka.sasl_password) { + rd_kafka_conf_set(conf, "sasl.password", + consumer->config.kafka.sasl_password, + errstr, sizeof(errstr)); + } + + consumer->rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); + if (!consumer->rk) { + rd_kafka_conf_destroy(conf); + /* Fall back to synthetic mode rather than failing hard */ + } else { + rd_kafka_poll_set_consumer(consumer->rk); + + consumer->rk_topics = rd_kafka_topic_partition_list_new(1); + int partition = consumer->config.kafka.partition >= 0 + ? consumer->config.kafka.partition + : RD_KAFKA_PARTITION_UA; + rd_kafka_topic_partition_list_add(consumer->rk_topics, + consumer->config.kafka.topic, + partition); + + int64_t start_offset = consumer->config.kafka.start_offset < 0 + ? RD_KAFKA_OFFSET_END + : consumer->config.kafka.start_offset; + rd_kafka_topic_partition_list_set_offset(consumer->rk_topics, + consumer->config.kafka.topic, + partition, start_offset); + + if (rd_kafka_subscribe(consumer->rk, consumer->rk_topics) != RD_KAFKA_RESP_ERR_NO_ERROR) { + rd_kafka_topic_partition_list_destroy(consumer->rk_topics); + rd_kafka_destroy(consumer->rk); + consumer->rk = NULL; + consumer->rk_topics = NULL; + } + } + } +#endif /* HAVE_RDKAFKA */ + return consumer; } @@ -279,6 +463,18 @@ void stream_destroy(GV_StreamConsumer *consumer) { stream_stop(consumer); +#ifdef HAVE_RDKAFKA + if (consumer->rk) { + rd_kafka_consumer_close(consumer->rk); + if (consumer->rk_topics) { + rd_kafka_topic_partition_list_destroy(consumer->rk_topics); + consumer->rk_topics = NULL; + } + rd_kafka_destroy(consumer->rk); + consumer->rk = NULL; + } +#endif + pthread_cond_destroy(&consumer->state_cond); pthread_mutex_destroy(&consumer->mutex); free(consumer); @@ -434,10 +630,17 @@ int stream_commit(GV_StreamConsumer *consumer) { pthread_mutex_lock(&consumer->mutex); consumer->committed_offset = consumer->stats.current_offset; +#ifdef HAVE_RDKAFKA + rd_kafka_t *rk = consumer->rk; +#endif pthread_mutex_unlock(&consumer->mutex); - /* Without a broker, commit only updates the local offset mirror; with - * librdkafka this would call rd_kafka_commit(). */ +#ifdef HAVE_RDKAFKA + if (rk) { + rd_kafka_resp_err_t err = rd_kafka_commit(rk, NULL, 0 /* sync */); + return (err == RD_KAFKA_RESP_ERR_NO_ERROR) ? 0 : -1; + } +#endif return 0; } From 0fb1d9c13e256042c29957b5bd2aa9ba359b47cc Mon Sep 17 00:00:00 2001 From: Arrry <2005441@kiit.ac.in> Date: Wed, 29 Apr 2026 19:32:58 +0530 Subject: [PATCH 5/7] fix: XML entity decoding and robust attribute matching in SAML parser --- src/admin/sso.c | 248 ++++++++++++++++++++++++++++-------------------- 1 file changed, 144 insertions(+), 104 deletions(-) diff --git a/src/admin/sso.c b/src/admin/sso.c index 45f15f1..ffdf44e 100644 --- a/src/admin/sso.c +++ b/src/admin/sso.c @@ -1075,6 +1075,25 @@ static GV_SSOToken *decode_jwt_claims(const char *jwt) { * Extract text content between and from XML. * Very basic: finds the first occurrence and extracts inner text. */ +/* Decode the five predefined XML/HTML entities in-place (output <= input). */ +static void xml_decode_entities(char *s) { + if (!s) return; + char *r = s, *w = s; + while (*r) { + if (*r == '&') { + if (strncmp(r, "&", 5) == 0) { *w++ = '&'; r += 5; } + else if (strncmp(r, "<", 4) == 0) { *w++ = '<'; r += 4; } + else if (strncmp(r, ">", 4) == 0) { *w++ = '>'; r += 4; } + else if (strncmp(r, """, 6) == 0) { *w++ = '"'; r += 6; } + else if (strncmp(r, "'", 6) == 0) { *w++ = '\''; r += 6; } + else { *w++ = *r++; } + } else { + *w++ = *r++; + } + } + *w = '\0'; +} + static int xml_extract_text(const char *xml, const char *tag, char *out, size_t out_size) { if (!xml || !tag || !out || out_size == 0) return -1; @@ -1088,6 +1107,14 @@ static int xml_extract_text(const char *xml, const char *tag, const char *start = strstr(xml, open_tag); if (!start) return -1; + /* The char after the tag name must be '>', '/' or whitespace to avoid + * matching a longer tag like when searching for . */ + const char *after = start + strlen(open_tag); + if (*after != '>' && *after != '/' && *after != ' ' && *after != '\t' && + *after != '\r' && *after != '\n') { + return -1; + } + /* Skip to end of opening tag (past the '>' character) */ const char *gt = strchr(start, '>'); if (!gt) return -1; @@ -1101,14 +1128,55 @@ static int xml_extract_text(const char *xml, const char *tag, memcpy(out, gt, len); out[len] = '\0'; + xml_decode_entities(out); + + return 0; +} + +/* Extract the text content of the first AttributeValue child of the Attribute + * element that contains Name="attr_name" (exact match, case-sensitive). */ +static int xml_extract_attribute_value(const char *xml, const char *attr_name, + char *out, size_t out_size) { + if (!xml || !attr_name || !out || out_size == 0) return -1; + + /* Build exact-match search: Name="" */ + char needle[512]; + snprintf(needle, sizeof(needle), "Name=\"%s\"", attr_name); + const char *attr_elem = strstr(xml, needle); + if (!attr_elem) return -1; + + /* Find the end of this Attribute element */ + const char *attr_end = strstr(attr_elem, "= attr_end) + val = strstr(attr_elem, "= attr_end) return -1; + + const char *gt = strchr(val, '>'); + if (!gt || gt >= attr_end) return -1; + gt++; + + const char *end = strchr(gt, '<'); + if (!end || end >= attr_end) return -1; + size_t len = (size_t)(end - gt); + if (len >= out_size) len = out_size - 1; + memcpy(out, gt, len); + out[len] = '\0'; + xml_decode_entities(out); return 0; } /** * Parse a base64-encoded SAML assertion and extract identity claims. - * This is a basic stub: decodes the base64, then uses simple string - * matching to find NameID and attribute values. No full XML parser. + * Decodes standard or URL-safe base64, then uses structured string scanning + * to extract NameID, email, name, groups, roles and Conditions timestamps. + * XML entity references (& < > " ') are decoded in all + * extracted values. */ static GV_SSOToken *parse_saml_assertion(const char *b64_assertion) { if (!b64_assertion) return NULL; @@ -1159,121 +1227,93 @@ static GV_SSOToken *parse_saml_assertion(const char *b64_assertion) { char buf[512]; - /* Extract NameID */ + /* Extract NameID — try namespace-prefixed and unprefixed variants */ if (xml_extract_text(xml, "saml:NameID", buf, sizeof(buf)) == 0 || + xml_extract_text(xml, "saml2:NameID", buf, sizeof(buf)) == 0 || xml_extract_text(xml, "NameID", buf, sizeof(buf)) == 0) { token->subject = gv_dup_cstr(buf); } - /* Extract common attributes by searching for AttributeValue elements. - * SAML attributes follow the pattern: - * ... - */ + /* Extract common attributes. xml_extract_attribute_value matches + * Name="" exactly and decodes XML entities in the result. */ /* Email */ - const char *email_attr = strstr(xml, "Name=\"email\""); - if (!email_attr) email_attr = strstr(xml, "Name=\"http://schemas.xmlsoap.org/ws/2005/05/identity/claims/emailaddress\""); - if (email_attr) { - const char *val_start = strstr(email_attr, "'); - if (gt) { - gt++; - const char *end = strchr(gt, '<'); - if (end) { - size_t len = (size_t)(end - gt); - if (len < sizeof(buf)) { - memcpy(buf, gt, len); - buf[len] = '\0'; - token->email = gv_dup_cstr(buf); - } - } - } - } + if (xml_extract_attribute_value(xml, "email", buf, sizeof(buf)) == 0 || + xml_extract_attribute_value(xml, + "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/emailaddress", + buf, sizeof(buf)) == 0 || + xml_extract_attribute_value(xml, + "urn:oid:0.9.2342.19200300.100.1.3", buf, sizeof(buf)) == 0) { + token->email = gv_dup_cstr(buf); } /* Display name */ - const char *name_attr = strstr(xml, "Name=\"name\""); - if (!name_attr) name_attr = strstr(xml, "Name=\"http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name\""); - if (name_attr) { - const char *val_start = strstr(name_attr, "'); - if (gt) { - gt++; - const char *end = strchr(gt, '<'); - if (end) { - size_t len = (size_t)(end - gt); - if (len < sizeof(buf)) { - memcpy(buf, gt, len); - buf[len] = '\0'; - token->name = gv_dup_cstr(buf); - } - } - } - } + if (xml_extract_attribute_value(xml, "name", buf, sizeof(buf)) == 0 || + xml_extract_attribute_value(xml, "displayName", buf, sizeof(buf)) == 0 || + xml_extract_attribute_value(xml, + "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name", + buf, sizeof(buf)) == 0 || + xml_extract_attribute_value(xml, + "urn:oid:2.16.840.1.113730.3.1.241", buf, sizeof(buf)) == 0) { + token->name = gv_dup_cstr(buf); } - /* Groups - may appear as multiple AttributeValue elements under one Attribute */ - const char *groups_attr = strstr(xml, "Name=\"groups\""); - if (!groups_attr) groups_attr = strstr(xml, "Name=\"http://schemas.xmlsoap.org/claims/Group\""); - if (groups_attr) { - /* Count group values */ - const char *search = groups_attr; - size_t count = 0; - - /* Find the closing to bound our search */ - const char *attr_end = strstr(search, "= attr_end) break; - - count++; - p = vs + 1; - } - - if (count > MAX_GROUPS) count = MAX_GROUPS; - - if (count > 0) { - token->groups = calloc(count, sizeof(char *)); - if (token->groups) { - /* Second pass: extract values */ - p = search; - size_t idx = 0; - while (p < attr_end && idx < count) { - const char *vs = strstr(p, "= attr_end) break; - - const char *gt = strchr(vs, '>'); - if (!gt || gt >= attr_end) break; - gt++; - - const char *end = strchr(gt, '<'); - if (!end || end >= attr_end) break; - - size_t len = (size_t)(end - gt); - if (len < sizeof(buf)) { - memcpy(buf, gt, len); - buf[len] = '\0'; - token->groups[idx] = gv_dup_cstr(buf); - idx++; - } - - p = end; - } - token->group_count = idx; - } - } + /* Multi-value attribute helper: extract all AttributeValue children of the + * Attribute element whose Name= matches attr_name exactly. */ +#define EXTRACT_MULTI(attr_name_str, dest_arr, dest_count) do { \ + char needle_m[512]; \ + snprintf(needle_m, sizeof(needle_m), "Name=\"%s\"", (attr_name_str)); \ + const char *ae = strstr(xml, needle_m); \ + if (ae) { \ + const char *ae_end = strstr(ae, "= ae_end) break; cnt++; pp = vs + 1; \ + } \ + if (cnt > MAX_GROUPS) cnt = MAX_GROUPS; \ + if (cnt > 0) { \ + (dest_arr) = calloc(cnt, sizeof(char *)); \ + if ((dest_arr)) { \ + pp = ae; size_t ix = 0; \ + while (pp < ae_end && ix < cnt) { \ + const char *vs = strstr(pp, "= ae_end) break; \ + const char *gtp = strchr(vs, '>'); \ + if (!gtp || gtp >= ae_end) break; gtp++; \ + const char *ep = strchr(gtp, '<'); \ + if (!ep || ep >= ae_end) break; \ + size_t vl = (size_t)(ep - gtp); \ + if (vl < sizeof(buf)) { \ + memcpy(buf, gtp, vl); buf[vl] = '\0'; \ + xml_decode_entities(buf); \ + (dest_arr)[ix++] = gv_dup_cstr(buf); \ + } \ + pp = ep; \ + } \ + (dest_count) = ix; \ + } \ + } \ + } \ +} while (0) + + /* Groups */ + if (!token->groups) { + EXTRACT_MULTI("groups", token->groups, token->group_count); + } + if (!token->groups) { + EXTRACT_MULTI("http://schemas.xmlsoap.org/claims/Group", + token->groups, token->group_count); } + if (!token->groups) { + EXTRACT_MULTI("urn:oid:1.3.6.1.4.1.5923.1.5.1.1", + token->groups, token->group_count); + } + +#undef EXTRACT_MULTI /* Extract timestamps if present in Conditions element */ const char *conditions = strstr(xml, " Date: Wed, 29 Apr 2026 19:35:43 +0530 Subject: [PATCH 6/7] cmake: add librdkafka detection and linking for Kafka streaming --- CMakeLists.txt | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/CMakeLists.txt b/CMakeLists.txt index 9564a87..49934c5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -128,6 +128,28 @@ else() message(WARNING "Install onnxruntime to enable ONNX model serving") endif() +# Check for librdkafka (optional, for real Kafka streaming ingestion) +find_package(PkgConfig QUIET) +if(PkgConfig_FOUND) + pkg_check_modules(RDKAFKA QUIET rdkafka) +endif() +if(NOT RDKAFKA_FOUND) + find_library(RDKAFKA_LIBRARIES NAMES rdkafka) + find_path(RDKAFKA_INCLUDE_DIRS NAMES librdkafka/rdkafka.h) + if(RDKAFKA_LIBRARIES AND RDKAFKA_INCLUDE_DIRS) + set(RDKAFKA_FOUND TRUE) + endif() +endif() +if(RDKAFKA_FOUND) + target_compile_definitions(GigaVector PRIVATE HAVE_RDKAFKA) + target_include_directories(GigaVector PRIVATE ${RDKAFKA_INCLUDE_DIRS}) + target_link_libraries(GigaVector PRIVATE ${RDKAFKA_LIBRARIES}) + message(STATUS " Kafka streaming: Enabled (librdkafka found)") +else() + message(STATUS " Kafka streaming: Disabled (librdkafka not found)") + message(WARNING "Install librdkafka-dev to enable real Kafka streaming ingestion") +endif() + # SIMD runtime dispatch — compile distance.c with AVX2/AVX512 flags when the # compiler supports them, without forcing -march=native across the whole build. include(CheckCCompilerFlag) From 6099d4988c185132320007d6aeb7d8b637120382 Mon Sep 17 00:00:00 2001 From: Arrry <2005441@kiit.ac.in> Date: Wed, 29 Apr 2026 23:31:05 +0530 Subject: [PATCH 7/7] remove verbose comments introduced in this branch --- src/admin/cluster.c | 3 --- src/admin/replication.c | 5 ----- src/admin/sso.c | 20 -------------------- 3 files changed, 28 deletions(-) diff --git a/src/admin/cluster.c b/src/admin/cluster.c index bd14761..4330db9 100644 --- a/src/admin/cluster.c +++ b/src/admin/cluster.c @@ -225,9 +225,6 @@ int cluster_start(GV_Cluster *cluster) { local->state = GV_NODE_ACTIVE; } - /* Bootstrap peer list from comma-separated seed_nodes config string. - * Each entry is an address (host:port) used to pre-populate the node - * table so the heartbeat thread can track their health from startup. */ if (cluster->config.seed_nodes && cluster->config.seed_nodes[0] != '\0') { char *seeds = gv_dup_cstr(cluster->config.seed_nodes); if (seeds) { diff --git a/src/admin/replication.c b/src/admin/replication.c index 2c1cd01..a7f9c1c 100644 --- a/src/admin/replication.c +++ b/src/admin/replication.c @@ -179,9 +179,6 @@ static void *replication_thread_func(void *arg) { mgr->leader_id = mgr->node_id ? gv_dup_cstr(mgr->node_id) : NULL; replication_embedded_followers_catch_up_locked(mgr); } - /* If quorum not reachable with in-process replicas alone, stay - * CANDIDATE until more followers are registered or the caller - * invokes replication_request_leadership() directly. */ } pthread_rwlock_unlock(&mgr->rwlock); @@ -347,8 +344,6 @@ int replication_request_leadership(GV_ReplicationManager *mgr) { free(mgr->voted_for); mgr->voted_for = gv_dup_cstr(mgr->node_id); - /* Count self plus any in-process follower DBs as automatic votes. - * Remote-only replicas cannot be contacted without a wire protocol. */ size_t votes = 1; for (size_t i = 0; i < mgr->replica_count; i++) { if (mgr->follower_dbs[i] != NULL) { diff --git a/src/admin/sso.c b/src/admin/sso.c index ffdf44e..089be54 100644 --- a/src/admin/sso.c +++ b/src/admin/sso.c @@ -1075,7 +1075,6 @@ static GV_SSOToken *decode_jwt_claims(const char *jwt) { * Extract text content between and from XML. * Very basic: finds the first occurrence and extracts inner text. */ -/* Decode the five predefined XML/HTML entities in-place (output <= input). */ static void xml_decode_entities(char *s) { if (!s) return; char *r = s, *w = s; @@ -1133,24 +1132,19 @@ static int xml_extract_text(const char *xml, const char *tag, return 0; } -/* Extract the text content of the first AttributeValue child of the Attribute - * element that contains Name="attr_name" (exact match, case-sensitive). */ static int xml_extract_attribute_value(const char *xml, const char *attr_name, char *out, size_t out_size) { if (!xml || !attr_name || !out || out_size == 0) return -1; - /* Build exact-match search: Name="" */ char needle[512]; snprintf(needle, sizeof(needle), "Name=\"%s\"", attr_name); const char *attr_elem = strstr(xml, needle); if (!attr_elem) return -1; - /* Find the end of this Attribute element */ const char *attr_end = strstr(attr_elem, "= attr_end) val = strstr(attr_elem, "subject = gv_dup_cstr(buf); } - /* Extract common attributes. xml_extract_attribute_value matches - * Name="" exactly and decodes XML entities in the result. */ - /* Email */ if (xml_extract_attribute_value(xml, "email", buf, sizeof(buf)) == 0 || xml_extract_attribute_value(xml, @@ -1258,8 +1241,6 @@ static GV_SSOToken *parse_saml_assertion(const char *b64_assertion) { token->name = gv_dup_cstr(buf); } - /* Multi-value attribute helper: extract all AttributeValue children of the - * Attribute element whose Name= matches attr_name exactly. */ #define EXTRACT_MULTI(attr_name_str, dest_arr, dest_count) do { \ char needle_m[512]; \ snprintf(needle_m, sizeof(needle_m), "Name=\"%s\"", (attr_name_str)); \ @@ -1300,7 +1281,6 @@ static GV_SSOToken *parse_saml_assertion(const char *b64_assertion) { } \ } while (0) - /* Groups */ if (!token->groups) { EXTRACT_MULTI("groups", token->groups, token->group_count); }