From 701e882287e2d5729cc51df2031507c440864717 Mon Sep 17 00:00:00 2001 From: l4b4r4b4b4 Date: Mon, 18 Aug 2025 18:16:24 +0200 Subject: [PATCH 1/6] compose deployment init --- .env.example | 11 +++ .gitignore | 3 +- Dockerfile | 27 ++++++++ README.md | 27 +++++++- docker-compose.yml | 165 +++++++++++++++++++++++++++++++++++++++++++++ main_docker.py | 101 +++++++++++++++++++++++++++ 6 files changed, 332 insertions(+), 2 deletions(-) create mode 100644 .env.example create mode 100644 Dockerfile create mode 100644 docker-compose.yml create mode 100644 main_docker.py diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..1d9da38 --- /dev/null +++ b/.env.example @@ -0,0 +1,11 @@ +# Optional +# LLM= +# EMB_MODEL= +HUGGINGFACEHUB_API_TOKEN= +LLM_MAX_CONTEXT_LENGTH=32768 +LLM_SWAP_SPACE=16 +LLM_CPU_OFFLOAD_SPACE=8 +DATASET=cinderella +OUT_DIR=result/cinderella_vllm +SAVE_DIR=outputs/cinderella_vllm +OPENAI_API_KEY=DUMMY_KEY diff --git a/.gitignore b/.gitignore index 89011af..72a7d15 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +.env # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] @@ -89,4 +90,4 @@ Thumbs.db # Other *.swp *.bak -*.tmp \ No newline at end of file +*.tmp diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..c4122f7 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,27 @@ +FROM pytorch/pytorch:2.6.0-cuda12.4-cudnn9-runtime + +WORKDIR /app + +# Install essential OS-level dependencies +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Install requirements +COPY . . +RUN pip install -r requirements.txt + +# Set environment variables +ENV PYTHONPATH=/app +ENV PYTHONUNBUFFERED=1 +ENV PORT=7373 +ENV HOSTNAME=0.0.0.0 + +# Set CUDA-related environment variables +ENV NVIDIA_VISIBLE_DEVICES=all +ENV NVIDIA_DRIVER_CAPABILITIES=compute,utility,video +ENV HF_HUB_ENABLE_HF_TRANSFER=1 + +# Default command - will be overridden by specific environment Dockerfiles +CMD ["python", "main_docker.py"] diff --git a/README.md b/README.md index 06378b3..5e83790 100644 --- a/README.md +++ b/README.md @@ -210,6 +210,31 @@ netstat -tlnp | grep 8000 curl http://localhost:8000/v1/models ``` +### Method 3: Using Local docker compose deployment (main_docker.py) โšก +This method deploys everything needed locally. Namely: +1. vLLM openai server for language model inference +2. ๐Ÿค— HugginFace's Text Embeddings Inference +3. como-app + +#### Requirements +- docker +- nvidia device plugin + +#### 1. Configure inference services ๐Ÿ“ + +```bash +cp .env.example .env +``` +After cpoying the example environment file adjust environment variables as wanted. + +#### 2. Spin-up docker compose deployment + +```bash +docker compose up -d && docker compose logs -f +``` + + + ### Comparison of Two Methods ๐Ÿ“Š | Feature | OpenAI API (main.py) | vLLM Local (main_vllm.py) | @@ -264,4 +289,4 @@ For questions or suggestions, feel free to submit an Issue or PR. --- ## Acknowledgement ๐Ÿ™ -We refer to the repository of [HippoRAG](https://github.com/OSU-NLP-Group/HippoRAG) as a skeleton code. \ No newline at end of file +We refer to the repository of [HippoRAG](https://github.com/OSU-NLP-Group/HippoRAG) as a skeleton code. diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..c20dcf0 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,165 @@ +volumes: + hf_cache: + +services: + como-app: + build: . + # image: nvidia/cuda:11.8.0-base-ubuntu22.04 + # The key part for CDI GPU access: + device_cgroup_rules: + - "c 195:* rmw" + - "c 236:* rmw" + devices: + - nvidia.com/gpu=all + env_file: .env + ipc: host + depends_on: + embeddings: + condition: service_healthy + vllm: + condition: service_healthy + + embeddings: + deploy: + replicas: 1 + image: ghcr.io/huggingface/text-embeddings-inference:86-1.8 + volumes: + - hf_cache:/.hf_cache + ports: + - 8011:8080 + ipc: host + container_name: embeddings + # Replace runtime with devices for CDI + devices: + - nvidia.com/gpu=all + device_cgroup_rules: + - "c 195:* rmw" + - "c 236:* rmw" + environment: + - NVIDIA_VISIBLE_DEVICES=all + - NVIDIA_DRIVER_CAPABILITIES=compute,utility + - USE_FLASH_ATTENTION=True + - HF_HUB_ENABLE_HF_TRANSFER=1 + - HF_HOME=/.hf_cache + - RUST_LOG=info + # Performance tuning for embedding models + - OMP_NUM_THREADS=8 + - MKL_NUM_THREADS=8 + - TOKENIZERS_PARALLELISM=true + restart: no + env_file: .env + command: + [ + "--model-id", + "${EMB_MODEL:-nomic-ai/nomic-embed-text-v1.5}", + "--hostname", + "0.0.0.0", + "--port", + "8080", + "--huggingface-hub-cache", + "/.hf_cache", + "--tokenization-workers", + "16", + "--max-concurrent-requests", + "1024", + "--max-batch-tokens", + "32768", + "--max-batch-requests", + "256", + "--max-client-batch-size", + "64", + "--auto-truncate", + "--payload-limit", + "4000000", + ] + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8080/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 30s + logging: + driver: json-file + options: + max-size: "100m" + max-file: "1" + + vllm: + deploy: + replicas: 1 + volumes: + - hf_cache:/.hf_cache + restart: no + image: vllm/vllm-openai:latest + ports: + - 7373:80 + device_cgroup_rules: + - "c 195:* rmw" + - "c 236:* rmw" + devices: + - nvidia.com/gpu=all + environment: + - HUGGING_FACE_HUB_TOKEN=${HUGGINGFACEHUB_API_TOKEN:-your_hf_api_access_token} + - HF_HOME=/.hf_cache + - NVIDIA_VISIBLE_DEVICES=all + - VLLM_ATTENTION_BACKEND=FLASHINFER + - HF_HUB_ENABLE_HF_TRANSFER=1 + - MAX_PARALLEL_LOADING_WORKERS=4 + ipc: host + env_file: .env + command: [ + "--host", + "0.0.0.0", + "--port", + "80", + "--model", + "${LLM:-solidrust/Mistral-7B-Instruct-v0.3-AWQ}", + "--served-model-name", + "como/lm", + "--max-model-len", + "${LLM_MAX_CONTEXT_LENGTH}", + "--max-num-batched-tokens", + "${LLM_MAX_CONTEXT_LENGTH}", + "--max-seq-len-to-capture", + "${LLM_MAX_CONTEXT_LENGTH}", + "--kv-cache-dtype", + "fp8", + "--quantization", + "awq", + # "--dtype", + # "float16", + "--enable-auto-tool-choice", + "--tool-call-parser", + "mistral", # llama3_json + "--chat-template", + "examples/tool_chat_template_mistral_parallel.jinja", # tool_chat_template_llama3_json + "--cpu-offload-gb", + "${LLM_CPU_OFFLOAD_SPACE}", + "--gpu-memory-utilization", + "0.73", + "--use-v2-block-manager", + # "--block-size", + # "32", + "--swap-space", + "${LLM_SWAP_SPACE}", + # "--trust-remote-code", + "--seed", + "4269", + "--max-num-seqs", + "1", + "--trust-remote-code", + "--enable-prefix-caching", + "--enable-chunked-prefill", + # "--disable-sliding-window", + # "--max-paddings", + # "16", + # "--enable-chunked-prefill", # Not possible together with prfix caching enabled + # "--enforce-eager", + # "--max-parallel-loading-workers", + # "2" + ] + healthcheck: + test: ["CMD", "curl", "-f", "http://vllm:80/health"] + interval: 30s + timeout: 5s + retries: 5 diff --git a/main_docker.py b/main_docker.py new file mode 100644 index 0000000..e9ed52f --- /dev/null +++ b/main_docker.py @@ -0,0 +1,101 @@ +import os +import json +import copy + +from src.comorag.ComoRAG import ComoRAG +from src.comorag.utils.config_utils import BaseConfig +from src.comorag.utils.misc_utils import get_gold_answers + + +def process_dataset(dataset_path, config): + dataset_name = os.path.basename(dataset_path) + corpus_path = os.path.join(dataset_path, "corpus.jsonl") + qas_path = os.path.join(dataset_path, "qas.jsonl") + + with open(corpus_path, "r", encoding="utf-8") as f: + corpus = [json.loads(line) for line in f if line.strip()] + docs = [doc["contents"] for doc in corpus] + + with open(qas_path, "r", encoding="utf-8") as f: + samples = [json.loads(line) for line in f if line.strip()] + + all_queries = [s["question"] for s in samples] + config.corpus_len = len(corpus) + + comorag = ComoRAG(global_config=config) + comorag.index(docs) + solutions = comorag.try_answer(all_queries) + + gold_answers = get_gold_answers(samples) + for idx, q in enumerate(solutions): + q.gold_answers = list(gold_answers[idx]) + + result_list = [] + for idx, (q, solution) in enumerate(zip(all_queries, solutions)): + result_list.append( + { + "idx": idx, + "question": q, + "golden_answers": solution.gold_answers, + "output": solution.answer, + } + ) + + folder_path = os.path.join(config.output_dir) + os.makedirs(folder_path, exist_ok=True) + with open(os.path.join(folder_path, "results.json"), "w", encoding="utf-8") as f: + json.dump(result_list, f, ensure_ascii=False, indent=2) + + +def main(): + # 1) Start a vLLM OpenAI-compatible server separately, e.g.: + # vllm serve /path/to/your/model --tensor-parallel-size 1 --max_model_len 4096*2 --gpu-memory-utilization 0.95 + + # 2) Then run this script. We will call the server via base_url. + # Ensure an API key is present for OpenAI-compatible clients + os.environ["OPENAI_API_KEY"] = "your-api-key-here" + + # Optional: select visible GPUs for the client side + os.environ.setdefault("CUDA_VISIBLE_DEVICES", "0") + + base_path = "./dataset/cinderella" + dataset_dirs = [ + d for d in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, d)) + ] + dataset_dirs.sort() + dataset_paths = [os.path.join(base_path, d) for d in dataset_dirs] + + vllm_base_url = "http://vllm:8000/v1" + served_model_name = "como/lm" + embedding_model_name = os.environ["EMB_MODEL"] + dataset_name = os.environ["DATASET"] + output_dir = os.environ["OUT_DIR"] + save_dir = os.environ["SAVE_DIR"] + + config = BaseConfig( + llm_base_url=vllm_base_url, + llm_name=served_model_name, + llm_api_key=os.environ["OPENAI_API_KEY"], + dataset=dataset_name, + embedding_base_url="http://embeddings:8080/embed", + embedding_model_name=embedding_model_name, + embedding_batch_size=4, + need_cluster=True, + output_dir=output_dir, + save_dir=save_dir, + max_meta_loop_max_iterations=5, + is_mc=False, + max_tokens_ver=2000, + max_tokens_sem=2000, + max_tokens_epi=2000, + ) + + for dataset_path in dataset_paths: + tempconfig = copy.deepcopy(config) + tempconfig.output_dir += f"/{os.path.basename(dataset_path)}" + tempconfig.save_dir += f"/{os.path.basename(dataset_path)}" + process_dataset(dataset_path, tempconfig) + + +if __name__ == "__main__": + main() From ab2915f2761fec576d0fef9dab64b5acee31f1e7 Mon Sep 17 00:00:00 2001 From: l4b4r4b4b4 Date: Mon, 18 Aug 2025 18:36:06 +0200 Subject: [PATCH 2/6] compose deployment basic docker compose setup. integration work needed --- README.md | 86 ++++++++++++++++++++++++++++++++++++++++++++-- docker-compose.yml | 3 +- 2 files changed, 86 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 5e83790..f6b6650 100644 --- a/README.md +++ b/README.md @@ -213,7 +213,7 @@ curl http://localhost:8000/v1/models ### Method 3: Using Local docker compose deployment (main_docker.py) โšก This method deploys everything needed locally. Namely: 1. vLLM openai server for language model inference -2. ๐Ÿค— HugginFace's Text Embeddings Inference +2. ๐Ÿค— HugginFace's Text Embeddings Inference (HF TEI) 3. como-app #### Requirements @@ -227,13 +227,95 @@ cp .env.example .env ``` After cpoying the example environment file adjust environment variables as wanted. -#### 2. Spin-up docker compose deployment +#### 2. Pull, build and spin-up docker compose deployment ```bash docker compose up -d && docker compose logs -f ``` +#### 4. Check Deployment Services' Status ๐Ÿ” +Following the deployment of the application stack. Both HF TEI and vllm need to download models. +The `como-app` is only started once both these are up and running healthy. +```logs +[+] Running 3/3 + โœ” Container comorag-vllm-1 Healthy 1.0s + โœ” Container embeddings Healthy 16.0s + โœ” Container comorag-como-app-1 Started + ``` + +##### Check HF TEI logs +```bash +docker compose logs -f embeddings +``` + +Wait until you see the following: +```logs +... +embeddings | 2025-08-18T16:20:13.621603Z INFO text_embeddings_router: router/src/lib.rs:239: Starting model backend +embeddings | 2025-08-18T16:20:13.623965Z INFO text_embeddings_backend: backends/src/lib.rs:516: Downloading `model.safetensors` +embeddings | 2025-08-18T16:20:13.624022Z INFO text_embeddings_backend: backends/src/lib.rs:395: Model weights downloaded in 63.05ยตs +embeddings | 2025-08-18T16:20:13.998327Z INFO text_embeddings_backend_candle: backends/candle/src/lib.rs:412: Starting FlashNomicBert model on Cuda(CudaDevice(DeviceId(1))) +embeddings | 2025-08-18T16:20:23.549330Z INFO text_embeddings_router: router/src/lib.rs:257: Warming up model +embeddings | 2025-08-18T16:20:23.945749Z INFO text_embeddings_router::http::server: router/src/http/server.rs:1852: Starting HTTP server: 0.0.0.0:8080 +embeddings | 2025-08-18T16:20:23.945762Z INFO text_embeddings_router::http::server: router/src/http/server.rs:1853: Ready +``` + +##### Check vLLM logs +```bash +docker compose logs -f vllm +``` + +Wait until you see the following: +```logs +... +vllm-1 | INFO 08-18 09:29:55 [api_server.py:1818] Starting vLLM API server 0 on http://0.0.0.0:80 +vllm-1 | INFO 08-18 09:29:55 [launcher.py:29] Available routes are: +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /openapi.json, Methods: HEAD, GET +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /docs, Methods: HEAD, GET +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /docs/oauth2-redirect, Methods: HEAD, GET +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /redoc, Methods: HEAD, GET +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /health, Methods: GET +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /load, Methods: GET +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /ping, Methods: POST +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /ping, Methods: GET +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /tokenize, Methods: POST +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /detokenize, Methods: POST +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /v1/models, Methods: GET +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /version, Methods: GET +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /v1/responses, Methods: POST +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /v1/responses/{response_id}, Methods: GET +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /v1/responses/{response_id}/cancel, Methods: POST +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /v1/chat/completions, Methods: POST +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /v1/completions, Methods: POST +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /v1/embeddings, Methods: POST +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /pooling, Methods: POST +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /classify, Methods: POST +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /score, Methods: POST +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /v1/score, Methods: POST +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /v1/audio/transcriptions, Methods: POST +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /v1/audio/translations, Methods: POST +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /rerank, Methods: POST +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /v1/rerank, Methods: POST +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /v2/rerank, Methods: POST +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /scale_elastic_ep, Methods: POST +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /is_scaling_elastic_ep, Methods: POST +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /invocations, Methods: POST +vllm-1 | INFO 08-18 09:29:55 [launcher.py:37] Route: /metrics, Methods: GET +vllm-1 | INFO: Started server process [1] +vllm-1 | INFO: Waiting for application startup. +vllm-1 | INFO: Application startup complete. +``` + +##### ComoRAG +Now attach to the como-app to see its logs. +```bash +docker compose logs -f como-app +``` + +For example: +```logs +``` ### Comparison of Two Methods ๐Ÿ“Š diff --git a/docker-compose.yml b/docker-compose.yml index c20dcf0..b5157d7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,6 +12,8 @@ services: devices: - nvidia.com/gpu=all env_file: .env + environment: + - EMB_MODEL=${EMB_MODEL} ipc: host depends_on: embeddings: @@ -137,7 +139,6 @@ services: "${LLM_CPU_OFFLOAD_SPACE}", "--gpu-memory-utilization", "0.73", - "--use-v2-block-manager", # "--block-size", # "32", "--swap-space", From 4aa26e9378c7dc25ee37083f8864b7c9ac05ccf3 Mon Sep 17 00:00:00 2001 From: l4b4r4b4b4 Date: Tue, 19 Aug 2025 21:18:24 +0200 Subject: [PATCH 3/6] bbefore openai suggestion --- .env.example | 1 + Dockerfile | 3 +- docker-compose.yml | 3 + main_docker.py | 19 +- src/comorag/ComoRAG.py | 185 ++++++++++---------- src/comorag/embedding_model/HF_TEI.py | 37 ++++ src/comorag/embedding_model/TEITokenizer.py | 37 ++++ src/comorag/embedding_model/__init__.py | 8 +- test_tei.py | 8 + 9 files changed, 197 insertions(+), 104 deletions(-) create mode 100644 src/comorag/embedding_model/HF_TEI.py create mode 100644 src/comorag/embedding_model/TEITokenizer.py create mode 100644 test_tei.py diff --git a/.env.example b/.env.example index 1d9da38..341357a 100644 --- a/.env.example +++ b/.env.example @@ -1,6 +1,7 @@ # Optional # LLM= # EMB_MODEL= +LOCAL_DOCKER=True HUGGINGFACEHUB_API_TOKEN= LLM_MAX_CONTEXT_LENGTH=32768 LLM_SWAP_SPACE=16 diff --git a/Dockerfile b/Dockerfile index c4122f7..4194ab0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,8 +9,9 @@ RUN apt-get update && \ && rm -rf /var/lib/apt/lists/* # Install requirements -COPY . . +COPY requirements.txt requirements.txt RUN pip install -r requirements.txt +COPY . . # Set environment variables ENV PYTHONPATH=/app diff --git a/docker-compose.yml b/docker-compose.yml index b5157d7..ca5cd8c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,6 +3,7 @@ volumes: services: como-app: + container_name: como-app build: . # image: nvidia/cuda:11.8.0-base-ubuntu22.04 # The key part for CDI GPU access: @@ -38,6 +39,7 @@ services: - "c 195:* rmw" - "c 236:* rmw" environment: + - EMB_MODEL=${EMB_MODEL} - NVIDIA_VISIBLE_DEVICES=all - NVIDIA_DRIVER_CAPABILITIES=compute,utility - USE_FLASH_ATTENTION=True @@ -92,6 +94,7 @@ services: volumes: - hf_cache:/.hf_cache restart: no + container_name: vllm image: vllm/vllm-openai:latest ports: - 7373:80 diff --git a/main_docker.py b/main_docker.py index e9ed52f..a2f6874 100644 --- a/main_docker.py +++ b/main_docker.py @@ -1,10 +1,12 @@ import os import json import copy - from src.comorag.ComoRAG import ComoRAG from src.comorag.utils.config_utils import BaseConfig from src.comorag.utils.misc_utils import get_gold_answers +from src.comorag.utils.logging_utils import get_logger + +logger = get_logger(__name__) def process_dataset(dataset_path, config): @@ -48,27 +50,24 @@ def process_dataset(dataset_path, config): def main(): - # 1) Start a vLLM OpenAI-compatible server separately, e.g.: - # vllm serve /path/to/your/model --tensor-parallel-size 1 --max_model_len 4096*2 --gpu-memory-utilization 0.95 + os.environ.setdefault("CUDA_VISIBLE_DEVICES", "all") + + # os.environ["OPENAI_API_KEY"] = "your-api-key-here" - # 2) Then run this script. We will call the server via base_url. - # Ensure an API key is present for OpenAI-compatible clients - os.environ["OPENAI_API_KEY"] = "your-api-key-here" + dataset_name = os.environ["DATASET"] - # Optional: select visible GPUs for the client side - os.environ.setdefault("CUDA_VISIBLE_DEVICES", "0") + base_path = f"./dataset/{dataset_name}" - base_path = "./dataset/cinderella" dataset_dirs = [ d for d in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, d)) ] + dataset_dirs.sort() dataset_paths = [os.path.join(base_path, d) for d in dataset_dirs] vllm_base_url = "http://vllm:8000/v1" served_model_name = "como/lm" embedding_model_name = os.environ["EMB_MODEL"] - dataset_name = os.environ["DATASET"] output_dir = os.environ["OUT_DIR"] save_dir = os.environ["SAVE_DIR"] diff --git a/src/comorag/ComoRAG.py b/src/comorag/ComoRAG.py index 3b8e6e6..0487d2d 100644 --- a/src/comorag/ComoRAG.py +++ b/src/comorag/ComoRAG.py @@ -17,14 +17,15 @@ import concurrent.futures from transformers import AutoTokenizer from concurrent.futures import ThreadPoolExecutor, as_completed - +from .embedding_model.TEITokenizer import TEITokenizer from .utils.embed_utils import get_similar_summaries from .utils import agents from src.comorag.utils.timeline_utils import TimelineSummarizer from src.comorag.utils.summarization_utils import GPT4SummarizationModel from .llm import _get_llm_class, BaseLLM -from .embedding_model import _get_embedding_model_class, BaseEmbeddingModel +from .embedding_model import _get_embedding_model_class +from .embedding_model.base import BaseEmbeddingModel from .embedding_store import EmbeddingStore from .information_extraction import OpenIE from .information_extraction.openie_vllm_offline import VLLMOfflineOpenIE @@ -40,10 +41,12 @@ logger = logging.getLogger(__name__) +LOCAL_DOCKER = os.environ["LOCAL_DOCKER"] +logger.info(f"LOCAL_DOCKER: {LOCAL_DOCKER} {type(LOCAL_DOCKER)}") class ComoRAG: - def __init__(self, global_config=None, - save_dir=None, - llm_model_name=None, + def __init__(self, global_config=None, + save_dir=None, + llm_model_name=None, llm_base_url=None, llm_api_key=None, embedding_model_name=None, @@ -118,22 +121,22 @@ def __init__(self, global_config=None, self.sem_embedding_store = EmbeddingStore(self.embedding_model, os.path.join(self.working_dir, "summary_embeddings"), self.global_config.embedding_batch_size, 'summary') - + self.epi_embedding_store = EmbeddingStore(self.embedding_model, os.path.join(self.working_dir, "timeline_embeddings"), self.global_config.embedding_batch_size, 'timeline') - - + + self.summarization_model = GPT4SummarizationModel(self.global_config.llm_name,self.global_config.llm_base_url,self.global_config.llm_api_key) self.timeline_summarizer = TimelineSummarizer( chunk_embedding_store=self.ver_embedding_store, summary_embedding_store=self.epi_embedding_store, summarization_model=self.summarization_model ) - - if not self.flag_cluster: + + if not self.flag_cluster: self.clustering = ChunkSoftClustering( embedding_store=self.ver_embedding_store, reduction_dimension=10, @@ -146,7 +149,7 @@ def __init__(self, global_config=None, llm_base_url=self.global_config.llm_base_url, llm_api_key=self.global_config.llm_api_key ) - + self.timeline_summarizer.load_all_summaries() self.level_store = self.timeline_summarizer.get_level_embedding_store(0) @@ -156,8 +159,12 @@ def __init__(self, global_config=None, self.max_tokens_epi = self.global_config.max_tokens_epi self.level_store = self.timeline_summarizer.get_level_embedding_store(0) - self.tokenizer = AutoTokenizer.from_pretrained(self.global_config.embedding_model_name) - + if LOCAL_DOCKER: + self.tokenizer = TEITokenizer + else: + self.tokenizer = AutoTokenizer.from_pretrained(self.global_config.embedding_model_name) + + def initialize_graph(self): self._graphml_xml_file = os.path.join( self.working_dir, f"graph.graphml" @@ -180,7 +187,7 @@ def initialize_graph(self): def pre_openie(self, docs: List[str]): logger.info(f"Indexing Documents") logger.info(f"Performing OpenIE Offline") - + chunks = self.ver_embedding_store.get_missing_string_hash_ids(docs) all_openie_info, chunk_keys_to_process = self.load_existing_openie(chunks.keys()) @@ -211,12 +218,12 @@ def index(self, docs: List[str]): self.global_config.embedding_batch_size, 'timeline' ) - + self.timeline_summarizer.try_load_or_generate_summaries(timeline_dir) self.timeline_summarizer.load_all_summaries() self.level_store = self.timeline_summarizer.get_level_embedding_store(0) - if self.global_config.need_cluster and not self.flag_cluster: + if self.global_config.need_cluster and not self.flag_cluster: all_summaries, final_summary = self._recursive_clustering( [self.ver_embedding_store.get_row(hash_id)['content'] for hash_id in self.ver_embedding_store.get_all_ids()], max_iterations=5 # Set maximum iteration count @@ -234,12 +241,12 @@ def index(self, docs: List[str]): self.merge_openie_results(all_openie_info, new_openie_rows, new_ner_results_dict, new_triple_results_dict) if self.global_config.save_openie: self.save_openie_results(all_openie_info) - ner_results_dict, triple_results_dict = reformat_openie_results(all_openie_info) + ner_results_dict, triple_results_dict = reformat_openie_results(all_openie_info) assert len(chunks) == len(ner_results_dict) == len(triple_results_dict) # prepare data_store chunk_ids = list(chunks.keys()) - + chunk_triples = [[text_processing(t) for t in triple_results_dict[chunk_id].triples] for chunk_id in chunk_ids] entity_nodes, chunk_triple_entities = extract_entity_nodes(chunk_triples) facts = flatten_facts(chunk_triples) @@ -283,15 +290,15 @@ def meta_control_loop(self, q_idx, query): docs, nodes = self.tri_retrieve(retrieve_query, memory_pool) memory_pool = self.mem_encode(query=retrieve_query, docs=docs, memory_pool=memory_pool) - + ver_context = "\n".join([ver for node in memory_pool.get_temp_nodes_by_type(NodeType.VER) for ver in node.original_content]) sem_context = "\n".join([sem for node in memory_pool.get_temp_nodes_by_type(NodeType.SEM) for sem in node.original_content]) epi_context = "\n".join([epi for node in memory_pool.get_temp_nodes_by_type(NodeType.EPI) for epi in node.original_content]) - + historical_infomation = "" - all_steps = [] + all_steps = [] step_answers_local = {} - + for i in range(self.global_config.max_meta_loop_max_iterations+1): step_info = { "step": i + 1, @@ -309,11 +316,11 @@ def meta_control_loop(self, q_idx, query): prompt_user += f"### Semantic Summary\n{sem_context}\n\n" if self.global_config.use_epi: prompt_user += f"### Timeline Summary\n{epi_context}\n\n" - + if i != 0: prompt_user += f"### Historical Information\n{historical_infomation}\n\n" - prompt_user += 'Question: ' + query + '\nThought: ' + prompt_user += 'Question: ' + query + '\nThought: ' if self.global_config.is_mc: if i == 0: qa_message = self.prompt_template_manager.render(name=f'rag_qa_mc', prompt_user=prompt_user) @@ -321,7 +328,7 @@ def meta_control_loop(self, q_idx, query): qa_message = self.prompt_template_manager.render(name=f'rag_qa_mc_memory', prompt_user=prompt_user) else: qa_message = self.prompt_template_manager.render(name=f'rag_qa_narrativeqa', prompt_user=prompt_user) - + result = self.llm_model.infer(qa_message) # try: if result is None: @@ -329,14 +336,14 @@ def meta_control_loop(self, q_idx, query): step_info["error"] = "LLM returned None response" all_steps.append(step_info) continue - + response_content = result[0] if isinstance(result, (list, tuple)) else result if not response_content: logger.error("Empty response content from LLM") step_info["error"] = "Empty response content from LLM" all_steps.append(step_info) continue - + try: pred_ans = response_content.split('### Final Answer')[1].strip() except IndexError: @@ -359,7 +366,7 @@ def meta_control_loop(self, q_idx, query): # mem-fusion historical_infomation = memory_pool.create_fusion_content(probe=retrieve_query,top_k_percent=0.5) memory_pool.add_fused_node(probe=retrieve_query, fused_content=historical_infomation, source_nodes=nodes) - + sem_context = "\n".join([node.cue for node in memory_pool.get_temp_nodes_by_type(NodeType.SEM)]) epi_context = "\n".join([node.cue for node in memory_pool.get_temp_nodes_by_type(NodeType.EPI)]) ver_context = "\n".join([node.cue for node in memory_pool.get_temp_nodes_by_type(NodeType.VER)]) @@ -367,20 +374,20 @@ def meta_control_loop(self, q_idx, query): historical_infomation = "" for node in memory_pool.get_temp_nodes_by_type(NodeType.FUSION): historical_infomation += f"probe : {node.probe}\nFinding : {node.cue}\n" - + for node in memory_pool.get_nodes_by_type(NodeType.FUSION): historical_infomation += f"probe : {node.probe}\nFinding : {node.cue}\n" all_steps.append(step_info) else: - all_steps.append(step_info) + all_steps.append(step_info) break - - + + query_solution = QuerySolution(question=query, docs=ver_context, summary=sem_context, timeline=epi_context) query_solution.answer = response_content - + pool_info = { @@ -391,10 +398,10 @@ def meta_control_loop(self, q_idx, query): "total_probes": len(memory_pool.get_all_probes()), "probes": memory_pool.get_all_probes() } - + output_dir = os.path.join(self.global_config.output_dir, 'details') os.makedirs(output_dir, exist_ok=True) - + with open(os.path.join(output_dir, f"pool_info_{q_idx}.json"), 'w', encoding='utf-8') as f: json.dump(pool_info, f, ensure_ascii=False, indent=4) @@ -433,25 +440,25 @@ def try_answer(self, queries: List[str], num_to_retrieve: int = None) -> List[Qu queries_solutions = [] step_answers = {} self.level_store = self.timeline_summarizer.get_level_embedding_store(0) - max_workers = min(16, len(queries)) + max_workers = min(16, len(queries)) with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_query = { - executor.submit(self.meta_control_loop, q_idx, query): q_idx + executor.submit(self.meta_control_loop, q_idx, query): q_idx for q_idx, query in enumerate(queries) } - + queries_solutions = [None] * len(queries) step_answers = {} for future in tqdm(as_completed(future_to_query), total=len(queries), desc="Processing Queries"): q_idx, query_solution, step_answers_local = future.result() if query_solution: - queries_solutions[q_idx] = query_solution + queries_solutions[q_idx] = query_solution step_answers[q_idx] = step_answers_local queries_solutions = [qs for qs in queries_solutions if qs is not None] return queries_solutions - + #tri-retrieve def tri_retrieve(self, query: str, memory_pool: MemoryPool, ver_top_k: int = None, sem_top_k: int = None, epi_top_k: int = None) -> Tuple[Dict[str, Any], Dict[str, Any]]: ver_top_k = self.global_config.qa_ver_top_k if hasattr(self.global_config, 'qa_ver_top_k') else ver_top_k @@ -463,7 +470,7 @@ def tri_retrieve(self, query: str, memory_pool: MemoryPool, ver_top_k: int = Non sem_hashes = all_hashes.get(NodeType.SEM, []) epi_hashes = all_hashes.get(NodeType.EPI, []) - + if not self.ready_to_retrieve: self.prepare_retrieval_objects() @@ -509,7 +516,7 @@ def tri_retrieve(self, query: str, memory_pool: MemoryPool, ver_top_k: int = Non retrieved_passages = top_k_docs retrieved_passages_sorted = sorted(retrieved_passages,key=lambda doc: hash_id_to_order.get(text_to_hash_id.get(doc), float('inf'))) top_k_docs = retrieved_passages_sorted - + # Semantic Index Retrieval sorted_sem_ids, sorted_sem_scores = self.dense_passage_retrieval(query, need_cluster=True) @@ -520,7 +527,7 @@ def tri_retrieve(self, query: str, memory_pool: MemoryPool, ver_top_k: int = Non if len(sem_hashes) > 0: top_k_sem = [sem for sem in top_k_sem if text_to_hash_id[sem] not in sem_hashes] - + ### Episodic Index Retrieval top_k_epi, sorted_epi_scores = get_similar_summaries( @@ -530,17 +537,17 @@ def tri_retrieve(self, query: str, memory_pool: MemoryPool, ver_top_k: int = Non top_k=epi_top_k ) top_k_epi = top_k_epi[:epi_top_k] - + # epi result if len(top_k_epi) > 0: text_to_hash_id = self.level_store.text_to_hash_id top_k_epi_hashes = [text_to_hash_id[doc] for doc in top_k_epi] - + if len(epi_hashes) > 0: top_k_epi = [epi for epi in top_k_epi if text_to_hash_id[epi] not in epi_hashes] hash_id_to_order = self.level_store.get_hash_id_to_order() - text_to_hash_id = self.level_store.text_to_hash_id + text_to_hash_id = self.level_store.text_to_hash_id retrieved_passages = top_k_epi retrieved_passages_sorted = sorted(retrieved_passages, key=lambda doc: hash_id_to_order.get(text_to_hash_id.get(doc), float('inf'))) @@ -563,7 +570,7 @@ def mem_encode(self, query: str, docs: Dict, memory_pool: MemoryPool, probe: str break selected_vers.append(ver) current_tokens += ver_tokens - + selected_sems = [] current_tokens = 0 for sem in docs["semantic"]: @@ -572,7 +579,7 @@ def mem_encode(self, query: str, docs: Dict, memory_pool: MemoryPool, probe: str break selected_sems.append(sem) current_tokens += sem_tokens - + selected_epis = [] current_tokens = 0 for epi in docs["episodic"]: @@ -582,17 +589,17 @@ def mem_encode(self, query: str, docs: Dict, memory_pool: MemoryPool, probe: str selected_epis.append(epi) current_tokens += epi_tokens - + pool_agent = memory_pool.agent ver_cue, sem_cue, epi_cue = pool_agent.fusion( - query=query, - vers="\n".join(selected_vers), - sems="\n".join(selected_sems), - epis="\n".join(selected_epis), + query=query, + vers="\n".join(selected_vers), + sems="\n".join(selected_sems), + epis="\n".join(selected_epis), ) - - + + # Memory Nodes Generation ver_node = MemoryNode( probe=probe if probe else query, @@ -601,7 +608,7 @@ def mem_encode(self, query: str, docs: Dict, memory_pool: MemoryPool, probe: str cue=ver_cue ) ver_node.update_hashes() - + sem_node = MemoryNode( probe=probe if probe else query, node_type=NodeType.SEM, @@ -609,19 +616,19 @@ def mem_encode(self, query: str, docs: Dict, memory_pool: MemoryPool, probe: str cue=sem_cue ) sem_node.update_hashes() - + epi_node = MemoryNode( probe=probe if probe else query, node_type=NodeType.EPI, original_content=selected_epis, - cue=epi_cue + cue=epi_cue ) epi_node.update_hashes() - + memory_pool.add_to_temp_pool(ver_node) memory_pool.add_to_temp_pool(sem_node) memory_pool.add_to_temp_pool(epi_node) - + return memory_pool def add_fact_edges(self, chunk_ids: List[str], chunk_triples: List[Tuple]): @@ -655,7 +662,7 @@ def add_passage_edges(self, chunk_ids: List[str], chunk_triple_entities: List[Li current_graph_nodes = set() num_new_chunks = 0 - logger.info(f"Connecting passage nodes to phrase nodes.") + logger.info(f"Connecting passage nodes to phrase nodes.") for idx, chunk_key in tqdm(enumerate(chunk_ids)): if chunk_key not in current_graph_nodes: for chunk_ent in chunk_triple_entities[idx]: @@ -674,7 +681,7 @@ def add_synonymy_edges(self): entity_node_keys = list(self.entity_id_to_row.keys()) logger.info(f"Performing KNN retrieval for each phrase nodes ({len(entity_node_keys)}).") entity_embs = self.entity_embedding_store.get_embeddings(entity_node_keys) - + query_node_key2knn_node_keys = retrieve_knn(query_ids=entity_node_keys, key_ids=entity_node_keys, query_vecs=entity_embs, @@ -935,7 +942,7 @@ def get_query_embeddings(self, queries: List[str] | List[QuerySolution]): self.query_to_embedding['passage'][query] = embedding def get_fact_scores(self, query: str) -> np.ndarray: - + query_embedding = self.query_to_embedding['triple'].get(query, None) if query_embedding is None: query_embedding = self.embedding_model.batch_encode(query, @@ -953,7 +960,7 @@ def dense_passage_retrieval(self, query: str, need_cluster: bool = False) -> Tup query_embedding = self.embedding_model.batch_encode(query, instruction=get_query_instruction('query_to_passage'), norm=True) - + if need_cluster: query_doc_scores = np.dot(self.summary_embeddings, query_embedding.T) else: @@ -979,7 +986,7 @@ def get_top_k_weights(self, top_k_phrases = set(linking_score_map.keys()) top_k_phrases_keys = set( [compute_mdhash_id(content=top_k_phrase, prefix="entity-") for top_k_phrase in top_k_phrases]) - + for phrase_key in self.node_name_to_vertex_idx: if phrase_key not in top_k_phrases_keys: phrase_id = self.node_name_to_vertex_idx.get(phrase_key, None) @@ -995,9 +1002,9 @@ def graph_search_with_fact_entities(self, query: str, top_k_facts: List[Tuple], top_k_fact_indices: List[str], passage_node_weight: float = 0.05) -> Tuple[np.ndarray, np.ndarray]: - - linking_score_map = {} # from phrase to the average scores of the facts that contain the phrase - phrase_scores = {} # store all fact scores for each phrase regardless of whether they exist in the knowledge graph or not + + linking_score_map = {} # from phrase to the average scores of the facts that contain the phrase + phrase_scores = {} # store all fact scores for each phrase regardless of whether they exist in the knowledge graph or not phrase_weights = np.zeros(len(self.graph.vs['name'])) passage_weights = np.zeros(len(self.graph.vs['name'])) used_phrases_with_scores = {} @@ -1042,7 +1049,7 @@ def graph_search_with_fact_entities(self, query: str, linking_score_map[passage_node_text] = passage_dpr_score * passage_node_weight node_weights = phrase_weights + passage_weights - + if len(linking_score_map) > 30: linking_score_map = dict(sorted(linking_score_map.items(), key=lambda x: x[1], reverse=True)[:30]) @@ -1082,7 +1089,7 @@ def rerank_facts(self, query: str, query_fact_scores: np.ndarray) -> Tuple[List[ rerank_log = {'facts_before_rerank': candidate_facts, 'facts_after_rerank': top_k_facts} return top_k_fact_indices, top_k_facts, rerank_log - + def run_ppr(self, reset_prob: np.ndarray, damping: float =0.5) -> Tuple[np.ndarray, np.ndarray]: @@ -1103,12 +1110,12 @@ def run_ppr(self, sorted_doc_scores = doc_scores[sorted_doc_ids.tolist()] return sorted_doc_ids, sorted_doc_scores - + def _recursive_clustering(self, texts, max_iterations=5, current_iteration=0): # Create temporary folder paths temp_embeddings_dir = os.path.join(self.working_dir, "temp_embeddings") temp_clusters_dir = os.path.join(self.working_dir, "temp_clusters") - + # Define cleanup function def cleanup_temp_folders(): try: @@ -1121,16 +1128,16 @@ def cleanup_temp_folders(): print(f"Deleted temporary clusters folder: {temp_clusters_dir}") except Exception as e: print(f"Error cleaning up temporary folders: {e}") - + # Early return cases if len(texts) <= 1: cleanup_temp_folders() return texts, texts - + if current_iteration >= max_iterations: cleanup_temp_folders() return texts, [texts[0]] - + try: temp_embedding_store = EmbeddingStore( self.embedding_model, @@ -1138,9 +1145,9 @@ def cleanup_temp_folders(): self.global_config.embedding_batch_size, 'temp' ) - + temp_embedding_store.insert_strings(texts) - + clustering = ChunkSoftClustering( embedding_store=temp_embedding_store, reduction_dimension=10, @@ -1153,43 +1160,43 @@ def cleanup_temp_folders(): llm_base_url=self.global_config.llm_base_url, llm_api_key=self.global_config.llm_api_key ) - + clusters = clustering.perform_clustering() - + stats = clustering.get_cluster_stats() print(f"Clustering stats: {stats}") - + summary_texts = [] with concurrent.futures.ThreadPoolExecutor(max_workers=min(32, len(clusters))) as executor: future_to_cluster = { - executor.submit(clustering.create_cluster_summary, cluster.id): cluster + executor.submit(clustering.create_cluster_summary, cluster.id): cluster for cluster in clusters } - + for future in concurrent.futures.as_completed(future_to_cluster): try: summary = future.result() - if summary: + if summary: summary_texts.append(summary) except Exception as e: logger.error(f"error: {str(e)}") - + # Clean up temporary folders for current level cleanup_temp_folders() - + # Recursively process next level if len(summary_texts) == 1: return summary_texts, summary_texts - + next_level_summaries, final_summary = self._recursive_clustering( - summary_texts, + summary_texts, max_iterations=max_iterations, current_iteration=current_iteration + 1 ) return summary_texts + next_level_summaries, final_summary - + except Exception as e: # Ensure temporary folders are cleaned up even in case of exceptions print(f"Error during recursive clustering: {e}") cleanup_temp_folders() - raise \ No newline at end of file + raise diff --git a/src/comorag/embedding_model/HF_TEI.py b/src/comorag/embedding_model/HF_TEI.py new file mode 100644 index 0000000..b59a85a --- /dev/null +++ b/src/comorag/embedding_model/HF_TEI.py @@ -0,0 +1,37 @@ +import requests +import logging + +logger = logging.getLogger(__name__) + +class TEITokenizer: + def __init__(self, base_url: str = "http://embeddings:8080", timeout: int = 30): + """ + base_url: Base URL of the TEI service (e.g. http://embeddings:8080 if running locally) + """ + self.base_url = base_url.rstrip("/") + self.timeout = timeout + + def encode(self, text: str) -> list[int]: + """Tokenize a single string and return list of token IDs""" + url = f"{self.base_url}/tokenize" + payload = {"inputs": text} + try: + response = requests.post(url, json=payload, timeout=self.timeout) + response.raise_for_status() + tokens = response.json() # TEI returns tokenized ids + return tokens[0] if isinstance(tokens, list) and tokens else [] + except Exception as e: + logger.error(f"TEI tokenization failed: {e}") + return [] + + def batch_encode(self, texts: list[str]) -> list[list[int]]: + """Tokenize multiple strings at once""" + url = f"{self.base_url}/tokenize" + payload = {"inputs": texts} + try: + response = requests.post(url, json=payload, timeout=self.timeout) + response.raise_for_status() + return response.json() # list of token ID lists + except Exception as e: + logger.error(f"TEI batch tokenization failed: {e}") + return [[] for _ in texts] diff --git a/src/comorag/embedding_model/TEITokenizer.py b/src/comorag/embedding_model/TEITokenizer.py new file mode 100644 index 0000000..b59a85a --- /dev/null +++ b/src/comorag/embedding_model/TEITokenizer.py @@ -0,0 +1,37 @@ +import requests +import logging + +logger = logging.getLogger(__name__) + +class TEITokenizer: + def __init__(self, base_url: str = "http://embeddings:8080", timeout: int = 30): + """ + base_url: Base URL of the TEI service (e.g. http://embeddings:8080 if running locally) + """ + self.base_url = base_url.rstrip("/") + self.timeout = timeout + + def encode(self, text: str) -> list[int]: + """Tokenize a single string and return list of token IDs""" + url = f"{self.base_url}/tokenize" + payload = {"inputs": text} + try: + response = requests.post(url, json=payload, timeout=self.timeout) + response.raise_for_status() + tokens = response.json() # TEI returns tokenized ids + return tokens[0] if isinstance(tokens, list) and tokens else [] + except Exception as e: + logger.error(f"TEI tokenization failed: {e}") + return [] + + def batch_encode(self, texts: list[str]) -> list[list[int]]: + """Tokenize multiple strings at once""" + url = f"{self.base_url}/tokenize" + payload = {"inputs": texts} + try: + response = requests.post(url, json=payload, timeout=self.timeout) + response.raise_for_status() + return response.json() # list of token ID lists + except Exception as e: + logger.error(f"TEI batch tokenization failed: {e}") + return [[] for _ in texts] diff --git a/src/comorag/embedding_model/__init__.py b/src/comorag/embedding_model/__init__.py index 9629c2c..7dd7a99 100644 --- a/src/comorag/embedding_model/__init__.py +++ b/src/comorag/embedding_model/__init__.py @@ -1,9 +1,8 @@ -from .base import EmbeddingConfig, BaseEmbeddingModel +from .HF_TEI import TEIEmbeddingModel from .BGEEmbedding import BGEEmbeddingModel from .OpenAI import OpenAIEmbeddingModel from ..utils.logging_utils import get_logger - logger = get_logger(__name__) @@ -13,5 +12,6 @@ def _get_embedding_model_class(embedding_model_name: str = "None"): elif "text-embedding-3-small" in embedding_model_name: return OpenAIEmbeddingModel else: - logger.info(f"Unknown embedding model name: {embedding_model_name}, using BGEEmbeddingModel as default") - return \ No newline at end of file + return TEIEmbeddingModel + logger.info(f"Unknown embedding model name: {embedding_model_name}, using TEI as default") + return diff --git a/test_tei.py b/test_tei.py new file mode 100644 index 0000000..481dfde --- /dev/null +++ b/test_tei.py @@ -0,0 +1,8 @@ +import requests + +url = 'http://embeddings:8080/embed' +data = {"inputs": ["What is Deep Learning?", "What is Love?"]} + +x = requests.post(url, json = myobj) + +print(x.text) From bd23140aa9e3421545ec41f6fa713d6c61d012c7 Mon Sep 17 00:00:00 2001 From: l4b4r4b4b4 Date: Tue, 19 Aug 2025 21:44:06 +0200 Subject: [PATCH 4/6] embeddings work functionaly --- src/comorag/ComoRAG.py | 5 +- src/comorag/embedding_model/HF_TEI.py | 54 +++++++++++---------- src/comorag/embedding_model/TEITokenizer.py | 34 +++++++------ src/comorag/embedding_model/__init__.py | 10 +++- src/comorag/utils/config_utils.py | 51 ++++++++++--------- 5 files changed, 85 insertions(+), 69 deletions(-) diff --git a/src/comorag/ComoRAG.py b/src/comorag/ComoRAG.py index 0487d2d..b45cf7e 100644 --- a/src/comorag/ComoRAG.py +++ b/src/comorag/ComoRAG.py @@ -35,14 +35,15 @@ from .utils.misc_utils import * from .utils.embed_utils import retrieve_knn from .utils.typing_utils import Triple -from .utils.config_utils import BaseConfig +from .utils.config_utils import BaseConfig, str_to_bool from .utils.cluster_utils import ChunkSoftClustering from .utils.memory_utils import MemoryNode, MemoryPool, NodeType logger = logging.getLogger(__name__) -LOCAL_DOCKER = os.environ["LOCAL_DOCKER"] +LOCAL_DOCKER = str_to_bool(os.getenv("LOCAL_DOCKER", "false")) logger.info(f"LOCAL_DOCKER: {LOCAL_DOCKER} {type(LOCAL_DOCKER)}") + class ComoRAG: def __init__(self, global_config=None, save_dir=None, diff --git a/src/comorag/embedding_model/HF_TEI.py b/src/comorag/embedding_model/HF_TEI.py index b59a85a..ac982f1 100644 --- a/src/comorag/embedding_model/HF_TEI.py +++ b/src/comorag/embedding_model/HF_TEI.py @@ -1,37 +1,39 @@ +import numpy as np import requests import logging logger = logging.getLogger(__name__) -class TEITokenizer: - def __init__(self, base_url: str = "http://embeddings:8080", timeout: int = 30): - """ - base_url: Base URL of the TEI service (e.g. http://embeddings:8080 if running locally) - """ +class HFTEIEmbedding: + def __init__(self, global_config, base_url: str = "http://embeddings:8080", embedding_model_name = "", timeout: int = 30): + logger.debug("Global Config: {global_config}") + logger.warning("input name: {embedding_model_name}. Make sure you did load it with HF TEI!") + + self.global_config = global_config self.base_url = base_url.rstrip("/") self.timeout = timeout - def encode(self, text: str) -> list[int]: - """Tokenize a single string and return list of token IDs""" - url = f"{self.base_url}/tokenize" - payload = {"inputs": text} - try: - response = requests.post(url, json=payload, timeout=self.timeout) - response.raise_for_status() - tokens = response.json() # TEI returns tokenized ids - return tokens[0] if isinstance(tokens, list) and tokens else [] - except Exception as e: - logger.error(f"TEI tokenization failed: {e}") - return [] + def batch_encode(self, texts: list[str], instruction: str = None, norm: bool = True) -> np.ndarray: + """ + Encode a batch of texts using Hugging Face TEI `/embed`. + Returns numpy array of shape (batch_size, embedding_dim). + """ + if not texts: + return np.empty((0, 0), dtype=np.float32) + + url = f"{self.base_url}/embed" + payload = {"inputs": texts, "normalize": norm} + if instruction: + payload["prompt_name"] = instruction - def batch_encode(self, texts: list[str]) -> list[list[int]]: - """Tokenize multiple strings at once""" - url = f"{self.base_url}/tokenize" - payload = {"inputs": texts} try: - response = requests.post(url, json=payload, timeout=self.timeout) - response.raise_for_status() - return response.json() # list of token ID lists + resp = requests.post(url, json=payload, timeout=self.timeout) + resp.raise_for_status() + embeddings = resp.json() # [[float, ...], [float, ...]] + embeddings = [np.array(e, dtype=np.float32) for e in embeddings if e] + if not embeddings: + raise ValueError("Empty embeddings returned from TEI") + return np.vstack(embeddings) except Exception as e: - logger.error(f"TEI batch tokenization failed: {e}") - return [[] for _ in texts] + logger.error(f"TEI embedding failed: {e}") + return np.zeros((len(texts), 1), dtype=np.float32) diff --git a/src/comorag/embedding_model/TEITokenizer.py b/src/comorag/embedding_model/TEITokenizer.py index b59a85a..1312877 100644 --- a/src/comorag/embedding_model/TEITokenizer.py +++ b/src/comorag/embedding_model/TEITokenizer.py @@ -4,34 +4,38 @@ logger = logging.getLogger(__name__) class TEITokenizer: - def __init__(self, base_url: str = "http://embeddings:8080", timeout: int = 30): - """ - base_url: Base URL of the TEI service (e.g. http://embeddings:8080 if running locally) - """ + def __init__(self, global_config, base_url: str = "http://embeddings:8080", timeout: int = 30): + logger.debug("Global Config: {global_config}") + self.global_config = global_config self.base_url = base_url.rstrip("/") self.timeout = timeout def encode(self, text: str) -> list[int]: - """Tokenize a single string and return list of token IDs""" + """ + Tokenize a single string and return list of token IDs. + """ url = f"{self.base_url}/tokenize" - payload = {"inputs": text} try: - response = requests.post(url, json=payload, timeout=self.timeout) - response.raise_for_status() - tokens = response.json() # TEI returns tokenized ids - return tokens[0] if isinstance(tokens, list) and tokens else [] + resp = requests.post(url, json={"inputs": text}, timeout=self.timeout) + resp.raise_for_status() + tokens = resp.json() + if tokens and isinstance(tokens[0], list): + return [tok["id"] for tok in tokens[0]] + return [] except Exception as e: logger.error(f"TEI tokenization failed: {e}") return [] def batch_encode(self, texts: list[str]) -> list[list[int]]: - """Tokenize multiple strings at once""" + """ + Tokenize multiple strings and return list of lists of token IDs. + """ url = f"{self.base_url}/tokenize" - payload = {"inputs": texts} try: - response = requests.post(url, json=payload, timeout=self.timeout) - response.raise_for_status() - return response.json() # list of token ID lists + resp = requests.post(url, json={"inputs": texts}, timeout=self.timeout) + resp.raise_for_status() + all_tokens = resp.json() + return [[tok["id"] for tok in seq] for seq in all_tokens] except Exception as e: logger.error(f"TEI batch tokenization failed: {e}") return [[] for _ in texts] diff --git a/src/comorag/embedding_model/__init__.py b/src/comorag/embedding_model/__init__.py index 7dd7a99..bf51af9 100644 --- a/src/comorag/embedding_model/__init__.py +++ b/src/comorag/embedding_model/__init__.py @@ -1,17 +1,23 @@ -from .HF_TEI import TEIEmbeddingModel +import os +from .HF_TEI import HFTEIEmbedding from .BGEEmbedding import BGEEmbeddingModel from .OpenAI import OpenAIEmbeddingModel from ..utils.logging_utils import get_logger +from ..utils.config_utils import str_to_bool logger = get_logger(__name__) +LOCAL_DOCKER = str_to_bool(os.getenv("LOCAL_DOCKER", "false")) +logger.info(f"LOCAL_DOCKER: {LOCAL_DOCKER} {type(LOCAL_DOCKER)}") + def _get_embedding_model_class(embedding_model_name: str = "None"): if "bge-" in embedding_model_name.lower(): return BGEEmbeddingModel elif "text-embedding-3-small" in embedding_model_name: return OpenAIEmbeddingModel + elif LOCAL_DOCKER: + return HFTEIEmbedding else: - return TEIEmbeddingModel logger.info(f"Unknown embedding model name: {embedding_model_name}, using TEI as default") return diff --git a/src/comorag/utils/config_utils.py b/src/comorag/utils/config_utils.py index cf0a76c..5194e85 100644 --- a/src/comorag/utils/config_utils.py +++ b/src/comorag/utils/config_utils.py @@ -13,11 +13,14 @@ logger = get_logger(__name__) +def str_to_bool(val: str) -> bool: + return val.strip().lower() in ("1", "true", "yes", "on") + @dataclass class BaseConfig: """One and only configuration.""" - # LLM specific attributes + # LLM specific attributes llm_name: str = field( default="gpt-4o-mini", metadata={"help": "Class name indicating which LLM model to use."} @@ -66,7 +69,7 @@ class BaseConfig: default=None, metadata={"help": "Azure OpenAI embedding endpoint. e.g. https://YOUR_RESOURCE_NAME.openai.azure.com/"} ) - + ## LLM specific attributes -> Async hyperparameters max_retry_attempts: int = field( default=5, @@ -107,8 +110,8 @@ class BaseConfig: metadata={"help": "Max number of tokens each chunk can contain. If set to None, the whole doc will treated as a single chunk."} ) preprocess_chunk_func: Literal["by_token", "by_word"] = field(default='by_token') - - + + # Information extraction specific attributes information_extraction_model_name: Literal["openie_openai_gpt", ] = field( default="openie_openai_gpt", @@ -122,8 +125,8 @@ class BaseConfig: default=False, metadata={"help": "Whether to skip graph construction or not. Set it to be true when running vllm offline indexing for the first time."} ) - - + + # Embedding specific attributes embedding_model_name: str = field( default="nvidia/NV-Embed-v2", @@ -145,9 +148,9 @@ class BaseConfig: default="auto", metadata={"help": "Data type for local embedding model."} ) - - - + + + # Graph construction specific attributes synonymy_edge_topk: int = field( default=2047, @@ -169,9 +172,9 @@ class BaseConfig: default=False, metadata={"help": "Whether the graph is directed or not."} ) - - - + + + # Retrieval specific attributes linking_top_k: int = field( default=5, @@ -185,8 +188,8 @@ class BaseConfig: default=0.5, metadata={"help": "Damping factor for ppr algorithm."} ) - - + + # QA specific attributes max_meta_loop_max_iterations : int = field( default=5, @@ -217,7 +220,7 @@ class BaseConfig: default=50, metadata={"help": "Feeding top k episodics to the QA model for reading."} ) - + is_mc: bool = field( default=False, metadata={"help": "Whether the question is a multiple choice question."} @@ -227,7 +230,7 @@ class BaseConfig: default=True, metadata={"help": "Whether to use the veridical index of the documents in the QA process."} ) - + use_sem: bool = field( default= True, metadata={"help": "Whether to use the semantic index of the documents in the QA process."} @@ -260,14 +263,14 @@ class BaseConfig: default=None, metadata={"help": "Directory to save all related information. If it's given, will overwrite all default save_dir setups. If it's not given, then if we're not running specific datasets, default to `outputs`, otherwise, default to a dataset-customized output dir."} ) - + # Output directory for QA results output_dir: str = field( default="./outputs/qa_results", metadata={"help": "Directory to save QA results."} ) - - + + # Dataset running specific attributes ## Dataset running specific attributes -> General dataset: Optional[Literal['hotpotqa', 'hotpotqa_train', 'musique', '2wikimultihopqa']] = field( @@ -276,10 +279,10 @@ class BaseConfig: ) ## Dataset running specific attributes -> Graph graph_type: Literal[ - 'dpr_only', - 'entity', + 'dpr_only', + 'entity', 'passage_entity', 'relation_aware_passage_entity', - 'passage_entity_relation', + 'passage_entity_relation', 'facts_and_sim_passage_node_unidirectional', ] = field( default="facts_and_sim_passage_node_unidirectional", @@ -289,8 +292,8 @@ class BaseConfig: default=None, metadata={"help": "Length of the corpus to use."} ) - - + + def __post_init__(self): if self.save_dir is None: # If save_dir not given if self.dataset is None: self.save_dir = 'outputs' # running freely From 4339f5a55187d08517c7620c077f407d7b716741 Mon Sep 17 00:00:00 2001 From: l4b4r4b4b4 Date: Tue, 19 Aug 2025 21:44:21 +0200 Subject: [PATCH 5/6] embeddings work functionally --- src/comorag/embedding_model/HF_TEI.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/comorag/embedding_model/HF_TEI.py b/src/comorag/embedding_model/HF_TEI.py index ac982f1..2de9364 100644 --- a/src/comorag/embedding_model/HF_TEI.py +++ b/src/comorag/embedding_model/HF_TEI.py @@ -8,7 +8,6 @@ class HFTEIEmbedding: def __init__(self, global_config, base_url: str = "http://embeddings:8080", embedding_model_name = "", timeout: int = 30): logger.debug("Global Config: {global_config}") logger.warning("input name: {embedding_model_name}. Make sure you did load it with HF TEI!") - self.global_config = global_config self.base_url = base_url.rstrip("/") self.timeout = timeout From bc2e2f758d20e63cef8c3deee5631847bb4fc8c7 Mon Sep 17 00:00:00 2001 From: l4b4r4b4b4 Date: Tue, 19 Aug 2025 21:54:25 +0200 Subject: [PATCH 6/6] functionally working --- main_docker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main_docker.py b/main_docker.py index a2f6874..3ed87cf 100644 --- a/main_docker.py +++ b/main_docker.py @@ -65,7 +65,7 @@ def main(): dataset_dirs.sort() dataset_paths = [os.path.join(base_path, d) for d in dataset_dirs] - vllm_base_url = "http://vllm:8000/v1" + vllm_base_url = "http://vllm:80/v1" served_model_name = "como/lm" embedding_model_name = os.environ["EMB_MODEL"] output_dir = os.environ["OUT_DIR"]