Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions agent-framework/prometheus_swarm/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .nonce_request import NonceRequest

__all__ = ['NonceRequest']
105 changes: 105 additions & 0 deletions agent-framework/prometheus_swarm/nonce_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
from typing import Dict, Any, Optional
import uuid
import time
from datetime import datetime, timedelta

class NonceRequest:
"""
A class to manage cryptographic nonce requests with expiration and validation.

Nonces are one-time tokens used to prevent replay attacks and ensure request uniqueness.
"""

def __init__(self,
duration: int = 300, # Default 5 minutes
allow_reuse: bool = False):
"""
Initialize a NonceRequest manager.

Args:
duration (int): How long a nonce remains valid in seconds. Defaults to 300 (5 minutes).
allow_reuse (bool): Whether nonces can be reused. Defaults to False.
"""
self._nonces: Dict[str, Dict[str, Any]] = {}
self._duration = duration
self._allow_reuse = allow_reuse

def generate_nonce(self, context: Optional[Dict[str, Any]] = None) -> str:
"""
Generate a new unique nonce.

Args:
context (dict, optional): Additional context to associate with the nonce.

Returns:
str: A unique nonce token
"""
nonce = str(uuid.uuid4())
expiration = datetime.now() + timedelta(seconds=self._duration)

self._nonces[nonce] = {
'created_at': datetime.now(),
'expires_at': expiration,
'context': context or {},
'used': False
}

return nonce

def validate_nonce(self, nonce: str) -> bool:
"""
Validate a given nonce.

Args:
nonce (str): The nonce to validate

Returns:
bool: Whether the nonce is valid and usable
"""
if nonce not in self._nonces:
return False

nonce_info = self._nonces[nonce]

# Check expiration
if datetime.now() > nonce_info['expires_at']:
return False

# Check reuse
if not self._allow_reuse and nonce_info['used']:
return False

# Mark as used
nonce_info['used'] = True

return True

def get_nonce_context(self, nonce: str) -> Optional[Dict[str, Any]]:
"""
Retrieve the context associated with a nonce.

Args:
nonce (str): The nonce to retrieve context for

Returns:
dict or None: The context associated with the nonce, or None if not found
"""
return self._nonces.get(nonce, {}).get('context', None)

def clear_expired_nonces(self) -> int:
"""
Remove expired nonces from the internal storage.

Returns:
int: Number of nonces removed
"""
current_time = datetime.now()
expired_nonces = [
nonce for nonce, info in self._nonces.items()
if current_time > info['expires_at']
]

for nonce in expired_nonces:
del self._nonces[nonce]

return len(expired_nonces)
64 changes: 64 additions & 0 deletions agent-framework/prometheus_swarm/tests/test_nonce_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import pytest
import time
from datetime import datetime, timedelta
from prometheus_swarm.nonce_request import NonceRequest

def test_generate_nonce():
"""Test that generate_nonce produces a unique nonce."""
nonce_manager = NonceRequest()
nonce1 = nonce_manager.generate_nonce()
nonce2 = nonce_manager.generate_nonce()

assert nonce1 != nonce2
assert len(nonce1) > 0

def test_nonce_validation():
"""Test basic nonce validation."""
nonce_manager = NonceRequest(duration=5) # 5 seconds
nonce = nonce_manager.generate_nonce()

assert nonce_manager.validate_nonce(nonce) is True
assert nonce_manager.validate_nonce(nonce) is False # No reuse by default

def test_nonce_expiration():
"""Test nonce expiration."""
nonce_manager = NonceRequest(duration=1) # 1 second
nonce = nonce_manager.generate_nonce()

time.sleep(2) # Wait beyond expiration
assert nonce_manager.validate_nonce(nonce) is False

def test_nonce_context():
"""Test attaching and retrieving nonce context."""
nonce_manager = NonceRequest()
context = {"user_id": 123, "action": "login"}
nonce = nonce_manager.generate_nonce(context)

retrieved_context = nonce_manager.get_nonce_context(nonce)
assert retrieved_context == context

def test_nonce_reuse_allowed():
"""Test nonce when reuse is explicitly allowed."""
nonce_manager = NonceRequest(duration=5, allow_reuse=True)
nonce = nonce_manager.generate_nonce()

assert nonce_manager.validate_nonce(nonce) is True
assert nonce_manager.validate_nonce(nonce) is True

def test_clear_expired_nonces():
"""Test clearing expired nonces."""
nonce_manager = NonceRequest(duration=1)

# Generate a few nonces
nonce1 = nonce_manager.generate_nonce()
nonce2 = nonce_manager.generate_nonce()

time.sleep(2) # Wait beyond expiration

cleared_count = nonce_manager.clear_expired_nonces()
assert cleared_count == 2

def test_invalid_nonce():
"""Test validation of an invalid/non-existent nonce."""
nonce_manager = NonceRequest()
assert nonce_manager.validate_nonce("invalid_nonce") is False