diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..16c9531 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,9 @@ +.git +__pycache__ +*.pyc +*.pyo +*.pyd +venv +build +*.db + diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..713385c --- /dev/null +++ b/.env.example @@ -0,0 +1,13 @@ +LINE_CHANNEL_ACCESS_TOKEN= +LINE_CHANNEL_SECRET= +DIFY_API_KEY= +DIFY_BASE_URL= +DIFY_USER= +# Optional: enable verbose logging +LINEDIFY_VERBOSE=false +# Optional: limit bot to a specific room +TARGET_ROOM_ID= +# Optional: path to an image file for tests +DIFY_IMAGE_PATH=tests/resources/catgirl.png +# Optional: port for the FastAPI server +PORT=8443 diff --git a/.gitignore b/.gitignore index cce6885..1a41f6f 100644 --- a/.gitignore +++ b/.gitignore @@ -159,7 +159,6 @@ cython_debug/ # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ -examples/ pytest.ini .DS_store *.db diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..77a4958 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,14 @@ +FROM python:3.11-slim + +WORKDIR /app + +COPY requirements.txt requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +ENV PORT=18080 +EXPOSE 18080 + +CMD ["sh", "-c", "uvicorn examples.run:app --host 0.0.0.0 --port ${PORT}"] + diff --git a/README.md b/README.md index f07d8ba..2f6d9e8 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ - Built on FastAPI for high performance and easy scaling - Asynchronous processing for smooth operations + - Compatible with Dify API v1.6.0 streaming events ## đŸ“Ļ Install @@ -38,6 +39,7 @@ By passing the HTTP request body and signature to `line_dify.process_request`, t from contextlib import asynccontextmanager from fastapi import FastAPI, Request, BackgroundTasks from linedify import LineDify +import os # LINE Bot - Dify Agent Integrator line_dify = LineDify( @@ -48,6 +50,13 @@ line_dify = LineDify( dify_user=DIFY_USER ) +TARGET_ROOM_ID = os.getenv("TARGET_ROOM_ID") + +@line_dify.validate_event +async def validate_event(event): + if TARGET_ROOM_ID and event.source.type == "room" and event.source.room_id != TARGET_ROOM_ID: + return [] + # FastAPI @asynccontextmanager async def lifespan(app: FastAPI): @@ -74,6 +83,46 @@ uvicorn run:app NOTE: You have to expose the host:port to where the LINE API server can access. +## 🔧 Environment Variables + +Copy `.env.example` to `.env` and set the following variables: + +- `LINE_CHANNEL_ACCESS_TOKEN` +- `LINE_CHANNEL_SECRET` +- `DIFY_API_KEY` +- `DIFY_BASE_URL` +- `DIFY_USER` +- *(optional)* `DIFY_IMAGE_PATH` - path to an image file for tests +- *(optional)* `PORT` - server port (default `18080`) +- *(optional)* `TARGET_ROOM_ID` - room ID the bot responds to +- *(optional)* `LINEDIFY_VERBOSE` - set to `true` to enable verbose logging + +## đŸŗ Docker + +Use the following commands to build and run the container image. + +```sh +docker build -t linedify . +docker run -p 8443:8443 \ + -e LINE_CHANNEL_ACCESS_TOKEN=YOUR_CHANNEL_ACCESS_TOKEN \ + -e LINE_CHANNEL_SECRET=YOUR_CHANNEL_SECRET \ + -e DIFY_API_KEY=DIFY_API_KEY \ + -e DIFY_BASE_URL=DIFY_BASE_URL \ + -e DIFY_USER=DIFY_USER \ + -e TARGET_ROOM_ID=YOUR_ROOM_ID \ + -e LINEDIFY_VERBOSE=true \ + -e PORT=8443 \ + linedify +``` + +The default listening port is `18080`. To change it, override the `PORT` environment variable. +For example, to listen on port 8443, specify as follows. + +```sh +PORT=8443 +``` + + ## đŸ•šī¸ Switching Types @@ -235,7 +284,7 @@ line_dify = LineDify( ## 🐝 Debug -Set `verbose=True` to see the request and response, both from/to LINE and from/to Dify. +Set `verbose=True` or environment variable `LINEDIFY_VERBOSE=true` to see the request and response, both from/to LINE and from/to Dify. ```python line_dify = LineDify( diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..f845249 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,9 @@ +version: '3.8' +services: + linedify: + build: . + env_file: + - .env + ports: + - "${PORT:-18080}:${PORT:-18080}" + restart: unless-stopped diff --git a/examples/run.py b/examples/run.py new file mode 100644 index 0000000..d7e6075 --- /dev/null +++ b/examples/run.py @@ -0,0 +1,37 @@ +from contextlib import asynccontextmanager +from fastapi import FastAPI, Request, BackgroundTasks +from linedify import LineDify +import os + +line_dify = LineDify( + line_channel_access_token=os.environ.get("LINE_CHANNEL_ACCESS_TOKEN", ""), + line_channel_secret=os.environ.get("LINE_CHANNEL_SECRET", ""), + dify_api_key=os.environ.get("DIFY_API_KEY", ""), + dify_base_url=os.environ.get("DIFY_BASE_URL", ""), + dify_user=os.environ.get("DIFY_USER", ""), +) + +# Room ID to accept messages from +TARGET_ROOM_ID = os.environ.get("TARGET_ROOM_ID") + +@line_dify.validate_event +async def validate_event(event): + if TARGET_ROOM_ID and event.source.type == "room" and event.source.room_id != TARGET_ROOM_ID: + return [] + +@asynccontextmanager +async def lifespan(app: FastAPI): + yield + await line_dify.shutdown() + +app = FastAPI(lifespan=lifespan) + +@app.post("/linebot") +async def handle_request(request: Request, background_tasks: BackgroundTasks): + background_tasks.add_task( + line_dify.process_request, + request_body=(await request.body()).decode("utf-8"), + signature=request.headers.get("X-Line-Signature", "") + ) + return "ok" + diff --git a/linedify/dify.py b/linedify/dify.py index 0b317b0..82fbc5a 100644 --- a/linedify/dify.py +++ b/linedify/dify.py @@ -4,6 +4,7 @@ from typing import Dict, Tuple import aiohttp +from .utils import normalize_line_breaks logger = getLogger(__name__) logger.addHandler(NullHandler()) @@ -32,9 +33,10 @@ def __init__(self, *, api_key: str, base_url: str, user: str, type: DifyType = D self.conversation_ids = {} async def make_payloads(self, *, text: str, image_bytes: bytes = None, inputs: dict = None, user: str = None) -> Dict: + normalized_text = normalize_line_breaks(text) payloads = { "inputs": inputs or {}, - "query": text, + "query": normalized_text, "response_mode": "streaming" if self.type == DifyType.Agent else "blocking", "user": user or self.default_user, "auto_generate_name": False, @@ -87,22 +89,50 @@ async def process_agent_response(self, response: aiohttp.ClientResponse) -> Tupl if self.verbose: logger.debug(f"Chunk from Dify: {json.dumps(chunk, ensure_ascii=False)}") - event_type = chunk["event"] + event_type = chunk.get("event") - if event_type == "agent_message": - conversation_id = chunk["conversation_id"] - response_text += chunk["answer"] + # Text chunk. New spec uses "message", old spec uses "agent_message". + if event_type in ("agent_message", "message"): + if cid := chunk.get("conversation_id"): + conversation_id = cid + response_text += chunk.get("answer", "") + # Tool call (old spec) elif event_type == "agent_thought": if tool := chunk.get("tool"): response_data["tool"] = tool if tool_input := chunk.get("tool_input"): response_data["tool_input"] = tool_input - + + # File event in new spec + elif event_type == "message_file": + files = response_data.setdefault("message_files", []) + files.append({ + "id": chunk.get("id"), + "type": chunk.get("type"), + "url": chunk.get("url"), + "belongs_to": chunk.get("belongs_to"), + }) + if cid := chunk.get("conversation_id"): + conversation_id = cid + + # Message content replaced (new spec) + elif event_type == "message_replace": + response_text = chunk.get("answer", "") + if cid := chunk.get("conversation_id"): + conversation_id = cid + + # End of streaming elif event_type == "message_end": - if retriever_resources := chunk["metadata"].get("retriever_resources"): + metadata = chunk.get("metadata") or {} + if retriever_resources := metadata.get("retriever_resources"): response_data["retriever_resources"] = retriever_resources + if cid := chunk.get("conversation_id"): + conversation_id = cid + + # Ignore other event types (tts_message, workflow logs, etc.) + response_text = normalize_line_breaks(response_text) return conversation_id, response_text, response_data async def process_chatbot_response(self, response: aiohttp.ClientResponse) -> Tuple[str, str, Dict]: @@ -112,7 +142,7 @@ async def process_chatbot_response(self, response: aiohttp.ClientResponse) -> Tu logger.info(f"Response from Dify: {json.dumps(response_json, ensure_ascii=False)}") conversation_id = response_json["conversation_id"] - response_text = response_json["answer"] + response_text = normalize_line_breaks(response_json["answer"]) return conversation_id, response_text, {} async def process_textgenerator_response(self, response: aiohttp.ClientResponse) -> Tuple[str, str, Dict]: diff --git a/linedify/integration.py b/linedify/integration.py index 707ba6e..48f3402 100644 --- a/linedify/integration.py +++ b/linedify/integration.py @@ -1,7 +1,10 @@ import json from logging import getLogger, NullHandler from traceback import format_exc -from typing import Dict, List, Tuple, Union +import os +from typing import Dict, List, Tuple, Union, Optional + +from .utils import normalize_line_breaks from linebot.v3 import WebhookParser from linebot.v3.messaging import ( @@ -40,9 +43,16 @@ def __init__(self, *, dify_type: DifyType = DifyType.Agent, session_db_url: str = "sqlite:///sessions.db", session_timeout: float = 3600.0, - verbose: bool = False + verbose: Optional[bool] = None ) -> None: + if verbose is None: + env_verbose = os.getenv("LINEDIFY_VERBOSE") + if env_verbose is not None: + verbose = env_verbose.lower() in ("1", "true", "yes", "on") + else: + verbose = False + self.verbose = verbose # LINE @@ -173,12 +183,13 @@ async def handle_message_event(self, event: MessageEvent): request_text, image_bytes = await parse_message(event.message) formated_request_text = await self._format_request_text(request_text, image_bytes) + normalized_request_text = normalize_line_breaks(formated_request_text) conversation_session = await self.conversation_session_store.get_session(event.source.user_id) inputs = await self._make_inputs(conversation_session) conversation_id, text, data = await self.dify_agent.invoke( conversation_id=conversation_session.conversation_id, - text=formated_request_text, + text=normalized_request_text, image=image_bytes, inputs=inputs, user=conversation_session.user_id @@ -187,12 +198,13 @@ async def handle_message_event(self, event: MessageEvent): conversation_session.conversation_id = conversation_id await self.conversation_session_store.set_session(conversation_session) - response_messages = await self._to_reply_message(text, data, conversation_session) + normalized_response_text = normalize_line_breaks(text) + response_messages = await self._to_reply_message(normalized_response_text, data, conversation_session) if self.verbose: logger.info(f"Response to LINE: {', '.join([json.dumps(m.to_dict(), ensure_ascii=False) for m in response_messages])}") - await self._on_message_handling_end(conversation_session, request_text, text, data) + await self._on_message_handling_end(conversation_session, request_text, normalized_response_text, data) return response_messages @@ -228,13 +240,14 @@ async def validate_event_default(self, Event) -> Union[None, List[Message]]: return None async def format_request_text_default(self, request_text: str, image_bytes: bytes) -> str: - return request_text + return normalize_line_breaks(request_text) async def make_inputs_default(self, session: ConversationSession) -> Dict: return {} async def to_reply_message_default(self, text: str, data: dict, session: ConversationSession) -> List[Message]: - return [TextMessage(text=text)] + normalized_text = normalize_line_breaks(text) + return [TextMessage(text=normalized_text or "")] async def to_error_message_default(self, event: Event, ex: Exception, session: ConversationSession = None) -> List[Message]: return [TextMessage(text="Error đŸĨ˛")] diff --git a/linedify/utils.py b/linedify/utils.py new file mode 100644 index 0000000..2d7270b --- /dev/null +++ b/linedify/utils.py @@ -0,0 +1,9 @@ +from typing import Optional + + +def normalize_line_breaks(text: Optional[str]) -> Optional[str]: + """Normalize CRLF/CR line endings to LF while preserving None.""" + if text is None: + return None + # Replace CRLF first to avoid double replacement, then lone CR + return text.replace("\r\n", "\n").replace("\r", "\n") diff --git a/tests/test_dify_response.py b/tests/test_dify_response.py new file mode 100644 index 0000000..813e90f --- /dev/null +++ b/tests/test_dify_response.py @@ -0,0 +1,58 @@ +import pytest +import types +import sys +import asyncio +import importlib.util +from pathlib import Path + +# Provide dummy linebot modules so linedify can be imported without dependency. +sys.modules.setdefault("linebot", types.ModuleType("linebot")) +sys.modules.setdefault("linebot.v3", types.ModuleType("linebot.v3")) +sys.modules.setdefault("linebot.v3.messaging", types.ModuleType("linebot.v3.messaging")) +sys.modules.setdefault("linebot.v3.webhooks", types.ModuleType("linebot.v3.webhooks")) + +# Provide dummy aiohttp module so importing dify does not fail when aiohttp is not installed. +aiohttp_stub = types.ModuleType("aiohttp") +aiohttp_stub.ClientSession = object +aiohttp_stub.ClientResponse = object +aiohttp_stub.FormData = object +sys.modules.setdefault("aiohttp", aiohttp_stub) + +# Import linedify.dify without triggering linedify.__init__ +spec = importlib.util.spec_from_file_location( + "linedify.dify", str(Path(__file__).resolve().parents[1] / "linedify" / "dify.py") +) +dify = importlib.util.module_from_spec(spec) +spec.loader.exec_module(dify) +DifyAgent = dify.DifyAgent +DifyType = dify.DifyType + +class FakeContent: + def __init__(self, chunks): + self.chunks = [c.encode('utf-8') for c in chunks] + def __aiter__(self): + return self + async def __anext__(self): + if not self.chunks: + raise StopAsyncIteration + return self.chunks.pop(0) + +class FakeResponse: + def __init__(self, chunks): + self.content = FakeContent(chunks) + +def build_chunks(): + return [ + 'data: {"event": "message", "conversation_id": "cid1", "answer": "Hello"}\n\n', + 'data: {"event": "message_end", "conversation_id": "cid1", "metadata": {}}\n\n' + ] + +def test_process_agent_response_new_spec(): + agent = DifyAgent(api_key="x", base_url="http://example", user="u", type=DifyType.Agent) + resp = FakeResponse(build_chunks()) + conv_id, text, data = asyncio.get_event_loop().run_until_complete( + agent.process_agent_response(resp) + ) + assert conv_id == "cid1" + assert text == "Hello" + assert data == {} diff --git a/tests/test_integration.py b/tests/test_integration.py index c629a8f..8217c41 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -199,3 +199,19 @@ async def handle_message_event(event): reply_messages = await line_dify.process_event(to_message_event("hello")) assert reply_messages[0].text == "Custom error message" + + +@pytest.mark.asyncio +async def test_preserve_line_breaks(line_dify): + captured = {} + + async def fake_invoke(*, conversation_id, text=None, image=None, inputs=None, user=None, start_as_new=False): + captured["text"] = text + return ("cid123", "first line\r\nsecond line", {}) + + line_dify.dify_agent.invoke = fake_invoke + + reply_messages = await line_dify.process_event(to_message_event("first line\r\nsecond line")) + + assert captured["text"] == "first line\nsecond line" + assert reply_messages[0].text == "first line\nsecond line"