diff --git a/.env b/.env index 5769eb6..081fedd 100644 --- a/.env +++ b/.env @@ -11,7 +11,7 @@ JWT_SECRET_KEY= JWT_ALGORITHM=HS256 JWT_EXPIRE_MINUTES=240 CORS_ALLOWED_HOSTS="http://localhost:8081,http://localhost:8002" -TOTP_SECRET= +TOTP_SECRET=12345678901234567890 TOTP_DIGEST=sha1 TOTP_RETURN_DIGITS=8 TOTP_TIME_STEP=30 \ No newline at end of file diff --git a/src/routers/admin/security_router.py b/src/routers/admin/security_router.py index 23ac4a3..9bc1f25 100644 --- a/src/routers/admin/security_router.py +++ b/src/routers/admin/security_router.py @@ -15,8 +15,8 @@ @router.get("/2fa-now") @inject -async def get_2f_code(totp: totp_dependency, email: Optional[str] = None): - totp_code = await totp.now(email) +async def get_2f_code(totp: totp_dependency, secret: Optional[str] = None): + totp_code = await totp.now(secret) if totp_code: return Response(content=totp_code, media_type="plain/text") else: @@ -24,8 +24,8 @@ async def get_2f_code(totp: totp_dependency, email: Optional[str] = None): @router.get("/2fa-at") @inject -async def get_2f_code_at(time: int, totp: totp_dependency, email: Optional[str] = None): - otp_code = await totp.at(time, email) +async def get_2f_code_at(time: int, totp: totp_dependency, secret: Optional[str] = None): + otp_code = await totp.at(time, secret) if otp_code: return Response(content=otp_code, media_type="plain/text") else: @@ -33,8 +33,8 @@ async def get_2f_code_at(time: int, totp: totp_dependency, email: Optional[str] @router.get("/2fa-verify") @inject -async def get_2f_code_verify(code: str, totp: totp_dependency, email: Optional[str] = None): - result = await totp.verify(code, email) +async def get_2f_code_verify(code: str, totp: totp_dependency, secret: Optional[str] = None): + result = await totp.verify(code, secret) if result: return Response(status_code=200) else: diff --git a/src/routers/admin/users_router.py b/src/routers/admin/users_router.py index b8738e1..1bf5b1e 100644 --- a/src/routers/admin/users_router.py +++ b/src/routers/admin/users_router.py @@ -8,7 +8,7 @@ from src.models.address_model import Address from src.services.user_service import change_password, insert_address from src.dependencies import get_db -from src.middlewares.auth_jwt import JWTCustom +from src.middlewares.auth_roles_jwt import JWTCustom from src.dependency_injection.containers import Container import src.services.user_service as uSvc diff --git a/src/routers/users_router.py b/src/routers/users_router.py index bfeb532..05dc4b0 100644 --- a/src/routers/users_router.py +++ b/src/routers/users_router.py @@ -61,7 +61,7 @@ async def update_password(db: db_dependency, password: str, email: Annotated[str @router.get("/user/2fa") @inject async def get_2f_code(email: Annotated[str, Depends(oauth2_scheme)], totp: totp_dependency): - totp_code = await totp.now(email) + totp_code = await totp.now(email, is_value_ascii = True) if totp_code: return Response(content=totp_code, media_type="plain/text") else: @@ -70,7 +70,7 @@ async def get_2f_code(email: Annotated[str, Depends(oauth2_scheme)], totp: totp_ @router.get("/user/2fa-verify") @inject async def get_2f_code_verify(code: str, email: Annotated[str, Depends(oauth2_scheme)], totp: totp_dependency): - if await totp.verify(code, email): + if await totp.verify(code, email, is_value_ascii = True): return Response(status_code=200) else: return Response(status_code=401) diff --git a/src/services/jwt_service.py b/src/services/jwt_service.py index 62b0757..bcecba4 100644 --- a/src/services/jwt_service.py +++ b/src/services/jwt_service.py @@ -23,7 +23,7 @@ async def create_token(data: dict, expire_time: timedelta = timedelta(minutes=in data[item][x] = data[item][x] #await crypto.encrypt_text(data[item][x]) else: data[item] = await crypto.encrypt_text(data[item]) - + expire = datetime.now(timezone.utc) + expire_time data.update({ "exp": expire }) encode_jwt = jwt.encode(data, str(os.environ["JWT_SECRET_KEY"]), algorithm= os.environ["JWT_ALGORITHM"]) @@ -36,8 +36,7 @@ async def create_token(data: dict, expire_time: timedelta = timedelta(minutes=in async def verify_token(token: str, crypto = crypto_service, log = log_service): try: payload = await verify(token) - email = await crypto.decrypt_text(payload.get("sub")) - return email + return await crypto.decrypt_text(payload.get("sub")) except Exception as e: log.logger.error(e) raise e @@ -54,7 +53,7 @@ async def verify_token_and_roles(token: str, required_roles: Optional[list[str]] return email else: raise HTTPException(status_code=401, detail="Insufficient permissions") - + except Exception as e: log.logger.error(e) raise e @@ -68,11 +67,10 @@ async def verify_token_from_requests(request: Request): payload = await verify(request_token) email = await crypto_service.decrypt_text(payload.get("sub")) return email - except Exception as e: #log.logger.error(e) raise e - + @inject async def verify(request_token: str, log = log_service): try: @@ -89,13 +87,10 @@ async def verify(request_token: str, log = log_service): raise e @inject -async def get_email(token, crypto = crypto_service, log = log_service): +async def get_email(token: str, crypto = crypto_service, log = log_service): try: payload = jwt.decode(token, str(os.environ["JWT_SECRET_KEY"]), os.environ["JWT_ALGORITHM"]) - email = await crypto.decrypt_text(payload.get("sub")) - if email is None: - return None + return await crypto.decrypt_text(payload.get("sub")) except Exception as e: log.logger.error(e) - else: - return email \ No newline at end of file + raise e \ No newline at end of file diff --git a/src/services/otp_service.py b/src/services/otp_service.py index 111c4e6..7a4b277 100644 --- a/src/services/otp_service.py +++ b/src/services/otp_service.py @@ -1,26 +1,27 @@ -import hmac, struct +import hmac, struct, base64 class OTP(): - DIGITS_POWER = [1,10,100,1000,10000,100000,1000000,10000000,100000000] - def __init__(self) -> None: pass def get_hmac_sha(self, secret: bytes, moving_factor: bytes, digest: str): return hmac.new(secret, moving_factor, digest) - def to_bytes(self, value: str): - bytes_object = value.encode('ascii') - hex_string = bytes_object.hex() - return bytes(bytearray.fromhex(hex_string)) + def to_bytes(self, value: str, is_value_ascii: bool = False, is_value_hex: bool = False): + if is_value_ascii: + return value.encode('ascii') + elif is_value_hex: + return bytes(bytearray.fromhex(value)) + else: + return base64.b32decode(value, casefold=True) - def generate_OPT(self, secret: str, moving_factor: int, return_digits: int, digest: str): + def generate_OPT(self, secret: str, moving_factor: int, return_digits: int, digest: str, is_secret_ascii: bool = False, is_secret_hex: bool = False): otp_code = "" try: msg = struct.pack('>Q', moving_factor) - k = self.to_bytes(secret) - hasher = self.get_hmac_sha(k, msg, digest) + key = self.to_bytes(secret, is_secret_ascii, is_secret_hex) + hasher = self.get_hmac_sha(key, msg, digest) #hasher_hex = hasher.hexdigest() #hmac_hash = bytes.fromhex(hasher_hex) hmac_hash = bytearray(hasher.digest()) @@ -31,7 +32,7 @@ def generate_OPT(self, secret: str, moving_factor: int, return_digits: int, dige (hmac_hash[offset + 2] & 0xFF) << 8 | (hmac_hash[offset + 3] & 0xFF) ) - otp_code = str(bin_code % self.DIGITS_POWER[return_digits]) + otp_code = str(bin_code)[-return_digits :] while (otp_code.__len__() < return_digits): otp_code = "0" + otp_code diff --git a/src/services/totp_service.py b/src/services/totp_service.py index 3eda6b6..817e52e 100644 --- a/src/services/totp_service.py +++ b/src/services/totp_service.py @@ -14,26 +14,28 @@ def __init__(self, secret: str, digest: str, time_step: int, return_digits: int, self.log = log super().__init__() - async def at(self, for_time: Union[int, datetime], secret: Optional[str] = None, time_window: int = 0) -> str: + async def at(self, for_time: Union[int, datetime], secret: Optional[str] = None, time_window: int = 0, is_value_ascii: bool = False, is_value_hex: bool = False) -> str: otp = "" try: - secret = self.secret if secret is None else secret + secret = secret if secret else self.secret if isinstance(for_time, int): for_time = self.to_unix_time(datetime.fromtimestamp(for_time)) - otp = self.generate_OPT(secret, self.to_unix_time(for_time), self.return_digits, self.digest) + otp = self.generate_OPT(secret, self.to_unix_time(for_time), self.return_digits, self.digest, is_value_ascii, is_value_hex) except Exception as e: self.log.logger.error(e) return otp - async def now(self, secret: Optional[str] = None) -> str: - return self.generate_OPT(self.secret if secret is None else secret, self.to_unix_time(datetime.now()), self.return_digits, self.digest) + async def now(self, secret: Optional[str] = None, is_value_ascii: bool = False, is_value_hex: bool = False) -> str: + print(f'ascii: {is_value_ascii}') + print(f'secret: {secret}') + return self.generate_OPT(secret if secret else self.secret, self.to_unix_time(datetime.now()), self.return_digits, self.digest, is_value_ascii, is_value_hex) - async def verify(self, otp_to_validate: str, secret: Optional[str] = None, for_time: Optional[datetime] = None, time_window: int = 0) -> bool: + async def verify(self, otp_to_validate: str, secret: Optional[str] = None, for_time: Optional[datetime] = None, time_window: int = 0, is_value_ascii: bool = False, is_value_hex: bool = False) -> bool: result = False try: secret = self.secret if secret is None else secret for_time = datetime.now() if for_time is None else for_time - otp = await self.at(for_time, secret, time_window) + otp = await self.at(for_time, secret, time_window, is_value_ascii, is_value_hex) if otp_to_validate == otp: result = True except Exception as e: