Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions libzapi/application/services/ticketing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
from libzapi.application.services.ticketing.group_memberships_service import (
GroupMembershipsService,
)
from libzapi.application.services.ticketing.incremental_exports_service import (
IncrementalExportsService,
)
from libzapi.application.services.ticketing.job_statuses_service import JobStatusesService
from libzapi.application.services.ticketing.locales_service import LocalesService
from libzapi.application.services.ticketing.macro_service import MacroService
Expand Down Expand Up @@ -82,6 +85,9 @@ def __init__(
self.email_notifications = EmailNotificationService(api.EmailNotificationApiClient(http))
self.groups = GroupsService(api.GroupApiClient(http))
self.group_memberships = GroupMembershipsService(api.GroupMembershipApiClient(http))
self.incremental_exports = IncrementalExportsService(
api.IncrementalExportApiClient(http)
)
self.job_statuses = JobStatusesService(api.JobStatusApiClient(http))
self.locales = LocalesService(api.LocaleApiClient(http))
self.macros = MacroService(api.MacroApiClient(http))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from __future__ import annotations

from typing import Iterable

from libzapi.domain.models.ticketing.organization import Organization
from libzapi.domain.models.ticketing.ticket import Ticket
from libzapi.domain.models.ticketing.user import User
from libzapi.infrastructure.api_clients.ticketing.incremental_export_api_client import (
IncrementalExportApiClient,
)


class IncrementalExportsService:
"""High-level service for Zendesk Ticketing Incremental Exports."""

def __init__(self, client: IncrementalExportApiClient) -> None:
self._client = client

def tickets(self, start_time: int) -> Iterable[Ticket]:
return self._client.tickets(start_time=start_time)

def tickets_cursor(self, start_time: int) -> Iterable[Ticket]:
return self._client.tickets_cursor(start_time=start_time)

def ticket_events(self, start_time: int) -> Iterable[dict]:
return self._client.ticket_events(start_time=start_time)

def users(self, start_time: int) -> Iterable[User]:
return self._client.users(start_time=start_time)

def users_cursor(self, start_time: int) -> Iterable[User]:
return self._client.users_cursor(start_time=start_time)

def organizations(self, start_time: int) -> Iterable[Organization]:
return self._client.organizations(start_time=start_time)

def sample(self, resource: str, start_time: int) -> dict:
return self._client.sample(resource=resource, start_time=start_time)
4 changes: 4 additions & 0 deletions libzapi/infrastructure/api_clients/ticketing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
from libzapi.infrastructure.api_clients.ticketing.group_membership_api_client import (
GroupMembershipApiClient,
)
from libzapi.infrastructure.api_clients.ticketing.incremental_export_api_client import (
IncrementalExportApiClient,
)
from libzapi.infrastructure.api_clients.ticketing.job_status_api_client import JobStatusApiClient
from libzapi.infrastructure.api_clients.ticketing.locale_api_client import LocaleApiClient
from libzapi.infrastructure.api_clients.ticketing.macro_api_client import MacroApiClient
Expand Down Expand Up @@ -64,6 +67,7 @@
"EmailNotificationApiClient",
"GroupApiClient",
"GroupMembershipApiClient",
"IncrementalExportApiClient",
"JobStatusApiClient",
"LocaleApiClient",
"MacroApiClient",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from __future__ import annotations

from typing import Iterator

from libzapi.domain.models.ticketing.organization import Organization
from libzapi.domain.models.ticketing.ticket import Ticket
from libzapi.domain.models.ticketing.user import User
from libzapi.infrastructure.http.client import HttpClient
from libzapi.infrastructure.http.pagination import yield_items
from libzapi.infrastructure.serialization.parse import to_domain

_BASE = "/api/v2/incremental"


class IncrementalExportApiClient:
"""HTTP adapter for Zendesk Ticketing Incremental Export endpoints.

Time-based endpoints follow ``next_page`` links, cursor-based endpoints
follow ``after_url`` + ``end_of_stream`` semantics.
"""

def __init__(self, http: HttpClient) -> None:
self._http = http

# -----------------------------------------------------------------
# Time-based exports
# -----------------------------------------------------------------

def tickets(self, start_time: int) -> Iterator[Ticket]:
for obj in yield_items(
get_json=self._http.get,
first_path=f"{_BASE}/tickets?start_time={int(start_time)}",
base_url=self._http.base_url,
items_key="tickets",
):
yield to_domain(data=obj, cls=Ticket)

def ticket_events(self, start_time: int) -> Iterator[dict]:
for obj in yield_items(
get_json=self._http.get,
first_path=f"{_BASE}/ticket_events?start_time={int(start_time)}",
base_url=self._http.base_url,
items_key="ticket_events",
):
yield obj

def users(self, start_time: int) -> Iterator[User]:
for obj in yield_items(
get_json=self._http.get,
first_path=f"{_BASE}/users?start_time={int(start_time)}",
base_url=self._http.base_url,
items_key="users",
):
yield to_domain(data=obj, cls=User)

def organizations(self, start_time: int) -> Iterator[Organization]:
for obj in yield_items(
get_json=self._http.get,
first_path=f"{_BASE}/organizations?start_time={int(start_time)}",
base_url=self._http.base_url,
items_key="organizations",
):
yield to_domain(data=obj, cls=Organization)

# -----------------------------------------------------------------
# Cursor-based exports
# -----------------------------------------------------------------

def tickets_cursor(self, start_time: int) -> Iterator[Ticket]:
yield from self._iter_cursor(
first_path=f"{_BASE}/tickets/cursor?start_time={int(start_time)}",
items_key="tickets",
cls=Ticket,
)

def users_cursor(self, start_time: int) -> Iterator[User]:
yield from self._iter_cursor(
first_path=f"{_BASE}/users/cursor?start_time={int(start_time)}",
items_key="users",
cls=User,
)

def _iter_cursor(self, first_path: str, items_key: str, cls):
path: str | None = first_path
while path:
page = self._http.get(path)
for obj in page.get(items_key, []) or []:
yield to_domain(data=obj, cls=cls)
if page.get("end_of_stream"):
return
after_url = page.get("after_url")
if not after_url:
return
path = (
after_url.replace(self._http.base_url, "")
if isinstance(after_url, str) and after_url.startswith("https://")
else after_url
)

# -----------------------------------------------------------------
# Sample endpoint (one page, no pagination)
# -----------------------------------------------------------------

def sample(self, resource: str, start_time: int) -> dict:
allowed = {"tickets", "users", "organizations", "ticket_events"}
if resource not in allowed:
raise ValueError(f"Unknown incremental sample resource: {resource}")
return self._http.get(
f"{_BASE}/{resource}/sample?start_time={int(start_time)}"
)
54 changes: 54 additions & 0 deletions tests/integration/ticketing/test_incremental_exports.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import itertools

from libzapi import Ticketing

_EPOCH = 0


def test_tickets(ticketing: Ticketing):
items = list(
itertools.islice(ticketing.incremental_exports.tickets(start_time=_EPOCH), 10)
)
assert isinstance(items, list)


def test_tickets_cursor(ticketing: Ticketing):
items = list(
itertools.islice(
ticketing.incremental_exports.tickets_cursor(start_time=_EPOCH), 10
)
)
assert isinstance(items, list)


def test_ticket_events(ticketing: Ticketing):
items = list(
itertools.islice(
ticketing.incremental_exports.ticket_events(start_time=_EPOCH), 10
)
)
assert isinstance(items, list)


def test_users(ticketing: Ticketing):
items = list(
itertools.islice(ticketing.incremental_exports.users(start_time=_EPOCH), 10)
)
assert isinstance(items, list)


def test_organizations(ticketing: Ticketing):
items = list(
itertools.islice(
ticketing.incremental_exports.organizations(start_time=_EPOCH), 10
)
)
assert isinstance(items, list)


def test_sample(ticketing: Ticketing):
payload = ticketing.incremental_exports.sample(
resource="tickets", start_time=_EPOCH
)
assert isinstance(payload, dict)
assert "tickets" in payload or "count" in payload
Loading
Loading