diff --git a/CMakeLists.txt b/CMakeLists.txt
index 9564a87..49934c5 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -128,6 +128,28 @@ else()
message(WARNING "Install onnxruntime to enable ONNX model serving")
endif()
+# Check for librdkafka (optional, for real Kafka streaming ingestion)
+find_package(PkgConfig QUIET)
+if(PkgConfig_FOUND)
+ pkg_check_modules(RDKAFKA QUIET rdkafka)
+endif()
+if(NOT RDKAFKA_FOUND)
+ find_library(RDKAFKA_LIBRARIES NAMES rdkafka)
+ find_path(RDKAFKA_INCLUDE_DIRS NAMES librdkafka/rdkafka.h)
+ if(RDKAFKA_LIBRARIES AND RDKAFKA_INCLUDE_DIRS)
+ set(RDKAFKA_FOUND TRUE)
+ endif()
+endif()
+if(RDKAFKA_FOUND)
+ target_compile_definitions(GigaVector PRIVATE HAVE_RDKAFKA)
+ target_include_directories(GigaVector PRIVATE ${RDKAFKA_INCLUDE_DIRS})
+ target_link_libraries(GigaVector PRIVATE ${RDKAFKA_LIBRARIES})
+ message(STATUS " Kafka streaming: Enabled (librdkafka found)")
+else()
+ message(STATUS " Kafka streaming: Disabled (librdkafka not found)")
+ message(WARNING "Install librdkafka-dev to enable real Kafka streaming ingestion")
+endif()
+
# SIMD runtime dispatch — compile distance.c with AVX2/AVX512 flags when the
# compiler supports them, without forcing -march=native across the whole build.
include(CheckCCompilerFlag)
diff --git a/src/admin/cluster.c b/src/admin/cluster.c
index 91e65a7..4330db9 100644
--- a/src/admin/cluster.c
+++ b/src/admin/cluster.c
@@ -225,6 +225,49 @@ int cluster_start(GV_Cluster *cluster) {
local->state = GV_NODE_ACTIVE;
}
+ if (cluster->config.seed_nodes && cluster->config.seed_nodes[0] != '\0') {
+ char *seeds = gv_dup_cstr(cluster->config.seed_nodes);
+ if (seeds) {
+ char *saveptr = NULL;
+ char *tok = strtok_r(seeds, ",", &saveptr);
+ while (tok) {
+ while (*tok == ' ' || *tok == '\t') tok++;
+ char *end = tok + strlen(tok) - 1;
+ while (end > tok && (*end == ' ' || *end == '\t')) {
+ *end-- = '\0';
+ }
+ if (*tok != '\0' && cluster->node_count < MAX_NODES) {
+ char seed_id[128];
+ snprintf(seed_id, sizeof(seed_id), "seed-%.122s", tok);
+ for (char *p = seed_id + 5; *p; p++) {
+ if (*p == ':' || *p == '.') *p = '_';
+ }
+ if (!find_node(cluster, seed_id)) {
+ char *nid = gv_dup_cstr(seed_id);
+ char *addr = gv_dup_cstr(tok);
+ if (nid && addr) {
+ NodeEntry *entry = &cluster->nodes[cluster->node_count];
+ entry->node_id = nid;
+ entry->address = addr;
+ entry->role = GV_NODE_DATA;
+ entry->state = GV_NODE_JOINING;
+ entry->shard_ids = NULL;
+ entry->shard_count = 0;
+ entry->last_heartbeat = (uint64_t)time(NULL);
+ entry->load = 0.0;
+ cluster->node_count++;
+ } else {
+ free(nid);
+ free(addr);
+ }
+ }
+ }
+ tok = strtok_r(NULL, ",", &saveptr);
+ }
+ free(seeds);
+ }
+ }
+
pthread_rwlock_unlock(&cluster->rwlock);
/* Start heartbeat thread */
@@ -235,11 +278,6 @@ int cluster_start(GV_Cluster *cluster) {
return -1;
}
- /* Note: Seed-node discovery and the RPC listener are not yet
- * implemented. The cluster currently operates in single-node mode
- * with all management (shard assignment, node health) done locally.
- * A future version will add TCP-based gossip / RPC here. */
-
/* Mark cluster ready */
pthread_mutex_lock(&cluster->state_mutex);
cluster->is_ready = 1;
diff --git a/src/admin/replication.c b/src/admin/replication.c
index 358dcec..a7f9c1c 100644
--- a/src/admin/replication.c
+++ b/src/admin/replication.c
@@ -158,16 +158,27 @@ static void *replication_thread_func(void *arg) {
mgr->leader_id = NULL;
free(mgr->voted_for);
mgr->voted_for = mgr->node_id ? gv_dup_cstr(mgr->node_id) : NULL;
-
- /* Note: In a real implementation, this would:
- * 1. Request votes from all known replicas
- * 2. Wait for majority response
- * 3. If majority votes received, become leader
- * 4. If another leader is discovered, become follower
- * 5. If election times out, restart election with new term */
}
}
}
+ } else if (mgr->role == GV_REPL_CANDIDATE) {
+ /* Count in-process registered follower DBs as automatic votes: they
+ * are co-located and can trivially accept this node as leader.
+ * Remote-only replicas (follower_dbs[i] == NULL) cannot be asked
+ * without a wire protocol, so they are not counted. */
+ size_t votes = 1; /* self-vote */
+ for (size_t i = 0; i < mgr->replica_count; i++) {
+ if (mgr->follower_dbs[i] != NULL) {
+ votes++;
+ }
+ }
+ size_t quorum = (mgr->replica_count + 1) / 2 + 1;
+ if (votes >= quorum) {
+ mgr->role = GV_REPL_LEADER;
+ free(mgr->leader_id);
+ mgr->leader_id = mgr->node_id ? gv_dup_cstr(mgr->node_id) : NULL;
+ replication_embedded_followers_catch_up_locked(mgr);
+ }
}
pthread_rwlock_unlock(&mgr->rwlock);
@@ -333,22 +344,25 @@ int replication_request_leadership(GV_ReplicationManager *mgr) {
free(mgr->voted_for);
mgr->voted_for = gv_dup_cstr(mgr->node_id);
- /* In a real implementation, we would:
- * 1. Send RequestVote to all replicas
- * 2. Wait for majority
- * 3. Become leader if we win
- */
-
- if (mgr->replica_count == 0) {
+ size_t votes = 1;
+ for (size_t i = 0; i < mgr->replica_count; i++) {
+ if (mgr->follower_dbs[i] != NULL) {
+ votes++;
+ }
+ }
+ size_t quorum = (mgr->replica_count + 1) / 2 + 1;
+ if (votes >= quorum) {
mgr->role = GV_REPL_LEADER;
free(mgr->leader_id);
mgr->leader_id = gv_dup_cstr(mgr->node_id);
+ replication_embedded_followers_catch_up_locked(mgr);
}
+ GV_ReplicationRole result_role = mgr->role;
pthread_rwlock_unlock(&mgr->rwlock);
pthread_mutex_unlock(&mgr->election_mutex);
- return mgr->role == GV_REPL_LEADER ? 0 : -1;
+ return result_role == GV_REPL_LEADER ? 0 : -1;
}
int replication_add_follower(GV_ReplicationManager *mgr, const char *node_id,
diff --git a/src/admin/sso.c b/src/admin/sso.c
index 45f15f1..089be54 100644
--- a/src/admin/sso.c
+++ b/src/admin/sso.c
@@ -1075,6 +1075,24 @@ static GV_SSOToken *decode_jwt_claims(const char *jwt) {
* Extract text content between and from XML.
* Very basic: finds the first occurrence and extracts inner text.
*/
+static void xml_decode_entities(char *s) {
+ if (!s) return;
+ char *r = s, *w = s;
+ while (*r) {
+ if (*r == '&') {
+ if (strncmp(r, "&", 5) == 0) { *w++ = '&'; r += 5; }
+ else if (strncmp(r, "<", 4) == 0) { *w++ = '<'; r += 4; }
+ else if (strncmp(r, ">", 4) == 0) { *w++ = '>'; r += 4; }
+ else if (strncmp(r, """, 6) == 0) { *w++ = '"'; r += 6; }
+ else if (strncmp(r, "'", 6) == 0) { *w++ = '\''; r += 6; }
+ else { *w++ = *r++; }
+ } else {
+ *w++ = *r++;
+ }
+ }
+ *w = '\0';
+}
+
static int xml_extract_text(const char *xml, const char *tag,
char *out, size_t out_size) {
if (!xml || !tag || !out || out_size == 0) return -1;
@@ -1088,6 +1106,14 @@ static int xml_extract_text(const char *xml, const char *tag,
const char *start = strstr(xml, open_tag);
if (!start) return -1;
+ /* The char after the tag name must be '>', '/' or whitespace to avoid
+ * matching a longer tag like when searching for . */
+ const char *after = start + strlen(open_tag);
+ if (*after != '>' && *after != '/' && *after != ' ' && *after != '\t' &&
+ *after != '\r' && *after != '\n') {
+ return -1;
+ }
+
/* Skip to end of opening tag (past the '>' character) */
const char *gt = strchr(start, '>');
if (!gt) return -1;
@@ -1101,15 +1127,44 @@ static int xml_extract_text(const char *xml, const char *tag,
memcpy(out, gt, len);
out[len] = '\0';
+ xml_decode_entities(out);
return 0;
}
-/**
- * Parse a base64-encoded SAML assertion and extract identity claims.
- * This is a basic stub: decodes the base64, then uses simple string
- * matching to find NameID and attribute values. No full XML parser.
- */
+static int xml_extract_attribute_value(const char *xml, const char *attr_name,
+ char *out, size_t out_size) {
+ if (!xml || !attr_name || !out || out_size == 0) return -1;
+
+ char needle[512];
+ snprintf(needle, sizeof(needle), "Name=\"%s\"", attr_name);
+ const char *attr_elem = strstr(xml, needle);
+ if (!attr_elem) return -1;
+
+ const char *attr_end = strstr(attr_elem, "= attr_end)
+ val = strstr(attr_elem, "= attr_end) return -1;
+
+ const char *gt = strchr(val, '>');
+ if (!gt || gt >= attr_end) return -1;
+ gt++;
+
+ const char *end = strchr(gt, '<');
+ if (!end || end >= attr_end) return -1;
+
+ size_t len = (size_t)(end - gt);
+ if (len >= out_size) len = out_size - 1;
+ memcpy(out, gt, len);
+ out[len] = '\0';
+ xml_decode_entities(out);
+ return 0;
+}
+
static GV_SSOToken *parse_saml_assertion(const char *b64_assertion) {
if (!b64_assertion) return NULL;
@@ -1159,121 +1214,86 @@ static GV_SSOToken *parse_saml_assertion(const char *b64_assertion) {
char buf[512];
- /* Extract NameID */
if (xml_extract_text(xml, "saml:NameID", buf, sizeof(buf)) == 0 ||
+ xml_extract_text(xml, "saml2:NameID", buf, sizeof(buf)) == 0 ||
xml_extract_text(xml, "NameID", buf, sizeof(buf)) == 0) {
token->subject = gv_dup_cstr(buf);
}
- /* Extract common attributes by searching for AttributeValue elements.
- * SAML attributes follow the pattern:
- * ...
- */
-
/* Email */
- const char *email_attr = strstr(xml, "Name=\"email\"");
- if (!email_attr) email_attr = strstr(xml, "Name=\"http://schemas.xmlsoap.org/ws/2005/05/identity/claims/emailaddress\"");
- if (email_attr) {
- const char *val_start = strstr(email_attr, "');
- if (gt) {
- gt++;
- const char *end = strchr(gt, '<');
- if (end) {
- size_t len = (size_t)(end - gt);
- if (len < sizeof(buf)) {
- memcpy(buf, gt, len);
- buf[len] = '\0';
- token->email = gv_dup_cstr(buf);
- }
- }
- }
- }
+ if (xml_extract_attribute_value(xml, "email", buf, sizeof(buf)) == 0 ||
+ xml_extract_attribute_value(xml,
+ "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/emailaddress",
+ buf, sizeof(buf)) == 0 ||
+ xml_extract_attribute_value(xml,
+ "urn:oid:0.9.2342.19200300.100.1.3", buf, sizeof(buf)) == 0) {
+ token->email = gv_dup_cstr(buf);
}
/* Display name */
- const char *name_attr = strstr(xml, "Name=\"name\"");
- if (!name_attr) name_attr = strstr(xml, "Name=\"http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name\"");
- if (name_attr) {
- const char *val_start = strstr(name_attr, "');
- if (gt) {
- gt++;
- const char *end = strchr(gt, '<');
- if (end) {
- size_t len = (size_t)(end - gt);
- if (len < sizeof(buf)) {
- memcpy(buf, gt, len);
- buf[len] = '\0';
- token->name = gv_dup_cstr(buf);
- }
- }
- }
- }
+ if (xml_extract_attribute_value(xml, "name", buf, sizeof(buf)) == 0 ||
+ xml_extract_attribute_value(xml, "displayName", buf, sizeof(buf)) == 0 ||
+ xml_extract_attribute_value(xml,
+ "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name",
+ buf, sizeof(buf)) == 0 ||
+ xml_extract_attribute_value(xml,
+ "urn:oid:2.16.840.1.113730.3.1.241", buf, sizeof(buf)) == 0) {
+ token->name = gv_dup_cstr(buf);
}
- /* Groups - may appear as multiple AttributeValue elements under one Attribute */
- const char *groups_attr = strstr(xml, "Name=\"groups\"");
- if (!groups_attr) groups_attr = strstr(xml, "Name=\"http://schemas.xmlsoap.org/claims/Group\"");
- if (groups_attr) {
- /* Count group values */
- const char *search = groups_attr;
- size_t count = 0;
-
- /* Find the closing to bound our search */
- const char *attr_end = strstr(search, "= attr_end) break;
-
- count++;
- p = vs + 1;
- }
-
- if (count > MAX_GROUPS) count = MAX_GROUPS;
-
- if (count > 0) {
- token->groups = calloc(count, sizeof(char *));
- if (token->groups) {
- /* Second pass: extract values */
- p = search;
- size_t idx = 0;
- while (p < attr_end && idx < count) {
- const char *vs = strstr(p, "= attr_end) break;
-
- const char *gt = strchr(vs, '>');
- if (!gt || gt >= attr_end) break;
- gt++;
-
- const char *end = strchr(gt, '<');
- if (!end || end >= attr_end) break;
-
- size_t len = (size_t)(end - gt);
- if (len < sizeof(buf)) {
- memcpy(buf, gt, len);
- buf[len] = '\0';
- token->groups[idx] = gv_dup_cstr(buf);
- idx++;
- }
-
- p = end;
- }
- token->group_count = idx;
- }
- }
+#define EXTRACT_MULTI(attr_name_str, dest_arr, dest_count) do { \
+ char needle_m[512]; \
+ snprintf(needle_m, sizeof(needle_m), "Name=\"%s\"", (attr_name_str)); \
+ const char *ae = strstr(xml, needle_m); \
+ if (ae) { \
+ const char *ae_end = strstr(ae, "= ae_end) break; cnt++; pp = vs + 1; \
+ } \
+ if (cnt > MAX_GROUPS) cnt = MAX_GROUPS; \
+ if (cnt > 0) { \
+ (dest_arr) = calloc(cnt, sizeof(char *)); \
+ if ((dest_arr)) { \
+ pp = ae; size_t ix = 0; \
+ while (pp < ae_end && ix < cnt) { \
+ const char *vs = strstr(pp, "= ae_end) break; \
+ const char *gtp = strchr(vs, '>'); \
+ if (!gtp || gtp >= ae_end) break; gtp++; \
+ const char *ep = strchr(gtp, '<'); \
+ if (!ep || ep >= ae_end) break; \
+ size_t vl = (size_t)(ep - gtp); \
+ if (vl < sizeof(buf)) { \
+ memcpy(buf, gtp, vl); buf[vl] = '\0'; \
+ xml_decode_entities(buf); \
+ (dest_arr)[ix++] = gv_dup_cstr(buf); \
+ } \
+ pp = ep; \
+ } \
+ (dest_count) = ix; \
+ } \
+ } \
+ } \
+} while (0)
+
+ if (!token->groups) {
+ EXTRACT_MULTI("groups", token->groups, token->group_count);
+ }
+ if (!token->groups) {
+ EXTRACT_MULTI("http://schemas.xmlsoap.org/claims/Group",
+ token->groups, token->group_count);
}
+ if (!token->groups) {
+ EXTRACT_MULTI("urn:oid:1.3.6.1.4.1.5923.1.5.1.1",
+ token->groups, token->group_count);
+ }
+
+#undef EXTRACT_MULTI
/* Extract timestamps if present in Conditions element */
const char *conditions = strstr(xml, "
+#endif
+
#define GV_STREAM_STACK_VEC_CAP 4096
/* Internal Structures */
@@ -44,6 +46,11 @@ struct GV_StreamConsumer {
/* Offset tracking */
int64_t committed_offset;
+#ifdef HAVE_RDKAFKA
+ rd_kafka_t *rk;
+ rd_kafka_topic_partition_list_t *rk_topics;
+#endif
+
/* Threading */
pthread_t consumer_thread;
int thread_running;
@@ -196,6 +203,107 @@ static void stream_process_embedded_batch(GV_StreamConsumer *consumer) {
}
}
+#ifdef HAVE_RDKAFKA
+/* Process one librdkafka poll batch. Caller holds consumer->mutex. */
+static void stream_process_kafka_batch(GV_StreamConsumer *consumer) {
+ GV_Database *db = consumer->db;
+ if (!db || !consumer->rk) {
+ consumer->state = GV_STREAM_ERROR;
+ return;
+ }
+
+ size_t dimension = database_dimension(db);
+ if (dimension == 0) {
+ consumer->state = GV_STREAM_ERROR;
+ return;
+ }
+
+ GV_VectorExtractor extract = consumer->extractor ? consumer->extractor : default_extractor;
+ void *ext_ud = consumer->extractor_user_data;
+ GV_StreamMessageHandler handler = consumer->handler;
+ void *hand_ud = consumer->handler_user_data;
+
+ float stack_vec[GV_STREAM_STACK_VEC_CAP];
+ float *heap_vec = NULL;
+ float *vec = stack_vec;
+ if (dimension > GV_STREAM_STACK_VEC_CAP) {
+ heap_vec = malloc(dimension * sizeof(float));
+ if (!heap_vec) { consumer->state = GV_STREAM_ERROR; return; }
+ vec = heap_vec;
+ }
+
+ int timeout_ms = (int)consumer->config.batch_timeout_ms;
+ size_t batch = consumer->config.batch_size;
+ if (batch == 0) batch = 1;
+
+ for (size_t i = 0; i < batch && !consumer->stop_requested; i++) {
+ rd_kafka_message_t *rkmsg = rd_kafka_consumer_poll(consumer->rk, timeout_ms);
+ if (!rkmsg) break;
+
+ if (rkmsg->err) {
+ if (rkmsg->err != RD_KAFKA_RESP_ERR__PARTITION_EOF) {
+ consumer->stats.messages_failed++;
+ }
+ rd_kafka_message_destroy(rkmsg);
+ continue;
+ }
+
+ GV_StreamMessage msg;
+ memset(&msg, 0, sizeof(msg));
+ msg.value = rkmsg->payload;
+ msg.value_len = rkmsg->len;
+ msg.offset = (int64_t)rkmsg->offset;
+ msg.timestamp = (int64_t)(rd_kafka_message_timestamp(rkmsg, NULL) / 1000);
+ msg.partition = (int)rkmsg->partition;
+ if (rkmsg->key)
+ msg.key = (const char *)rkmsg->key;
+
+ consumer->stats.messages_received++;
+ consumer->stats.bytes_received += rkmsg->len;
+ consumer->stats.current_offset = msg.offset + 1;
+
+ if (handler && handler(&msg, hand_ud) != 0) {
+ consumer->stats.messages_failed++;
+ rd_kafka_message_destroy(rkmsg);
+ continue;
+ }
+
+ char **mkeys = NULL, **mvals = NULL;
+ size_t mcount = 0;
+ if (extract(&msg, vec, dimension,
+ (char ***)&mkeys, (char ***)&mvals, &mcount, ext_ud) != 0) {
+ consumer->stats.messages_failed++;
+ rd_kafka_message_destroy(rkmsg);
+ continue;
+ }
+
+ int added = (mcount > 0)
+ ? db_add_vector_with_rich_metadata(db, vec, dimension,
+ (const char **)mkeys,
+ (const char **)mvals, mcount)
+ : db_add_vector(db, vec, dimension);
+
+ if (mkeys) { for (size_t m = 0; m < mcount; m++) { free(mkeys[m]); free(mvals[m]); } free(mkeys); free(mvals); }
+
+ if (added != 0) {
+ consumer->stats.messages_failed++;
+ } else {
+ consumer->stats.messages_processed++;
+ consumer->stats.vectors_ingested++;
+ }
+
+ rd_kafka_message_destroy(rkmsg);
+ }
+
+ if (consumer->config.auto_commit && consumer->rk) {
+ rd_kafka_commit(consumer->rk, NULL, 0 /* sync */);
+ consumer->committed_offset = consumer->stats.current_offset;
+ }
+
+ free(heap_vec);
+}
+#endif /* HAVE_RDKAFKA */
+
/* Consumer Thread */
static void *consumer_thread_func(void *arg) {
@@ -221,7 +329,15 @@ static void *consumer_thread_func(void *arg) {
if (consumer->stop_requested) break;
pthread_mutex_lock(&consumer->mutex);
+#ifdef HAVE_RDKAFKA
+ if (consumer->rk) {
+ stream_process_kafka_batch(consumer);
+ } else {
+ stream_process_embedded_batch(consumer);
+ }
+#else
stream_process_embedded_batch(consumer);
+#endif
/* Interruptible sleep: wake early on pause/stop signal */
if (!consumer->stop_requested && !consumer->pause_requested) {
@@ -271,6 +387,74 @@ GV_StreamConsumer *stream_create(GV_Database *db, const GV_StreamConfig *config)
return NULL;
}
+#ifdef HAVE_RDKAFKA
+ if (consumer->config.source == GV_STREAM_KAFKA &&
+ consumer->config.kafka.brokers && consumer->config.kafka.topic) {
+ char errstr[512];
+ rd_kafka_conf_t *conf = rd_kafka_conf_new();
+
+ rd_kafka_conf_set(conf, "bootstrap.servers",
+ consumer->config.kafka.brokers, errstr, sizeof(errstr));
+
+ if (consumer->config.kafka.consumer_group) {
+ rd_kafka_conf_set(conf, "group.id",
+ consumer->config.kafka.consumer_group,
+ errstr, sizeof(errstr));
+ }
+
+ if (consumer->config.kafka.security_protocol) {
+ rd_kafka_conf_set(conf, "security.protocol",
+ consumer->config.kafka.security_protocol,
+ errstr, sizeof(errstr));
+ }
+ if (consumer->config.kafka.sasl_mechanism) {
+ rd_kafka_conf_set(conf, "sasl.mechanism",
+ consumer->config.kafka.sasl_mechanism,
+ errstr, sizeof(errstr));
+ }
+ if (consumer->config.kafka.sasl_username) {
+ rd_kafka_conf_set(conf, "sasl.username",
+ consumer->config.kafka.sasl_username,
+ errstr, sizeof(errstr));
+ }
+ if (consumer->config.kafka.sasl_password) {
+ rd_kafka_conf_set(conf, "sasl.password",
+ consumer->config.kafka.sasl_password,
+ errstr, sizeof(errstr));
+ }
+
+ consumer->rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
+ if (!consumer->rk) {
+ rd_kafka_conf_destroy(conf);
+ /* Fall back to synthetic mode rather than failing hard */
+ } else {
+ rd_kafka_poll_set_consumer(consumer->rk);
+
+ consumer->rk_topics = rd_kafka_topic_partition_list_new(1);
+ int partition = consumer->config.kafka.partition >= 0
+ ? consumer->config.kafka.partition
+ : RD_KAFKA_PARTITION_UA;
+ rd_kafka_topic_partition_list_add(consumer->rk_topics,
+ consumer->config.kafka.topic,
+ partition);
+
+ int64_t start_offset = consumer->config.kafka.start_offset < 0
+ ? RD_KAFKA_OFFSET_END
+ : consumer->config.kafka.start_offset;
+ rd_kafka_topic_partition_list_set_offset(consumer->rk_topics,
+ consumer->config.kafka.topic,
+ partition, start_offset);
+
+ if (rd_kafka_subscribe(consumer->rk, consumer->rk_topics) != RD_KAFKA_RESP_ERR_NO_ERROR) {
+ rd_kafka_topic_partition_list_destroy(consumer->rk_topics);
+ rd_kafka_destroy(consumer->rk);
+ consumer->rk = NULL;
+ consumer->rk_topics = NULL;
+ }
+ }
+ }
+#endif /* HAVE_RDKAFKA */
+
return consumer;
}
@@ -279,6 +463,18 @@ void stream_destroy(GV_StreamConsumer *consumer) {
stream_stop(consumer);
+#ifdef HAVE_RDKAFKA
+ if (consumer->rk) {
+ rd_kafka_consumer_close(consumer->rk);
+ if (consumer->rk_topics) {
+ rd_kafka_topic_partition_list_destroy(consumer->rk_topics);
+ consumer->rk_topics = NULL;
+ }
+ rd_kafka_destroy(consumer->rk);
+ consumer->rk = NULL;
+ }
+#endif
+
pthread_cond_destroy(&consumer->state_cond);
pthread_mutex_destroy(&consumer->mutex);
free(consumer);
@@ -434,10 +630,17 @@ int stream_commit(GV_StreamConsumer *consumer) {
pthread_mutex_lock(&consumer->mutex);
consumer->committed_offset = consumer->stats.current_offset;
+#ifdef HAVE_RDKAFKA
+ rd_kafka_t *rk = consumer->rk;
+#endif
pthread_mutex_unlock(&consumer->mutex);
- /* Without a broker, commit only updates the local offset mirror; with
- * librdkafka this would call rd_kafka_commit(). */
+#ifdef HAVE_RDKAFKA
+ if (rk) {
+ rd_kafka_resp_err_t err = rd_kafka_commit(rk, NULL, 0 /* sync */);
+ return (err == RD_KAFKA_RESP_ERR_NO_ERROR) ? 0 : -1;
+ }
+#endif
return 0;
}
diff --git a/src/specialized/gpu.c b/src/specialized/gpu.c
index 9e1e8f0..b7c3f10 100644
--- a/src/specialized/gpu.c
+++ b/src/specialized/gpu.c
@@ -16,21 +16,21 @@
/* Check for CUDA availability at compile time */
#ifdef HAVE_CUDA
-extern int cuda_available(void);
-extern int cuda_device_count(void);
-extern int cuda_get_device_info(int device_id, GV_GPUDeviceInfo *info);
-extern GV_GPUContext *cuda_create(const GV_GPUConfig *config);
-extern void cuda_destroy(GV_GPUContext *ctx);
-extern int cuda_synchronize(GV_GPUContext *ctx);
-extern int cuda_compute_distances(GV_GPUContext *ctx, const float *queries,
- size_t num_queries, const float *database,
- size_t num_vectors, size_t dimension,
- GV_GPUDistanceMetric metric, float *distances);
-extern int cuda_knn_search(GV_GPUContext *ctx, const float *queries,
- size_t num_queries, const float *database,
- size_t num_vectors, size_t dimension,
- const GV_GPUSearchParams *params,
- size_t *indices, float *distances);
+extern int gv_cuda_available(void);
+extern int gv_cuda_device_count(void);
+extern int gv_cuda_get_device_info(int device_id, GV_GPUDeviceInfo *info);
+extern GV_GPUContext *gv_cuda_create(const GV_GPUConfig *config);
+extern void gv_cuda_destroy(GV_GPUContext *ctx);
+extern int gv_cuda_synchronize(GV_GPUContext *ctx);
+extern int gv_cuda_compute_distances(GV_GPUContext *ctx, const float *queries,
+ size_t num_queries, const float *database,
+ size_t num_vectors, size_t dimension,
+ GV_GPUDistanceMetric metric, float *distances);
+extern int gv_cuda_knn_search(GV_GPUContext *ctx, const float *queries,
+ size_t num_queries, const float *database,
+ size_t num_vectors, size_t dimension,
+ const GV_GPUSearchParams *params,
+ size_t *indices, float *distances);
#endif
/* Internal Structures */
@@ -85,7 +85,7 @@ void gpu_config_init(GV_GPUConfig *config) {
int gpu_available(void) {
#ifdef HAVE_CUDA
- return cuda_available();
+ return gv_cuda_available();
#else
return 0; /* CPU fallback always available, but no GPU */
#endif
@@ -93,7 +93,7 @@ int gpu_available(void) {
int gpu_device_count(void) {
#ifdef HAVE_CUDA
- return cuda_device_count();
+ return gv_cuda_device_count();
#else
return 0;
#endif
@@ -103,7 +103,7 @@ int gpu_get_device_info(int device_id, GV_GPUDeviceInfo *info) {
if (!info) return -1;
#ifdef HAVE_CUDA
- return cuda_get_device_info(device_id, info);
+ return gv_cuda_get_device_info(device_id, info);
#else
(void)device_id;
/* Return CPU "device" info for fallback */
@@ -123,10 +123,10 @@ GV_GPUContext *gpu_create(const GV_GPUConfig *config) {
ctx->config = config ? *config : DEFAULT_CONFIG;
#ifdef HAVE_CUDA
- ctx->cuda_available = cuda_available();
+ ctx->cuda_available = gv_cuda_available();
if (ctx->cuda_available) {
/* Initialize CUDA context */
- GV_GPUContext *cuda_ctx = cuda_create(&ctx->config);
+ GV_GPUContext *cuda_ctx = gv_cuda_create(&ctx->config);
if (cuda_ctx) {
ctx->cuda_context = cuda_ctx->cuda_context;
ctx->cuda_streams = cuda_ctx->cuda_streams;
@@ -150,7 +150,7 @@ void gpu_destroy(GV_GPUContext *ctx) {
#ifdef HAVE_CUDA
if (ctx->cuda_available) {
- cuda_destroy(ctx);
+ gv_cuda_destroy(ctx);
}
#endif
@@ -162,7 +162,7 @@ int gpu_synchronize(GV_GPUContext *ctx) {
#ifdef HAVE_CUDA
if (ctx->cuda_available) {
- return cuda_synchronize(ctx);
+ return gv_cuda_synchronize(ctx);
}
#endif
@@ -462,7 +462,7 @@ int gpu_compute_distances(GV_GPUContext *ctx, const float *queries,
#ifdef HAVE_CUDA
if (ctx->cuda_available) {
- return cuda_compute_distances(ctx, queries, num_queries, database,
+ return gv_cuda_compute_distances(ctx, queries, num_queries, database,
num_vectors, dimension, metric, distances);
}
#endif
@@ -567,7 +567,7 @@ int gpu_knn_search(GV_GPUContext *ctx, const float *queries,
#ifdef HAVE_CUDA
if (ctx->cuda_available) {
- return cuda_knn_search(ctx, queries, num_queries, database,
+ return gv_cuda_knn_search(ctx, queries, num_queries, database,
num_vectors, dimension, params,
indices, distances);
}
@@ -771,7 +771,7 @@ int gpu_train_ivfpq(GV_GPUContext *ctx, const float *vectors,
/* Use GPU to compute all pairwise distances */
GV_GPUSearchParams params = {0};
params.metric = GV_GPU_METRIC_EUCLIDEAN;
- cuda_compute_distances(ctx, d_vectors, num_vectors,
+ gv_cuda_compute_distances(ctx, d_vectors, num_vectors,
d_centroids, num_centroids,
dimension, params.metric, d_distances);
cudaMemcpy(host_distances, d_distances, dist_size, cudaMemcpyDeviceToHost);