Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Native async support for PEP 249 interface (replace to_thread wrapper) #95
- Improve public API surface and top-level exports #94
- Fix PEP 249 compliance: cursor.description and result types #91
- Add structured logging across the codebase #90

## [v0.2.0](https://github.com/Mapepire-IBMi/mapepire-python/releases/tag/v0.2.0) - 2024-11-26
- replace `websocket-client` with `websockets`
Expand Down
16 changes: 10 additions & 6 deletions mapepire_python/async_base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async def get_channel(self, db2_server: DaemonServer) -> ClientConnection:
return await socket.connect()

async def send(self, content: str) -> Dict[str, Any]:
logger.debug(f"sending data: {content}")
logger.debug("sending data: %s", content)
req = json.loads(content)
if self.socket is None:
raise RuntimeError("Socket is not connected")
Expand All @@ -98,14 +98,15 @@ def get_status(self) -> JobStatus:

def get_running_count(self) -> int:
count = len(self._pending)
logger.debug(f"--- running count {self.unique_id}: {count}, status: {self.get_status()}")
logger.debug("--- running count %s: %d, status: %s", self.unique_id, count, self.get_status())
return count

async def connect( # type: ignore[override]
self, db2_server: Optional[Union[DaemonServer, Dict[str, Any], Path]] = None, **kwargs
) -> Any:
db2_server = self._parse_connection_input(db2_server, **kwargs)

logger.info("Connecting to %s:%s", db2_server.host, db2_server.port)
self.socket = await self.get_channel(db2_server)
self._message_task = asyncio.create_task(self.message_handler())

Expand All @@ -127,12 +128,14 @@ async def connect( # type: ignore[override]

if result.success:
self.status = JobStatus.Ready
self.id = result.job
logger.info("Connection established: job_id=%s", self.id)
else:
self.status = JobStatus.NotStarted
logger.error("Connection failed: %s", result.error or "unknown error")
await self.close()
raise Exception(result.error or "Failed to connect to server")

self.id = result.job
self._is_tracing_channel_data = False

return result
Expand All @@ -159,18 +162,18 @@ async def message_handler(self):
if self.socket is None:
raise RuntimeError("Socket is not connected")
async for message in self.socket:
logger.debug(f"Received raw message: {message}")
logger.debug("Received raw message: %s", message)
try:
response = json.loads(message)
except json.JSONDecodeError as e:
logger.warning(f"Discarding malformed message: {e}")
logger.warning("Discarding malformed message: %s", e)
continue
req_id = response.get("id")
future = self._pending.get(req_id) if req_id else None
if future is not None and not future.done():
future.set_result(response)
else:
logger.debug(f"No pending request for response id: {req_id}")
logger.debug("No pending request for response id: %s", req_id)
except websockets.exceptions.ConnectionClosedError:
await self.dispose()

Expand Down Expand Up @@ -207,6 +210,7 @@ async def query_and_run( # type: ignore[override]
raise RuntimeError(f"Failed to run query: {e}") from e

async def close(self) -> None: # type: ignore[override]
logger.info("Closing job %s", self.id)
await self.dispose()
self.status = JobStatus.Ended

7 changes: 7 additions & 0 deletions mapepire_python/asyncio/connection.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from pathlib import Path
from typing import Any, Dict, Optional, Sequence, Union

Expand All @@ -8,6 +9,8 @@
from ..data_types import DaemonServer, JobStatus
from .cursor import AsyncCursor

logger = logging.getLogger(__name__)


class AsyncConnection(aiopep249.AsyncCursorExecuteMixin, aiopep249.AsyncConnection):
"""
Expand Down Expand Up @@ -47,13 +50,15 @@ def __init__(self, database: Union[DaemonServer, dict, Path], opts: Optional[Dic

async def _ensure_connected(self) -> None:
if self._job.status == JobStatus.NotStarted:
logger.debug("Opening async connection")
await self._job.connect(self._database, **self._kwargs)

async def cursor(self) -> AsyncCursor:
await self._ensure_connected()
return AsyncCursor(self, self._job)

async def close(self) -> None:
logger.debug("Closing async connection")
await self._job.close()

async def execute(
Expand Down Expand Up @@ -94,8 +99,10 @@ def _raise_if_closed(self) -> None:

async def commit(self) -> None:
self._raise_if_closed()
logger.debug("Committing transaction")
await self.execute("COMMIT")

async def rollback(self) -> None:
self._raise_if_closed()
logger.debug("Rolling back transaction")
await self.execute("ROLLBACK")
5 changes: 5 additions & 0 deletions mapepire_python/asyncio/cursor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import weakref
from typing import TYPE_CHECKING, List, Optional, Sequence, Type, Union

Expand All @@ -18,6 +19,8 @@
from ..data_types import QueryOptions
from ..pool.pool_query import PoolQuery

logger = logging.getLogger(__name__)


class AsyncCursor(
aiopep249.CursorConnectionMixin,
Expand Down Expand Up @@ -76,6 +79,7 @@ def setoutputsize(self, size: int, column: Optional[int] = None) -> None:
async def execute(
self, operation: SQLQuery, parameters: Optional[QueryParameters] = None
) -> "AsyncCursor":
logger.debug("Executing statement")
opts = QueryOptions(parameters=parameters)
self._query = self._job.query(operation, opts=opts)
result = await self._query.run()
Expand Down Expand Up @@ -131,6 +135,7 @@ async def fetchall(self) -> ResultSet:
return rows

async def close(self) -> None:
logger.debug("Closing cursor")
if self._query is not None:
await self._query.close()
self._query = None
Expand Down
11 changes: 11 additions & 0 deletions mapepire_python/authentication/kerberosTokenProvider.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import base64
import logging
import os
import platform
from typing import Final, Optional

logger = logging.getLogger(__name__)

sspi = None
gssapi = None
PLATFORM = platform.system()
Expand Down Expand Up @@ -45,6 +48,7 @@ def __init__(
raise ValueError(f"Missing required parameters: {', '.join(missing)}")

def get_token(self) -> str:
logger.debug("Requesting Kerberos token for host=%s", self.host)
return self._refresh_token()

def _refresh_token(self) -> str:
Expand All @@ -58,17 +62,21 @@ def _format_token(self, token: bytes) -> str:
return TOKEN_PREFIX + token_b64

def _refresh_token_windows(self) -> str:
logger.debug("Generating Kerberos token via Windows SSPI for host=%s", self.host)
target = f"krbsvr400/{self.host}"
client = sspi.ClientAuth("Kerberos", targetspn=target)

err, out_buffer = client.authorize(None)
if err != 0:
logger.error("Windows SSPI Kerberos authentication failed: error_code=%s", hex(err))
raise RuntimeError(f"Windows SSPI error when attempting Kerberos login: {hex(err)}")

token = out_buffer[0].Buffer
logger.debug("Kerberos token generated successfully via Windows SSPI")
return self._format_token(token)

def _refresh_token_unix(self) -> str:
logger.debug("Generating Kerberos token via GSSAPI for host=%s realm=%s", self.host, self.realm)
os.environ["KRB5_CONFIG"] = self.krb5_path
if self.ticket_cache:
os.environ["KRB5CCNAME"] = self.ticket_cache
Expand Down Expand Up @@ -96,9 +104,12 @@ def _refresh_token_unix(self) -> str:
)
except gssapi.exceptions.GSSError as e:
if "No credentials were supplied" in str(e) or "Unavailable" in str(e):
logger.error("Kerberos authentication failed: no valid TGT in credential cache")
raise RuntimeError("No valid TGT found in credential cache.")
logger.error("Kerberos GSSAPI authentication failed for host=%s", self.host)
raise RuntimeError(
f"Kerberos token generation error when attempting Kerberos login: {str(e)}"
)

logger.debug("Kerberos token generated successfully via GSSAPI")
return self._format_token(token)
15 changes: 14 additions & 1 deletion mapepire_python/client/query.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import dataclasses
import json
import logging
from enum import Enum
from typing import Any, Dict, Generic, Mapping, Optional, Sequence, TypeVar, Union

Expand All @@ -21,6 +22,8 @@

T = TypeVar("T")

logger = logging.getLogger(__name__)


class QueryState(Enum):
NOT_YET_RUN = (1,)
Expand Down Expand Up @@ -68,6 +71,7 @@ def prepare_sql_execute(self):
if self.state == QueryState.RUN_DONE:
raise Exception("Statement has already been fully run")

logger.debug("Preparing and executing query")
query_result = QueryResult.from_dict( # type: ignore
self._execute_query(
PrepareSqlExecuteRequest(
Expand All @@ -84,9 +88,11 @@ def prepare_sql_execute(self):
error_list = {k: v for k, v in {"error": query_result.error, "sql_state": query_result.sql_state, "sql_rc": query_result.sql_rc}.items() if v is not None}
if not error_list:
error_list["error"] = "failed to run query for unknown reason"
logger.error("Query preparation failed: %s", error_list)
raise RuntimeError(error_list)

self._correlation_id = query_result.id
logger.debug("Query prepared: correlation_id=%s done=%s", self._correlation_id, query_result.is_done)

return query_result

Expand All @@ -103,6 +109,8 @@ def run(self, rows_to_fetch: Optional[int] = None) -> Dict[str, Any]:
elif self.state == QueryState.RUN_DONE:
raise Exception("Statement has already been fully run")

logger.debug("Executing query: rows_to_fetch=%s", rows_to_fetch)

if self.is_cl_command:
request = ClRequest(
id=self.job._get_unique_id("clcommand"),
Expand Down Expand Up @@ -135,9 +143,11 @@ def run(self, rows_to_fetch: Optional[int] = None) -> Dict[str, Any]:
error_list = {k: v for k, v in {"error": query_result.error, "sql_state": query_result.sql_state, "sql_rc": query_result.sql_rc}.items() if v is not None}
if not error_list:
error_list["error"] = "failed to run query for unknown reason"
logger.error("Query execution failed: %s", error_list)
raise RuntimeError(error_list)

self._correlation_id = query_result.id
logger.debug("Query executed: correlation_id=%s done=%s", self._correlation_id, query_result.is_done)

return query_result

Expand All @@ -154,6 +164,7 @@ def fetch_more(self, rows_to_fetch: Optional[int] = None) -> Dict[str, Any]:
raise Exception("Statement has already been fully run")

assert self._correlation_id is not None
logger.debug("Fetching more rows: rows=%s correlation_id=%s", rows_to_fetch, self._correlation_id)
self._rows_to_fetch = rows_to_fetch
query_result = SqlMoreResponse.from_dict( # type: ignore
self._execute_query(
Expand All @@ -170,14 +181,16 @@ def fetch_more(self, rows_to_fetch: Optional[int] = None) -> Dict[str, Any]:

if not query_result.success:
self.state = QueryState.ERROR
logger.error("Fetch failed: %s", query_result.error)
raise RuntimeError(query_result.error or "Failed to run Query (unknown error)")

return query_result

@handle_ws_errors
def close(self):
if not self.job._socket:
if self.job._socket is None:
raise Exception("SQL Job not connected")
logger.debug("Closing query: correlation_id=%s", self._correlation_id)
if self._correlation_id and self.state is not QueryState.RUN_DONE:
self.state = QueryState.RUN_DONE
return SqlCloseResponse.from_dict( # type: ignore
Expand Down
10 changes: 9 additions & 1 deletion mapepire_python/client/sql_job.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import dataclasses
import json
import logging
from pathlib import Path
from typing import Any, Dict, Optional, Union

Expand All @@ -19,6 +20,8 @@

__all__ = ["SQLJob"]

logger = logging.getLogger(__name__)


class SQLJob(BaseJob):
def __init__(
Expand Down Expand Up @@ -106,6 +109,7 @@ def connect(
"""
db2_server = self._parse_connection_input(db2_server, **kwargs)

logger.info("Connecting to %s:%s", db2_server.host, db2_server.port)
self._socket = self._get_channel(db2_server)

props = ";".join(
Expand All @@ -124,16 +128,19 @@ def connect(
try:
result = ConnectionResult.from_dict(json.loads(self._socket.recv())) # type: ignore
except Exception as e:
logger.error("Failed to parse connect response: %s", e)
raise Exception(f"Failed to parse connect response: {e}")

if result.success:
self._status = JobStatus.Ready
self.id = result.job
logger.info("Connection established: job_id=%s", self.id)
else:
self._status = JobStatus.NotStarted
logger.error("Connection failed: %s", result.error or "unknown error")
self.close()
raise Exception(result.error or "Failed to connect to server")

self.id = result.job
self._is_tracing_channeldata = False

return result
Expand Down Expand Up @@ -196,6 +203,7 @@ def query_and_run(
raise RuntimeError(f"Failed to run query: {e}")

def close(self) -> None:
logger.info("Closing job %s", self.id)
self._status = JobStatus.Ended
if self._socket:
self._socket.close()
5 changes: 5 additions & 0 deletions mapepire_python/core/connection.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from pathlib import Path
from typing import Any, Dict, Optional, Sequence, Union

Expand All @@ -12,6 +13,8 @@

__all__ = ["Connection"]

logger = logging.getLogger(__name__)

COMMIT = "COMMIT"
ROLLBACK = "ROLLBACK"

Expand Down Expand Up @@ -45,6 +48,7 @@ class Connection(pep249.CursorExecuteMixin, pep249.ConcreteErrorMixin, pep249.Co
def __init__(self, database: Union[DaemonServer, dict, Path], opts: Optional[Dict[str, Any]] = None, **kwargs) -> None:
super().__init__()
self._closed = False
logger.info("Opening connection")
self.job = SQLJob(creds=database, options=opts, **kwargs)
self.job.connect(database, **kwargs)

Expand All @@ -60,6 +64,7 @@ def close(self) -> None:
if self._closed:
return

logger.info("Closing connection: job_id=%s", self.job.id)
self.job.close()
self._closed = True

Expand Down
2 changes: 2 additions & 0 deletions mapepire_python/core/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def execute(
parameters: Optional[QueryParameters] = None,
**kwargs: Any,
) -> "Cursor":
logger.debug("Executing statement")
opts = kwargs.get("opts", None)
if opts:
query = Query(self.job, operation, opts)
Expand Down Expand Up @@ -212,6 +213,7 @@ def nextset(self) -> Optional[bool]:
def close(self) -> None:
if self._closed:
return
logger.debug("Closing cursor")
if self.query:
for q in self.query_q:
q.close()
Expand Down
Loading
Loading