From 772c565e30ee712fd6db4c0111f1313c582adcf5 Mon Sep 17 00:00:00 2001 From: JAEIL1999 Date: Thu, 9 Apr 2026 17:17:52 +0900 Subject: [PATCH 1/9] chore: ignore local IDE and Python cache artifacts --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b617770 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.idea/ +__pycache__/ +*.py[cod] From d0adac574329b08a15f8627d221778eea25a0ded Mon Sep 17 00:00:00 2001 From: JAEIL1999 Date: Thu, 9 Apr 2026 17:18:40 +0900 Subject: [PATCH 2/9] refactor: flatten backend layout and isolate stress control --- backend/Dockerfile | 4 +- backend/__init__.py | 1 + backend/app.py | 47 +++++++++++ backend/config.py | 9 ++ backend/{src => }/requirements.txt | 2 +- backend/server.py | 5 ++ backend/src/server.py | 127 ----------------------------- backend/stress.py | 86 +++++++++++++++++++ 8 files changed, 151 insertions(+), 130 deletions(-) create mode 100644 backend/__init__.py create mode 100644 backend/app.py create mode 100644 backend/config.py rename backend/{src => }/requirements.txt (87%) create mode 100644 backend/server.py delete mode 100644 backend/src/server.py create mode 100644 backend/stress.py diff --git a/backend/Dockerfile b/backend/Dockerfile index d8d3969..0615082 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -2,9 +2,9 @@ FROM python:3.10-slim WORKDIR /app -COPY src/requirements.txt /app/ +COPY requirements.txt /app/ RUN pip install --no-cache-dir -r /app/requirements.txt -COPY src/ /app/ +COPY . /app/ CMD ["python", "/app/server.py"] diff --git a/backend/__init__.py b/backend/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/backend/__init__.py @@ -0,0 +1 @@ + diff --git a/backend/app.py b/backend/app.py new file mode 100644 index 0000000..06b96b5 --- /dev/null +++ b/backend/app.py @@ -0,0 +1,47 @@ +from flask import Flask, Response, request +from flask_cors import CORS +from prometheus_client import CONTENT_TYPE_LATEST, Counter, generate_latest +from prometheus_flask_exporter import PrometheusMetrics + +from config import DEFAULT_LOAD_DURATION, TARGET_URL +from stress import StressController + + +def create_app() -> Flask: + app = Flask(__name__) + CORS(app) + + PrometheusMetrics(app, group_by="endpoint") + load_request_counter = Counter( + "load_requests_total", + "Total /load POST requests", + ) + stress_controller = StressController(TARGET_URL) + + @app.route("/load", methods=["GET", "POST", "PUT", "DELETE", "PATCH"]) + def load_handler(): + load_request_counter.inc() + duration = float(request.args.get("duration", str(DEFAULT_LOAD_DURATION))) + stress_controller.enqueue_cpu_load(duration) + return "ok" + + @app.route("/cpu/toggle", methods=["POST"]) + def cpu_toggle(): + return stress_controller.toggle() + + @app.route("/health", methods=["GET", "POST"]) + def health_check(): + return "OK", 200 + + @app.route("/metrics") + def metrics_handler(): + return Response(generate_latest(), mimetype=CONTENT_TYPE_LATEST) + + @app.route("/") + def home(): + return "hello, this is pnu cloud computing term project", 200 + + return app + + +app = create_app() diff --git a/backend/config.py b/backend/config.py new file mode 100644 index 0000000..390e4e8 --- /dev/null +++ b/backend/config.py @@ -0,0 +1,9 @@ +TARGET_URL = "http://load_balancer:8000/load" +DEFAULT_LOAD_DURATION = 0.5 +LOAD_STEP_DURATION = 5 +MAX_RPS = 150 +INITIAL_RPS = 20 +RPS_INCREMENT = 20 +REQUEST_TIMEOUT_SECONDS = 5.0 +THREAD_POOL_WORKERS = 50 +CPU_POOL_WORKERS = 4 diff --git a/backend/src/requirements.txt b/backend/requirements.txt similarity index 87% rename from backend/src/requirements.txt rename to backend/requirements.txt index 1e84286..070f8d0 100644 --- a/backend/src/requirements.txt +++ b/backend/requirements.txt @@ -2,4 +2,4 @@ flask prometheus_client flask-cors prometheus_flask_exporter -requests \ No newline at end of file +requests diff --git a/backend/server.py b/backend/server.py new file mode 100644 index 0000000..1f53e1e --- /dev/null +++ b/backend/server.py @@ -0,0 +1,5 @@ +from app import app + + +if __name__ == "__main__": + app.run(host="0.0.0.0", port=5000, threaded=True) diff --git a/backend/src/server.py b/backend/src/server.py deleted file mode 100644 index ef2da0d..0000000 --- a/backend/src/server.py +++ /dev/null @@ -1,127 +0,0 @@ -from flask import Flask, request, Response -from flask_cors import CORS -from prometheus_flask_exporter import PrometheusMetrics -from prometheus_client import Counter, generate_latest, CONTENT_TYPE_LATEST -from multiprocessing import Process, Manager, cpu_count -import multiprocessing -from http.server import HTTPServer, BaseHTTPRequestHandler -import json -import time -import requests - - -app = Flask(__name__) -health_app = Flask(__name__) -CORS(app) - -metrics = PrometheusMetrics(app, group_by='endpoint') -load_request_counter = Counter("load_requests_total", "Total /load POST requests") - -TARGET_URL = "http://load_balancer:8000/load" - -# 전역 상태 -manager = Manager() -stop_event = manager.Event() -load_process = None - -from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor - -# 전역 프로세스 풀 (부하 연산용) -cpu_executor = ProcessPoolExecutor(max_workers=4) - -# 실제 CPU 부하를 주는 작업 (휴식 제거) -def cpu_stress_worker(duration): - # stop_event 제거 (Pool에서 실행하기 위해 단순화) - end = time.time() + duration - while time.time() < end: - # 타이트한 루프 연산 - _ = [i * i for i in range(1000)] - return "done" - -# 백엔드 서버에서 부하 처리 -@app.route('/load', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH']) -def load_handler(): - load_request_counter.inc() - duration = float(request.args.get("duration", "0.5")) - - # 프로세스 풀에 부하 작업 던지기 (비차단) - # 헬스체크 응답을 방해하지 않기 위해 메인 스레드는 즉시 반환하거나 짧게만 대기 - cpu_executor.submit(cpu_stress_worker, duration) - - return "ok" - -# rps만큼 병렬로 POST /load 호출 -def _send_requests(rps, duration_sec, url, stop_event): - if not url: - return - - interval = 1.0 / rps - end_time = time.time() + duration_sec - - with ThreadPoolExecutor(max_workers=50) as executor: - while time.time() < end_time: - if stop_event.is_set(): - break - # 비동기적으로 요청 발사 - executor.submit(requests.post, url, timeout=5.0) - time.sleep(interval) - -# 증가하는 RPS로 부하 주기 루프 -def send_http_load_loop(stop_event): - url = f"{TARGET_URL}?duration=0.5" - step_duration = 5 - max_rps = 150 - rps = 20 - rps_increment = 20 - - print(f"🚀 부하 생성 루프 시작 (Target: {url})") - while not stop_event.is_set(): - print(f"🔥 현재 부하 강도: {rps} RPS") - _send_requests(rps, step_duration, url, stop_event) - if rps < max_rps: - rps += rps_increment - - -@app.route('/cpu/toggle', methods=['POST']) -def cpu_toggle(): - global load_process, stop_event - - if load_process is None or not load_process.is_alive(): - print("📌 부하 시작") - stop_event = manager.Event() - load_process = Process(target=send_http_load_loop, args=(stop_event,)) - load_process.start() - print(f"✅ 부하 프로세스 시작됨: pid={load_process.pid}") - return "started" - else: - print("🛑 부하 중지 요청 받음") - stop_event.set() - load_process.join(timeout=3) - if load_process.is_alive(): - print("⚠️ 프로세스가 살아있어서 강제 종료합니다.") - load_process.terminate() - load_process.join() - else: - print("✅ 프로세스 정상 종료됨.") - load_process = None - return "stopped" - - -@app.route('/health', methods=['GET','POST']) -def health_check(): - return "OK", 200 - -@app.route('/metrics') -def metrics_handler(): - return Response(generate_latest(), mimetype=CONTENT_TYPE_LATEST) - -# 기본 루트 -@app.route('/') -def home(): - return "hello, this is pnu cloud computing term project", 200 - -if __name__ == '__main__': - # health_process = multiprocessing.Process(target=run_health_process, daemon=True) - # health_process.start() - - app.run(host='0.0.0.0', port=5000, threaded=True) diff --git a/backend/stress.py b/backend/stress.py new file mode 100644 index 0000000..85d3293 --- /dev/null +++ b/backend/stress.py @@ -0,0 +1,86 @@ +import time +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor +from multiprocessing import Manager, Process + +import requests + +from config import ( + CPU_POOL_WORKERS, + DEFAULT_LOAD_DURATION, + INITIAL_RPS, + LOAD_STEP_DURATION, + MAX_RPS, + REQUEST_TIMEOUT_SECONDS, + RPS_INCREMENT, + THREAD_POOL_WORKERS, +) + + +def cpu_stress_worker(duration: float): + end = time.time() + duration + while time.time() < end: + _ = [i * i for i in range(1000)] + return "done" + + +def send_requests(rps: int, duration_sec: int, url: str, stop_event) -> None: + if not url: + return + + interval = 1.0 / rps + end_time = time.time() + duration_sec + + with ThreadPoolExecutor(max_workers=THREAD_POOL_WORKERS) as executor: + while time.time() < end_time: + if stop_event.is_set(): + break + executor.submit(requests.post, url, timeout=REQUEST_TIMEOUT_SECONDS) + time.sleep(interval) + + +def send_http_load_loop(stop_event, target_url: str) -> None: + url = f"{target_url}?duration={DEFAULT_LOAD_DURATION}" + rps = INITIAL_RPS + + print(f"Starting load generation loop (target: {url})") + while not stop_event.is_set(): + print(f"Current load level: {rps} RPS") + send_requests(rps, LOAD_STEP_DURATION, url, stop_event) + if rps < MAX_RPS: + rps += RPS_INCREMENT + + +class StressController: + def __init__(self, target_url: str): + self.target_url = target_url + self.manager = Manager() + self.stop_event = self.manager.Event() + self.load_process = None + self.cpu_executor = ProcessPoolExecutor(max_workers=CPU_POOL_WORKERS) + + def enqueue_cpu_load(self, duration: float) -> None: + self.cpu_executor.submit(cpu_stress_worker, duration) + + def toggle(self) -> str: + if self.load_process is None or not self.load_process.is_alive(): + print("Starting stress process") + self.stop_event = self.manager.Event() + self.load_process = Process( + target=send_http_load_loop, + args=(self.stop_event, self.target_url), + ) + self.load_process.start() + print(f"Stress process started with pid={self.load_process.pid}") + return "started" + + print("Received request to stop stress process") + self.stop_event.set() + self.load_process.join(timeout=3) + if self.load_process.is_alive(): + print("Stress process is still alive. Terminating it forcefully.") + self.load_process.terminate() + self.load_process.join() + else: + print("Stress process stopped cleanly.") + self.load_process = None + return "stopped" From 781289102a03380d31998ec9377c90108fffe4f0 Mon Sep 17 00:00:00 2001 From: JAEIL1999 Date: Thu, 9 Apr 2026 17:18:57 +0900 Subject: [PATCH 3/9] refactor: separate autoscaler policy, docker, and cleanup services --- autoscaler/autoscaler.py | 185 ++++---------------------------- autoscaler/cleanup.py | 39 +++++++ autoscaler/config.py | 30 ++++++ autoscaler/docker_manager.py | 88 +++++++++++++++ autoscaler/metrics.py | 120 --------------------- autoscaler/notifier.py | 24 +++++ autoscaler/prometheus_client.py | 28 +++++ autoscaler/scaler.py | 94 ++++++++++++++++ autoscaler/targets.py | 14 +++ 9 files changed, 339 insertions(+), 283 deletions(-) create mode 100644 autoscaler/cleanup.py create mode 100644 autoscaler/config.py create mode 100644 autoscaler/docker_manager.py delete mode 100644 autoscaler/metrics.py create mode 100644 autoscaler/notifier.py create mode 100644 autoscaler/prometheus_client.py create mode 100644 autoscaler/scaler.py create mode 100644 autoscaler/targets.py diff --git a/autoscaler/autoscaler.py b/autoscaler/autoscaler.py index 7d01c13..d5bae08 100644 --- a/autoscaler/autoscaler.py +++ b/autoscaler/autoscaler.py @@ -1,175 +1,34 @@ -import time import logging -import os -import multiprocessing -import signal, sys, docker, json -import requests -from metrics import PrometheusClient, DockerManager, clear_prometheus_targets +from cleanup import register_signal_handlers +from config import load_settings +from scaler import AutoScaler +from targets import clear_prometheus_targets -class AutoScaler: - def __init__( - self, - prom_url: str, - docker_image: str, - label: str = 'autoscale_service', - cpu_threshold: float = 0.7, - min_instances: int = 1, - max_instances: int = 10, - check_interval: int = 10, - load_balancer_url: str = "http://host.docker.internal:8000" - ): - self.prom = PrometheusClient(prom_url) - self.dock = DockerManager() - self.image = docker_image - self.label = label - self.threshold = cpu_threshold - self.min = min_instances - self.max = max_instances - self.interval = check_interval - self.above_since = None - self.below_since = None - - self.load_balancer_url = load_balancer_url - - def notify_load_balancer(self): - """로드밸런서에 서버 목록 갱신 요청""" - try: - refresh_url = f"{self.load_balancer_url}/refresh-servers" - - response = requests.post(refresh_url, timeout=3) - - if response.status_code == 200: - logging.info("✅ Load balancer server refresh triggered") - else: - logging.warning(f"⚠️ Load balancer refresh failed: {response.status_code}") - - except requests.exceptions.ConnectionError: - logging.warning("🔌 Could not connect to load balancer") - except requests.exceptions.Timeout: - logging.warning("⏰ Load balancer request timed out") - except Exception as e: - logging.error(f"❗ Error notifying load balancer: {e}") - - def scale(self) -> None: - containers = self.dock.list_containers(self.label) - autoscaled_containers = [c for c in containers if not self.dock._is_fixed(c)] - count = len(containers) - - # under min instances - if count < self.min: - logging.info(f"Instances below minimum ({count} < {self.min}). Scaling up.") - self.dock.run_container(self.image, self.label) - #서버 갱신 요청 - self.notify_load_balancer() - - self.above_since = None - self.below_since = None - return - - num_cpus = multiprocessing.cpu_count() - usages = [self.dock.get_container_cpu(c) for c in containers] - raw_avg = sum(usages) / count if usages else 0.0 - avg_cpu = raw_avg / num_cpus - - logging.info( - f"Avg CPU: {avg_cpu:.2f}% (per core) " - f"across {count} containers" - ) - - now = time.time() - - if avg_cpu > (self.threshold*100): - if self.above_since is None: - self.above_since = now - logging.debug("CPU above threshold, starting timer for scale-out.") - elif now - self.above_since >= 30 and count < self.max: - logging.info("CPU above threshold for ≥ 2 minutes. Scaling up by 1.") - self.dock.run_container(self.image, self.label) - - # 서버 갱신 요청 - self.notify_load_balancer() - - self.above_since = None - self.below_since = None - else: - self.above_since = None - - if avg_cpu < (self.threshold * 50): - if self.below_since is None: - self.below_since = now - logging.debug("CPU below half-threshold, starting timer for scale-in.") - elif now - self.below_since >= 15 and len(autoscaled_containers) > 0: - target = autoscaled_containers[-1] - logging.info(f"CPU below half-threshold for ≥ 1 minute. Scaling down container: {target.name}") - self.dock.remove_container(target) - - # 서버 갱신 요청 - self.notify_load_balancer() - - self.above_since = None - self.below_since = None - elif now - self.below_since >= 30: - logging.info("CPU below half-threshold, but no removable container found (all fixed).") - else: - self.below_since = None - - def run(self) -> None: - logging.info("Starting AutoScaler loop.") - while True: - try: - self.scale() - except Exception as e: - logging.error(f"Error during scaling: {e}") - time.sleep(self.interval) - - -if __name__ == '__main__': +def main(): logging.basicConfig( level=logging.INFO, - format='%(asctime)s [%(levelname)s] %(message)s', - datefmt='%Y-%m-%d %H:%M:%S' + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", ) - clear_prometheus_targets() - prom_url = os.getenv('PROM_URL', 'http://localhost:9090') - docker_img = os.getenv('DOCKER_IMAGE', '') - min_i = int(os.getenv('MIN_INSTANCES', 1)) - max_i = int(os.getenv('MAX_INSTANCES', 10)) - cpu_th = float(os.getenv('CPU_THRESHOLD', 0.7)) - interval = int(os.getenv('CHECK_INTERVAL', 30)) - - def graceful_shutdown(signum, frame): - print("📦 Shutting down autoscaler...") - - # 1. Docker에서 autoscale_service-* 컨테이너 삭제 - client = docker.from_env() - for container in client.containers.list(all=True): - if container.name.startswith("autoscale_service-"): - print(f"🗑 Removing container {container.name}") - try: - container.remove(force=True) - except Exception as e: - print(f"❌ Failed to remove {container.name}: {e}") - # 2. flask.json 초기화 - try: - flask_json_path = "/app/prometheus/targets/flask.json" - if os.path.exists(flask_json_path): - with open(flask_json_path, "w") as f: - json.dump([], f) - print("🧹 flask.json cleared") - except Exception as e: - print(f"❌ Failed to clear flask.json: {e}") - - sys.exit(0) + register_signal_handlers() + clear_prometheus_targets() + settings = load_settings() scaler = AutoScaler( - prom_url, - docker_img, - min_instances=min_i, - max_instances=max_i, - cpu_threshold=cpu_th, - check_interval=interval + prom_url=settings.prom_url, + docker_image=settings.docker_image, + label=settings.label, + min_instances=settings.min_instances, + max_instances=settings.max_instances, + cpu_threshold=settings.cpu_threshold, + check_interval=settings.check_interval, + load_balancer_url=settings.load_balancer_url, ) scaler.run() + + +if __name__ == "__main__": + main() diff --git a/autoscaler/cleanup.py b/autoscaler/cleanup.py new file mode 100644 index 0000000..0ff24bd --- /dev/null +++ b/autoscaler/cleanup.py @@ -0,0 +1,39 @@ +import json +import os +import signal +import sys + +import docker + + +def cleanup_autoscaled_containers(): + client = docker.from_env() + for container in client.containers.list(all=True): + if container.name.startswith("autoscale_service-"): + print(f"Removing container {container.name}") + try: + container.remove(force=True) + except Exception as error: + print(f"Failed to remove {container.name}: {error}") + + +def clear_local_target_file(): + flask_json_path = "/app/prometheus/targets/flask.json" + if os.path.exists(flask_json_path): + with open(flask_json_path, "w") as file: + json.dump([], file) + print("flask.json cleared") + + +def register_signal_handlers(): + def graceful_shutdown(signum, frame): + print("Shutting down autoscaler...") + cleanup_autoscaled_containers() + try: + clear_local_target_file() + except Exception as error: + print(f"Failed to clear flask.json: {error}") + sys.exit(0) + + signal.signal(signal.SIGINT, graceful_shutdown) + signal.signal(signal.SIGTERM, graceful_shutdown) diff --git a/autoscaler/config.py b/autoscaler/config.py new file mode 100644 index 0000000..dfde555 --- /dev/null +++ b/autoscaler/config.py @@ -0,0 +1,30 @@ +import os +from dataclasses import dataclass + + +@dataclass +class AutoScalerSettings: + prom_url: str + docker_image: str + label: str + cpu_threshold: float + min_instances: int + max_instances: int + check_interval: int + load_balancer_url: str + + +def load_settings() -> AutoScalerSettings: + return AutoScalerSettings( + prom_url=os.getenv("PROM_URL", "http://localhost:9090"), + docker_image=os.getenv("DOCKER_IMAGE", ""), + label=os.getenv("AUTOSCALE_LABEL", "autoscale_service"), + cpu_threshold=float(os.getenv("CPU_THRESHOLD", 0.7)), + min_instances=int(os.getenv("MIN_INSTANCES", 1)), + max_instances=int(os.getenv("MAX_INSTANCES", 10)), + check_interval=int(os.getenv("CHECK_INTERVAL", 30)), + load_balancer_url=os.getenv( + "LOAD_BALANCER_URL", + "http://host.docker.internal:8000", + ), + ) diff --git a/autoscaler/docker_manager.py b/autoscaler/docker_manager.py new file mode 100644 index 0000000..729536d --- /dev/null +++ b/autoscaler/docker_manager.py @@ -0,0 +1,88 @@ +import logging +import os +import time +import uuid + +import docker + +from targets import write_prometheus_targets + + +class DockerManager: + def __init__(self): + self.client = docker.from_env() + + def list_containers(self, label: str): + containers = self.client.containers.list(filters={"label": label}) + for container in containers: + logging.debug( + "Container %s (%s) - fixed=%s", + container.name, + container.short_id, + container.labels.get("fixed"), + ) + return containers + + def run_container(self, image: str, label: str): + project = os.getenv("COMPOSE_PROJECT_NAME", "pnu_cloud_computing") + compose_labels = { + "com.docker.compose.project": project, + "com.docker.compose.service": label, + "com.docker.compose.oneoff": "False", + "autoscale_service": label, + } + labels = {"autoscale_service": label} + existing = self.list_containers(label) + if not any(self.is_fixed(container) for container in existing): + labels["fixed"] = "true" + + container_name = f"{label}-{uuid.uuid4().hex[:5]}" + container = self.client.containers.run( + image, + name=container_name, + labels={**compose_labels, **labels}, + detach=True, + ports={"5000/tcp": None}, + network="pnu_cloud_computing_mynet", + ) + self.update_prometheus_targets(label) + return container + + def remove_container(self, container): + if self.is_fixed(container): + logging.info("Skipping fixed container: %s (%s)", container.name, container.short_id) + return + + logging.info("Removing container: %s (%s)", container.name, container.short_id) + container.stop() + container.remove() + self.update_prometheus_targets(container.labels.get("autoscale_service")) + + def get_container_cpu(self, container): + stats_stream = container.stats(stream=True, decode=True) + first = next(stats_stream) + time.sleep(1) + second = next(stats_stream) + del stats_stream + + cpu_delta = ( + second["cpu_stats"]["cpu_usage"]["total_usage"] + - first["cpu_stats"]["cpu_usage"]["total_usage"] + ) + system_delta = ( + second["cpu_stats"]["system_cpu_usage"] + - first["cpu_stats"]["system_cpu_usage"] + ) + + if system_delta > 0.0 and cpu_delta > 0.0: + num_cpus = len(second["cpu_stats"]["cpu_usage"].get("percpu_usage", [])) or 1 + return (cpu_delta / system_delta) * num_cpus * 100.0 + return 0.0 + + def update_prometheus_targets(self, label: str): + containers = self.list_containers(label) + targets = [f"{container.name}:5000" for container in containers if not self.is_fixed(container)] + write_prometheus_targets(targets) + + def is_fixed(self, container) -> bool: + return str(container.labels.get("fixed", "")).lower() == "true" diff --git a/autoscaler/metrics.py b/autoscaler/metrics.py deleted file mode 100644 index 75ec79d..0000000 --- a/autoscaler/metrics.py +++ /dev/null @@ -1,120 +0,0 @@ -import requests -import docker -import json -import time -import logging -import os -import uuid - -FLASK_TARGET_PATH = '/etc/prometheus/targets/flask.json' - -class PrometheusClient: - def __init__(self, base_url: str): - self.base_url = base_url.rstrip('/') - - def get_metric(self, query: str) -> float: - url = f"{self.base_url}/api/v1/query" - resp = requests.get(url, params={'query': query}) - resp.raise_for_status() - data = resp.json() - if data.get('status') != 'success' or not data['data']['result']: - return 0.0 - return float(data['data']['result'][0]['value'][1]) - - def get_avg_cpu_usage(self, label: str) -> float: - query = ( - 'sum(rate(container_cpu_usage_seconds_total{job="cadvisor"}[1m]) * 0.01)' - ) - return self.get_metric(query) - - def get_container_count(self, label: str) -> int: - query = ( - 'count(' - 'container_memory_usage_bytes' - '{job="cadvisor",container_label_autoscale_service="' + label + '"}' - ')' - ) - return int(self.get_metric(query)) - -class DockerManager: - def __init__(self): - self.client = docker.from_env() - - def list_containers(self, label: str): - containers = self.client.containers.list(filters={'label': label}) - for c in containers: - logging.debug(f"Container {c.name} ({c.short_id}) - fixed={c.labels.get('fixed')}") - return containers - - def run_container(self, image: str, label: str): - project = os.getenv('COMPOSE_PROJECT_NAME', 'pnu_cloud_computing') - service = label - compose_labels = { - 'com.docker.compose.project': project, - 'com.docker.compose.service': service, - 'com.docker.compose.oneoff': 'False', - 'autoscale_service': label, - } - labels = {'autoscale_service': label} - existing = self.list_containers(label) - if not any(self._is_fixed(c) for c in existing): - labels['fixed'] = 'true' - - labels = {**compose_labels, **labels} - container_name = f"{label}-{uuid.uuid4().hex[:5]}" - - container = self.client.containers.run( - image, - name=container_name, - labels=labels, - detach=True, - ports={'5000/tcp': None}, - network='pnu_cloud_computing_mynet' - ) - self.update_prometheus_targets(label) - return container - - def remove_container(self, container): - if self._is_fixed(container): - logging.info(f"Skipping fixed container: {container.name} ({container.short_id})") - return - logging.info(f"Removing container: {container.name} ({container.short_id})") - container.stop() - container.remove() - self.update_prometheus_targets(container.labels.get('autoscale_service')) - - def get_container_cpu(self, container): - stats_stream = container.stats(stream=True, decode=True) - first = next(stats_stream) - time.sleep(1) - second = next(stats_stream) - del stats_stream - - cpu_delta = second['cpu_stats']['cpu_usage']['total_usage'] - first['cpu_stats']['cpu_usage']['total_usage'] - system_delta = second['cpu_stats']['system_cpu_usage'] - first['cpu_stats']['system_cpu_usage'] - - if system_delta > 0.0 and cpu_delta > 0.0: - num_cpus = len(second['cpu_stats']['cpu_usage'].get('percpu_usage', [])) or 1 - return (cpu_delta / system_delta) * num_cpus * 100.0 - return 0.0 - - def update_prometheus_targets(self, label: str): - containers = self.list_containers(label) - targets = [] - - for c in containers: - if not self._is_fixed(c): - targets.append(f"{c.name}:5000") - - os.makedirs(os.path.dirname(FLASK_TARGET_PATH), exist_ok=True) - with open(FLASK_TARGET_PATH, 'w') as f: - json.dump([{"targets": targets, "labels": {"job": "flask-autoscaled"}}], f) - - - def _is_fixed(self, container) -> bool: - return str(container.labels.get('fixed', '')).lower() == 'true' - -def clear_prometheus_targets(): - os.makedirs(os.path.dirname(FLASK_TARGET_PATH), exist_ok=True) - with open(FLASK_TARGET_PATH, 'w') as f: - json.dump([{"targets": [], "labels": {"job": "flask-autoscaled"}}], f) diff --git a/autoscaler/notifier.py b/autoscaler/notifier.py new file mode 100644 index 0000000..f61bf4b --- /dev/null +++ b/autoscaler/notifier.py @@ -0,0 +1,24 @@ +import logging + +import requests + + +class LoadBalancerNotifier: + def __init__(self, load_balancer_url: str): + self.load_balancer_url = load_balancer_url + + def notify_refresh(self) -> None: + try: + refresh_url = f"{self.load_balancer_url}/refresh-servers" + response = requests.post(refresh_url, timeout=3) + + if response.status_code == 200: + logging.info("Load balancer server refresh triggered") + else: + logging.warning("Load balancer refresh failed: %s", response.status_code) + except requests.exceptions.ConnectionError: + logging.warning("Could not connect to load balancer") + except requests.exceptions.Timeout: + logging.warning("Load balancer request timed out") + except Exception as error: + logging.error("Error notifying load balancer: %s", error) diff --git a/autoscaler/prometheus_client.py b/autoscaler/prometheus_client.py new file mode 100644 index 0000000..0c0f5e1 --- /dev/null +++ b/autoscaler/prometheus_client.py @@ -0,0 +1,28 @@ +import requests + + +class PrometheusClient: + def __init__(self, base_url: str): + self.base_url = base_url.rstrip("/") + + def get_metric(self, query: str) -> float: + url = f"{self.base_url}/api/v1/query" + response = requests.get(url, params={"query": query}) + response.raise_for_status() + data = response.json() + if data.get("status") != "success" or not data["data"]["result"]: + return 0.0 + return float(data["data"]["result"][0]["value"][1]) + + def get_avg_cpu_usage(self, label: str) -> float: + query = 'sum(rate(container_cpu_usage_seconds_total{job="cadvisor"}[1m]) * 0.01)' + return self.get_metric(query) + + def get_container_count(self, label: str) -> int: + query = ( + 'count(' + 'container_memory_usage_bytes' + '{job="cadvisor",container_label_autoscale_service="' + label + '"}' + ')' + ) + return int(self.get_metric(query)) diff --git a/autoscaler/scaler.py b/autoscaler/scaler.py new file mode 100644 index 0000000..f6842ec --- /dev/null +++ b/autoscaler/scaler.py @@ -0,0 +1,94 @@ +import logging +import multiprocessing +import time + +from docker_manager import DockerManager +from notifier import LoadBalancerNotifier +from prometheus_client import PrometheusClient + + +class AutoScaler: + def __init__( + self, + prom_url: str, + docker_image: str, + label: str = "autoscale_service", + cpu_threshold: float = 0.7, + min_instances: int = 1, + max_instances: int = 10, + check_interval: int = 10, + load_balancer_url: str = "http://host.docker.internal:8000", + ): + self.prom = PrometheusClient(prom_url) + self.dock = DockerManager() + self.notifier = LoadBalancerNotifier(load_balancer_url) + self.image = docker_image + self.label = label + self.threshold = cpu_threshold + self.min = min_instances + self.max = max_instances + self.interval = check_interval + self.above_since = None + self.below_since = None + + def notify_load_balancer(self) -> None: + self.notifier.notify_refresh() + + def scale(self) -> None: + containers = self.dock.list_containers(self.label) + autoscaled_containers = [container for container in containers if not self.dock.is_fixed(container)] + count = len(containers) + + if count < self.min: + logging.info("Instances below minimum (%s < %s). Scaling up.", count, self.min) + self.dock.run_container(self.image, self.label) + self.notify_load_balancer() + self.above_since = None + self.below_since = None + return + + num_cpus = multiprocessing.cpu_count() + usages = [self.dock.get_container_cpu(container) for container in containers] + raw_avg = sum(usages) / count if usages else 0.0 + avg_cpu = raw_avg / num_cpus + + logging.info("Avg CPU: %.2f%% (per core) across %s containers", avg_cpu, count) + + now = time.time() + if avg_cpu > (self.threshold * 100): + if self.above_since is None: + self.above_since = now + logging.debug("CPU above threshold, starting timer for scale-out.") + elif now - self.above_since >= 30 and count < self.max: + logging.info("CPU above threshold long enough. Scaling up by 1.") + self.dock.run_container(self.image, self.label) + self.notify_load_balancer() + self.above_since = None + self.below_since = None + else: + self.above_since = None + + if avg_cpu < (self.threshold * 50): + if self.below_since is None: + self.below_since = now + logging.debug("CPU below half-threshold, starting timer for scale-in.") + elif now - self.below_since >= 15 and autoscaled_containers: + target = autoscaled_containers[-1] + logging.info("CPU below half-threshold long enough. Scaling down container: %s", target.name) + self.dock.remove_container(target) + self.notify_load_balancer() + self.above_since = None + self.below_since = None + elif now - self.below_since >= 30: + logging.info("CPU below half-threshold, but no removable container found (all fixed).") + else: + self.below_since = None + + def run(self) -> None: + logging.info("Starting AutoScaler loop.") + while True: + try: + self.scale() + except Exception as error: + logging.error("Error during scaling: %s", error) + time.sleep(self.interval) diff --git a/autoscaler/targets.py b/autoscaler/targets.py new file mode 100644 index 0000000..046445e --- /dev/null +++ b/autoscaler/targets.py @@ -0,0 +1,14 @@ +import json +import os + +FLASK_TARGET_PATH = "/etc/prometheus/targets/flask.json" + + +def write_prometheus_targets(targets): + os.makedirs(os.path.dirname(FLASK_TARGET_PATH), exist_ok=True) + with open(FLASK_TARGET_PATH, "w") as file: + json.dump([{"targets": targets, "labels": {"job": "flask-autoscaled"}}], file) + + +def clear_prometheus_targets(): + write_prometheus_targets([]) From 27eb5f87273a47afe9d0bf1ecdad94700be3ecb3 Mon Sep 17 00:00:00 2001 From: JAEIL1999 Date: Thu, 9 Apr 2026 17:19:16 +0900 Subject: [PATCH 4/9] refactor: split load balancer app, proxy, and health workflows --- load_balancer/__init__.py | 1 + load_balancer/app.py | 149 ++++++++++++++++++++++++++ load_balancer/config.py | 7 ++ load_balancer/discovery.py | 47 ++++++++ load_balancer/health_check.py | 114 +++++++------------- load_balancer/init.py | 0 load_balancer/metrics.py | 34 ++++++ load_balancer/proxy.py | 50 +++++++++ load_balancer/server.py | 196 +--------------------------------- 9 files changed, 327 insertions(+), 271 deletions(-) create mode 100644 load_balancer/__init__.py create mode 100644 load_balancer/app.py create mode 100644 load_balancer/config.py create mode 100644 load_balancer/discovery.py delete mode 100644 load_balancer/init.py create mode 100644 load_balancer/metrics.py create mode 100644 load_balancer/proxy.py diff --git a/load_balancer/__init__.py b/load_balancer/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/load_balancer/__init__.py @@ -0,0 +1 @@ + diff --git a/load_balancer/app.py b/load_balancer/app.py new file mode 100644 index 0000000..de30156 --- /dev/null +++ b/load_balancer/app.py @@ -0,0 +1,149 @@ +import time +from contextlib import asynccontextmanager + +import httpx +from fastapi import BackgroundTasks, FastAPI, HTTPException, Request + +import balancer +from config import ( + APP_TITLE, + DEFAULT_HEALTH_CHECK_INTERVAL, + MAX_CONNECTIONS, + MAX_KEEPALIVE_CONNECTIONS, +) +from discovery import discover_containers +from health_check import logger, start_health_check, trigger_server_refresh +from metrics import build_metrics_response +from proxy import proxy_cpu_toggle_request, proxy_load_request + + +@asynccontextmanager +async def lifespan(app: FastAPI): + logger.info("Starting load balancer (FastAPI)...") + app.state.http_client = httpx.AsyncClient( + timeout=10.0, + limits=httpx.Limits( + max_connections=MAX_CONNECTIONS, + max_keepalive_connections=MAX_KEEPALIVE_CONNECTIONS, + ), + ) + health_task = start_health_check(interval=DEFAULT_HEALTH_CHECK_INTERVAL) + yield + await app.state.http_client.aclose() + health_task.cancel() + logger.info("Load balancer shutting down...") + + +def create_app() -> FastAPI: + app = FastAPI(lifespan=lifespan, title=APP_TITLE) + + @app.get("/") + async def home(): + all_servers = await balancer.get_all_servers() + return { + "message": APP_TITLE, + "total_backends": len(all_servers), + "healthy_backends": len([server for server in all_servers if server["status"] == "healthy"]), + "mode": balancer.selection_mode, + } + + @app.api_route("/load", methods=["GET", "POST", "PUT", "DELETE", "PATCH"]) + async def route_load(request: Request): + start_time = time.time() + server = await balancer.choose_backend() + if not server: + logger.error("No healthy backend servers available") + raise HTTPException(status_code=503, detail="No healthy servers") + + client = request.app.state.http_client + for attempt in range(2): + try: + response = await proxy_load_request(client, request, server) + duration = round(time.time() - start_time, 3) + logger.info( + "Forwarded to %s - %s in %ss", + server["container_name"], + response.status_code, + duration, + ) + return response + except (httpx.RequestError, httpx.TimeoutException) as error: + logger.warning( + "Backend %s failed (attempt %s): %s", + server["container_name"], + attempt + 1, + error, + ) + server = await balancer.choose_backend() + if not server: + break + + raise HTTPException(status_code=502, detail="All backend servers unavailable") + + @app.post("/refresh-servers") + async def refresh_servers(background_tasks: BackgroundTasks): + logger.info("Priority refresh request received from autoscaler") + trigger_server_refresh() + + async def fast_refresh(): + servers = await discover_containers() + if servers: + for server in servers: + server["status"] = "healthy" + await balancer.update_backend_servers(servers) + logger.info("Fast refresh completed: %s servers found", len(servers)) + + background_tasks.add_task(fast_refresh) + return {"status": "success", "message": "Priority refresh triggered"} + + @app.get("/status") + async def get_status(): + all_servers = await balancer.get_all_servers() + return { + "load_balancer": { + "status": "healthy", + "mode": balancer.selection_mode, + "timestamp": time.time(), + }, + "backend_servers": { + "total": len(all_servers), + "healthy": len([server for server in all_servers if server["status"] == "healthy"]), + "servers": all_servers, + }, + } + + @app.get("/set_mode/{mode}") + async def set_mode(mode: str): + valid_modes = ["round_robin", "latency"] + if mode in valid_modes: + balancer.selection_mode = mode + return {"message": f"Mode set to {mode}"} + raise HTTPException(status_code=400, detail="Invalid mode") + + @app.get("/health") + async def health(): + return "OK" + + @app.post("/cpu/toggle") + async def cpu_toggle_proxy(request: Request): + server = await balancer.choose_backend() + if not server: + raise HTTPException(status_code=503, detail="No healthy servers") + + logger.info("Forwarding /cpu/toggle to %s", server["container_name"]) + return await proxy_cpu_toggle_request(request.app.state.http_client, server) + + @app.get("/metrics") + async def metrics(request: Request): + all_servers = await balancer.get_all_servers() + healthy_servers = [server for server in all_servers if server["status"] == "healthy"] + return await build_metrics_response( + request.app.state.http_client, + healthy_servers, + all_servers, + ) + + return app + + +app = create_app() diff --git a/load_balancer/config.py b/load_balancer/config.py new file mode 100644 index 0000000..f325aeb --- /dev/null +++ b/load_balancer/config.py @@ -0,0 +1,7 @@ +APP_TITLE = "PNU Cloud Load Balancer (Async)" +DEFAULT_HEALTH_CHECK_INTERVAL = 10 +PROXY_TIMEOUT_SECONDS = 10.0 +CPU_TOGGLE_TIMEOUT_SECONDS = 5.0 +METRICS_TIMEOUT_SECONDS = 2.0 +MAX_CONNECTIONS = 1000 +MAX_KEEPALIVE_CONNECTIONS = 100 diff --git a/load_balancer/discovery.py b/load_balancer/discovery.py new file mode 100644 index 0000000..af26888 --- /dev/null +++ b/load_balancer/discovery.py @@ -0,0 +1,47 @@ +import asyncio +import time +from typing import Dict, List + +import docker + + +async def discover_containers(network_name="pnu_cloud_computing_mynet") -> List[Dict]: + loop = asyncio.get_event_loop() + client = await loop.run_in_executor(None, docker.from_env) + + containers = await loop.run_in_executor( + None, + lambda: client.containers.list( + filters={"status": "running", "label": "autoscale_service=backend"} + ), + ) + if not containers: + containers = await loop.run_in_executor( + None, + lambda: client.containers.list( + filters={"status": "running", "ancestor": "backend"} + ), + ) + + servers = [] + for container in containers: + try: + network_settings = container.attrs["NetworkSettings"]["Networks"] + if network_name in network_settings: + ip = network_settings[network_name]["IPAddress"] + if ip: + servers.append( + { + "container_id": container.id[:12], + "container_name": container.name, + "ip": ip, + "host": f"http://{ip}:5000", + "status": "unknown", + "latency": float("inf"), + "_start_time": time.time(), + } + ) + except Exception: + continue + + return servers diff --git a/load_balancer/health_check.py b/load_balancer/health_check.py index cd53f2e..1815f73 100644 --- a/load_balancer/health_check.py +++ b/load_balancer/health_check.py @@ -1,108 +1,66 @@ -import docker import asyncio -import httpx import logging import time -from typing import List, Dict + +import httpx + from balancer import update_backend_servers +from config import DEFAULT_HEALTH_CHECK_INTERVAL +from discovery import discover_containers -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) -# 전역 상태 관리 immediate_check_event = asyncio.Event() -fail_counters = {} # 서버별 연속 실패 횟수 저장 +fail_counters = {} + -async def check_single_server(client: httpx.AsyncClient, server: Dict) -> Dict: - """개별 서버의 상태를 비동기로 체크 (Soft Failure 로직 도입)""" +async def check_single_server(client: httpx.AsyncClient, server): target_url = f"{server['host']}/health" - server_id = server['host'] - - # 실패 카운터 초기화 - if server_id not in fail_counters: - fail_counters[server_id] = 0 + server_id = server["host"] + fail_counters.setdefault(server_id, 0) try: - # 부하 상황을 고려하여 타임아웃을 5초로 연장 response = await client.get(target_url, timeout=5.0) - if response.status_code == 200: - server['status'] = 'healthy' - server['latency'] = round(time.time() - server.get('_start_time', time.time()), 3) - fail_counters[server_id] = 0 # 성공 시 카운터 리셋 + server["status"] = "healthy" + server["latency"] = round(time.time() - server.get("_start_time", time.time()), 3) + fail_counters[server_id] = 0 else: raise Exception(f"Status {response.status_code}") - - except Exception as e: + except Exception: fail_counters[server_id] += 1 - # 3회 연속 실패 전까지는 'healthy' 상태 유지 (단, 레이턴시는 무한대) if fail_counters[server_id] < 3: - server['status'] = 'healthy' - logger.warning(f"⚠️ {server['container_name']} slow/failed ({fail_counters[server_id]}/3). Keeping healthy.") + server["status"] = "healthy" + logger.warning( + "%s slow/failed (%s/3). Keeping healthy.", + server["container_name"], + fail_counters[server_id], + ) else: - server['status'] = 'unhealthy' - logger.error(f"❌ {server['container_name']} marked UNHEALTHY after 3 failures.") - - server['latency'] = float('inf') - + server["status"] = "unhealthy" + logger.error("%s marked UNHEALTHY after 3 failures.", server["container_name"]) + server["latency"] = float("inf") + return server -async def discover_containers(network_name='pnu_cloud_computing_mynet') -> List[Dict]: - """Docker API를 사용하여 백엔드 컨테이너 탐색""" - loop = asyncio.get_event_loop() - client = await loop.run_in_executor(None, docker.from_env) - - servers = [] - containers = await loop.run_in_executor(None, - lambda: client.containers.list(filters={"status": "running", "label": "autoscale_service=backend"}) - ) - - if not containers: - containers = await loop.run_in_executor(None, - lambda: client.containers.list(filters={"status": "running", "ancestor": "backend"}) - ) - - for container in containers: - try: - network_settings = container.attrs['NetworkSettings']['Networks'] - if network_name in network_settings: - ip = network_settings[network_name]['IPAddress'] - if ip: - servers.append({ - 'container_id': container.id[:12], - 'container_name': container.name, - 'ip': ip, - 'host': f'http://{ip}:5000', - 'status': 'unknown', - 'latency': float('inf'), - '_start_time': time.time() - }) - except Exception: - continue - - return servers - -async def health_check_loop(interval: int = 10): - """주기적인 비동기 헬스 체크 루프""" - logger.info(f"🚀 Robust Async Health Check Loop started (interval: {interval}s)") - + +async def health_check_loop(interval: int = DEFAULT_HEALTH_CHECK_INTERVAL): + logger.info("Robust async health check loop started (interval: %ss)", interval) + async with httpx.AsyncClient() as client: while True: try: servers = await discover_containers() - if not servers: await update_backend_servers([]) else: - tasks = [check_single_server(client, server) for server in servers] - checked_servers = await asyncio.gather(*tasks) + checked_servers = await asyncio.gather( + *(check_single_server(client, server) for server in servers) + ) await update_backend_servers(checked_servers) - - healthy_count = len([s for s in checked_servers if s['status'] == 'healthy']) - # logger.info(f"📊 Health Summary: {healthy_count}/{len(checked_servers)} healthy") - - except Exception as e: - logger.error(f"Error in health check loop: {e}") + except Exception as error: + logger.error("Error in health check loop: %s", error) try: await asyncio.wait_for(immediate_check_event.wait(), timeout=interval) @@ -110,6 +68,7 @@ async def health_check_loop(interval: int = 10): except asyncio.TimeoutError: pass + def trigger_server_refresh(): try: loop = asyncio.get_running_loop() @@ -117,5 +76,6 @@ def trigger_server_refresh(): except RuntimeError: pass -def start_health_check(interval: int = 10): + +def start_health_check(interval: int = DEFAULT_HEALTH_CHECK_INTERVAL): return asyncio.create_task(health_check_loop(interval)) diff --git a/load_balancer/init.py b/load_balancer/init.py deleted file mode 100644 index e69de29..0000000 diff --git a/load_balancer/metrics.py b/load_balancer/metrics.py new file mode 100644 index 0000000..2f0278a --- /dev/null +++ b/load_balancer/metrics.py @@ -0,0 +1,34 @@ +import asyncio +import time + +from fastapi.responses import PlainTextResponse + +from config import METRICS_TIMEOUT_SECONDS + + +async def build_metrics_response(client, healthy_servers, all_servers): + lb_metrics = [ + f"backend_servers_total {len(all_servers)}", + f"backend_servers_healthy {len(healthy_servers)}", + f"load_balancer_uptime {time.time()}", + ] + + async def fetch_backend_metrics(server): + try: + response = await client.get( + f"{server['host']}/metrics", + timeout=METRICS_TIMEOUT_SECONDS, + ) + if response.status_code == 200: + return f"\n# Backend: {server['container_name']}\n{response.text}" + except Exception: + return "" + return "" + + backend_outputs = [] + if healthy_servers: + results = await asyncio.gather(*(fetch_backend_metrics(server) for server in healthy_servers)) + backend_outputs = [result for result in results if result] + + full_metrics = "\n".join(lb_metrics) + "\n" + "\n".join(backend_outputs) + return PlainTextResponse(full_metrics) diff --git a/load_balancer/proxy.py b/load_balancer/proxy.py new file mode 100644 index 0000000..cc1f52e --- /dev/null +++ b/load_balancer/proxy.py @@ -0,0 +1,50 @@ +from typing import Dict + +import httpx +from fastapi import HTTPException, Request, Response + +from config import CPU_TOGGLE_TIMEOUT_SECONDS, PROXY_TIMEOUT_SECONDS + +EXCLUDED_HEADERS = { + "host", + "content-length", + "connection", + "upgrade", + "proxy-authenticate", + "proxy-authorization", + "te", + "trailers", +} + + +def filter_headers(headers): + return {key: value for key, value in headers.items() if key.lower() not in EXCLUDED_HEADERS} + + +async def proxy_load_request(client: httpx.AsyncClient, request: Request, server: Dict) -> Response: + body = await request.body() + target_url = f"{server['host']}/load" + response = await client.request( + method=request.method, + url=target_url, + content=body, + headers=filter_headers(request.headers), + params=request.query_params, + timeout=PROXY_TIMEOUT_SECONDS, + ) + return Response( + content=response.content, + status_code=response.status_code, + headers=filter_headers(response.headers), + ) + + +async def proxy_cpu_toggle_request(client: httpx.AsyncClient, server: Dict) -> Response: + try: + response = await client.post( + f"{server['host']}/cpu/toggle", + timeout=CPU_TOGGLE_TIMEOUT_SECONDS, + ) + return Response(content=response.content, status_code=response.status_code) + except Exception as error: + raise HTTPException(status_code=500, detail=f"Backend error: {error}") from error diff --git a/load_balancer/server.py b/load_balancer/server.py index 7de45f3..ee442e2 100644 --- a/load_balancer/server.py +++ b/load_balancer/server.py @@ -1,199 +1,7 @@ -from fastapi import FastAPI, Request, Response, HTTPException, BackgroundTasks -from fastapi.responses import JSONResponse, PlainTextResponse -from contextlib import asynccontextmanager -import httpx -import time -import logging -import asyncio -import balancer -from health_check import start_health_check, trigger_server_refresh, discover_containers +from app import app -# 로깅 설정 -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') -logger = logging.getLogger(__name__) - -# 비동기 HTTP 클라이언트 (타임아웃을 10초로 연장) -client = httpx.AsyncClient(timeout=10.0, limits=httpx.Limits(max_connections=1000, max_keepalive_connections=100)) - -@asynccontextmanager -async def lifespan(app: FastAPI): - """서버 시작/종료 시 실행될 로직""" - logger.info("🚀 Starting Load Balancer (FastAPI)...") - - # 헬스 체크 루프 시작 (10초 주기로 기본 설정) - health_task = start_health_check(interval=10) - - yield - - # 종료 시 클라이언트 닫기 및 헬스 체크 중지 - await client.aclose() - health_task.cancel() - logger.info("👋 Load Balancer shutting down...") - -app = FastAPI(lifespan=lifespan, title="PNU Cloud Load Balancer (Async)") - -EXCLUDED_HEADERS = { - 'host', 'content-length', 'connection', 'upgrade', - 'proxy-authenticate', 'proxy-authorization', 'te', 'trailers' -} - -def filter_headers(headers): - return {k: v for k, v in headers.items() if k.lower() not in EXCLUDED_HEADERS} - -@app.get("/") -async def home(): - all_servers = await balancer.get_all_servers() - return { - "message": "PNU Cloud Computing Load Balancer (FastAPI/Async)", - "total_backends": len(all_servers), - "healthy_backends": len([s for s in all_servers if s['status'] == 'healthy']), - "mode": balancer.selection_mode - } - -@app.api_route("/load", methods=["GET", "POST", "PUT", "DELETE", "PATCH"]) -async def route_load(request: Request): - """백엔드로 /load 요청을 비동기 프록시""" - start_time = time.time() - - # 1. 백엔드 선택 - server = await balancer.choose_backend() - if not server: - logger.error("No healthy backend servers available") - raise HTTPException(status_code=503, detail="No healthy servers") - - target_url = f"{server['host']}/load" - - # 2. 요청 전달 (최대 2번 재시도) - for attempt in range(2): - try: - # 요청 바디 읽기 - body = await request.body() - - # 비동기 요청 전송 - resp = await client.request( - method=request.method, - url=target_url, - content=body, - headers=filter_headers(request.headers), - params=request.query_params, - timeout=10.0 - ) - - duration = round(time.time() - start_time, 3) - logger.info(f"Forwarded to {server['container_name']} - {resp.status_code} in {duration}s") - - # 응답 반환 - return Response( - content=resp.content, - status_code=resp.status_code, - headers=filter_headers(resp.headers) - ) - - except (httpx.RequestError, httpx.TimeoutException) as e: - logger.warning(f"Backend {server['container_name']} failed (attempt {attempt+1}): {e}") - # 다른 서버로 재시도 - server = await balancer.choose_backend() - if not server: break - target_url = f"{server['host']}/load" - - raise HTTPException(status_code=502, detail="All backend servers unavailable") - -@app.post("/refresh-servers") -async def refresh_servers(background_tasks: BackgroundTasks): - """오토스케일러로부터의 갱신 요청을 1순위로 즉시 처리""" - logger.info("⚡ Priority Refresh Request received from Autoscaler") - - # 1. 즉시 헬스체크 이벤트 트리거 - trigger_server_refresh() - - # 2. 백그라운드에서 즉시 컨테이너 재탐색 시도 (더 빠른 반영) - async def fast_refresh(): - servers = await discover_containers() - # 일단 존재 여부만 확인해서 밸런서에 반영 (헬스체크는 루프에서 보완) - if servers: - for s in servers: s['status'] = 'healthy' # 새로 뜬 애들은 일단 시도해봄 - await balancer.update_backend_servers(servers) - logger.info(f"✅ Fast refresh completed: {len(servers)} servers found") - - background_tasks.add_task(fast_refresh) - return {"status": "success", "message": "Priority refresh triggered"} - -@app.get("/status") -async def get_status(): - all_servers = await balancer.get_all_servers() - return { - "load_balancer": { - "status": "healthy", - "mode": balancer.selection_mode, - "timestamp": time.time() - }, - "backend_servers": { - "total": len(all_servers), - "healthy": len([s for s in all_servers if s['status'] == 'healthy']), - "servers": all_servers - } - } - -@app.get("/set_mode/{mode}") -async def set_mode(mode: str): - valid_modes = ['round_robin', 'latency'] - if mode in valid_modes: - balancer.selection_mode = mode - return {"message": f"Mode set to {mode}"} - raise HTTPException(status_code=400, detail="Invalid mode") - -@app.get("/health") -async def health(): - return "OK" - -@app.post("/cpu/toggle") -async def cpu_toggle_proxy(): - """백엔드의 /cpu/toggle 엔드포인트를 비동기로 프록시""" - server = await balancer.choose_backend() - if not server: - raise HTTPException(status_code=503, detail="No healthy servers") - - try: - target_url = f"{server['host']}/cpu/toggle" - # POST 요청 전달 - resp = await client.post(target_url, timeout=5.0) - return Response(content=resp.content, status_code=resp.status_code) - except Exception as e: - logger.error(f"Error forwarding /cpu/toggle: {e}") - raise HTTPException(status_code=500, detail=f"Backend error: {str(e)}") - -@app.get("/metrics") -async def metrics(): - """로드밸런서 및 백엔드 메트릭 통합 수집""" - all_servers = await balancer.get_all_servers() - healthy_servers = [s for s in all_servers if s['status'] == 'healthy'] - - # 1. 로드밸런서 자체 메트릭 - lb_metrics = [ - f"backend_servers_total {len(all_servers)}", - f"backend_servers_healthy {len(healthy_servers)}", - f"load_balancer_uptime {time.time()}" - ] - - # 2. 백엔드들로부터 메트릭 수집 (병렬 처리) - backend_outputs = [] - async def fetch_backend_metrics(server): - try: - r = await client.get(f"{server['host']}/metrics", timeout=2.0) - if r.status_code == 200: - return f"\n# Backend: {server['container_name']}\n{r.text}" - except: - return "" - return "" - - if healthy_servers: - results = await asyncio.gather(*(fetch_backend_metrics(s) for s in healthy_servers)) - backend_outputs = [r for r in results if r] - - full_metrics = "\n".join(lb_metrics) + "\n" + "\n".join(backend_outputs) - return PlainTextResponse(full_metrics) if __name__ == "__main__": import uvicorn - # 워커 수를 늘려 동시 처리 성능 극대화 (CPU 코어 수에 맞게 조절 가능) + uvicorn.run(app, host="0.0.0.0", port=8000, workers=1) From b445c3e32f92c3fed8d0f9051aa4250dd4dcdaf0 Mon Sep 17 00:00:00 2001 From: JAEIL1999 Date: Thu, 9 Apr 2026 17:19:33 +0900 Subject: [PATCH 5/9] refactor: separate frontend dashboard config and API services --- fe/src/App.css | 9 -- fe/src/App.tsx | 35 ------- fe/src/assets/react.svg | 1 - fe/src/components/Overview.tsx | 27 +++--- fe/src/components/Sidebar.tsx | 152 ++++++++++++++--------------- fe/src/config/endpoints.ts | 10 ++ fe/src/config/grafana.ts | 20 ++++ fe/src/main.tsx | 2 +- fe/src/pages/Dashboard.tsx | 167 ++++++++++++++++---------------- fe/src/services/loadBalancer.ts | 11 +++ fe/src/types/assets.d.ts | 2 + fe/tsconfig.app.json | 5 +- 12 files changed, 218 insertions(+), 223 deletions(-) delete mode 100644 fe/src/App.css delete mode 100644 fe/src/App.tsx delete mode 100644 fe/src/assets/react.svg create mode 100644 fe/src/config/endpoints.ts create mode 100644 fe/src/config/grafana.ts create mode 100644 fe/src/services/loadBalancer.ts create mode 100644 fe/src/types/assets.d.ts diff --git a/fe/src/App.css b/fe/src/App.css deleted file mode 100644 index 92b61cc..0000000 --- a/fe/src/App.css +++ /dev/null @@ -1,9 +0,0 @@ -@tailwind base; -@tailwind components; -@tailwind utilities; - -#root { - margin: 0; - padding: 0; -} - diff --git a/fe/src/App.tsx b/fe/src/App.tsx deleted file mode 100644 index 3d7ded3..0000000 --- a/fe/src/App.tsx +++ /dev/null @@ -1,35 +0,0 @@ -import { useState } from 'react' -import reactLogo from './assets/react.svg' -import viteLogo from '/vite.svg' -import './App.css' - -function App() { - const [count, setCount] = useState(0) - - return ( - <> - -

Vite + React

-
- -

- Edit src/App.tsx and save to test HMR -

-
-

- Click on the Vite and React logos to learn more -

- - ) -} - -export default App diff --git a/fe/src/assets/react.svg b/fe/src/assets/react.svg deleted file mode 100644 index 6c87de9..0000000 --- a/fe/src/assets/react.svg +++ /dev/null @@ -1 +0,0 @@ - \ No newline at end of file diff --git a/fe/src/components/Overview.tsx b/fe/src/components/Overview.tsx index 2e99749..606506b 100644 --- a/fe/src/components/Overview.tsx +++ b/fe/src/components/Overview.tsx @@ -1,21 +1,16 @@ +import { GRAFANA_PANELS, buildGrafanaPanelUrl } from '../config/grafana'; + const Overview = () => { return ( -
-