Learn how to create custom plugins for OpenContext.
Custom plugins allow you to integrate OpenContext with your own APIs, databases, or data sources. Plugins are added to the custom_plugins/ directory and automatically discovered by the Plugin Manager.
-
Copy the template:
cp custom_plugins/template/plugin_template.py custom_plugins/my_plugin/plugin.py
-
Edit
custom_plugins/my_plugin/plugin.py:- Replace
MyCustomPluginwith your class name - Set
plugin_nameto your plugin name - Implement all TODO sections
- Replace
-
Add configuration to
config.yaml:plugins: my_plugin: enabled: true api_url: "https://api.example.com" api_key: "${MY_API_KEY}"
-
Deploy:
opencontext deploy --env staging
All plugins must:
- Inherit from
MCPPlugin(orDataPluginfor data sources) - Set class attributes:
plugin_name,plugin_type,plugin_version - Implement all required methods
- Be placed in
custom_plugins/your_plugin_name/plugin.py
Initialize plugin with configuration from config.yaml.
def __init__(self, config: Dict[str, Any]) -> None:
super().__init__(config)
self.api_url = config.get("api_url")
self.api_key = config.get("api_key")Set up connections, test connectivity, validate configuration.
async def initialize(self) -> bool:
self.client = httpx.AsyncClient(base_url=self.api_url)
response = await self.client.get("/health")
response.raise_for_status()
self._initialized = True
return TrueClean up resources.
async def shutdown(self) -> None:
if self.client:
await self.client.aclose()
self._initialized = FalseReturn list of tools your plugin provides.
def get_tools(self) -> List[ToolDefinition]:
return [
ToolDefinition(
name="search",
description="Search for items",
input_schema={
"type": "object",
"properties": {
"query": {"type": "string"},
},
"required": ["query"],
},
),
]Important: Tool names should NOT include plugin prefix. The Plugin Manager adds it automatically using double underscores (e.g., my_plugin__search).
Execute a tool by name.
async def execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> ToolResult:
if tool_name == "search":
query = arguments.get("query")
results = await self._search(query)
return ToolResult(
content=[{"type": "text", "text": self._format_results(results)}],
success=True,
)
else:
return ToolResult(
content=[],
success=False,
error_message=f"Unknown tool: {tool_name}",
)Check if plugin is healthy.
async def health_check(self) -> bool:
try:
response = await self.client.get("/health")
return response.status_code == 200
except:
return FalseIf your plugin provides data operations, inherit from DataPlugin instead of MCPPlugin directly.
DataPlugin extends MCPPlugin, so a DataPlugin subclass must implement all 8 abstract methods — the 5 from MCPPlugin plus the 3 defined on DataPlugin itself. Omitting any of these will raise a TypeError at startup.
These are the same methods documented in the Required Methods section above. DataPlugin does not override or relax any of them:
| Method | Signature |
|---|---|
initialize |
async def initialize(self) -> bool |
shutdown |
async def shutdown(self) -> None |
get_tools |
def get_tools(self) -> List[ToolDefinition] |
execute_tool |
async def execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> ToolResult |
health_check |
async def health_check(self) -> bool |
| Method | Signature |
|---|---|
search_datasets |
async def search_datasets(self, query: str, limit: int = 20) -> List[Dict[str, Any]] |
get_dataset |
async def get_dataset(self, dataset_id: str) -> Dict[str, Any] |
query_data |
async def query_data(self, resource_id: str, filters: Optional[Dict[str, Any]] = None, limit: int = 100) -> List[Dict[str, Any]] |
from typing import Any, Dict, List, Optional
from core.interfaces import DataPlugin, PluginType, ToolDefinition, ToolResult
class MyDataPlugin(DataPlugin):
plugin_name = "my_data"
plugin_type = PluginType.OPEN_DATA
plugin_version = "1.0.0"
def __init__(self, config: Dict[str, Any]) -> None:
super().__init__(config)
# --- 5 required methods from MCPPlugin ---
async def initialize(self) -> bool:
# Create clients, validate config, set self._initialized = True
self._initialized = True
return True
async def shutdown(self) -> None:
# Close clients, release resources
self._initialized = False
def get_tools(self) -> List[ToolDefinition]:
# Return ToolDefinition objects for each tool this plugin exposes
return []
async def execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> ToolResult:
# Dispatch to the correct tool implementation
return ToolResult(content=[], success=False, error_message=f"Unknown tool: {tool_name}")
async def health_check(self) -> bool:
return self._initialized
# --- 3 required methods from DataPlugin ---
async def search_datasets(self, query: str, limit: int = 20) -> List[Dict[str, Any]]:
# Return a list of dataset metadata dicts matching the query
pass
async def get_dataset(self, dataset_id: str) -> Dict[str, Any]:
# Return full metadata for a single dataset
pass
async def query_data(
self,
resource_id: str,
filters: Optional[Dict[str, Any]] = None,
limit: int = 100,
) -> List[Dict[str, Any]]:
# Return records from the specified resource
passSee custom_plugins/template/plugin_template.py for the canonical, fully-annotated starting point (it inherits MCPPlugin directly, which is fine for non-data plugins).
Always handle errors gracefully:
try:
result = await self._call_api()
return ToolResult(content=[...], success=True)
except Exception as e:
logger.error(f"Error: {e}", exc_info=True)
return ToolResult(
content=[],
success=False,
error_message=f"Operation failed: {str(e)}",
)Use structured logging:
import logging
logger = logging.getLogger(__name__)
logger.info("Plugin initialized")
logger.error("Error occurred", exc_info=True)Validate configuration in initialize():
async def initialize(self) -> bool:
if not self.api_url:
raise ValueError("api_url is required")
# ...Format results for clarity:
def _format_results(self, data: List[Dict]) -> str:
lines = [f"Found {len(data)} results:\n"]
for item in data:
lines.append(f"- {item['name']}: {item['description']}")
return "\n".join(lines)from core.interfaces import MCPPlugin, PluginType, ToolDefinition, ToolResult
import httpx
class MyAPIPlugin(MCPPlugin):
plugin_name = "my_api"
plugin_type = PluginType.CUSTOM_API
plugin_version = "1.0.0"
def __init__(self, config: Dict[str, Any]) -> None:
super().__init__(config)
self.api_url = config["api_url"]
self.client = None
async def initialize(self) -> bool:
self.client = httpx.AsyncClient(base_url=self.api_url)
self._initialized = True
return True
async def shutdown(self) -> None:
if self.client:
await self.client.aclose()
self._initialized = False
def get_tools(self) -> List[ToolDefinition]:
return [
ToolDefinition(
name="get_item",
description="Get an item by ID",
input_schema={
"type": "object",
"properties": {
"item_id": {"type": "string"},
},
"required": ["item_id"],
},
),
]
async def execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> ToolResult:
if tool_name == "get_item":
item_id = arguments["item_id"]
response = await self.client.get(f"/items/{item_id}")
data = response.json()
return ToolResult(
content=[{"type": "text", "text": f"Item: {data['name']}"}],
success=True,
)
return ToolResult(content=[], success=False, error_message="Unknown tool")
async def health_check(self) -> bool:
try:
response = await self.client.get("/health")
return response.status_code == 200
except:
return FalseTest your plugin locally before deploying:
# test_my_plugin.py
import asyncio
from custom_plugins.my_plugin.plugin import MyAPIPlugin
async def test():
plugin = MyAPIPlugin({"api_url": "https://api.example.com"})
await plugin.initialize()
tools = plugin.get_tools()
print(f"Tools: {[t.name for t in tools]}")
result = await plugin.execute_tool("get_item", {"item_id": "123"})
print(f"Result: {result.success}")
asyncio.run(test())For complex plugins, create a Pydantic schema:
# custom_plugins/my_plugin/config_schema.py
from pydantic import BaseModel
class MyPluginConfig(BaseModel):
enabled: bool = False
api_url: str
api_key: Optional[str] = None
timeout: int = 120Use in plugin:
from custom_plugins.my_plugin.config_schema import MyPluginConfig
def __init__(self, config: Dict[str, Any]) -> None:
super().__init__(config)
self.plugin_config = MyPluginConfig(**config)- Plugin Template
- CKAN Plugin - Example implementation
- Core Interfaces - API reference