Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/pluggedinkit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@
RagStorageStats,
ModelInfo,
AIMetadata,
# Jungian Intelligence types
IndividuationComponents,
IndividuationResponse,
IndividuationHistoryEntry,
ArchetypedPattern,
ArchetypeSearchResponse,
SynchronicityPattern,
DreamConsolidation,
)
from .services.agents import (
Agent,
Expand All @@ -56,6 +64,7 @@
AgentService,
AsyncAgentService,
)
from .services.jungian import JungianService, AsyncJungianService

__all__ = [
"PluggedInClient",
Expand Down Expand Up @@ -103,4 +112,14 @@
"AgentDetails",
"AgentService",
"AsyncAgentService",
# Jungian Intelligence types
"IndividuationComponents",
"IndividuationResponse",
"IndividuationHistoryEntry",
"ArchetypedPattern",
"ArchetypeSearchResponse",
"SynchronicityPattern",
"DreamConsolidation",
"JungianService",
"AsyncJungianService",
]
3 changes: 3 additions & 0 deletions src/pluggedinkit/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
)
from .services.clipboard import ClipboardService, AsyncClipboardService
from .services.documents import DocumentService, AsyncDocumentService
from .services.jungian import JungianService, AsyncJungianService
from .services.rag import RagService, AsyncRagService
from .services.uploads import UploadService, AsyncUploadService
from .services.agents import AgentService, AsyncAgentService
Expand Down Expand Up @@ -100,6 +101,7 @@ def __init__(
# Initialize services
self.clipboard = ClipboardService(self)
self.documents = DocumentService(self)
self.jungian = JungianService(self)
self.rag = RagService(self)
self.uploads = UploadService(self)
self.agents = AgentService(self)
Expand Down Expand Up @@ -201,6 +203,7 @@ def __init__(
# Initialize async services
self.clipboard = AsyncClipboardService(self)
self.documents = AsyncDocumentService(self)
self.jungian = AsyncJungianService(self)
self.rag = AsyncRagService(self)
self.uploads = AsyncUploadService(self)
self.agents = AsyncAgentService(self)
Expand Down
3 changes: 3 additions & 0 deletions src/pluggedinkit/services/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from .clipboard import AsyncClipboardService, ClearAllResult, ClipboardService
from .documents import AsyncDocumentService, DocumentService
from .jungian import AsyncJungianService, JungianService
from .rag import AsyncRagService, RagService
from .uploads import AsyncUploadService, UploadService

Expand All @@ -11,6 +12,8 @@
"ClearAllResult",
"DocumentService",
"AsyncDocumentService",
"JungianService",
"AsyncJungianService",
"RagService",
"AsyncRagService",
"UploadService",
Expand Down
301 changes: 301 additions & 0 deletions src/pluggedinkit/services/jungian.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,301 @@
"""Jungian Intelligence Layer service for Plugged.in SDK

Provides access to the Jungian-inspired intelligence layer including
archetype-aware pattern search, individuation scoring, synchronicity
detection, and dream-cycle consolidation history.
"""

from typing import TYPE_CHECKING, Any, Dict, List, Optional

from ..exceptions import PluggedInError
from ..types import (
ArchetypedPattern,
ArchetypeSearchResponse,
DreamConsolidation,
IndividuationHistoryEntry,
IndividuationResponse,
SynchronicityPattern,
)

if TYPE_CHECKING:
from ..client import AsyncPluggedInClient, PluggedInClient


# -----------------------------------------------------------------------------
# Shared Helper Functions
# -----------------------------------------------------------------------------


def _parse_archetype_search(data: Dict[str, Any]) -> ArchetypeSearchResponse:
"""Parse the archetype search response into typed model."""
patterns_raw = data.get("patterns", [])
patterns = [ArchetypedPattern(**p) for p in patterns_raw]
return ArchetypeSearchResponse(patterns=patterns)


def _parse_individuation(data: Dict[str, Any]) -> IndividuationResponse:
"""Parse the individuation score response into typed model."""
return IndividuationResponse(**data)


def _parse_individuation_history(data: Any) -> List[IndividuationHistoryEntry]:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting shared list-extraction and request-building helpers to reduce repeated parsing and wiring logic across the Jungian service functions.

You can trim complexity a bit in two focused areas without changing any behavior.

1. Centralize the “flexible shape” parsing

The three helpers _parse_individuation_history, _parse_synchronicity_patterns, and _parse_dream_history all repeat the same pattern: accept Any, handle list or dict with a couple of possible keys, and silently fall back to [].

You can keep the same accepted shapes but centralize the logic so each parser is simpler and less branched:

from typing import Callable, Type, TypeVar

T = TypeVar("T")

def _extract_list_payload(
    data: Any,
    *,
    default_key: str,
    alt_key: Optional[str] = None,
) -> List[Dict[str, Any]]:
    if isinstance(data, list):
        return data
    if isinstance(data, dict):
        if default_key in data and isinstance(data[default_key], list):
            return data[default_key]
        if alt_key and alt_key in data and isinstance(data[alt_key], list):
            return data[alt_key]
    # Preserve current behavior: unexpected shape -> empty list
    return []

Then the individual parsers become much narrower and easier to reason about:

def _parse_individuation_history(data: Any) -> List[IndividuationHistoryEntry]:
    entries = _extract_list_payload(data, default_key="history", alt_key="entries")
    return [IndividuationHistoryEntry(**entry) for entry in entries]

def _parse_synchronicity_patterns(data: Any) -> List[SynchronicityPattern]:
    patterns = _extract_list_payload(data, default_key="patterns")
    return [SynchronicityPattern(**p) for p in patterns]

def _parse_dream_history(data: Any) -> List[DreamConsolidation]:
    entries = _extract_list_payload(data, default_key="history", alt_key="consolidations")
    return [DreamConsolidation(**d) for d in entries]

If you want to make unexpected shapes more visible later, you can swap the return [] in _extract_list_payload for a PluggedInError without changing all call sites.


2. Share sync/async request + parse wiring

The sync and async services mostly differ in:

  • the client type, and
  • whether request is awaited.

You can keep the separate public classes/signatures but factor out the “request + parse” pattern into a small helper to cut repetition. For example, for the individuation endpoints:

# shared helper
def _build_individuation_history_params(days: int) -> Dict[str, str]:
    return {"history": "true", "days": str(days)}

Usage in sync:

def get_individuation_history(self, days: int = 30) -> List[IndividuationHistoryEntry]:
    response = self.client.request(
        "GET",
        "/api/memory/individuation",
        params=_build_individuation_history_params(days),
    )
    return _parse_individuation_history(response.json())

Usage in async:

async def get_individuation_history(self, days: int = 30) -> List[IndividuationHistoryEntry]:
    response = await self.client.request(
        "GET",
        "/api/memory/individuation",
        params=_build_individuation_history_params(days),
    )
    return _parse_individuation_history(response.json())

You can apply the same approach for:

  • _build_search_with_context_payload(query, tool_name, outcome)
  • constant paths like "/api/memory/individuation" / "/api/memory/sync/patterns" / "/api/memory/dream/history"

This keeps functionality identical while reducing duplicated literals and argument construction logic, and makes future changes (e.g., adding a param) less error-prone across sync/async variants.

"""Parse the individuation history response into typed list."""
if isinstance(data, list):
return [IndividuationHistoryEntry(**entry) for entry in data]
# If wrapped in an object, try the 'history' key
if isinstance(data, dict):
entries = data.get("history", data.get("entries", []))
return [IndividuationHistoryEntry(**entry) for entry in entries]
return []
Comment on lines +43 to +49

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation is not fully robust. If the API returns an unexpected data type (e.g., None or a string instead of a list for history or entries), the function will raise a TypeError. It's safer to validate that the extracted data is a list before iterating over it and that each item is a dictionary before unpacking.

    items_raw = []
    if isinstance(data, list):
        items_raw = data
    elif isinstance(data, dict):
        items_raw = data.get("history")
        if not isinstance(items_raw, list):
            items_raw = data.get("entries")

    if not isinstance(items_raw, list):
        return []

    return [IndividuationHistoryEntry(**entry) for entry in items_raw if isinstance(entry, dict)]



def _parse_synchronicity_patterns(data: Any) -> List[SynchronicityPattern]:
"""Parse synchronicity patterns response."""
if isinstance(data, list):
return [SynchronicityPattern(**p) for p in data]
if isinstance(data, dict):
patterns = data.get("patterns", [])
return [SynchronicityPattern(**p) for p in patterns]
return []
Comment on lines +54 to +59

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to the other parsing functions in this file, this implementation is not robust against unexpected API response formats. If data.get('patterns') returns a non-list iterable or None, this function will raise a TypeError. The implementation should be updated to handle these cases gracefully.

Suggested change
if isinstance(data, list):
return [SynchronicityPattern(**p) for p in data]
if isinstance(data, dict):
patterns = data.get("patterns", [])
return [SynchronicityPattern(**p) for p in patterns]
return []
items_raw = []
if isinstance(data, list):
items_raw = data
elif isinstance(data, dict):
items_raw = data.get("patterns")
if not isinstance(items_raw, list):
return []
return [SynchronicityPattern(**p) for p in items_raw if isinstance(p, dict)]



def _parse_dream_history(data: Any) -> List[DreamConsolidation]:
"""Parse dream consolidation history response."""
if isinstance(data, list):
return [DreamConsolidation(**d) for d in data]
if isinstance(data, dict):
entries = data.get("history", data.get("consolidations", []))
return [DreamConsolidation(**d) for d in entries]
return []
Comment on lines +64 to +69

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This function has the same robustness issue as the other parsers. It can fail with a TypeError if the API response for history or consolidations is not a list. It's important to validate the data type before iteration to prevent runtime errors.

Suggested change
if isinstance(data, list):
return [DreamConsolidation(**d) for d in data]
if isinstance(data, dict):
entries = data.get("history", data.get("consolidations", []))
return [DreamConsolidation(**d) for d in entries]
return []
items_raw = []
if isinstance(data, list):
items_raw = data
elif isinstance(data, dict):
items_raw = data.get("history")
if not isinstance(items_raw, list):
items_raw = data.get("consolidations")
if not isinstance(items_raw, list):
return []
return [DreamConsolidation(**d) for d in items_raw if isinstance(d, dict)]



# -----------------------------------------------------------------------------
# Synchronous Jungian Service
# -----------------------------------------------------------------------------


class JungianService:
"""Synchronous Jungian Intelligence Layer service for Plugged.in.

Provides archetype-aware pattern search, individuation scoring,
synchronicity detection, and dream-cycle consolidation history.
"""

def __init__(self, client: "PluggedInClient"):
self.client = client

def search_with_context(
self,
query: str,
tool_name: Optional[str] = None,
outcome: Optional[str] = None,
) -> ArchetypeSearchResponse:
"""Search patterns with archetype context injection.

Sends a query to the archetype inject endpoint which returns
patterns enriched with Jungian archetype classifications.

Args:
query: Natural language search query
tool_name: Optional tool name for context
outcome: Optional outcome description for context

Returns:
ArchetypeSearchResponse with classified patterns

Raises:
PluggedInError: If the API request fails
"""
payload: Dict[str, Any] = {"query": query}
if tool_name is not None:
payload["tool_name"] = tool_name
if outcome is not None:
payload["outcome"] = outcome

response = self.client.request(
"POST", "/api/memory/archetype/inject", json=payload
)
return _parse_archetype_search(response.json())

def get_individuation_score(self) -> IndividuationResponse:
"""Get the current individuation score.

The individuation score measures the agent's psychological
development across four components: memory depth, learning
velocity, collective contribution, and self-awareness.

Returns:
IndividuationResponse with score, level, trend, and components

Raises:
PluggedInError: If the API request fails
"""
response = self.client.request("GET", "/api/memory/individuation")
return _parse_individuation(response.json())

def get_individuation_history(
self, days: int = 30
) -> List[IndividuationHistoryEntry]:
"""Get individuation score history over a time period.

Args:
days: Number of days of history to retrieve (default: 30)

Returns:
List of IndividuationHistoryEntry ordered by date

Raises:
PluggedInError: If the API request fails
"""
response = self.client.request(
"GET",
"/api/memory/individuation",
params={"history": "true", "days": str(days)},
)
return _parse_individuation_history(response.json())

def get_synchronicity_patterns(self) -> List[SynchronicityPattern]:
"""Get detected synchronicity patterns.

Synchronicity patterns are meaningful coincidences detected
across multiple profiles in the collective unconscious layer.

Returns:
List of SynchronicityPattern instances

Raises:
PluggedInError: If the API request fails
"""
response = self.client.request("GET", "/api/memory/sync/patterns")
return _parse_synchronicity_patterns(response.json())

def get_dream_history(self) -> List[DreamConsolidation]:
"""Get dream-cycle consolidation history.

Dream consolidations represent periodic memory compression
cycles that merge similar patterns and reduce token usage.

Returns:
List of DreamConsolidation records

Raises:
PluggedInError: If the API request fails
"""
response = self.client.request("GET", "/api/memory/dream/history")
return _parse_dream_history(response.json())


# -----------------------------------------------------------------------------
# Asynchronous Jungian Service
# -----------------------------------------------------------------------------


class AsyncJungianService:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The AsyncJungianService class is almost an exact copy of JungianService, with the only differences being async and await. This creates significant code duplication across all five methods, making future maintenance harder (e.g., adding a parameter requires changes in two places). While this pattern might be consistent with other services in the SDK, it's worth considering strategies to reduce this duplication for better maintainability. For example, a tool like unasync could generate the sync code from the async code, or parameter-building logic could be extracted into shared helper functions.

"""Asynchronous Jungian Intelligence Layer service for Plugged.in.

Provides archetype-aware pattern search, individuation scoring,
synchronicity detection, and dream-cycle consolidation history.
"""

def __init__(self, client: "AsyncPluggedInClient"):
self.client = client

async def search_with_context(
self,
query: str,
tool_name: Optional[str] = None,
outcome: Optional[str] = None,
) -> ArchetypeSearchResponse:
"""Search patterns with archetype context injection.

Sends a query to the archetype inject endpoint which returns
patterns enriched with Jungian archetype classifications.

Args:
query: Natural language search query
tool_name: Optional tool name for context
outcome: Optional outcome description for context

Returns:
ArchetypeSearchResponse with classified patterns

Raises:
PluggedInError: If the API request fails
"""
payload: Dict[str, Any] = {"query": query}
if tool_name is not None:
payload["tool_name"] = tool_name
if outcome is not None:
payload["outcome"] = outcome

response = await self.client.request(
"POST", "/api/memory/archetype/inject", json=payload
)
return _parse_archetype_search(response.json())

async def get_individuation_score(self) -> IndividuationResponse:
"""Get the current individuation score.

The individuation score measures the agent's psychological
development across four components: memory depth, learning
velocity, collective contribution, and self-awareness.

Returns:
IndividuationResponse with score, level, trend, and components

Raises:
PluggedInError: If the API request fails
"""
response = await self.client.request("GET", "/api/memory/individuation")
return _parse_individuation(response.json())

async def get_individuation_history(
self, days: int = 30
) -> List[IndividuationHistoryEntry]:
"""Get individuation score history over a time period.

Args:
days: Number of days of history to retrieve (default: 30)

Returns:
List of IndividuationHistoryEntry ordered by date

Raises:
PluggedInError: If the API request fails
"""
response = await self.client.request(
"GET",
"/api/memory/individuation",
params={"history": "true", "days": str(days)},
)
return _parse_individuation_history(response.json())

async def get_synchronicity_patterns(self) -> List[SynchronicityPattern]:
"""Get detected synchronicity patterns.

Synchronicity patterns are meaningful coincidences detected
across multiple profiles in the collective unconscious layer.

Returns:
List of SynchronicityPattern instances

Raises:
PluggedInError: If the API request fails
"""
response = await self.client.request("GET", "/api/memory/sync/patterns")
return _parse_synchronicity_patterns(response.json())

async def get_dream_history(self) -> List[DreamConsolidation]:
"""Get dream-cycle consolidation history.

Dream consolidations represent periodic memory compression
cycles that merge similar patterns and reduce token usage.

Returns:
List of DreamConsolidation records

Raises:
PluggedInError: If the API request fails
"""
response = await self.client.request("GET", "/api/memory/dream/history")
return _parse_dream_history(response.json())
Loading