Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ JWT_SECRET_KEY=
JWT_ALGORITHM=HS256
JWT_EXPIRE_MINUTES=240
CORS_ALLOWED_HOSTS="http://localhost:8081,http://localhost:8002"
TOTP_SECRET=
TOTP_SECRET=12345678901234567890
TOTP_DIGEST=sha1
TOTP_RETURN_DIGITS=8
TOTP_TIME_STEP=30
12 changes: 6 additions & 6 deletions src/routers/admin/security_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,26 @@

@router.get("/2fa-now")
@inject
async def get_2f_code(totp: totp_dependency, email: Optional[str] = None):
totp_code = await totp.now(email)
async def get_2f_code(totp: totp_dependency, secret: Optional[str] = None):
totp_code = await totp.now(secret)
if totp_code:
return Response(content=totp_code, media_type="plain/text")
else:
return Response(status_code=404)

@router.get("/2fa-at")
@inject
async def get_2f_code_at(time: int, totp: totp_dependency, email: Optional[str] = None):
otp_code = await totp.at(time, email)
async def get_2f_code_at(time: int, totp: totp_dependency, secret: Optional[str] = None):
otp_code = await totp.at(time, secret)
if otp_code:
return Response(content=otp_code, media_type="plain/text")
else:
return Response(status_code=404)

@router.get("/2fa-verify")
@inject
async def get_2f_code_verify(code: str, totp: totp_dependency, email: Optional[str] = None):
result = await totp.verify(code, email)
async def get_2f_code_verify(code: str, totp: totp_dependency, secret: Optional[str] = None):
result = await totp.verify(code, secret)
if result:
return Response(status_code=200)
else:
Expand Down
2 changes: 1 addition & 1 deletion src/routers/admin/users_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from src.models.address_model import Address
from src.services.user_service import change_password, insert_address
from src.dependencies import get_db
from src.middlewares.auth_jwt import JWTCustom
from src.middlewares.auth_roles_jwt import JWTCustom
from src.dependency_injection.containers import Container
import src.services.user_service as uSvc

Expand Down
4 changes: 2 additions & 2 deletions src/routers/users_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async def update_password(db: db_dependency, password: str, email: Annotated[str
@router.get("/user/2fa")
@inject
async def get_2f_code(email: Annotated[str, Depends(oauth2_scheme)], totp: totp_dependency):
totp_code = await totp.now(email)
totp_code = await totp.now(email, is_value_ascii = True)
if totp_code:
return Response(content=totp_code, media_type="plain/text")
else:
Expand All @@ -70,7 +70,7 @@ async def get_2f_code(email: Annotated[str, Depends(oauth2_scheme)], totp: totp_
@router.get("/user/2fa-verify")
@inject
async def get_2f_code_verify(code: str, email: Annotated[str, Depends(oauth2_scheme)], totp: totp_dependency):
if await totp.verify(code, email):
if await totp.verify(code, email, is_value_ascii = True):
return Response(status_code=200)
else:
return Response(status_code=401)
Expand Down
19 changes: 7 additions & 12 deletions src/services/jwt_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async def create_token(data: dict, expire_time: timedelta = timedelta(minutes=in
data[item][x] = data[item][x] #await crypto.encrypt_text(data[item][x])
else:
data[item] = await crypto.encrypt_text(data[item])

expire = datetime.now(timezone.utc) + expire_time
data.update({ "exp": expire })
encode_jwt = jwt.encode(data, str(os.environ["JWT_SECRET_KEY"]), algorithm= os.environ["JWT_ALGORITHM"])
Expand All @@ -36,8 +36,7 @@ async def create_token(data: dict, expire_time: timedelta = timedelta(minutes=in
async def verify_token(token: str, crypto = crypto_service, log = log_service):
try:
payload = await verify(token)
email = await crypto.decrypt_text(payload.get("sub"))
return email
return await crypto.decrypt_text(payload.get("sub"))
except Exception as e:
log.logger.error(e)
raise e
Expand All @@ -54,7 +53,7 @@ async def verify_token_and_roles(token: str, required_roles: Optional[list[str]]
return email
else:
raise HTTPException(status_code=401, detail="Insufficient permissions")

except Exception as e:
log.logger.error(e)
raise e
Expand All @@ -68,11 +67,10 @@ async def verify_token_from_requests(request: Request):
payload = await verify(request_token)
email = await crypto_service.decrypt_text(payload.get("sub"))
return email

except Exception as e:
#log.logger.error(e)
raise e

@inject
async def verify(request_token: str, log = log_service):
try:
Expand All @@ -89,13 +87,10 @@ async def verify(request_token: str, log = log_service):
raise e

@inject
async def get_email(token, crypto = crypto_service, log = log_service):
async def get_email(token: str, crypto = crypto_service, log = log_service):
try:
payload = jwt.decode(token, str(os.environ["JWT_SECRET_KEY"]), os.environ["JWT_ALGORITHM"])
email = await crypto.decrypt_text(payload.get("sub"))
if email is None:
return None
return await crypto.decrypt_text(payload.get("sub"))
except Exception as e:
log.logger.error(e)
else:
return email
raise e
23 changes: 12 additions & 11 deletions src/services/otp_service.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
import hmac, struct
import hmac, struct, base64


class OTP():
DIGITS_POWER = [1,10,100,1000,10000,100000,1000000,10000000,100000000]

def __init__(self) -> None:
pass

def get_hmac_sha(self, secret: bytes, moving_factor: bytes, digest: str):
return hmac.new(secret, moving_factor, digest)

def to_bytes(self, value: str):
bytes_object = value.encode('ascii')
hex_string = bytes_object.hex()
return bytes(bytearray.fromhex(hex_string))
def to_bytes(self, value: str, is_value_ascii: bool = False, is_value_hex: bool = False):
if is_value_ascii:
return value.encode('ascii')
elif is_value_hex:
return bytes(bytearray.fromhex(value))
else:
return base64.b32decode(value, casefold=True)

def generate_OPT(self, secret: str, moving_factor: int, return_digits: int, digest: str):
def generate_OPT(self, secret: str, moving_factor: int, return_digits: int, digest: str, is_secret_ascii: bool = False, is_secret_hex: bool = False):
otp_code = ""
try:
msg = struct.pack('>Q', moving_factor)
k = self.to_bytes(secret)
hasher = self.get_hmac_sha(k, msg, digest)
key = self.to_bytes(secret, is_secret_ascii, is_secret_hex)
hasher = self.get_hmac_sha(key, msg, digest)
#hasher_hex = hasher.hexdigest()
#hmac_hash = bytes.fromhex(hasher_hex)
hmac_hash = bytearray(hasher.digest())
Expand All @@ -31,7 +32,7 @@ def generate_OPT(self, secret: str, moving_factor: int, return_digits: int, dige
(hmac_hash[offset + 2] & 0xFF) << 8 |
(hmac_hash[offset + 3] & 0xFF)
)
otp_code = str(bin_code % self.DIGITS_POWER[return_digits])
otp_code = str(bin_code)[-return_digits :]

while (otp_code.__len__() < return_digits):
otp_code = "0" + otp_code
Expand Down
16 changes: 9 additions & 7 deletions src/services/totp_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,28 @@ def __init__(self, secret: str, digest: str, time_step: int, return_digits: int,
self.log = log
super().__init__()

async def at(self, for_time: Union[int, datetime], secret: Optional[str] = None, time_window: int = 0) -> str:
async def at(self, for_time: Union[int, datetime], secret: Optional[str] = None, time_window: int = 0, is_value_ascii: bool = False, is_value_hex: bool = False) -> str:
otp = ""
try:
secret = self.secret if secret is None else secret
secret = secret if secret else self.secret
if isinstance(for_time, int):
for_time = self.to_unix_time(datetime.fromtimestamp(for_time))
otp = self.generate_OPT(secret, self.to_unix_time(for_time), self.return_digits, self.digest)
otp = self.generate_OPT(secret, self.to_unix_time(for_time), self.return_digits, self.digest, is_value_ascii, is_value_hex)
except Exception as e:
self.log.logger.error(e)
return otp

async def now(self, secret: Optional[str] = None) -> str:
return self.generate_OPT(self.secret if secret is None else secret, self.to_unix_time(datetime.now()), self.return_digits, self.digest)
async def now(self, secret: Optional[str] = None, is_value_ascii: bool = False, is_value_hex: bool = False) -> str:
print(f'ascii: {is_value_ascii}')
print(f'secret: {secret}')
return self.generate_OPT(secret if secret else self.secret, self.to_unix_time(datetime.now()), self.return_digits, self.digest, is_value_ascii, is_value_hex)

async def verify(self, otp_to_validate: str, secret: Optional[str] = None, for_time: Optional[datetime] = None, time_window: int = 0) -> bool:
async def verify(self, otp_to_validate: str, secret: Optional[str] = None, for_time: Optional[datetime] = None, time_window: int = 0, is_value_ascii: bool = False, is_value_hex: bool = False) -> bool:
result = False
try:
secret = self.secret if secret is None else secret
for_time = datetime.now() if for_time is None else for_time
otp = await self.at(for_time, secret, time_window)
otp = await self.at(for_time, secret, time_window, is_value_ascii, is_value_hex)
if otp_to_validate == otp:
result = True
except Exception as e:
Expand Down