diff --git a/backend/app/agents/channel_exec.py b/backend/app/agents/channel_exec.py
index 4536828..0f41e85 100644
--- a/backend/app/agents/channel_exec.py
+++ b/backend/app/agents/channel_exec.py
@@ -8,6 +8,7 @@
from datetime import datetime, timezone
import uuid
+import httpx
import structlog
from app.config import settings
@@ -40,10 +41,96 @@ async def deploy(self, channel_config: dict, content: dict, assets: dict) -> lis
return [f"google_ad_{uuid.uuid4().hex[:8]}"]
+class ZhihuAdapter:
+ BASE = "https://zhuanlan.zhihu.com"
+
+ def _md_to_html(self, md: str) -> str:
+ import re
+ # Strip residual markdown symbols
+ md = re.sub(r'^#{1,6}\s+', '', md, flags=re.MULTILINE)
+ md = re.sub(r'\*{1,3}([^*]+)\*{1,3}', r'\1', md)
+ md = re.sub(r'`([^`]*)`', r'\1', md)
+ md = re.sub(r'^[-*]\s+', '', md, flags=re.MULTILINE)
+ # Split into paragraphs (separated by blank lines)
+ paragraphs = re.split(r'\n{2,}', md.strip())
+ html_parts = []
+ for para in paragraphs:
+ lines = [l.strip() for l in para.splitlines() if l.strip()]
+ if lines:
+ html_parts.append('
'.join(lines))
+ # Paragraphs separated by double
for spacing
+ return '
'.join(html_parts)
+
+ def _xsrf(self) -> str:
+ for part in settings.zhihu_cookie.split(";"):
+ part = part.strip()
+ if part.startswith("_xsrf="):
+ return part[len("_xsrf="):]
+ return ""
+
+ def _headers(self) -> dict:
+ return {
+ "Cookie": settings.zhihu_cookie,
+ "Content-Type": "application/json",
+ "Accept": "application/json, text/plain, */*",
+ "Origin": "https://zhuanlan.zhihu.com",
+ "Referer": "https://zhuanlan.zhihu.com/write",
+ "x-api-version": "3.0.91",
+ "x-requested-with": "fetch",
+ "x-xsrftoken": self._xsrf(),
+ "x-zst-81": settings.zhihu_zst_81,
+ "User-Agent": (
+ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
+ "AppleWebKit/537.36 (KHTML, like Gecko) "
+ "Chrome/146.0.0.0 Safari/537.36"
+ ),
+ }
+
+ async def deploy(self, channel_config: dict, content: dict, assets: dict) -> list[str]:
+ """Save article as Zhihu draft. User reviews and publishes manually."""
+ if not settings.zhihu_cookie:
+ logger.warning("zhihu_no_cookie_configured")
+ return ["zhihu_failed_no_cookie"]
+
+ variant = (content.get("variants") or [{}])[0]
+ title = variant.get("title", "无标题")
+ body_md = variant.get("body", "")
+ body_html = self._md_to_html(body_md)
+
+ logger.info("zhihu_save_draft_start", title=title)
+
+ async with httpx.AsyncClient(timeout=30) as client:
+ # Step 1: POST /api/articles/drafts — create empty draft, get article ID
+ create_resp = await client.post(
+ f"{self.BASE}/api/articles/drafts",
+ headers=self._headers(),
+ json={},
+ )
+ logger.info("zhihu_create_draft", status=create_resp.status_code, body=create_resp.text[:300])
+ create_resp.raise_for_status()
+ article_id = create_resp.json().get("id")
+ if not article_id:
+ raise ValueError(f"No article id in response: {create_resp.text[:200]}")
+
+ # Step 2: PATCH /api/articles/{id}/draft — save title and content
+ patch_resp = await client.patch(
+ f"{self.BASE}/api/articles/{article_id}/draft",
+ headers=self._headers(),
+ json={"title": title, "content": body_html, "table_of_contents": False},
+ )
+ logger.info("zhihu_save_draft", status=patch_resp.status_code, body=patch_resp.text[:300])
+ patch_resp.raise_for_status()
+
+ draft_url = f"https://zhuanlan.zhihu.com/p/{article_id}/edit"
+ logger.info("zhihu_draft_saved", article_id=article_id, draft_url=draft_url)
+ return [draft_url]
+
+
_ADAPTERS = {
"meta": MetaAdapter(),
"tiktok": TikTokAdapter(),
"google": GoogleAdapter(),
+ "zhihu": ZhihuAdapter(),
}
diff --git a/backend/app/agents/content_gen.py b/backend/app/agents/content_gen.py
index 243525e..44ffa05 100644
--- a/backend/app/agents/content_gen.py
+++ b/backend/app/agents/content_gen.py
@@ -6,8 +6,10 @@
Events: ContentGenerated
"""
import json
+import re
import uuid
+import httpx
import structlog
from tenacity import retry, stop_after_attempt, wait_exponential
@@ -19,25 +21,141 @@
logger = structlog.get_logger(__name__)
-@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
-async def _call_llm(prompt: str) -> list[dict]:
+async def _get_github_context(url: str) -> str:
+ """
+ Fetch comprehensive project context from GitHub:
+ metadata, directory structure, recent commits, dependency files, README.
+ Falls back gracefully — each section is independently optional.
+ """
+ match = re.match(r"https://github\.com/([^/]+)/([^/\s]+?)(?:\.git)?/?$", url)
+ if not match:
+ logger.warning("github_url_not_repo", url=url)
+ return ""
+
+ owner, repo = match.groups()
+ api_base = f"https://api.github.com/repos/{owner}/{repo}"
+ raw_base = f"https://raw.githubusercontent.com/{owner}/{repo}"
+ api_headers = {"Accept": "application/vnd.github.v3+json"}
+ parts: list[str] = []
+
+ async with httpx.AsyncClient(timeout=30) as client:
+
+ # ── 1. Repository metadata ────────────────────────────────────────
+ try:
+ r = await client.get(api_base, headers=api_headers)
+ if r.status_code == 200:
+ m = r.json()
+ topics = ", ".join(m.get("topics") or []) or "N/A"
+ parts.append(
+ f"[Meta] Stars:{m.get('stargazers_count', 0)} | "
+ f"Forks:{m.get('forks_count', 0)} | "
+ f"Language:{m.get('language', 'N/A')} | "
+ f"Topics:{topics} | "
+ f"Description:{m.get('description', '')}"
+ )
+ logger.info("github_meta_fetched", owner=owner, repo=repo)
+ except Exception as exc:
+ logger.warning("github_meta_failed", error=str(exc))
+
+ # ── 2. Root directory structure ───────────────────────────────────
+ try:
+ r = await client.get(f"{api_base}/git/trees/HEAD", headers=api_headers)
+ if r.status_code == 200:
+ entries = [item["path"] for item in r.json().get("tree", [])]
+ parts.append(f"[Structure] {' | '.join(entries[:50])}")
+ logger.info("github_tree_fetched", entries=len(entries))
+ except Exception as exc:
+ logger.warning("github_tree_failed", error=str(exc))
+
+ # ── 3. Recent commits (last 5) ────────────────────────────────────
+ try:
+ r = await client.get(f"{api_base}/commits?per_page=5", headers=api_headers)
+ if r.status_code == 200:
+ msgs = [c["commit"]["message"].split("\n")[0] for c in r.json()[:5]]
+ parts.append(f"[Recent Commits] {' | '.join(msgs)}")
+ logger.info("github_commits_fetched", count=len(msgs))
+ except Exception as exc:
+ logger.warning("github_commits_failed", error=str(exc))
+
+ # ── 4. Dependency file (first match across branches) ──────────────
+ dep_candidates = [
+ "requirements.txt", "pyproject.toml",
+ "package.json", "go.mod", "Cargo.toml",
+ ]
+ dep_found = False
+ for fname in dep_candidates:
+ if dep_found:
+ break
+ for branch in ("main", "master"):
+ try:
+ r = await client.get(f"{raw_base}/{branch}/{fname}")
+ if r.status_code == 200:
+ parts.append(f"[{fname}]\n{r.text[:800]}")
+ logger.info("github_dep_fetched", file=fname, branch=branch)
+ dep_found = True
+ break
+ except Exception:
+ continue
+
+ # ── 5. README ─────────────────────────────────────────────────────
+ readme = ""
+ for branch in ("main", "master"):
+ try:
+ r = await client.get(f"{raw_base}/{branch}/README.md")
+ if r.status_code == 200:
+ readme = r.text
+ logger.info("github_readme_fetched", branch=branch, length=len(readme))
+ break
+ except Exception as exc:
+ logger.warning("github_readme_fetch_failed", branch=branch, error=str(exc))
+
+ if readme:
+ parts.append(f"[README]\n{readme[:3000]}")
+
+ return "\n\n".join(parts)
+
+
+# NOTE: 2 attempts max — article generation is slow (~90s), 3 retries would hit ARQ 300s job limit.
+@retry(stop=stop_after_attempt(2), wait=wait_exponential(multiplier=1, min=2, max=5))
+async def _call_llm(prompt: str, is_article: bool = False) -> list[dict]:
"""Call LLM to generate copy variants. Retries 3x on transient errors."""
+ system_prompt = (
+ "You are a senior performance marketing copywriter and technical evangelist. "
+ "Return a JSON array of copy variants."
+ )
+
+ if is_article:
+ system_prompt += (
+ " Generate exactly ONE article variant with fields: variant_label (set to 'A'), title, body, channel. "
+ "The body must be plain text only — absolutely NO Markdown syntax: no ##, no **, no -, no `, no >. "
+ "Structure the article like an academic paper: use numbered section headings like '1. 节名', '2. 节名', '2.1 小节名' on their own lines. "
+ "Each section should have multiple paragraphs separated by a blank line. Write in depth — aim for 1500+ Chinese characters. "
+ "IMPORTANT: The title MUST be 15 Chinese characters or fewer — count carefully, this is a hard limit."
+ )
+ else:
+ system_prompt += (
+ " Each variant must have: variant_label (A/B/C), hook, body, cta, channel. "
+ )
+
+ system_prompt += "Output ONLY valid JSON, no markdown outside the JSON structure."
+
raw = await llm_client.chat_completion(
- system=(
- "You are a senior performance marketing copywriter. "
- "Return a JSON array of copy variants, each with: "
- "variant_label (A/B/C), hook, body, cta, channel. "
- "Output ONLY valid JSON, no markdown."
- ),
+ system=system_prompt,
messages=[{"role": "user", "content": prompt}],
+ max_tokens=8192 if is_article else 2048,
)
- # Clean up potential markdown code blocks if the LLM includes them
- if "```json" in raw:
- raw = raw.split("```json")[1].split("```")[0].strip()
- elif "```" in raw:
- raw = raw.split("```")[1].split("```")[0].strip()
+ # Extract JSON array — find outermost [ ... ] to avoid being fooled
+ # by code blocks (```python, ```yaml etc.) inside the article body.
+ start = raw.find('[')
+ end = raw.rfind(']')
+ if start != -1 and end != -1:
+ raw = raw[start:end + 1]
- return json.loads(raw)
+ try:
+ return json.loads(raw)
+ except json.JSONDecodeError as e:
+ logger.error("content_gen_json_parse_error", error=str(e), raw_snippet=raw[:300])
+ raise
async def content_gen_node(state: CampaignState) -> dict:
@@ -49,32 +167,92 @@ async def content_gen_node(state: CampaignState) -> dict:
strategy = state.get("strategy") or {}
channels = strategy.get("channel_plan", [{"channel": "tiktok"}, {"channel": "meta"}])
channel_names = [c["channel"] for c in channels]
+ is_technical_promo = any(ch in ["zhihu", "juejin", "csdn"] for ch in channel_names)
+
+ # Fetch comprehensive project context if a GitHub URL is present in goal
+ repo_context = ""
+ repo_url_match = re.search(r"https://github\.com/[^\s]+", state["goal"])
+ if repo_url_match:
+ repo_url = repo_url_match.group(0)
+ repo_context = await _get_github_context(repo_url)
prompt = (
f"Product goal: {state['goal']}\n"
f"Target channels: {', '.join(channel_names)}\n"
- f"KPI target: {state['kpi']['metric']} = {state['kpi']['target']}\n"
- f"Generate 3 A/B/C copy variants optimized for these channels."
+ f"KPI target: {state.get('kpi', {}).get('metric', 'awareness')} = {state.get('kpi', {}).get('target', 'high')}\n"
)
+ if repo_context:
+ prompt += f"\nProject Context:\n{repo_context}\n"
+
+ if is_technical_promo:
+ prompt += (
+ "\nGenerate ONE comprehensive technical article (variant_label: 'A'). "
+ "Return a JSON array with exactly 1 variant.\n"
+ "CRITICAL RULE — title字数: 标题必须≤15个汉字,例如'DataPulse架构深度解析'(12字)是合法的,"
+ "'DataPulse v3.1深度解析:从多模型到知识图谱'(超过15字)是不合法的。"
+ "生成前请数清楚字数,超过15字必须重新起名。"
+ )
+ else:
+ prompt += "\nGenerate 3 A/B/C copy variants optimized for these channels."
+
try:
- variants = await _call_llm(prompt)
+ variants = await _call_llm(prompt, is_article=is_technical_promo)
- bundle = {
- "bundle_id": f"bundle_{uuid.uuid4().hex[:8]}",
+ bundle_id = uuid.uuid4()
+ bundle_data = {
+ "bundle_id": str(bundle_id),
"variants": variants,
"llm_model": settings.anthropic_model,
}
+ # ── Persistence Layer ─────────────────────────────────────────────
+ from app.database import async_session_factory
+ from app.models.content import ContentBundle, Copy
+
+ async with async_session_factory() as db:
+ # Ensure campaign exists (it might be 'demo' in some contexts,
+ # we should skip persistence for demo or handle it)
+ campaign_id = state.get("campaign_id")
+ DEMO_UUID = uuid.UUID("00000000-0000-0000-0000-000000000001")
+ try:
+ camp_uuid = DEMO_UUID if campaign_id == "demo" else uuid.UUID(campaign_id)
+
+ new_bundle = ContentBundle(
+ id=bundle_id,
+ campaign_id=camp_uuid,
+ llm_model=settings.anthropic_model,
+ generation_params={"tone": state.get("tone", "professional")},
+ )
+ db.add(new_bundle)
+
+ for var in variants:
+ new_copy = Copy(
+ bundle_id=bundle_id,
+ campaign_id=camp_uuid,
+ variant_label=var.get("variant_label", "A"),
+ hook=var.get("title") or var.get("hook"),
+ body=var.get("body"),
+ cta=var.get("cta"),
+ channel=var.get("channel"),
+ status="GENERATED"
+ )
+ db.add(new_copy)
+
+ await db.commit()
+ logger.info("content_gen_persisted", bundle_id=str(bundle_id))
+ except (ValueError, TypeError):
+ logger.warning("content_gen_persistence_skipped", campaign_id=campaign_id)
+
await event_bus.publish(
"ContentGenerated",
- {"bundle": bundle},
+ {"bundle": bundle_data},
state["campaign_id"],
)
logger.info("content_gen_done", variants=len(variants))
return {
- "content": bundle,
+ "content": bundle_data,
"status": "PRODUCTION",
"completed_tasks": ["content_gen"],
}
diff --git a/backend/app/agents/graph.py b/backend/app/agents/graph.py
index d5f553e..620ce55 100644
--- a/backend/app/agents/graph.py
+++ b/backend/app/agents/graph.py
@@ -40,39 +40,39 @@ def build_campaign_graph(checkpointer=None):
graph = StateGraph(CampaignState)
# ── Register nodes ────────────────────────────────────────────
- graph.add_node("planner", planner_node)
- graph.add_node("strategy", strategy_node)
- graph.add_node("content_gen", content_gen_node)
- graph.add_node("multimodal", multimodal_node)
- graph.add_node("channel_exec", channel_exec_node)
- graph.add_node("analysis", analysis_node)
- graph.add_node("optimizer", optimizer_node)
+ graph.add_node("planner_node", planner_node)
+ graph.add_node("strategy_node", strategy_node)
+ graph.add_node("content_gen_node", content_gen_node)
+ graph.add_node("multimodal_node", multimodal_node)
+ graph.add_node("channel_exec_node", channel_exec_node)
+ graph.add_node("analysis_node", analysis_node)
+ graph.add_node("optimizer_node", optimizer_node)
# ── Entry point ───────────────────────────────────────────────
- graph.set_entry_point("planner")
+ graph.set_entry_point("planner_node")
# ── Sequential edges ──────────────────────────────────────────
- graph.add_edge("planner", "strategy")
+ graph.add_edge("planner_node", "strategy_node")
# ── Parallel fan-out: strategy → content_gen AND multimodal ──
# LangGraph executes both nodes concurrently when both are listed as targets
- graph.add_edge("strategy", "content_gen")
- graph.add_edge("strategy", "multimodal")
+ graph.add_edge("strategy_node", "content_gen_node")
+ graph.add_edge("strategy_node", "multimodal_node")
# ── Fan-in: both content_gen and multimodal must finish before channel_exec
- graph.add_edge("content_gen", "channel_exec")
- graph.add_edge("multimodal", "channel_exec")
+ graph.add_edge("content_gen_node", "channel_exec_node")
+ graph.add_edge("multimodal_node", "channel_exec_node")
# ── Continue pipeline ─────────────────────────────────────────
- graph.add_edge("channel_exec", "analysis")
- graph.add_edge("analysis", "optimizer")
+ graph.add_edge("channel_exec_node", "analysis_node")
+ graph.add_edge("analysis_node", "optimizer_node")
# ── Conditional loop edge ─────────────────────────────────────
graph.add_conditional_edges(
- "optimizer",
+ "optimizer_node",
should_loop,
{
- "loop": "strategy", # loop back for optimization
+ "loop": "strategy_node", # loop back for optimization
"done": END,
},
)
diff --git a/backend/app/agents/planner.py b/backend/app/agents/planner.py
index e759faa..0631fd7 100644
--- a/backend/app/agents/planner.py
+++ b/backend/app/agents/planner.py
@@ -5,7 +5,9 @@
Output: state.plan, state.scenario
Events: PlanGenerated
"""
+import json
import structlog
+import uuid
from app.config import settings
from app.core.event_bus import event_bus
@@ -14,72 +16,69 @@
logger = structlog.get_logger(__name__)
-# DAG templates — mirrors Planner.js but is the authoritative Python version
-_TEMPLATES: dict[str, list[dict]] = {
- "NEW_PRODUCT": [
- {"id": "t1", "agent_type": "STRATEGY", "dependencies": [], "parallel_group": None},
- {"id": "t2", "agent_type": "CONTENT_GEN", "dependencies": ["t1"], "parallel_group": "gen"},
- {"id": "t3", "agent_type": "MULTIMODAL", "dependencies": ["t1"], "parallel_group": "gen"},
- {"id": "t4", "agent_type": "CHANNEL_EXEC","dependencies": ["t2", "t3"], "parallel_group": None},
- {"id": "t5", "agent_type": "ANALYSIS", "dependencies": ["t4"], "parallel_group": None},
- {"id": "t6", "agent_type": "OPTIMIZER", "dependencies": ["t5"], "parallel_group": None},
- ],
- "RETENTION": [
- {"id": "t1", "agent_type": "CONTENT_GEN", "dependencies": [], "parallel_group": "gen"},
- {"id": "t2", "agent_type": "STRATEGY", "dependencies": [], "parallel_group": "gen"},
- {"id": "t3", "agent_type": "CHANNEL_EXEC","dependencies": ["t1", "t2"], "parallel_group": None},
- {"id": "t4", "agent_type": "ANALYSIS", "dependencies": ["t3"], "parallel_group": None},
- {"id": "t5", "agent_type": "OPTIMIZER", "dependencies": ["t4"], "parallel_group": None},
- ],
- "BRAND_AWARENESS": [
- {"id": "t1", "agent_type": "MULTIMODAL", "dependencies": [], "parallel_group": None},
- {"id": "t2", "agent_type": "CONTENT_GEN", "dependencies": ["t1"], "parallel_group": None},
- {"id": "t3", "agent_type": "STRATEGY", "dependencies": [], "parallel_group": None},
- {"id": "t4", "agent_type": "CHANNEL_EXEC","dependencies": ["t1", "t2", "t3"], "parallel_group": None},
- {"id": "t5", "agent_type": "ANALYSIS", "dependencies": ["t4"], "parallel_group": None},
- {"id": "t6", "agent_type": "OPTIMIZER", "dependencies": ["t5"], "parallel_group": None},
- ],
- "GROWTH_GENERAL": [
- {"id": "t1", "agent_type": "STRATEGY", "dependencies": [], "parallel_group": None},
- {"id": "t2", "agent_type": "CONTENT_GEN", "dependencies": ["t1"], "parallel_group": "gen"},
- {"id": "t3", "agent_type": "MULTIMODAL", "dependencies": ["t1"], "parallel_group": "gen"},
- {"id": "t4", "agent_type": "CHANNEL_EXEC","dependencies": ["t2", "t3"], "parallel_group": None},
- {"id": "t5", "agent_type": "ANALYSIS", "dependencies": ["t4"], "parallel_group": None},
- {"id": "t6", "agent_type": "OPTIMIZER", "dependencies": ["t5"], "parallel_group": None},
- ],
-}
-
-
-def _detect_scenario(goal: str, constraints: dict) -> str:
- g = goal.lower()
- if any(k in g for k in ["新品", "冷启动", "launch", "new product"]):
- return "NEW_PRODUCT"
- if any(k in g for k in ["复购", "retention", "留存"]):
- return "RETENTION"
- if any(k in g for k in ["品牌", "brand awareness", "曝光"]):
- return "BRAND_AWARENESS"
- return "GROWTH_GENERAL"
-
+def _print_ascii_dag(tasks: list[dict]):
+ """Print a human-readable ASCII representation of the generated DAG."""
+ print("\n" + "="*50)
+ print(" 🗺️ GENERATED AGENT PLAN (DAG)")
+ print("="*50)
+
+ # Simple dependency visualization
+ for task in tasks:
+ deps = ", ".join(task['dependencies']) if task['dependencies'] else "START"
+ parallel = f" [Parallel: {task['parallel_group']}]" if task.get('parallel_group') else ""
+ print(f" {task['id']:<4} | {task['agent_type']:<15} | Deps: {deps:<15}{parallel}")
+
+ print("="*50 + "\n")
async def planner_node(state: CampaignState) -> dict:
"""
- LangGraph node function.
- In production: calls Claude to generate a dynamic DAG.
- Current: rule-based scenario detection + static templates.
+ Planner Agent: Uses LLM to dynamically generate a task DAG based on the goal.
"""
logger.info("planner_start", campaign_id=state["campaign_id"], goal=state["goal"][:60])
+ system_prompt = (
+ "You are a senior AI Solutions Architect. Your task is to decompose a marketing goal into a "
+ "Directed Acyclic Graph (DAG) of specialized agent tasks.\n\n"
+ "Available Agent Types:\n"
+ "- STRATEGY: Budget allocation and channel selection.\n"
+ "- CONTENT_GEN: Copywriting and text generation.\n"
+ "- MULTIMODAL: Visual asset (image/video) generation.\n"
+ "- CHANNEL_EXEC: Deploying content to platforms (Zhihu, TikTok, etc.).\n"
+ "- ANALYSIS: Performance tracking and ROI calculation.\n"
+ "- OPTIMIZER: Strategy refinement and closed-loop decision making.\n\n"
+ "Return a JSON object with 'scenario' (string) and 'tasks' (array of objects).\n"
+ "Each task object must have: id (t1, t2...), agent_type, dependencies (list of IDs), parallel_group (optional string).\n"
+ "Ensure the graph is logical: e.g., CHANNEL_EXEC depends on CONTENT_GEN."
+ )
+
+ user_prompt = f"Product Goal: {state['goal']}\nConstraints: {json.dumps(state.get('constraints', {}))}"
+
try:
- scenario = _detect_scenario(state["goal"], state.get("constraints", {}))
- tasks = _TEMPLATES[scenario]
+ # 1. Dynamic Generation via LLM
+ raw_response = await llm_client.chat_completion(
+ system=system_prompt,
+ messages=[{"role": "user", "content": user_prompt}],
+ max_tokens=2048
+ )
+
+ # Extract JSON (handling potential markdown)
+ start = raw_response.find('{')
+ end = raw_response.rfind('}')
+ plan_data = json.loads(raw_response[start:end+1])
+
+ tasks = plan_data.get("tasks", [])
+ scenario = plan_data.get("scenario", "DYNAMIC_GROWTH")
+
+ # 2. Print ASCII Visualization for the user (in server logs)
+ _print_ascii_dag(tasks)
- import uuid
plan = {
"id": f"plan_{uuid.uuid4().hex[:8]}",
"scenario": scenario,
"tasks": tasks,
}
+ # 3. Notify Frontend
await event_bus.publish(
"PlanGenerated",
{"plan": plan, "scenario": scenario},
@@ -96,7 +95,8 @@ async def planner_node(state: CampaignState) -> dict:
except Exception as exc:
logger.error("planner_error", error=str(exc))
+ # Fallback to a basic template if LLM fails
return {
- "errors": [{"node": "planner", "error": str(exc)}],
+ "errors": [{"node": "planner", "error": f"LLM Planning failed: {str(exc)}"}],
"status": "PLANNING_FAILED",
}
diff --git a/backend/app/api/articles.py b/backend/app/api/articles.py
new file mode 100644
index 0000000..956ceb4
--- /dev/null
+++ b/backend/app/api/articles.py
@@ -0,0 +1,55 @@
+from typing import List, Optional
+from uuid import UUID
+import structlog
+from fastapi import APIRouter, Depends, HTTPException, Query
+from sqlalchemy import select, desc, delete
+from sqlalchemy.ext.asyncio import AsyncSession
+from app.database import get_db
+from app.models.content import Copy
+
+logger = structlog.get_logger(__name__)
+router = APIRouter()
+
+@router.get("", summary="Get historical articles")
+async def list_articles(
+ limit: int = Query(20, ge=1, le=100),
+ offset: int = Query(0, ge=0),
+ db: AsyncSession = Depends(get_db)
+):
+ """Return historical generated articles/copies."""
+ query = (
+ select(Copy)
+ .order_by(desc(Copy.created_at))
+ .offset(offset)
+ .limit(limit)
+ )
+ result = await db.execute(query)
+ items = result.scalars().all()
+
+ return {
+ "items": [
+ {
+ "id": str(item.id),
+ "bundle_id": str(item.bundle_id),
+ "campaign_id": str(item.campaign_id),
+ "variant_label": item.variant_label,
+ "title": item.hook,
+ "body": item.body,
+ "channel": item.channel,
+ "status": item.status,
+ "created_at": item.created_at.isoformat()
+ }
+ for item in items
+ ]
+ }
+
+
+@router.delete("/{article_id}", summary="Delete an article by ID")
+async def delete_article(article_id: UUID, db: AsyncSession = Depends(get_db)):
+ result = await db.execute(select(Copy).where(Copy.id == article_id))
+ item = result.scalar_one_or_none()
+ if item is None:
+ raise HTTPException(status_code=404, detail="Article not found")
+ await db.execute(delete(Copy).where(Copy.id == article_id))
+ await db.commit()
+ return {"deleted": str(article_id)}
diff --git a/backend/app/api/router.py b/backend/app/api/router.py
index 29171e6..c89057c 100644
--- a/backend/app/api/router.py
+++ b/backend/app/api/router.py
@@ -3,10 +3,12 @@
from .campaigns import router as campaigns_router
from .agents import router as agents_router
+from .articles import router as articles_router
from .ws import router as ws_router
api_router = APIRouter()
api_router.include_router(campaigns_router, prefix="/v1/campaigns", tags=["Campaigns"])
api_router.include_router(agents_router, prefix="/v1/agents", tags=["A2A Agents"])
+api_router.include_router(articles_router, prefix="/v1/articles", tags=["Articles"])
api_router.include_router(ws_router, tags=["WebSocket"])
diff --git a/backend/app/config.py b/backend/app/config.py
index 752078a..45f0ea5 100644
--- a/backend/app/config.py
+++ b/backend/app/config.py
@@ -70,6 +70,10 @@ class Settings(BaseSettings):
google_ads_client_secret: str = Field(default="", alias="GOOGLE_ADS_CLIENT_SECRET")
google_ads_refresh_token: str = Field(default="", alias="GOOGLE_ADS_REFRESH_TOKEN")
+ # ── Zhihu ──────────────────────────────────────────────────────
+ zhihu_cookie: str = Field(default="", alias="ZHIHU_COOKIE")
+ zhihu_zst_81: str = Field(default="", alias="ZHIHU_ZST_81")
+
# ── Image Generation ───────────────────────────────────────────
openai_api_key: str = Field(default="", alias="OPENAI_API_KEY")
stability_api_key: str = Field(default="", alias="STABILITY_API_KEY")
diff --git a/backend/app/core/llm.py b/backend/app/core/llm.py
index 84fd5cb..4133765 100644
--- a/backend/app/core/llm.py
+++ b/backend/app/core/llm.py
@@ -81,16 +81,23 @@ async def chat_completion(
raise ValueError(f"Unsupported provider: {provider}")
async def _anthropic_completion(self, messages, system, model, max_tokens):
+ resolved_model = model or settings.anthropic_model
+ logger.info("llm_request", provider="anthropic", model=resolved_model,
+ system=system, messages=messages)
response = await self.anthropic.messages.create(
- model=model or settings.anthropic_model,
+ model=resolved_model,
max_tokens=max_tokens or settings.anthropic_max_tokens,
system=system,
messages=messages,
)
- return response.content[0].text
+ result = response.content[0].text
+ logger.info("llm_response", provider="anthropic", model=resolved_model,
+ response=result)
+ return result
async def _openai_compatible_completion(self, base_url, api_key, messages, system, model, max_tokens):
- async with httpx.AsyncClient() as client:
+ # NOTE: Timeout must be 180s for long technical articles
+ async with httpx.AsyncClient(timeout=180.0) as client:
full_messages = []
if system:
full_messages.append({"role": "system", "content": system})
@@ -113,9 +120,14 @@ async def _openai_compatible_completion(self, base_url, api_key, messages, syste
else:
url = url.rstrip("/") + "/chat/completions"
- response = await client.post(url, json=payload, headers=headers, timeout=60.0)
+ logger.info("llm_request", provider=url.split("/")[2], model=model,
+ messages=full_messages)
+ response = await client.post(url, json=payload, headers=headers)
response.raise_for_status()
data = response.json()
- return data["choices"][0]["message"]["content"]
+ result = data["choices"][0]["message"]["content"]
+ logger.info("llm_response", provider=url.split("/")[2], model=model,
+ response=result)
+ return result
llm_client = LLMClient()
diff --git a/backend/app/tasks/agent_tasks.py b/backend/app/tasks/agent_tasks.py
index c971d5f..81630ce 100644
--- a/backend/app/tasks/agent_tasks.py
+++ b/backend/app/tasks/agent_tasks.py
@@ -10,10 +10,25 @@
from arq.connections import RedisSettings
from app.config import settings
+from app.core.event_bus import event_bus
logger = structlog.get_logger(__name__)
+# ── Lifecycle Hooks ──────────────────────────────────────────────────────────
+
+async def startup(ctx: dict):
+ """Initialize resources for the worker process."""
+ await event_bus.connect()
+ logger.info("worker_startup_complete")
+
+
+async def shutdown(ctx: dict):
+ """Cleanup resources."""
+ await event_bus.disconnect()
+ logger.info("worker_shutdown_complete")
+
+
# ── Task Functions (executed by ARQ worker) ───────────────────────────────────
async def run_campaign_pipeline(ctx: dict, campaign_id: str):
@@ -21,6 +36,10 @@ async def run_campaign_pipeline(ctx: dict, campaign_id: str):
Full campaign pipeline: PLANNING → DEPLOYED → MONITORING → OPTIMIZING.
Invokes the LangGraph StateGraph with PostgreSQL checkpointer.
"""
+ # Robustness: ensure event_bus is connected in this worker process
+ if not event_bus._redis:
+ await event_bus.connect()
+
logger.info("campaign_pipeline_start", campaign_id=campaign_id)
from app.database import get_checkpointer, async_session_factory
@@ -50,7 +69,14 @@ async def run_campaign_pipeline(ctx: dict, campaign_id: str):
config = {"configurable": {"thread_id": campaign_id}}
result = await graph.ainvoke(initial_state, config=config)
- logger.info("campaign_pipeline_done", campaign_id=campaign_id, status=result.get("status"))
+ # Final status update to trigger frontend 'COMPLETED' (loop-back) visual
+ final_status = result.get("status", "COMPLETED")
+ if final_status == "OPTIMIZING":
+ # In the logic, OPTIMIZING means we finished a loop and KPI was met
+ # Let's broadcast COMPLETED to trigger the UI loop-back animation
+ await event_bus.publish("StatusChanged", {"old_status": "OPTIMIZING", "new_status": "COMPLETED"}, campaign_id)
+
+ logger.info("campaign_pipeline_done", campaign_id=campaign_id, status=final_status)
return result
@@ -142,6 +168,8 @@ async def cancel_job(task_id: str) -> bool:
class WorkerSettings:
functions = [run_campaign_pipeline, run_agent_node]
+ on_startup = startup
+ on_shutdown = shutdown
redis_settings = RedisSettings.from_dsn(settings.arq_redis_url)
max_jobs = settings.arq_max_jobs
job_timeout = settings.arq_job_timeout
diff --git a/backend/openautogrowth_backend.egg-info/PKG-INFO b/backend/openautogrowth_backend.egg-info/PKG-INFO
new file mode 100644
index 0000000..d23cde0
--- /dev/null
+++ b/backend/openautogrowth_backend.egg-info/PKG-INFO
@@ -0,0 +1,32 @@
+Metadata-Version: 2.4
+Name: openautogrowth-backend
+Version: 1.0.0
+Summary: OpenAutoGrowth — AI Multi-Agent Growth Engine Backend
+Requires-Python: >=3.12
+Requires-Dist: fastapi<0.116,>=0.115
+Requires-Dist: uvicorn[standard]<0.33,>=0.32
+Requires-Dist: pydantic<3.0,>=2.9
+Requires-Dist: pydantic-settings<3.0,>=2.6
+Requires-Dist: sqlalchemy[asyncio]<3.0,>=2.0
+Requires-Dist: asyncpg<0.31,>=0.30
+Requires-Dist: alembic<2.0,>=1.14
+Requires-Dist: langgraph>=0.2
+Requires-Dist: langchain-anthropic>=0.3
+Requires-Dist: langgraph-checkpoint-postgres>=2.0
+Requires-Dist: anthropic<0.50,>=0.40
+Requires-Dist: httpx<0.29,>=0.28
+Requires-Dist: mcp<2.0,>=1.0
+Requires-Dist: redis[hiredis]<6.0,>=5.2
+Requires-Dist: arq<0.27,>=0.26
+Requires-Dist: python-dotenv<2.0,>=1.0
+Requires-Dist: structlog<25.0,>=24.4
+Requires-Dist: tenacity<10.0,>=9.0
+Requires-Dist: python-jose[cryptography]<4.0,>=3.3
+Requires-Dist: passlib[bcrypt]<2.0,>=1.7
+Provides-Extra: dev
+Requires-Dist: pytest>=8.0; extra == "dev"
+Requires-Dist: pytest-asyncio>=0.24; extra == "dev"
+Requires-Dist: pytest-cov>=6.0; extra == "dev"
+Requires-Dist: httpx>=0.28; extra == "dev"
+Requires-Dist: ruff>=0.8; extra == "dev"
+Requires-Dist: mypy>=1.13; extra == "dev"
diff --git a/backend/openautogrowth_backend.egg-info/SOURCES.txt b/backend/openautogrowth_backend.egg-info/SOURCES.txt
new file mode 100644
index 0000000..c351f90
--- /dev/null
+++ b/backend/openautogrowth_backend.egg-info/SOURCES.txt
@@ -0,0 +1,46 @@
+pyproject.toml
+app/__init__.py
+app/config.py
+app/database.py
+app/agents/__init__.py
+app/agents/analysis.py
+app/agents/channel_exec.py
+app/agents/content_gen.py
+app/agents/graph.py
+app/agents/multimodal.py
+app/agents/optimizer.py
+app/agents/planner.py
+app/agents/state.py
+app/agents/strategy.py
+app/api/__init__.py
+app/api/agents.py
+app/api/campaigns.py
+app/api/router.py
+app/api/ws.py
+app/core/__init__.py
+app/core/event_bus.py
+app/core/llm.py
+app/core/memory.py
+app/core/rule_engine.py
+app/models/__init__.py
+app/models/analytics.py
+app/models/campaign.py
+app/models/content.py
+app/models/optimization.py
+app/models/user.py
+app/protocols/__init__.py
+app/protocols/a2a/__init__.py
+app/protocols/a2a/models.py
+app/protocols/mcp/__init__.py
+app/protocols/mcp/tools.py
+app/schemas/__init__.py
+app/schemas/a2a.py
+app/schemas/agent.py
+app/schemas/campaign.py
+app/tasks/__init__.py
+app/tasks/agent_tasks.py
+openautogrowth_backend.egg-info/PKG-INFO
+openautogrowth_backend.egg-info/SOURCES.txt
+openautogrowth_backend.egg-info/dependency_links.txt
+openautogrowth_backend.egg-info/requires.txt
+openautogrowth_backend.egg-info/top_level.txt
\ No newline at end of file
diff --git a/backend/openautogrowth_backend.egg-info/dependency_links.txt b/backend/openautogrowth_backend.egg-info/dependency_links.txt
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/backend/openautogrowth_backend.egg-info/dependency_links.txt
@@ -0,0 +1 @@
+
diff --git a/backend/openautogrowth_backend.egg-info/requires.txt b/backend/openautogrowth_backend.egg-info/requires.txt
new file mode 100644
index 0000000..16d1ba6
--- /dev/null
+++ b/backend/openautogrowth_backend.egg-info/requires.txt
@@ -0,0 +1,28 @@
+fastapi<0.116,>=0.115
+uvicorn[standard]<0.33,>=0.32
+pydantic<3.0,>=2.9
+pydantic-settings<3.0,>=2.6
+sqlalchemy[asyncio]<3.0,>=2.0
+asyncpg<0.31,>=0.30
+alembic<2.0,>=1.14
+langgraph>=0.2
+langchain-anthropic>=0.3
+langgraph-checkpoint-postgres>=2.0
+anthropic<0.50,>=0.40
+httpx<0.29,>=0.28
+mcp<2.0,>=1.0
+redis[hiredis]<6.0,>=5.2
+arq<0.27,>=0.26
+python-dotenv<2.0,>=1.0
+structlog<25.0,>=24.4
+tenacity<10.0,>=9.0
+python-jose[cryptography]<4.0,>=3.3
+passlib[bcrypt]<2.0,>=1.7
+
+[dev]
+pytest>=8.0
+pytest-asyncio>=0.24
+pytest-cov>=6.0
+httpx>=0.28
+ruff>=0.8
+mypy>=1.13
diff --git a/backend/openautogrowth_backend.egg-info/top_level.txt b/backend/openautogrowth_backend.egg-info/top_level.txt
new file mode 100644
index 0000000..b80f0bd
--- /dev/null
+++ b/backend/openautogrowth_backend.egg-info/top_level.txt
@@ -0,0 +1 @@
+app
diff --git a/backend/pyproject.toml b/backend/pyproject.toml
index 70df77c..c63c097 100644
--- a/backend/pyproject.toml
+++ b/backend/pyproject.toml
@@ -23,8 +23,9 @@ dependencies = [
"alembic>=1.14,<2.0",
# Agent framework
- "langgraph>=0.2,<0.3",
- "langchain-anthropic>=0.3,<0.4",
+ "langgraph>=0.2",
+ "langchain-anthropic>=0.3",
+ "langgraph-checkpoint-postgres>=2.0",
"anthropic>=0.40,<0.50",
# Protocol
diff --git a/index.html b/index.html
index 1ac3b1f..8c16a13 100644
--- a/index.html
+++ b/index.html
@@ -20,7 +20,7 @@
OpenAutoGrowth