Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:

- name: Install application
run: |
pip install src/core src/authentication src/build src/log src/permissions src/settings src/web
pip install src/core "src/authentication[oidc]" src/build src/log src/permissions src/settings src/web

- name: Run tests
run: pytest --import-mode=importlib
Expand Down Expand Up @@ -88,6 +88,14 @@ jobs:
python3-PyYAML \
python3-tomli

# el8 and EPEL8 don't ship Authlib, it is only available in more recent
# versions. Skip installation on this distribution (along with requests,
# soft dependency of Authlib).
# OIDC tests are automatically skipped when Authlib is not installed.
- name: Install OIDC dependencies
if: ${{ matrix.envs.container != 'rockylinux/rockylinux:8' }}
run: dnf -y install python3-authlib python3-requests

# pip and setuptools on Rocky Linux 8 do not fully support PEP 517 builds from
# pyproject.toml; enable PEP517_SETUP_WRAPPER on this distribution only.
- name: Install application
Expand Down Expand Up @@ -136,11 +144,13 @@ jobs:
export DEBIAN_FRONTEND=noninteractive
apt update
apt install -y \
python3-authlib \
python3-flask \
python3-jwt \
python3-ldap \
python3-pip \
python3-pytest \
python3-requests \
python3-tomli \
python3-yaml \
python3-venv
Expand Down Expand Up @@ -191,6 +201,14 @@ jobs:
python3-PyYAML \
python3-tomli

# openSUSE Leap 15 doesn't ship Authlib, it is only available in more
# recent versions. Skip installation on this distribution (along with
# requests, soft dependency of Authlib).
# OIDC tests are automatically skipped when Authlib is not installed.
- name: Install OIDC dependencies
if: ${{ matrix.envs.container != 'opensuse/leap:15' }}
run: zypper -n in python3-Authlib python3-requests

# Leap 15 ships Python 3.6; wheel is required for pip builds.
- name: Install python3-wheel (Leap 15)
if: ${{ matrix.envs.container == 'opensuse/leap:15' }}
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ and this project adheres to
`rfl.build.testing.params` as alternative to parameterized library (#75).
- settings: Support optional section-level documentation via `_doc` in settings
definition (#80).
- auth: Add `OIDCAuthentifier` for OpenID Connect Authorization Code flow with
authlib (#76).

## [1.7.0] - 2026-05-08

Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ authors = [

[project.optional-dependencies]
tests = [
"Authlib",
"Flask",
"PyJWT",
"pytest",
"python-ldap",
"PyYAML",
"requests",
]

[tool.pytest.ini_options]
Expand Down
3 changes: 3 additions & 0 deletions src/authentication/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ classifiers = [
]
readme = "README.md"

[project.optional-dependencies]
oidc = ["Authlib", "Flask", "requests"]

[project.urls]
"Homepage" = "https://github.com/rackslab/RFL"
"Bug Tracker" = "https://github.com/rackslab/RFL/issues"
Expand Down
4 changes: 4 additions & 0 deletions src/authentication/rfl/authentication/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ class LDAPAuthenticationError(AuthenticationError):
pass


class OIDCAuthenticationError(AuthenticationError):
pass


class JWTError(AuthenticationError):
pass

Expand Down
163 changes: 163 additions & 0 deletions src/authentication/rfl/authentication/oidc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# Copyright (c) 2026 Rackslab
#
# This file is part of RFL.
#
# SPDX-License-Identifier: LGPL-3.0-or-later

from typing import Any, Dict, List, Mapping, Optional, Union
from pathlib import Path
import logging

try:
from authlib.integrations.base_client import OAuthError
from authlib.integrations.flask_client import OAuth
except ImportError as err:
raise ImportError("Authlib is required for RFL OIDC Authentication") from err

from .errors import OIDCAuthenticationError
from .user import AuthenticatedUser

logger = logging.getLogger(__name__)


class OIDCClient:
"""OpenID Connect client for Flask apps using Authlib's Flask integration."""

_OIDC_CLIENT_NAME = "rfl_oidc"

def __init__(
self,
app,
*,
issuer: str,
client_id: str,
redirect_uri: str,
client_secret: Optional[str] = None,
scope: str = "openid profile email",
subject_claim: str = "sub",
fullname_claim: str = "name",
groups_claim: Optional[str] = "groups",
restricted_groups: Optional[List[str]] = None,
verify_ssl: bool = True,
cacert: Optional[Path] = None,
pkce: Optional[str] = None,
fetch_token=None,
update_token=None,
cache=None,
):
self.issuer = issuer.rstrip("/")
self.client_id = client_id
secret = client_secret if client_secret else None
self.client_secret = secret
self.redirect_uri = redirect_uri
self.scope = scope
self.subject_claim = subject_claim
self.fullname_claim = fullname_claim
self.groups_claim = groups_claim
self.restricted_groups = restricted_groups

if secret is None and pkce is None:
raise OIDCAuthenticationError("PKCE is required for public OIDC clients")

client_kwargs: Dict[str, Any] = {
"scope": scope,
"verify": self._verify(verify_ssl, cacert),
}
if pkce is not None:
client_kwargs["code_challenge_method"] = pkce

metadata_url = f"{self.issuer}/.well-known/openid-configuration"
oauth = OAuth(
app,
cache=cache,
fetch_token=fetch_token,
update_token=update_token,
)
self._client = oauth.register(
self._OIDC_CLIENT_NAME,
client_id=client_id,
client_secret=secret,
server_metadata_url=metadata_url,
client_kwargs=client_kwargs,
)
client_type = "public" if secret is None else "confidential"
logger.debug(
"Initialized %s OIDC client for issuer %s (client_id=%s, redirect_uri=%s)",
client_type,
self.issuer,
self.client_id,
self.redirect_uri,
)

@staticmethod
def _verify(verify_ssl: bool, cacert: Optional[Path]) -> Union[bool, str]:
if cacert is not None:
return str(cacert)
return verify_ssl

def redirect(self, redirect_uri=None, **kwargs):
"""Create HTTP redirect to the OIDC authorization endpoint."""
try:
return self._client.authorize_redirect(
redirect_uri=redirect_uri or self.redirect_uri,
**kwargs,
)
except OAuthError as err:
raise OIDCAuthenticationError(
f"OIDC authorization redirect failed: {err}"
) from err

def authenticate(self, **kwargs) -> AuthenticatedUser:
"""Complete the authorization code flow and return an AuthenticatedUser."""
try:
token = self._client.authorize_access_token(**kwargs)
except OAuthError as err:
raise OIDCAuthenticationError(f"OIDC authorization failed: {err}") from err
user = self._user_from_token(token)
if not self._allowed_groups(user.groups):
raise OIDCAuthenticationError(
f"User {user.login} is not member of restricted groups"
)
logger.info("OIDC authentication completed for user %s", user.login)
return user

def _user_from_token(self, token: Mapping[str, Any]) -> AuthenticatedUser:
userinfo = token.get("userinfo")
if userinfo is None:
raise OIDCAuthenticationError(
"OIDC token response does not contain validated userinfo"
)
return self._claims_to_user(userinfo)

def _normalize_groups(self, value: Any) -> List[str]:
if value is None:
return []
if isinstance(value, list):
return [str(group) for group in value]
return [str(value)]

def _claims_to_user(self, claims: Dict[str, Any]) -> AuthenticatedUser:
login = claims.get(self.subject_claim)
if not login:
raise OIDCAuthenticationError(
f"OIDC claims do not contain subject claim {self.subject_claim}"
)

fullname = claims.get(self.fullname_claim)
groups = []
if self.groups_claim is not None:
groups = self._normalize_groups(claims.get(self.groups_claim))

return AuthenticatedUser(
login=str(login),
fullname=str(fullname) if fullname is not None else None,
groups=groups,
)

def _allowed_groups(self, groups: List[str]) -> bool:
"""Return False if restricted groups are set and none of the groups match."""
return not (
self.restricted_groups is not None
and len(self.restricted_groups)
and not any(group in self.restricted_groups for group in groups)
)
Loading
Loading