Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
98 commits
Select commit Hold shift + click to select a range
a0f721d
feat: nuid implementation
corruptmane Oct 24, 2024
6c1bb12
feat: use NATS's implementation of uniqud identifier (NUID)
corruptmane Oct 24, 2024
58dcef7
feat: nats const headers
corruptmane Oct 24, 2024
fa91fe8
feat: `NoRespondersError`
corruptmane Oct 24, 2024
af67ecf
feat: process case when there are no responders
corruptmane Oct 24, 2024
7d6fc75
feat: JetStream related exceptions
corruptmane Oct 24, 2024
f6a556a
feat: JetStream client implementation start
corruptmane Oct 24, 2024
5ab57f0
fix: use `lstrip` instead of `rstrip`
corruptmane Oct 24, 2024
f60c340
feat: request timeout added to config
corruptmane Oct 24, 2024
da206b9
feat: `JetStream` object creation from `NATSCore`
corruptmane Oct 24, 2024
b020b74
feat: `AccountInfo` JetStream API entity
corruptmane Oct 24, 2024
8a862e2
feat: `get_account_info` method
corruptmane Oct 24, 2024
b4544f7
fix: warning
corruptmane Oct 25, 2024
39e8558
feat: `StreamInfo` and its dependant objects
corruptmane Oct 25, 2024
86a2db0
feat: `get_stream_list` jetstream api method
corruptmane Oct 25, 2024
287d1f9
refactor: drop 3.9 and 3.10 support, fix used typing annotations
corruptmane Oct 27, 2024
c922836
refactor: unreachable check
corruptmane Oct 27, 2024
0664731
refactor: unnecessary argument
corruptmane Oct 27, 2024
fb047d4
feat: from/to nanoseconds converters
corruptmane Oct 27, 2024
d60bb32
refactor+feat: use utils for nanoseconds converters | create/update s…
corruptmane Oct 27, 2024
67229f5
feat: basic stream manipulation methods
corruptmane Oct 27, 2024
eaf69f1
refactor: delete prints
corruptmane Oct 29, 2024
f0b0590
refactor: move config to separate module
corruptmane Oct 30, 2024
5a36b01
fix: strip removes excessive symbols from inbox id
corruptmane Oct 30, 2024
5706948
refactor: ruff
corruptmane Oct 30, 2024
4a279a0
refactor: ruff
corruptmane Oct 30, 2024
e6f34a9
refactor: ruff
corruptmane Oct 30, 2024
109971f
refactor: ruff
corruptmane Oct 30, 2024
7e7fe0f
feat: `fromisoformat` helper func
corruptmane Oct 30, 2024
a4e9c9d
refactor: ruff
corruptmane Oct 30, 2024
c652b16
refactor: move config to separate module
corruptmane Oct 30, 2024
cd2f375
refactor: move config to separate module
corruptmane Oct 30, 2024
f7f4237
feat: `current_server_version` property + 503 status processing
corruptmane Oct 30, 2024
3a1d4bd
feat: consumer-related entities
corruptmane Oct 30, 2024
961940d
feat: `get_msg`, `create_or_update_consumer`, `get_consumer_list` jet…
corruptmane Oct 30, 2024
e60c0fd
feat: `get_consumer_names`
corruptmane Oct 30, 2024
ae7fdd5
feat: `get_consumer_info`
corruptmane Oct 30, 2024
ad5b04c
refactor: make `status` a public property
corruptmane Nov 4, 2024
bd8bf18
feat: process inline status in nats headers
corruptmane Nov 4, 2024
85273f3
feat: `new_unique_inbox` method
corruptmane Nov 4, 2024
ef0a42b
fix: consumer config parsing
corruptmane Nov 4, 2024
d94be26
feat: `JetStreamMsg`
corruptmane Nov 4, 2024
ccdda72
feat: export entities and client to module
corruptmane Nov 4, 2024
6bd9fcd
refactor: use proper generic protocol for callback
corruptmane Nov 4, 2024
3b2054d
feat: `PushSubscription` for JetStream
corruptmane Nov 4, 2024
9503d0d
feat: `delete_consumer` and `push_subscription` methods
corruptmane Nov 4, 2024
6f710c9
update: TODO list
corruptmane Nov 5, 2024
ebaa26f
refactor: pass all arguments
corruptmane Nov 5, 2024
365a90e
update: TODO resolved
corruptmane Nov 5, 2024
1b3520e
feat: `MessageAlreadyAckedError`
corruptmane Nov 5, 2024
86c3f59
feat+fix: `from_msg_headers` classmethod | match case on strings inst…
corruptmane Nov 5, 2024
acc54b4
feat: `ack`, `ack_sync`, `nak`, `term`, `in_progress` methods on jets…
corruptmane Nov 5, 2024
d9f0aa6
feat: status code mapping
corruptmane Nov 5, 2024
eeda1e5
feat: `is_empty` method`
corruptmane Nov 5, 2024
86b2cc2
feat: `PullSubscription`
corruptmane Nov 5, 2024
a17c0c1
feat: `pull_subscription` method
corruptmane Nov 5, 2024
0abe356
update: remove `pyrightconfig.json`
corruptmane Nov 6, 2024
a20d900
fix: save `pinger_task` and `flusher_task` instead of `ping_task` and…
corruptmane Nov 7, 2024
0fee8de
feat: `BadSubjectError` and subject validation
corruptmane Nov 7, 2024
8d06f02
feat: tests
corruptmane Nov 7, 2024
e3ac399
feat: more precise headers parsing
corruptmane Nov 8, 2024
2ffd879
update: TODO
corruptmane Nov 8, 2024
120e038
fix: read `MSG` operation correctly (exactly specified size of bytes …
corruptmane Nov 8, 2024
8c82d33
refactor: use futures instead of sleeps where it is usable | delete s…
corruptmane Nov 8, 2024
255c572
feat: `FakeStream` mock
corruptmane Nov 8, 2024
8500c35
feat: headers parser tests
corruptmane Nov 8, 2024
58cd4fa
feat: protocol parser tests
corruptmane Nov 8, 2024
27ba8a1
update: TODO
corruptmane Nov 8, 2024
c55b5e2
refactor: created `SubscriptionProto`
corruptmane Nov 10, 2024
31ecd76
refactor: remove imports
corruptmane Nov 10, 2024
245e5a6
feat+refactor: `new_unique_deliver_subject` method | move validation …
corruptmane Nov 10, 2024
419eb86
fix: codes are interpreted as integers
corruptmane Nov 10, 2024
ef80a16
fix: codes are interpreted as integers
corruptmane Nov 10, 2024
a0145b8
refactor: use `SubscriptionProto` instead of `Subscription` typehint
corruptmane Nov 10, 2024
c645941
refactor: inherit `Subscription` from `SubscriptionProto`
corruptmane Nov 10, 2024
843984b
update: add `PushSubscription` and `PullSubscription` to exported cla…
corruptmane Nov 10, 2024
4142dae
lint: mypy+ruff
corruptmane Nov 10, 2024
e23e78d
update: split to more points
corruptmane Nov 10, 2024
8d9d6d8
feat+fix+refactor: `PushSubscription` reworked and fixed, flow contro…
corruptmane Nov 10, 2024
6ddcd68
refactor: use `Self` instead of string typehints where possible
corruptmane Nov 10, 2024
bb49259
update: TODO
corruptmane Nov 10, 2024
010758c
refactor: move flow control processing logic to separate method
corruptmane Nov 11, 2024
505cdf8
feat+refactor: DIRECT get message
corruptmane Nov 11, 2024
8ed2042
feat+refactor: delete stream message | rewrite json serializers from …
corruptmane Nov 12, 2024
4f1f719
feat: KV management methods
corruptmane Nov 12, 2024
42ffe11
fix: inline headers without native headers
corruptmane Nov 22, 2024
e7dd3af
feat: more constant headers
corruptmane Nov 22, 2024
9eab3d6
feat+refactor: KV errors | more `native` exceptions to inherit from
corruptmane Nov 22, 2024
3347d2a
feat: `RawMsgGetter` protocol
corruptmane Nov 22, 2024
a46e6bd
feat+refactor: `PubAck` | remove `GetMsgDirectRequest` class and vali…
corruptmane Nov 22, 2024
a457fd8
feat: `OrderedPushSubscription`
corruptmane Nov 22, 2024
278d50c
feat: `KeyValueWatcher` and `KeyValue` implementations
corruptmane Nov 22, 2024
03ecf08
feat+refactor: `push_subscribe_bind`, `pull_subscribe_bind`, `ordered…
corruptmane Nov 22, 2024
34a86e6
update: `TODO`
corruptmane Nov 22, 2024
7da1d50
refactor+feat: separate `JSONSerializerProto` to `abc` module | expli…
corruptmane Nov 22, 2024
3830f01
refactor+fix: all datetime objects are now timezone-aware, `purge_str…
corruptmane Nov 22, 2024
8c2e16c
refactor: serialization
corruptmane Feb 15, 2025
f53bf84
lint: mypy+ruff
corruptmane Feb 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,7 @@ venv/
.venv/

dist/

*.lock
test.py
pyrightconfig.json
18 changes: 17 additions & 1 deletion TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,21 @@
- [x] Proper logging
- [x] Proper limits handling
- [x] Try reconnecting on initial connect failure
- [ ] JetStream
1. - [x] Pub, Pull/Push subscription, Ack, Nak, etc.
2. - [x] Flow control for push consumers
3. - [x] DIRECT get message
4. - [x] Ordered push consumers
5. - [x] KV storage
6. - [ ] Object storage
- [ ] Tests with devcontainers
1. - [x] NATS Core
2. - [x] Protocol parser
3. - [x] Headers parser
4. - [ ] Different connections
5. - [ ] NATS JetStream (pull/push sub, kv/object storage)
- [ ] Auth (JWT, User/Pass, NKey)
- [ ] JetStream (Pub, Pull/Push subscription, Ack, etc.)
- [ ] Micro
- [ ] Cluster support (cluster endpoints discovery, update server info on `INFO` operation message, etc.)
- [x] Rework serialization of JS api
- [ ] Try to use mixins for subscriptions
2 changes: 1 addition & 1 deletion natsio/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.2.1"
__version__ = "0.3.0"
Empty file added natsio/_internal/__init__.py
Empty file.
Empty file.
28 changes: 28 additions & 0 deletions natsio/_internal/serialization/converters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from datetime import datetime, timedelta
from typing import Final

from natsio.utils.time import fromisoformat

from .types import Converter

NANOSECOND_POWER: Final[int] = 10**9


class TimedeltaNanosecondsConverter(Converter[timedelta, int]):
def to_wire(self, value: timedelta) -> int:
return int(value.total_seconds() * NANOSECOND_POWER)

def from_wire(self, value: int) -> timedelta:
return timedelta(microseconds=value / 1000)


class DatetimeIsoConverter(Converter[datetime, str]):
def to_wire(self, value: datetime) -> str:
return value.isoformat()

def from_wire(self, value: str) -> datetime:
return fromisoformat(value)


TIMEDELTA_NANO = TimedeltaNanosecondsConverter()
DATETIME_ISO = DatetimeIsoConverter()
143 changes: 143 additions & 0 deletions natsio/_internal/serialization/serializator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
from collections.abc import Mapping as MappingABC
from dataclasses import fields, is_dataclass
from enum import Enum
from types import UnionType
from typing import (
Annotated,
Any,
Mapping,
Union,
cast,
get_args,
get_origin,
get_type_hints,
)

from .types import DT, Converter, DataclassInstance


def get_converter_from_annotation(
field_type: type,
) -> Converter[Any, Any] | None:
if get_origin(field_type) is Annotated:
args = get_args(field_type)
for arg in args[1:]:
if isinstance(arg, Converter):
return arg
return None


def get_base_type(field_type: type) -> type:
origin = get_origin(field_type)
args = get_args(field_type)
if origin is Annotated:
return cast(type, args[0])

if origin is not None:
if origin is UnionType or origin is Union:
for arg in args:
if arg is not type(None):
return get_base_type(arg)
return cast(type, origin)
return field_type


def handle_enum(value: Any, enum_type: type) -> Any:
if isinstance(value, Enum):
return value.value
if isinstance(value, str) and issubclass(enum_type, Enum):
try:
return enum_type(value)
except ValueError:
raise ValueError(f"Invalid value '{value}' for enum {enum_type.__name__}")
return value


def serialize_value(value: Any, field_type: type) -> Any:
if value is None:
return None

converter = get_converter_from_annotation(field_type)
if converter is not None:
return converter.to_wire(value)

base_type = get_base_type(field_type)
origin = get_origin(field_type)
args = get_args(field_type)

if isinstance(value, Enum):
return handle_enum(value, base_type)

if origin is list and args:
item_type = args[0] if args else type(Any)
return [serialize_value(val, item_type) for val in value]

if origin is MappingABC and len(args) == 2:
item_type = args[1] if args else type(Any)
return {k: serialize_value(v, item_type) for k, v in value.items()}

if is_dataclass(value.__class__):
return serialize_dataclass(value)

return value


def deserialize_value(value: Any, field_type: type) -> Any:
if value is None:
return None

converter = get_converter_from_annotation(field_type)
if converter is not None:
return converter.from_wire(value)

base_type = get_base_type(field_type)
origin = get_origin(field_type)
args = get_args(field_type)

if is_dataclass(base_type):
return deserialize_dataclass(value, base_type)

if origin is list and args:
item_type = args[0] if args else type(Any)
return [deserialize_value(val, item_type) for val in value]

if origin is MappingABC and len(args) == 2:
item_type = args[1] if args else type(Any)
return {k: deserialize_value(v, item_type) for k, v in value.items()}

if issubclass(base_type, Enum):
return handle_enum(value, base_type)

if isinstance(value, (str, int, float, bool)) and not isinstance(value, base_type):
try:
return base_type(value) # type: ignore[call-arg]
except (ValueError, TypeError):
pass

return value


def serialize_dataclass(obj: DataclassInstance) -> Mapping[str, Any]:
result = {}
type_hints = get_type_hints(obj.__class__, include_extras=True)

for field in fields(obj):
value = getattr(obj, field.name)
if value is not None or field.default is None or field.default_factory is None:
result[field.name] = serialize_value(value, type_hints[field.name])

return result


def deserialize_dataclass(data: Mapping[str, Any], cls: type[DT]) -> DT:
type_hints = get_type_hints(cls, include_extras=True)
kwargs = {}

for field in fields(cls):
if field.name in data:
kwargs[field.name] = deserialize_value(
data[field.name],
type_hints[field.name],
)

return cls(**kwargs)
18 changes: 18 additions & 0 deletions natsio/_internal/serialization/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from dataclasses import Field
from typing import Any, ClassVar, Protocol, TypeVar, runtime_checkable


class DataclassInstance(Protocol):
__dataclass_fields__: ClassVar[dict[str, Field[Any]]]


DT = TypeVar("DT", bound=DataclassInstance)

T = TypeVar("T")
V = TypeVar("V")


@runtime_checkable
class Converter(Protocol[T, V]):
def to_wire(self, value: T) -> V: ...
def from_wire(self, value: V) -> T: ...
22 changes: 10 additions & 12 deletions natsio/abc/connection.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import asyncio
from ssl import SSLContext
from typing import TYPE_CHECKING, Optional, Protocol
from typing import Protocol, Self

from natsio.config import ServerInfo
from natsio.connection.status import ConnectionStatus
from natsio.protocol.operations.connect import Connect

from .client import ErrorCallback
from .dispatcher import DispatcherProto
from .json import JSONSerializerProto
from .protocol import ClientMessageProto

if TYPE_CHECKING:
from natsio.client.config import ServerInfo


class StreamProto(Protocol):
@property
Expand Down Expand Up @@ -67,7 +66,7 @@ def outstanding_pings(self) -> int:
raise NotImplementedError

@property
def server_info(self) -> "ServerInfo":
def server_info(self) -> ServerInfo:
raise NotImplementedError

@classmethod
Expand All @@ -85,15 +84,14 @@ async def connect(
force_flush_timeout: int,
error_callback: ErrorCallback,
timeout: float,
ssl: Optional[SSLContext],
ssl_hostname: Optional[str],
handshake_first: Optional[bool],
) -> "ConnectionProto":
json_serializer: JSONSerializerProto,
ssl: SSLContext | None,
ssl_hostname: str | None,
handshake_first: bool | None,
) -> Self:
raise NotImplementedError

async def send_command(
self, cmd: ClientMessageProto, force_flush: bool = False
) -> None:
async def send_command(self, cmd: ClientMessageProto, force_flush: bool = False) -> None:
raise NotImplementedError

async def flush(self) -> None:
Expand Down
6 changes: 3 additions & 3 deletions natsio/abc/dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import asyncio
from typing import Protocol, Sequence

from natsio.abc.subscription import SubscriptionProto
from natsio.messages.core import CoreMsg
from natsio.protocol.operations.hmsg import HMsg
from natsio.protocol.operations.msg import Msg
from natsio.subscriptions.core import Subscription


class DispatcherProto(Protocol):
def add_subscription(self, sub: Subscription) -> None:
def add_subscription(self, sub: SubscriptionProto) -> None:
raise NotImplementedError

def all_subscriptions(self) -> Sequence[Subscription]:
def all_subscriptions(self) -> Sequence[SubscriptionProto]:
raise NotImplementedError

def remove_subscription(self, sid: str) -> None:
Expand Down
9 changes: 9 additions & 0 deletions natsio/abc/json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from typing import Any, Protocol


class JSONSerializerProto(Protocol):
def load(self, obj: str | bytes) -> Any:
raise NotImplementedError

def dump(self, obj: Any) -> bytes:
raise NotImplementedError
48 changes: 48 additions & 0 deletions natsio/abc/subscription.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from enum import Enum
from typing import TypeVar, Protocol

from natsio.messages.core import CoreMsg
from natsio.messages.jetstream import JetStreamMsg


MsgType = TypeVar("MsgType", CoreMsg, JetStreamMsg, contravariant=True)


class Callback(Protocol[MsgType]):
async def __call__(self, msg: MsgType) -> None:
raise NotImplementedError


CoreCallback = Callback[CoreMsg]
JetStreamCallback = Callback[JetStreamMsg]


class SubscriptionStatus(Enum):
INITIALISING = "INITIALISING"
OPERATING = "OPERATING"
DRAINING = "DRAINING"
CLOSED = "CLOSED"


class SubscriptionProto(Protocol):
sid: str
subject: str
queue: str | None

@property
def is_ready_to_close(self) -> bool:
raise NotImplementedError

async def add_msg(self, msg: CoreMsg) -> None:
raise NotImplementedError

async def unsubscribe(self, max_msgs: int = 0) -> None:
raise NotImplementedError


__all__ = (
"CoreCallback",
"JetStreamCallback",
"SubscriptionStatus",
"SubscriptionProto",
)
6 changes: 5 additions & 1 deletion natsio/client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from .core import NATSCore
from .config import ClientConfig
from .status import ClientStatus

__all__ = (
"NATSCore",
"ClientStatus",
)
Loading