Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ validation/results/sweep_boost_*/
# 벤치마크 결과 (재실행 가능 — repo에 보관 X)
bench_ragas.json
bench_*.json
bench_*.md

# 시뮬레이션 임시 결과 (루트 디렉토리)
sim_*.json
Expand Down
382 changes: 382 additions & 0 deletions backend/scripts/eval/run_all_agents_v7.py

Large diffs are not rendered by default.

156 changes: 156 additions & 0 deletions backend/scripts/eval/seed_eval_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
"""v7 평가용 시뮬 batch — Redis 캐시(v2:population, v2:market 등)를 채우는 스크립트.

배경:
population_node / market_analyst_node 가 v2 prefix 로 raw 데이터까지 캐시하도록
변경됐지만, 기존 캐시는 v1 (raw 없음) 이라 평가 불가. 새 시뮬을 batch 호출해서
v2 캐시를 채워야 함.

사용:
cd backend
python -m scripts.eval.seed_eval_cache

전제:
- 백엔드가 떠있고 (http://localhost:8000)
- 노드 코드 변경 후 재시작됨
- DB / Redis 연결 정상
"""

from __future__ import annotations

import asyncio
import io
import sys
import time

if sys.stdout.encoding and sys.stdout.encoding.lower() != "utf-8":
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")

import httpx

BACKEND = "http://localhost:8000"

# 다양성 — 브랜드·동·업종 분포로 평가 fixture 풍부화.
# 1 케이스당 약 60~90초 (TCN+ML+LLM+SHAP 풀 파이프라인).
CASES: list[dict] = [
{"target": "서교동", "brand": "메가엠지씨커피", "biz": "cafe"},
{"target": "합정동", "brand": "이디야커피", "biz": "cafe"},
{"target": "연남동", "brand": "빽다방", "biz": "cafe"},
{"target": "망원1동", "brand": "스타벅스", "biz": "cafe"},
{"target": "성산2동", "brand": "컴포즈커피", "biz": "cafe"},
{"target": "공덕동", "brand": "빽다방", "biz": "cafe"},
{"target": "아현동", "brand": "메가엠지씨커피", "biz": "cafe"},
{"target": "도화동", "brand": "이디야커피", "biz": "cafe"},
]


def _build_payload(case: dict) -> dict:
"""SimulationInput 최소 페이로드 — 평가에 필요한 필드만."""
return {
"business_type": case["biz"],
"brand_name": case["brand"],
"target_district": case["target"],
"target_districts": [case["target"]],
"existing_stores": [],
"monthly_rent": 2_000_000,
"scenarios": [],
"store_area": 15.0,
"target_price_range": "5to10k",
"operating_hours": ["점심", "저녁"],
"initial_capital": 50_000_000,
"population_weight": True,
"commercial_radius": 500,
}


async def _wait_done(client: httpx.AsyncClient, status_url: str, timeout_s: float = 240) -> dict:
"""job status 폴링 — done 또는 timeout 까지."""
start = time.time()
while time.time() - start < timeout_s:
try:
r = await client.get(status_url, timeout=10)
data = r.json()
status = (data.get("status") or "").lower()
progress = data.get("progress", 0) or 0
stage = data.get("stage", "")
if status in {"done", "success", "completed"}:
return {"ok": True, "elapsed": time.time() - start, "stage": stage}
if status == "error":
return {"ok": False, "elapsed": time.time() - start, "error": data.get("error", "")}
print(f" progress={progress:.0%} stage={stage}", end="\r", flush=True)
except Exception as e:
print(f" polling error: {e}", end="\r")
await asyncio.sleep(3)
return {"ok": False, "elapsed": timeout_s, "error": "timeout"}


async def run_one(client: httpx.AsyncClient, case: dict, idx: int, total: int) -> dict:
payload = _build_payload(case)
case_id = f"{case['target']}/{case['brand']}/{case['biz']}"
print(f"\n[{idx + 1}/{total}] {case_id}")

# /predict/async + /analyze/llm/async 동시 호출 (서버는 두 큐 따로 관리)
try:
pred_resp = await client.post(f"{BACKEND}/predict/async", json=payload, timeout=30)
pred_job = pred_resp.json().get("job_id")
ana_resp = await client.post(f"{BACKEND}/analyze/llm/async", json=payload, timeout=30)
ana_job = ana_resp.json().get("job_id")
except Exception as e:
return {"case": case_id, "ok": False, "error": f"start: {e}"}

if not pred_job or not ana_job:
return {"case": case_id, "ok": False, "error": "no job_id"}

print(f" predict={pred_job[:8]} analyze={ana_job[:8]} — 대기…")

pred_res, ana_res = await asyncio.gather(
_wait_done(client, f"{BACKEND}/predict/{pred_job}/status"),
_wait_done(client, f"{BACKEND}/analyze/llm/{ana_job}/status"),
)
print(f" ✓ predict({pred_res['elapsed']:.0f}s) analyze({ana_res['elapsed']:.0f}s)")
return {
"case": case_id,
"ok": pred_res["ok"] and ana_res["ok"],
"predict": pred_res,
"analyze": ana_res,
}


async def main() -> None:
print("=" * 78)
print(f"v7 평가용 시뮬 batch — {len(CASES)} 케이스")
print("=" * 78)

# 백엔드 헬스체크
async with httpx.AsyncClient() as client:
try:
h = await client.get(f"{BACKEND}/health", timeout=5)
if h.status_code != 200:
print(f"❌ 백엔드 health 응답 비정상: {h.status_code}")
return
except Exception as e:
print(f"❌ 백엔드 연결 실패: {e}\n uvicorn 띄우고 다시 시도하세요.")
return

print("✓ 백엔드 정상")
results = []
t0 = time.time()
for idx, case in enumerate(CASES):
res = await run_one(client, case, idx, len(CASES))
results.append(res)

elapsed = time.time() - t0
n_ok = sum(1 for r in results if r.get("ok"))
print()
print("=" * 78)
print(f"완료 — {n_ok}/{len(CASES)} 성공 ({elapsed:.0f}s 소요)")
print("=" * 78)
for r in results:
mark = "✓" if r.get("ok") else "✗"
err = "" if r.get("ok") else f" — {r.get('error', '')}"
print(f" {mark} {r['case']}{err}")
print()
print("다음 단계: python -m scripts.eval.run_all_agents_v7 로 v7 재측정")


if __name__ == "__main__":
asyncio.run(main())
28 changes: 27 additions & 1 deletion backend/src/agents/nodes/market_analyst.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ async def market_analyst_node(state: AgentState) -> dict:
print(f"--- [MARKET ANALYST] {target_district} 실데이터 분석 시작 ---")

# Redis 캐시 조회 (예진 synthesis 패턴 — 조회 실패 시 연결 누수 방지)
cache_key = f"market:{target_district}:{business_type}"
# v2: raw_inputs(qoq_growth_pct/saturation_level) 추가 — v7 grade 분류 평가용.
cache_key = f"v2:market:{target_district}:{business_type}"
_redis = None
try:
_redis = aioredis.from_url(settings.redis_url, decode_responses=True)
Expand Down Expand Up @@ -177,13 +178,38 @@ async def market_analyst_node(state: AgentState) -> dict:
# Redis 캐시 저장 (finally로 연결 누수 방지)
if _redis is not None:
try:
# v7 평가용 raw_inputs — 룰엔진이 expected_grade 산출에 사용.
# qoq_growth_pct: pop_data.qoq_growth (% → 비율: 12 → 0.12)
# saturation_level: comp_data 명시 필드 또는 competitor_count 기반 추론.
_qoq_raw = pop_data.get("qoq_growth")
_qoq_pct = (float(_qoq_raw) / 100.0) if _qoq_raw is not None else None
_comp_count = comp_data.get("competitor_count", 0) or 0
_sat_level = comp_data.get("saturation_level")
if not _sat_level:
# competitor_count → saturation_level 추론 (반경 500m 기준)
if _comp_count >= 16:
_sat_level = "saturated"
elif _comp_count >= 11:
_sat_level = "high"
elif _comp_count >= 7:
_sat_level = "medium"
elif _comp_count >= 3:
_sat_level = "low"
else:
_sat_level = "sparse"
raw_inputs = {
"qoq_growth_pct": _qoq_pct,
"saturation_level": _sat_level,
"competitor_count": _comp_count,
}
await _redis.set(
cache_key,
json.dumps(
{
"market_report": market_summary,
"market_data": real_market_data,
"metrics": final_metrics,
"raw_inputs": raw_inputs, # v7 평가 — 룰엔진 expected_grade 산출용
},
ensure_ascii=False,
default=str,
Expand Down
33 changes: 31 additions & 2 deletions backend/src/agents/nodes/population.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ async def population_analyst_node(state: AgentState) -> dict:
logger.info(f"--- [POPULATION ANALYST] {target_district} 입동인구 분석 시작 ---")

# Redis 캐시 조회
cache_key = f"population:{target_district}:{business_type}"
# v2: raw_metrics(age/gender/time distribution) 캐시 추가 — v7 정확도 평가용.
cache_key = f"v2:population:{target_district}:{business_type}"
_redis = None
try:
_redis = aioredis.from_url(settings.redis_url, decode_responses=True)
Expand Down Expand Up @@ -183,10 +184,31 @@ async def _fetch_sgis_data() -> dict | None:
"peak_time": result.peak_time,
}

# v7 정확도 평가용 — LLM 출력과 함께 raw distribution 도 캐시 저장.
# _expected_top_age / _expected_top_gender / _expected_peak 가
# 정답 라벨 산출에 사용. 캐시 prefix v2 로 schema 변경 표시.
raw_metrics = {
"age_distribution": {
"20": int(demographics.get("20대", 0)),
"30": int(demographics.get("30대", 0)),
"40": int(demographics.get("40대", 0)),
}
if "error" not in demo_data
else {},
"gender_distribution": {
"male": int(demographics.get("남성", 0)),
"female": int(demographics.get("여성", 0)),
}
if "error" not in demo_data
else {},
"time_peak": real_peak_time or "",
}

except Exception as e:
logger.error(f"[POPULATION ANALYST ERROR] !!! {str(e)}")
population_report = f"{target_district} 인구 분석 중 오류가 발생했습니다."
new_metrics = {}
raw_metrics = {}

analysis_results = state.get("analysis_results", {})
analysis_results["population_report"] = population_report
Expand All @@ -196,7 +218,14 @@ async def _fetch_sgis_data() -> dict | None:
try:
await _redis.set(
cache_key,
json.dumps({"population_report": population_report, "metrics": new_metrics}, ensure_ascii=False),
json.dumps(
{
"population_report": population_report,
"metrics": new_metrics,
"raw_metrics": raw_metrics, # v7 평가용 raw distribution
},
ensure_ascii=False,
),
ex=_CACHE_TTL,
)
logger.info(f"[population_analyst] 캐시 저장: {cache_key} (TTL: {_CACHE_TTL}s)")
Expand Down
4 changes: 1 addition & 3 deletions backend/src/evaluation/competitor_intel_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ async def run_one(self, case: dict) -> dict:
# 없으면 실제 노드 호출 — 비용 큰 작업이라 별도 진입점 필요.
if "simulated_output" in case:
return case["simulated_output"]
raise NotImplementedError(
"case 에 'simulated_output' 미포함 — 실제 시뮬 호출 진입점 별도 구현 필요"
)
raise NotImplementedError("case 에 'simulated_output' 미포함 — 실제 시뮬 호출 진입점 별도 구현 필요")

def score(self, case: dict, output: Any) -> EvalResult:
# output 은 competitor_intel 결과 dict — market_entry_signal + cannibalization + competition_500m 보유.
Expand Down
Loading
Loading