Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 12 additions & 27 deletions pocs/concur/concur_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,15 @@ def _authenticate_with_pkce(self) -> str:

# Exchange authorization code for tokens
logger.info("Exchanging authorization code for tokens")
response_data = self._exchange_code_for_tokens(auth_code, code_verifier)
data = {
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": self.redirect_uri,
"client_id": self.client_id,
"client_secret": self.client_secret,
"code_verifier": code_verifier,
}
response_data = self._token_request(data)

# Process and save tokens
return self._process_token_response(response_data)
Expand Down Expand Up @@ -330,31 +338,6 @@ def _token_request(self, data: Dict[str, str]) -> Dict[str, Any]:

return response.json()

def _exchange_code_for_tokens(self, code: str, code_verifier: str) -> Dict[str, Any]:
"""
Exchange authorization code for access and refresh tokens.

Args:
code: Authorization code from OAuth callback
code_verifier: PKCE code verifier

Returns:
Token response data

Raises:
requests.HTTPError: If token exchange fails
"""
logger.info(f"Exchanging authorization code at {self.token_url}")
data = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": self.redirect_uri,
"client_id": self.client_id,
"client_secret": self.client_secret,
"code_verifier": code_verifier,
}
return self._token_request(data)

def _refresh_access_token(self) -> str:
"""
Refresh access token using refresh token.
Expand All @@ -363,6 +346,7 @@ def _refresh_access_token(self) -> str:
New access token

Raises:
ValueError: If no refresh token available
requests.HTTPError: If token refresh fails
"""
if not self.refresh_token:
Expand All @@ -375,7 +359,8 @@ def _refresh_access_token(self) -> str:
"client_id": self.client_id,
"client_secret": self.client_secret,
}
return self._process_token_response(self._token_request(data))
response_data = self._token_request(data)
return self._process_token_response(response_data)

def _process_token_response(self, response_data: Dict[str, Any]) -> str:
"""
Expand Down