Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
uv.toml
/resources/secrets/
/resources/data/
/resources/logs/
Expand All @@ -11,3 +12,11 @@
/wrench-code-library.iml
/WrenchCL.iml
/site/*
/.hypothesis
/.kiro
/legacy_logging_reference
/benchmark_report.html
/benchmark_report_generator.py
/benchmark_results.json
logging_v2_demo.py
test_logging_visual.py
9 changes: 6 additions & 3 deletions WrenchCL/Connect/AwsClientHub.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def reload_config(self, env_path: Optional[str] = None, **kwargs):
def config(self) -> _ConfigurationManager:
"""Loaded configuration object."""
self._initialize()
assert self.__config is not None
return self.__config

@property
Expand Down Expand Up @@ -126,9 +127,11 @@ def _init_rds_client(self):
"""Initialize the database client, applying PGHOST/PGPORT override or setting up an SSH tunnel if configured."""
try:
if self.config and isinstance(self.config, _ConfigurationManager):
db_port = self.config.pgport_override or self.config.db_port
assert db_port is not None
config = {
"PGHOST": self.config.pghost_override or self.config.db_host,
"PGPORT": int(self.config.pgport_override or self.config.db_port),
"PGPORT": int(db_port),
"PGDATABASE": self.config.db_name,
"PGUSER": self.config.db_user,
"PGPASSWORD": self.config.db_pass,
Expand All @@ -138,7 +141,7 @@ def _init_rds_client(self):
self.config.ssh_user,
self.config.pem_path or self.config.ssh_password,
]):
config["SSH_TUNNEL"] = {
config["SSH_TUNNEL"] = { # type: ignore
"SSH_SERVER": self.config.ssh_server,
"SSH_PORT": self.config.ssh_port,
"SSH_USER": self.config.ssh_user,
Expand Down Expand Up @@ -181,7 +184,7 @@ def _rds_handle_configuration(self, config: dict) -> "psycopg2.extensions.connec
password=config["PGPASSWORD"],
)

def get_secret(self, secret_id: str = None) -> Union[dict, str, None]:
def get_secret(self, secret_id: Optional[str] = None) -> Union[dict, str, None]:
"""
Retrieve a secret by ARN or default from config.

Expand Down
20 changes: 12 additions & 8 deletions WrenchCL/Connect/ProcessingTracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Literal, Optional

from Connect import AwsClientHub
from Decorators import SingletonClass
from typing_extensions import TYPE_CHECKING
from typing import TYPE_CHECKING, Literal, Optional

from .. import logger
from ..Types.TTLSet import TTLSet
from ..Connect import AwsClientHub
from ..Decorators import SingletonClass

# from ..Types.TTLSet import TTLSet
class TTLSet(set): # type: ignore[override]
"""Stub for TTLSet — accepts ttl kwarg, behaves as a plain set until TTLSet is implemented."""

def __init__(self, *args: object, ttl: int = 0, **kwargs: object) -> None:
super().__init__(*args, **kwargs)

if TYPE_CHECKING:
from mypy_boto3_rds import RDSClient
Expand Down Expand Up @@ -160,8 +164,8 @@ class ProcessingTracker:
_lock = threading.RLock()

_running_job_ids: dict[str, ProcessingEvent] = {}
_finished_job_ids = TTLSet(ttl=600)
_failed_job_ids = TTLSet(ttl=600)
_finished_job_ids = TTLSet(ttl=600) # type: ignore
Comment thread
Wrench-Service-Bot marked this conversation as resolved.
_failed_job_ids = TTLSet(ttl=600) # type: ignore

def __init__(
self, service_name: str, processor_name: str, sql_client: Optional["RDSClient"] = None
Expand Down
21 changes: 11 additions & 10 deletions WrenchCL/Connect/RdsServiceGateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,30 @@
# Author: Willem van der Schans.
# Licensed under the MIT License (https://opensource.org/license/mit).

import importlib.util
import json
import math
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, List, Optional, Tuple, Union
from uuid import UUID

if TYPE_CHECKING:
import pandas as pd
from mypy_boto3_rds.client import RDSClient
elif importlib.util.find_spec("pandas") is not None:
import pandas as pd # type: ignore[assignment]
else:
from .._Internal._MockPandas import _MockPandas as pd # type: ignore[assignment]

import psycopg2
import psycopg2.extensions
import psycopg2.extras
from psycopg2.pool import ThreadedConnectionPool

from .. import logger
from .._Internal._MockPandas import _MockPandas
from ..Decorators.SingletonClass import SingletonClass
from .AwsClientHub import AwsClientHub

try:
import pandas as pd
except ImportError:
pd = _MockPandas()
DataFrame = pd.DataFrame


@SingletonClass
class RdsServiceGateway:
Expand Down Expand Up @@ -57,7 +56,7 @@ def __init__(

if self.multithreaded:
# Initialize a threaded connection pool using the URI
self.pool: Optional[psycopg2.pool] = ThreadedConnectionPool(
self.pool: Optional[ThreadedConnectionPool] = ThreadedConnectionPool(
minconn=min_pool_size, maxconn=max_pool_size, dsn=self.db_uri
)
else:
Expand All @@ -76,6 +75,7 @@ def get_connection(self) -> Union["psycopg2.extensions.connection", "RDSClient"]
:rtype: psycopg2.extensions.connection
"""
if self.multithreaded:
assert self.pool is not None
return self.pool.getconn()
return self.connection

Expand All @@ -86,6 +86,7 @@ def release_connection(self, conn: "psycopg2.extensions.connection"):

"""
if self.multithreaded:
assert self.pool is not None
self.pool.putconn(conn)

def get_data(
Expand Down Expand Up @@ -175,7 +176,7 @@ def update_database(

try:
# Convert payload into a tuple if it's a single value or list
payload = self.convert_payload(payload)
payload = self.convert_payload(payload) # type: ignore
logger._internal.log_internal(f"Converted payload: {payload}")

if isinstance(payload, tuple):
Expand Down Expand Up @@ -301,7 +302,7 @@ def convert_payload(self, payload: Tuple[Any, ...]) -> pd.DataFrame | tuple[Any,
:return: A tuple with converted values.
:rtype: Tuple[Any, ...]
"""
if isinstance(payload, DataFrame):
if isinstance(payload, pd.DataFrame):
return self._convert_dataframe_types(payload)
else:
return tuple(self._convert_value(val) for val in payload)
Expand Down
8 changes: 4 additions & 4 deletions WrenchCL/Connect/S3ServiceGateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def __init__(self, config: Optional["Config"] = None):
logger._internal.log_internal("S3ServiceGateway initialized with S3 client.")

@staticmethod
def _get_mime_extension(mime_type: str) -> str:
def _get_mime_extension(mime_type: str) -> Optional[str]:
"""Get the file extension for a given MIME type."""
return mimetypes.guess_extension(mime_type)

Expand Down Expand Up @@ -112,9 +112,9 @@ def upload_file(
if not self.test_mode:
self.s3_client.upload_fileobj(file_obj, bucket_name, object_key)
elif hasattr(file, "read") and callable(file.read):
if file.seek(0, 2) == 0: # Move to the end of the file and check the position
if file.seek(0, 2) == 0: # type: ignore
raise ValueError("The file-like object is empty.")
file.seek(0) # Move back to the beginning of the file
file.seek(0) # type: ignore
logger.info(
f"Uploading file-like object to bucket: {bucket_name} as object: {object_key}"
)
Expand Down Expand Up @@ -274,7 +274,7 @@ def check_object_existence(self, bucket_name: str, object_key: str) -> bool:
raise

@Retryable()
def list_objects(self, bucket_name: str, prefix: str = None) -> list:
def list_objects(self, bucket_name: str, prefix: Optional[str] = None) -> list:
"""
Lists objects in an S3 bucket, optionally filtered by a prefix.

Expand Down
8 changes: 4 additions & 4 deletions WrenchCL/Connect/_Internal/_boto_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def _get_s3_client(profile: str, region: str, config: Optional[Config] = None) -
return client

except ImportError:
_get_boto3_session = None
_fetch_secret_from_secretsmanager = None
_get_s3_client = None
Config = None
_get_boto3_session = None # type: ignore
_fetch_secret_from_secretsmanager = None # type: ignore
_get_s3_client = None # type: ignore
Config = None # type: ignore
73 changes: 32 additions & 41 deletions WrenchCL/Connect/__init__.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,38 @@
"""AWS service integrations - requires 'aws' extra."""

try:
# Test all required AWS dependencies first
import boto3
import botocore
import paramiko
import psycopg2
from sshtunnel import SSHTunnelForwarder

assert SSHTunnelForwarder
assert psycopg2
assert paramiko
assert boto3
assert botocore
# Now try to import our classes (which may have additional dependencies)
from .AwsClientHub import AwsClientHub
from .Lambda import handle_lambda_response
from .RdsServiceGateway import RdsServiceGateway
from .S3ServiceGateway import S3ServiceGateway

except ImportError as e:
# Create a more specific error message based on what failed
error_details = str(e)
messages = []
# Map common errors to specific packages
if "boto3" in error_details:
messages.append("boto3 and related AWS packages")
elif "psycopg2" in error_details:
messages.append("psycopg2-binary (PostgreSQL adapter)")
elif "paramiko" in error_details:
messages.append("paramiko (SSH client)")
elif "sshtunnel" in error_details:
messages.append("sshtunnel (SSH tunneling)")
elif "botocore" in error_details:
messages.append("botocore and related AWS type stubs")
else:
messages.append("AWS-related dependencies")
missing_pkg = "\n -".join(messages)
import importlib.util
import sys


def _dep_available(name: str) -> bool:
if name in sys.modules:
return True
try:
return importlib.util.find_spec(name) is not None
except (ValueError, ModuleNotFoundError):
return False


_aws_deps = {
"boto3": "boto3",
"botocore": "botocore",
"paramiko": "paramiko",
"psycopg2": "psycopg2-binary",
"sshtunnel": "sshtunnel",
}
_missing = [pkg for mod, pkg in _aws_deps.items() if not _dep_available(mod)]

if _missing:
missing_str = "\n -".join(_missing)
raise ImportError(
f"AWS functionality requires additional dependencies.\n"
f"Missing Packages:\n -{missing_pkg}\n"
f"Install with: pip install 'WrenchCL[aws]'\n"
f"Original error: {error_details}"
) from e
f"Missing Packages:\n -{missing_str}\n"
f"Install with: pip install 'WrenchCL[aws]'"
) from None

from .AwsClientHub import AwsClientHub
from .Lambda import handle_lambda_response
from .RdsServiceGateway import RdsServiceGateway
from .S3ServiceGateway import S3ServiceGateway

__all__ = ["AwsClientHub", "RdsServiceGateway", "S3ServiceGateway", "handle_lambda_response"]
3 changes: 2 additions & 1 deletion WrenchCL/Decorators/Deprecated.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
# Licensed under the MIT License (https://opensource.org/license/mit).
import warnings
from functools import wraps
from typing import Optional

__depr_tracker__ = set()


def Deprecated(message: str = None):
def Deprecated(message: Optional[str] = None):
"""
Wraps a function with a decorator that warns the user the function is Deprecated. It also allows
an optional custom message to be displayed when the function is used.
Expand Down
4 changes: 2 additions & 2 deletions WrenchCL/Decorators/Retryable.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
try:
from botocore.exceptions import BotoCoreError, ClientError
except ImportError:
ClientError = Exception # Fallback to base Exception
BotoCoreError = Exception
ClientError = Exception # type: ignore
BotoCoreError = Exception # type: ignore


def Retryable(_func=None, *, max_retries=2, retry_on_exceptions=None, delay=2, verbose=False):
Expand Down
12 changes: 8 additions & 4 deletions WrenchCL/Decorators/SingletonClass.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
# Copyright (c) 2024-2025.
# Author: Willem van der Schans.
# Licensed under the MIT License (https://opensource.org/license/mit).
from typing import Type, TypeVar, cast

from ..Exceptions._internal import _SingletonViolationException

_T = TypeVar("_T")


def SingletonClass(cls: type) -> type:
def SingletonClass(cls: Type[_T]) -> Type[_T]: # type: ignore[shadowed-type-variable,unsupported-base]
"""
Enforces singleton behavior by wrapping the class in a custom subclass.

Expand All @@ -18,10 +22,10 @@ def SingletonClass(cls: type) -> type:
if "__cls_instance" in cls.__dict__:
raise _SingletonViolationException(cls)

class SingletonWrapper(cls):
class SingletonWrapper(cls): # type: ignore
__cls_instance = None

def __new__(cls_, *args, **kwargs):
def __new__(cls_, *args, **kwargs): # type: ignore[shadowed-type-variable]
if cls_.__cls_instance is None:
cls_.__cls_instance = super(SingletonWrapper, cls_).__new__(cls_)
return cls_.__cls_instance
Expand All @@ -34,4 +38,4 @@ def __init__(self, *args, **kwargs):
SingletonWrapper.__name__ = cls.__name__
SingletonWrapper.__qualname__ = cls.__qualname__
SingletonWrapper.__doc__ = cls.__doc__
return SingletonWrapper
return cast(Type[_T], SingletonWrapper)
Loading
Loading