From e4c486525e0339e35af311e6077ce60a96fd224b Mon Sep 17 00:00:00 2001 From: Jakub Filak Date: Tue, 24 Mar 2026 19:19:14 +0100 Subject: [PATCH 1/3] abap: print system information Print system information retrieved from /sap/bc/adt/system/information and /sap/bc/adt/core/http/systeminformation. Renaming "systemID" to "SID" because that's what everybody would expect. --- sap/adt/system.py | 95 +++++++++++++++++ sap/cli/abap.py | 19 ++++ test/unit/fixtures_adt_system.py | 128 ++++++++++++++++++++++ test/unit/test_sap_adt_system.py | 178 +++++++++++++++++++++++++++++++ test/unit/test_sap_cli_abap.py | 91 +++++++++++++++- 5 files changed, 510 insertions(+), 1 deletion(-) create mode 100644 sap/adt/system.py create mode 100644 test/unit/fixtures_adt_system.py create mode 100644 test/unit/test_sap_adt_system.py diff --git a/sap/adt/system.py b/sap/adt/system.py new file mode 100644 index 0000000..2e3ceeb --- /dev/null +++ b/sap/adt/system.py @@ -0,0 +1,95 @@ +"""ADT System Information wrappers""" + +from xml.etree import ElementTree + +from sap.adt.core import Connection + + +XMLNS_ATOM = '{http://www.w3.org/2005/Atom}' + +JSON_KEY_MAPPING = { + 'systemID': 'SID', +} + + +# pylint: disable=too-few-public-methods +class SystemInfoEntry: + """A single entry from the system information feed""" + + def __init__(self, identity, title): + self.identity = identity + self.title = title + + +class SystemInformation: + """Parsed system information from ADT""" + + def __init__(self, entries): + self._entries = {entry.identity: entry for entry in entries} + + @property + def entries(self): + """Returns a list of all system information entries""" + return list(self._entries.values()) + + def get(self, identity): + """Returns the title for the given identity or None if not found""" + entry = self._entries.get(identity) + return entry.title if entry else None + + def __iter__(self): + return iter(self._entries.values()) + + +def _fetch_xml_entries(connection): + """Fetch entries from the Atom feed endpoint /sap/bc/adt/system/information""" + + resp = connection.execute( + 'GET', + 'system/information', + accept='application/atom+xml;type=feed', + ) + + root = ElementTree.fromstring(resp.text) + + entries = [] + for entry_elem in root.findall(f'{XMLNS_ATOM}entry'): + identity = entry_elem.find(f'{XMLNS_ATOM}id').text + title = entry_elem.find(f'{XMLNS_ATOM}title').text + entries.append(SystemInfoEntry(identity, title)) + + return entries + + +def _fetch_json_entries(connection): + """Fetch entries from the JSON endpoint /sap/bc/adt/core/http/systeminformation""" + + resp = connection.execute( + 'GET', + 'core/http/systeminformation', + accept='application/vnd.sap.adt.core.http.systeminformation.v1+json' + ) + + data = resp.json() + + return [SystemInfoEntry(JSON_KEY_MAPPING.get(key, key), value) for key, value in data.items()] + + +def get_information(connection: Connection) -> SystemInformation: + """Fetch system information from ADT endpoints + + Sends GET requests to /sap/bc/adt/system/information and + /sap/bc/adt/core/http/systeminformation, merges the results + into a single SystemInformation object. Entries from the XML + endpoint take precedence over entries from the JSON endpoint + if the same key exists in both. + """ + + xml_entries = _fetch_xml_entries(connection) + json_entries = _fetch_json_entries(connection) + + merged = {entry.identity: entry for entry in json_entries} + for entry in xml_entries: + merged[entry.identity] = entry + + return SystemInformation(list(merged.values())) diff --git a/sap/cli/abap.py b/sap/cli/abap.py index 102091a..12862ea 100644 --- a/sap/cli/abap.py +++ b/sap/cli/abap.py @@ -3,6 +3,7 @@ import sys import sap.cli.core +import sap.adt.system import sap.platform.abap.run @@ -36,3 +37,21 @@ def run(connection, args): ) console.printout(result) + + +@CommandGroup.argument('--key', type=str, default=None, help='Print only the value for the given key') +@CommandGroup.command() +def systeminfo(connection, args): + """Prints system information""" + + console = args.console_factory() + + info = sap.adt.system.get_information(connection) + + if args.key: + value = info.get(args.key) + if value is not None: + console.printout(value) + else: + for entry in info: + console.printout(f'{entry.identity}: {entry.title}') diff --git a/test/unit/fixtures_adt_system.py b/test/unit/fixtures_adt_system.py new file mode 100644 index 0000000..e105ed2 --- /dev/null +++ b/test/unit/fixtures_adt_system.py @@ -0,0 +1,128 @@ +"""System Information ADT fixtures""" + +from mock import Response + +SYSTEM_INFORMATION_XML = ''' + + + SAP SE + +System Information +2026-03-23T13:43:39Z + + ApplicationServerName + C50_ddci + + + DBLibrary + SQLDBC 2.27.024.1772569942 + + + DBName + C50/02 + + + DBRelease + 2.00.089.01.1769502981 + + + DBSchema + SAPHANADB + + + DBServer + saphost + + + DBSystem + HDB + + + IPAddress + 172.27.4.5 + + + KernelCompilationDate + Linux GNU SLES-15 x86_64 cc10.3.0 use-pr260304 Mar 09 2026 11:05:09 + + + KernelKind + opt + + + KernelPatchLevel + 0 + + + KernelRelease + 920 + + + MachineType + x86_64 + + + NodeName + saphost + + + NotAuthorizedDB + false + + + NotAuthorizedHost + false + + + NotAuthorizedKernel + false + + + NotAuthorizedSystem + false + + + NotAuthorizedUser + false + + + OSName + Linux + + + OSVersion + 6.4.0-150600.23.60-default + + + SAPSystemID + 390 + + + SAPSystemNumber + 000000000000000001 + + + UnicodeSystem + True + +''' + +RESPONSE_SYSTEM_INFORMATION = Response( + text=SYSTEM_INFORMATION_XML, + status_code=200, + headers={'Content-Type': 'application/atom+xml;type=feed'} +) + +JSON_SYSTEM_INFORMATION = { + 'systemID': 'C50', + 'userName': 'DEVELOPER', + 'userFullName': '', + 'client': '100', + 'language': 'EN', +} + +RESPONSE_JSON_SYSTEM_INFORMATION = Response( + status_code=200, + json=JSON_SYSTEM_INFORMATION, + headers={'Content-Type': 'application/vnd.sap.adt.core.http.systeminformation.v1+json'} +) diff --git a/test/unit/test_sap_adt_system.py b/test/unit/test_sap_adt_system.py new file mode 100644 index 0000000..8b0f6bd --- /dev/null +++ b/test/unit/test_sap_adt_system.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python3 + +import unittest + +import sap.adt.system + +from mock import ( + Connection, + Response, + Request, +) +from fixtures_adt_system import ( + RESPONSE_SYSTEM_INFORMATION, + RESPONSE_JSON_SYSTEM_INFORMATION, + SYSTEM_INFORMATION_XML, + JSON_SYSTEM_INFORMATION, +) + + +def make_connection(): + """Create a Connection with both XML and JSON responses""" + return Connection([RESPONSE_SYSTEM_INFORMATION, RESPONSE_JSON_SYSTEM_INFORMATION]) + + +class TestGetInformation(unittest.TestCase): + + def test_get_information_sends_two_requests(self): + """Test that get_information sends GET requests to both endpoints""" + connection = make_connection() + sap.adt.system.get_information(connection) + + self.assertEqual(len(connection.execs), 2) + + def test_get_information_xml_request(self): + """Test that the first request goes to system/information""" + connection = make_connection() + sap.adt.system.get_information(connection) + + self.assertEqual(connection.execs[0].method, 'GET') + self.assertEqual(connection.execs[0].adt_uri, '/sap/bc/adt/system/information') + + def test_get_information_json_request(self): + """Test that the second request goes to core/http/systeminformation with JSON accept""" + connection = make_connection() + sap.adt.system.get_information(connection) + + self.assertEqual(connection.execs[1].method, 'GET') + self.assertEqual(connection.execs[1].adt_uri, '/sap/bc/adt/core/http/systeminformation') + self.assertEqual(connection.execs[1].headers['Accept'], 'application/vnd.sap.adt.core.http.systeminformation.v1+json') + + def test_get_information_parses_all_entries(self): + """Test that entries from both endpoints are merged""" + connection = make_connection() + result = sap.adt.system.get_information(connection) + + # 24 from XML + 5 from JSON, but no overlap in the sample data + self.assertEqual(len(result.entries), 29) + + def test_get_information_xml_entry_values(self): + """Test that XML entry values are correctly parsed""" + connection = make_connection() + result = sap.adt.system.get_information(connection) + + self.assertEqual(result.get('ApplicationServerName'), 'C50_ddci') + self.assertEqual(result.get('DBLibrary'), 'SQLDBC 2.27.024.1772569942') + self.assertEqual(result.get('DBName'), 'C50/02') + self.assertEqual(result.get('DBRelease'), '2.00.089.01.1769502981') + self.assertEqual(result.get('DBSchema'), 'SAPHANADB') + self.assertEqual(result.get('DBServer'), 'saphost') + self.assertEqual(result.get('DBSystem'), 'HDB') + self.assertEqual(result.get('IPAddress'), '172.27.4.5') + self.assertEqual(result.get('KernelCompilationDate'), + 'Linux GNU SLES-15 x86_64 cc10.3.0 use-pr260304 Mar 09 2026 11:05:09') + self.assertEqual(result.get('KernelKind'), 'opt') + self.assertEqual(result.get('KernelPatchLevel'), '0') + self.assertEqual(result.get('KernelRelease'), '920') + self.assertEqual(result.get('MachineType'), 'x86_64') + self.assertEqual(result.get('NodeName'), 'saphost') + self.assertEqual(result.get('OSName'), 'Linux') + self.assertEqual(result.get('OSVersion'), '6.4.0-150600.23.60-default') + self.assertEqual(result.get('SAPSystemID'), '390') + self.assertEqual(result.get('SAPSystemNumber'), '000000000000000001') + self.assertEqual(result.get('UnicodeSystem'), 'True') + + def test_get_information_json_entry_values(self): + """Test that JSON entry values are correctly parsed""" + connection = make_connection() + result = sap.adt.system.get_information(connection) + + self.assertEqual(result.get('SID'), 'C50') + self.assertEqual(result.get('userName'), 'DEVELOPER') + self.assertEqual(result.get('userFullName'), '') + self.assertEqual(result.get('client'), '100') + self.assertEqual(result.get('language'), 'EN') + + def test_get_information_not_authorized_flags(self): + """Test that NotAuthorized flags are correctly parsed""" + connection = make_connection() + result = sap.adt.system.get_information(connection) + + self.assertEqual(result.get('NotAuthorizedDB'), 'false') + self.assertEqual(result.get('NotAuthorizedHost'), 'false') + self.assertEqual(result.get('NotAuthorizedKernel'), 'false') + self.assertEqual(result.get('NotAuthorizedSystem'), 'false') + self.assertEqual(result.get('NotAuthorizedUser'), 'false') + + def test_get_information_unknown_entry(self): + """Test that get() returns None for unknown entries""" + connection = make_connection() + result = sap.adt.system.get_information(connection) + + self.assertIsNone(result.get('NonExistentEntry')) + + def test_get_information_iterable(self): + """Test that SystemInformation is iterable over merged entries""" + connection = make_connection() + result = sap.adt.system.get_information(connection) + + entries = list(result) + self.assertEqual(len(entries), 29) + + def test_get_information_xml_takes_precedence(self): + """Test that XML entries override JSON entries with the same key""" + xml_response = Response( + text=''' + + + SID + XML_VALUE + +''', + status_code=200, + headers={'Content-Type': 'application/atom+xml;type=feed'}, + ) + + json_response = Response( + status_code=200, + json={'systemID': 'JSON_VALUE', 'extra': 'ONLY_IN_JSON'}, + headers={'Content-Type': 'application/vnd.sap.adt.core.http.systeminformation.v1+json'}, + ) + + connection = Connection([xml_response, json_response]) + result = sap.adt.system.get_information(connection) + + self.assertEqual(result.get('SID'), 'XML_VALUE') + self.assertEqual(result.get('extra'), 'ONLY_IN_JSON') + self.assertEqual(len(result.entries), 2) + + +class TestSystemInfoEntry(unittest.TestCase): + + def test_entry_attributes(self): + """Test SystemInfoEntry stores identity and title""" + entry = sap.adt.system.SystemInfoEntry('TestKey', 'TestValue') + self.assertEqual(entry.identity, 'TestKey') + self.assertEqual(entry.title, 'TestValue') + + +class TestSystemInformation(unittest.TestCase): + + def test_empty_information(self): + """Test SystemInformation with no entries""" + info = sap.adt.system.SystemInformation([]) + self.assertEqual(len(info.entries), 0) + self.assertIsNone(info.get('anything')) + + def test_entries_property(self): + """Test entries property returns list of entries""" + entries = [ + sap.adt.system.SystemInfoEntry('Key1', 'Value1'), + sap.adt.system.SystemInfoEntry('Key2', 'Value2'), + ] + info = sap.adt.system.SystemInformation(entries) + self.assertEqual(len(info.entries), 2) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/unit/test_sap_cli_abap.py b/test/unit/test_sap_cli_abap.py index eca4d15..a655736 100644 --- a/test/unit/test_sap_cli_abap.py +++ b/test/unit/test_sap_cli_abap.py @@ -8,7 +8,12 @@ import sap.cli.abap import sap.platform.abap.run -from mock import BufferConsole +from mock import BufferConsole, Connection + +from fixtures_adt_system import ( + RESPONSE_SYSTEM_INFORMATION, + RESPONSE_JSON_SYSTEM_INFORMATION, +) parser = ArgumentParser() sap.cli.abap.CommandGroup().install_parser(parser) @@ -117,5 +122,89 @@ def test_run_prints_output(self): self.assertEqual(console.capout, expected_output + '\n') +class TestAbapSystemInfo(unittest.TestCase): + + def test_systeminfo_prints_all_entries(self): + """Test that systeminfo prints key: value lines for all entries""" + connection = Connection([RESPONSE_SYSTEM_INFORMATION, RESPONSE_JSON_SYSTEM_INFORMATION]) + args = parse_args(['systeminfo']) + console, factory = make_console_factory() + args.console_factory = factory + + args.execute(connection, args) + + output = console.capout + lines = output.strip().split('\n') + + # 24 XML entries + 5 JSON entries + self.assertEqual(len(lines), 29) + + def test_systeminfo_output_format(self): + """Test that systeminfo prints entries in key: value format""" + connection = Connection([RESPONSE_SYSTEM_INFORMATION, RESPONSE_JSON_SYSTEM_INFORMATION]) + args = parse_args(['systeminfo']) + console, factory = make_console_factory() + args.console_factory = factory + + args.execute(connection, args) + + output = console.capout + self.assertIn('ApplicationServerName: C50_ddci\n', output) + self.assertIn('DBName: C50/02\n', output) + self.assertIn('OSName: Linux\n', output) + self.assertIn('SID: C50\n', output) + self.assertIn('userName: DEVELOPER\n', output) + self.assertIn('client: 100\n', output) + self.assertIn('language: EN\n', output) + + def test_systeminfo_sends_requests(self): + """Test that systeminfo sends GET requests to both endpoints""" + connection = Connection([RESPONSE_SYSTEM_INFORMATION, RESPONSE_JSON_SYSTEM_INFORMATION]) + args = parse_args(['systeminfo']) + console, factory = make_console_factory() + args.console_factory = factory + + args.execute(connection, args) + + self.assertEqual(len(connection.execs), 2) + self.assertEqual(connection.execs[0].method, 'GET') + self.assertEqual(connection.execs[0].adt_uri, '/sap/bc/adt/system/information') + self.assertEqual(connection.execs[1].method, 'GET') + self.assertEqual(connection.execs[1].adt_uri, '/sap/bc/adt/core/http/systeminformation') + + def test_systeminfo_key_prints_value_only(self): + """Test that --key prints only the matching value""" + connection = Connection([RESPONSE_SYSTEM_INFORMATION, RESPONSE_JSON_SYSTEM_INFORMATION]) + args = parse_args(['systeminfo', '--key', 'OSName']) + console, factory = make_console_factory() + args.console_factory = factory + + args.execute(connection, args) + + self.assertEqual(console.capout, 'Linux\n') + + def test_systeminfo_key_json_entry(self): + """Test that --key works for entries from the JSON endpoint""" + connection = Connection([RESPONSE_SYSTEM_INFORMATION, RESPONSE_JSON_SYSTEM_INFORMATION]) + args = parse_args(['systeminfo', '--key', 'userName']) + console, factory = make_console_factory() + args.console_factory = factory + + args.execute(connection, args) + + self.assertEqual(console.capout, 'DEVELOPER\n') + + def test_systeminfo_key_not_found(self): + """Test that --key with unknown key prints nothing""" + connection = Connection([RESPONSE_SYSTEM_INFORMATION, RESPONSE_JSON_SYSTEM_INFORMATION]) + args = parse_args(['systeminfo', '--key', 'NonExistent']) + console, factory = make_console_factory() + args.console_factory = factory + + args.execute(connection, args) + + self.assertEqual(console.capout, '') + + if __name__ == '__main__': unittest.main() From 3fb175351bd6b4cf51513de8c348fb326cdcf4ce Mon Sep 17 00:00:00 2001 From: Jakub Filak Date: Tue, 24 Mar 2026 19:49:16 +0100 Subject: [PATCH 2/3] gcts: filter user credentials by endpoint For easy checks that user has valid credentials configured for the given endpoint. --- doc/commands/gcts.md | 8 +- sap/cli/gcts.py | 30 +++++ test/unit/test_sap_cli_gcts.py | 194 +++++++++++++++++++++++++++++++++ 3 files changed, 231 insertions(+), 1 deletion(-) diff --git a/doc/commands/gcts.md b/doc/commands/gcts.md index e533846..2ad3c4a 100644 --- a/doc/commands/gcts.md +++ b/doc/commands/gcts.md @@ -170,11 +170,17 @@ sapcli gcts config [-l|--list] [--unset] PACKAGE [NAME] [VALUE] Get credentials of the logged in user ```bash -sapcli gcts user get-credentials [-f|--format] {HUMAN|JSON} +sapcli gcts user get-credentials [-f|--format] {HUMAN|JSON} [-e|--endpoint ENDPOINT] ``` **Parameters:** - `--format`: The format of the command's output +- `--endpoint`: Filter credentials by HTTP API endpoint. When specified, + only credentials matching the given endpoint are returned. The endpoint + is canonicalized before matching (trailing slashes are stripped and + comparison is case-insensitive). The command returns a non-zero exit code + if no credentials are found for the endpoint or if all matching + credentials have an invalid state. ## user set-credentials diff --git a/sap/cli/gcts.py b/sap/cli/gcts.py index fbe28be..0cc5d14 100644 --- a/sap/cli/gcts.py +++ b/sap/cli/gcts.py @@ -77,6 +77,7 @@ def __init__(self): super().__init__('user') +@UserCommandGroup.argument('-e', '--endpoint', default=None) @UserCommandGroup.argument('-f', '--format', choices=['HUMAN', 'JSON'], default='HUMAN') @UserCommandGroup.command('get-credentials') def get_user_credentials(connection, args): @@ -84,6 +85,10 @@ def get_user_credentials(connection, args): user_credentials = sap.rest.gcts.simple.get_user_credentials(connection) console = args.console_factory() + + if args.endpoint is not None: + user_credentials = _filter_credentials_by_endpoint(user_credentials, args.endpoint) + if args.format == 'JSON': console.printout(user_credentials) else: @@ -98,6 +103,31 @@ def get_user_credentials(connection, args): sap.cli.helpers.TableWriter(user_credentials, columns).printout(console) +def _canonicalize_endpoint(endpoint): + """Canonicalize endpoint URL for consistent matching""" + + return endpoint.strip().rstrip('/').lower() + + +def _filter_credentials_by_endpoint(user_credentials, endpoint): + """Filter credentials by endpoint and validate state""" + + canonical_endpoint = _canonicalize_endpoint(endpoint) + + matching = [cred for cred in user_credentials + if _canonicalize_endpoint(cred['endpoint']) == canonical_endpoint] + + if not matching: + raise SAPCliError(f'No credentials found for endpoint: {endpoint}') + + valid = [cred for cred in matching if cred.get('state', '') != 'false'] + + if not valid: + raise SAPCliError(f'Credentials for endpoint {endpoint} are not valid: {matching[0]["state"]}') + + return valid + + @UserCommandGroup.argument('-t', '--token') @UserCommandGroup.argument('-a', '--api-url') @UserCommandGroup.command('set-credentials') diff --git a/test/unit/test_sap_cli_gcts.py b/test/unit/test_sap_cli_gcts.py index 7c564ec..b4d0148 100644 --- a/test/unit/test_sap_cli_gcts.py +++ b/test/unit/test_sap_cli_gcts.py @@ -2471,6 +2471,200 @@ def test_get_user_credentials_request_error(self, fake_dumper): fake_dumper.assert_called_once_with(self.console, messages) +class TestgCTSUserGetCredentialsEndpoint(PatcherTestCase, ConsoleOutputTestCase): + + def setUp(self): + super().setUp() + ConsoleOutputTestCase.setUp(self) + + assert self.console is not None + + self.patch_console(console=self.console) + self.fake_connection = None + self.api_url = 'https://api.github.com/v3/' + self.fake_get_credentials = self.patch('sap.rest.gcts.simple.get_user_credentials') + + def get_credentials_cmd(self, *args, **kwargs): + return parse_args('user', 'get-credentials', *args, **kwargs) + + def test_endpoint_filter_valid_state_json(self): + self.fake_get_credentials.return_value = [ + {'endpoint': self.api_url, 'type': 'token', 'state': 'user123'} + ] + + the_cmd = self.get_credentials_cmd('-e', self.api_url, '-f', 'JSON') + exit_code = the_cmd.execute(self.fake_connection, the_cmd) + + self.assertIsNone(exit_code) + self.assertConsoleContents( + self.console, + stdout=f"[{{'endpoint': '{self.api_url}', 'type': 'token', 'state': 'user123'}}]\n" + ) + + def test_endpoint_filter_valid_state_human(self): + self.fake_get_credentials.return_value = [ + {'endpoint': self.api_url, 'type': 'token', 'state': 'user123'} + ] + + the_cmd = self.get_credentials_cmd('-e', self.api_url, '-f', 'HUMAN') + exit_code = the_cmd.execute(self.fake_connection, the_cmd) + + self.assertIsNone(exit_code) + self.assertConsoleContents( + self.console, + stdout=( + "Endpoint | Type | State \n" + "--------------------------------------------\n" + f"{self.api_url} | token | user123\n" + ) + ) + + def test_endpoint_filter_not_found(self): + self.fake_get_credentials.return_value = [ + {'endpoint': self.api_url, 'type': 'token', 'state': 'user123'} + ] + + the_cmd = self.get_credentials_cmd('-e', 'https://other.example.com/') + exit_code = the_cmd.execute(self.fake_connection, the_cmd) + + self.assertEqual(exit_code, 1) + self.assertConsoleContents( + self.console, + stderr='No credentials found for endpoint: https://other.example.com/\n' + ) + + def test_endpoint_filter_invalid_state(self): + self.fake_get_credentials.return_value = [ + {'endpoint': self.api_url, 'type': 'token', 'state': 'false'} + ] + + the_cmd = self.get_credentials_cmd('-e', self.api_url) + exit_code = the_cmd.execute(self.fake_connection, the_cmd) + + self.assertEqual(exit_code, 1) + self.assertConsoleContents( + self.console, + stderr=f'Credentials for endpoint {self.api_url} are not valid: false\n' + ) + + def test_endpoint_canonicalization_trailing_slash(self): + self.fake_get_credentials.return_value = [ + {'endpoint': 'https://api.github.com/v3/', 'type': 'token', 'state': 'user123'} + ] + + the_cmd = self.get_credentials_cmd('-e', 'https://api.github.com/v3', '-f', 'JSON') + exit_code = the_cmd.execute(self.fake_connection, the_cmd) + + self.assertIsNone(exit_code) + self.assertConsoleContents( + self.console, + stdout="[{'endpoint': 'https://api.github.com/v3/', 'type': 'token', 'state': 'user123'}]\n" + ) + + def test_endpoint_canonicalization_case_insensitive(self): + self.fake_get_credentials.return_value = [ + {'endpoint': 'https://API.GitHub.com/v3/', 'type': 'token', 'state': 'user123'} + ] + + the_cmd = self.get_credentials_cmd('-e', 'https://api.github.com/v3/', '-f', 'JSON') + exit_code = the_cmd.execute(self.fake_connection, the_cmd) + + self.assertIsNone(exit_code) + self.assertConsoleContents( + self.console, + stdout="[{'endpoint': 'https://API.GitHub.com/v3/', 'type': 'token', 'state': 'user123'}]\n" + ) + + def test_endpoint_canonicalization_whitespace(self): + self.fake_get_credentials.return_value = [ + {'endpoint': 'https://api.github.com/v3/', 'type': 'token', 'state': 'user123'} + ] + + the_cmd = self.get_credentials_cmd('-e', ' https://api.github.com/v3/ ', '-f', 'JSON') + exit_code = the_cmd.execute(self.fake_connection, the_cmd) + + self.assertIsNone(exit_code) + self.assertConsoleContents( + self.console, + stdout="[{'endpoint': 'https://api.github.com/v3/', 'type': 'token', 'state': 'user123'}]\n" + ) + + def test_endpoint_filter_multiple_credentials(self): + self.fake_get_credentials.return_value = [ + {'endpoint': 'https://api.github.com/v3/', 'type': 'token', 'state': 'user123'}, + {'endpoint': 'https://other.example.com/', 'type': 'token', 'state': 'user456'} + ] + + the_cmd = self.get_credentials_cmd('-e', 'https://other.example.com/', '-f', 'JSON') + exit_code = the_cmd.execute(self.fake_connection, the_cmd) + + self.assertIsNone(exit_code) + self.assertConsoleContents( + self.console, + stdout="[{'endpoint': 'https://other.example.com/', 'type': 'token', 'state': 'user456'}]\n" + ) + + def test_endpoint_filter_multiple_matching_returns_all_valid(self): + self.fake_get_credentials.return_value = [ + {'endpoint': self.api_url, 'type': 'token', 'state': 'user123'}, + {'endpoint': self.api_url, 'type': 'basic', 'state': 'user456'} + ] + + the_cmd = self.get_credentials_cmd('-e', self.api_url, '-f', 'JSON') + exit_code = the_cmd.execute(self.fake_connection, the_cmd) + + self.assertIsNone(exit_code) + self.assertConsoleContents( + self.console, + stdout=f"[{{'endpoint': '{self.api_url}', 'type': 'token', 'state': 'user123'}}, " + f"{{'endpoint': '{self.api_url}', 'type': 'basic', 'state': 'user456'}}]\n" + ) + + def test_endpoint_filter_multiple_matching_skips_invalid(self): + self.fake_get_credentials.return_value = [ + {'endpoint': self.api_url, 'type': 'token', 'state': 'false'}, + {'endpoint': self.api_url, 'type': 'basic', 'state': 'user456'} + ] + + the_cmd = self.get_credentials_cmd('-e', self.api_url, '-f', 'JSON') + exit_code = the_cmd.execute(self.fake_connection, the_cmd) + + self.assertIsNone(exit_code) + self.assertConsoleContents( + self.console, + stdout=f"[{{'endpoint': '{self.api_url}', 'type': 'basic', 'state': 'user456'}}]\n" + ) + + def test_endpoint_filter_all_matching_invalid(self): + self.fake_get_credentials.return_value = [ + {'endpoint': self.api_url, 'type': 'token', 'state': 'false'}, + {'endpoint': self.api_url, 'type': 'basic', 'state': 'false'} + ] + + the_cmd = self.get_credentials_cmd('-e', self.api_url) + exit_code = the_cmd.execute(self.fake_connection, the_cmd) + + self.assertEqual(exit_code, 1) + self.assertConsoleContents( + self.console, + stderr=f'Credentials for endpoint {self.api_url} are not valid: false\n' + ) + + def test_no_endpoint_lists_all(self): + """When --endpoint is not given, all credentials are listed (existing behavior)""" + self.fake_get_credentials.return_value = [ + {'endpoint': self.api_url, 'type': 'token', 'state': 'user123'} + ] + + the_cmd = self.get_credentials_cmd('-f', 'JSON') + the_cmd.execute(self.fake_connection, the_cmd) + + self.assertConsoleContents( + self.console, + stdout=f"[{{'endpoint': '{self.api_url}', 'type': 'token', 'state': 'user123'}}]\n" + ) + + class TestgCTSUserSetCredentials(PatcherTestCase, ConsoleOutputTestCase): def setUp(self): From 087652d0c1614b440fd33ef7a3a453e5bdb1299e Mon Sep 17 00:00:00 2001 From: Jakub Filak Date: Thu, 26 Mar 2026 11:36:52 +0100 Subject: [PATCH 3/3] cli: detect invalid commands and print help Not all execution paths have handler e.g. "sapcli" and without this patch, the process dies with a stacktrace for missing member and that is not a nice user experience. --- sap/cli/_entry.py | 5 ++ test/unit/test_sap_cli__entry.py | 118 ++++++++++++++++++++++--------- 2 files changed, 91 insertions(+), 32 deletions(-) diff --git a/sap/cli/_entry.py b/sap/cli/_entry.py index 341503f..c9c4ad7 100644 --- a/sap/cli/_entry.py +++ b/sap/cli/_entry.py @@ -111,6 +111,11 @@ def parse_command_line(argv): log.setLevel(loglevel) logging.debug('Logging level: %i', loglevel) + if not hasattr(args, 'execute'): + report_args_error_and_exit( + arg_parser, + 'No command specified - please consult the help and specify a command to execute') + sap.cli.resolve_default_connection_values(args) if not args.ashost and not args.mshost: diff --git a/test/unit/test_sap_cli__entry.py b/test/unit/test_sap_cli__entry.py index a904ca6..6e3545f 100644 --- a/test/unit/test_sap_cli__entry.py +++ b/test/unit/test_sap_cli__entry.py @@ -15,6 +15,9 @@ 'fantomas', '--password', 'Down1oad', '--snc_lib', 'somelib.dylib' ] +MOCK_COMMAND_NAME = 'mockcommand' +MOCK_SUBCOMMAND_NAME = 'run' + def remove_cmd_param_from_list(params_list, param_name): index = params_list.index(param_name) @@ -22,35 +25,66 @@ def remove_cmd_param_from_list(params_list, param_name): del params_list[index] +def make_mock_command(): + """Create a mock command that registers a subcommand with execute default.""" + + mock_execute = Mock(return_value=0) + fake_cmd = Mock() + fake_cmd.name = MOCK_COMMAND_NAME + fake_cmd.description = 'Mock command for testing' + + def side_install_parser(subparser): + command_args = subparser.add_subparsers() + get_args = command_args.add_parser(MOCK_SUBCOMMAND_NAME) + get_args.set_defaults(execute=mock_execute) + + fake_cmd.install_parser.side_effect = side_install_parser + return fake_cmd + + +def get_tested_parameters(): + """Return ALL_PARAMETERS with the mock command and subcommand appended.""" + + return ALL_PARAMETERS + [MOCK_COMMAND_NAME, MOCK_SUBCOMMAND_NAME] + + class TestParseCommandLine(unittest.TestCase): + def setUp(self): + self._get_commands_patcher = patch('sap.cli.get_commands') + fake_commands = self._get_commands_patcher.start() + self._fake_cmd = make_mock_command() + fake_commands.return_value = [(Mock(), self._fake_cmd)] + + def tearDown(self): + self._get_commands_patcher.stop() + def test_args_sanity(self): - params = ALL_PARAMETERS.copy() + params = get_tested_parameters() args = entry.parse_command_line(params) - self.assertEqual( - vars(args), { - 'ashost': 'fixtures', - 'sysnr': '69', - 'client': '975', - 'ssl': False, - 'port': 3579, - 'user': 'fantomas', - 'password': 'Down1oad', - 'verify': False, - 'verbose_count': 0, - 'group': None, - 'mshost': None, - 'msserv': None, - 'snc_myname': None, - 'snc_partnername': None, - 'snc_qop': None, - 'snc_lib': 'somelib.dylib', - 'sysid': None, - }) + parsed = vars(args) + self.assertEqual(parsed['ashost'], 'fixtures') + self.assertEqual(parsed['sysnr'], '69') + self.assertEqual(parsed['client'], '975') + self.assertFalse(parsed['ssl']) + self.assertEqual(parsed['port'], 3579) + self.assertEqual(parsed['user'], 'fantomas') + self.assertEqual(parsed['password'], 'Down1oad') + self.assertFalse(parsed['verify']) + self.assertEqual(parsed['verbose_count'], 0) + self.assertIsNone(parsed['group']) + self.assertIsNone(parsed['mshost']) + self.assertIsNone(parsed['msserv']) + self.assertIsNone(parsed['snc_myname']) + self.assertIsNone(parsed['snc_partnername']) + self.assertIsNone(parsed['snc_qop']) + self.assertEqual(parsed['snc_lib'], 'somelib.dylib') + self.assertIsNone(parsed['sysid']) + self.assertTrue(callable(parsed['execute'])) def test_args_no_ashost(self): - test_params = ALL_PARAMETERS.copy() + test_params = get_tested_parameters() remove_cmd_param_from_list(test_params, '--ashost') with patch('sys.stderr', new_callable=StringIO) as fake_output, \ @@ -62,7 +96,7 @@ def test_args_no_ashost(self): 'No SAP Application Server Host name provided: use the option --ashost or the environment variable SAP_ASHOST')) def test_args_default_sysnr(self): - test_params = ALL_PARAMETERS.copy() + test_params = get_tested_parameters() remove_cmd_param_from_list(test_params, '--sysnr') args = entry.parse_command_line(test_params) @@ -70,7 +104,7 @@ def test_args_default_sysnr(self): self.assertEqual(args.sysnr, '00') def test_args_no_client(self): - test_params = ALL_PARAMETERS.copy() + test_params = get_tested_parameters() remove_cmd_param_from_list(test_params, '--client') with patch('sys.stderr', new_callable=StringIO) as fake_output, \ @@ -82,7 +116,7 @@ def test_args_no_client(self): 'No SAP Client provided: use the option --client or the environment variable SAP_CLIENT') def test_args_no_port(self): - test_params = ALL_PARAMETERS.copy() + test_params = get_tested_parameters() remove_cmd_param_from_list(test_params, '--port') args = entry.parse_command_line(test_params) @@ -90,7 +124,7 @@ def test_args_no_port(self): self.assertEqual(args.port, 443) def test_args_default_no_ssl(self): - test_params = ALL_PARAMETERS.copy() + test_params = get_tested_parameters() test_params.remove('--no-ssl') args = entry.parse_command_line(test_params) @@ -98,7 +132,7 @@ def test_args_default_no_ssl(self): self.assertTrue(args.ssl) def test_args_ask_user(self): - test_params = ALL_PARAMETERS.copy() + test_params = get_tested_parameters() remove_cmd_param_from_list(test_params, '--user') with patch('builtins.input', lambda pfx: 'fantomas'): @@ -107,7 +141,7 @@ def test_args_ask_user(self): self.assertEqual(args.user, 'fantomas') def test_args_ask_password(self): - test_params = ALL_PARAMETERS.copy() + test_params = get_tested_parameters() remove_cmd_param_from_list(test_params, '--password') with patch('getpass.getpass', lambda: 'Down1oad'): @@ -116,7 +150,7 @@ def test_args_ask_password(self): self.assertEqual(args.password, 'Down1oad') def test_args_ask_user_and_password(self): - test_params = ALL_PARAMETERS.copy() + test_params = get_tested_parameters() remove_cmd_param_from_list(test_params, '--password') remove_cmd_param_from_list(test_params, '--user') @@ -128,7 +162,7 @@ def test_args_ask_user_and_password(self): self.assertEqual(args.password, 'Down1oad') def test_args_env(self): - test_params = ALL_PARAMETERS.copy() + test_params = get_tested_parameters() remove_cmd_param_from_list(test_params, '--ashost') remove_cmd_param_from_list(test_params, '--sysnr') remove_cmd_param_from_list(test_params, '--password') @@ -167,7 +201,7 @@ def test_args_env(self): self.assertTrue(args.verify) def test_args_env_no_ssl_variants(self): - test_params = ALL_PARAMETERS.copy() + test_params = get_tested_parameters() test_params.remove('--no-ssl') for variant in ('n', 'N', 'No', 'no', 'NO', 'false', 'FALSE', 'False', 'Off', 'off'): @@ -191,7 +225,7 @@ def test_args_env_no_ssl_variants(self): self.assertTrue(args.ssl, msg=variant) def test_args_env_skip_ssl_validation_variants(self): - test_params = ALL_PARAMETERS.copy() + test_params = get_tested_parameters() test_params.remove('--skip-ssl-validation') for variant in ('n', 'N', 'No', 'no', 'NO', 'false', 'FALSE', 'False', 'Off', 'off'): @@ -215,11 +249,30 @@ def test_args_env_skip_ssl_validation_variants(self): self.assertTrue(args.verify, msg=variant) +class TestParseCommandLineNoCommand(unittest.TestCase): + + @patch('sap.cli.get_commands') + def test_no_command_specified(self, fake_commands): + fake_cmd = make_mock_command() + fake_commands.return_value = [(Mock(), fake_cmd)] + + test_params = ALL_PARAMETERS.copy() + + with patch('sys.stderr', new_callable=StringIO) as fake_output, \ + self.assertRaises(SystemExit) as exit_cm: + entry.parse_command_line(test_params) + + self.assertEqual(str(exit_cm.exception), '3') + self.assertTrue(fake_output.getvalue().startswith( + 'No command specified - please consult the help and specify a command to execute')) + + class TestParseCommandLineWithCorrnr(unittest.TestCase): def configure_mock(self, fake_commands): fake_cmd = Mock() fake_cmd.name = 'pytest' + fake_cmd.description = 'Mock pytest command' fake_cmd.install_parser = Mock() def side_install_parser(subparser): @@ -227,6 +280,7 @@ def side_install_parser(subparser): get_args = command_args.add_parser('command') get_args.add_argument('--corrnr') + get_args.set_defaults(execute=Mock()) fake_cmd.install_parser.side_effect = side_install_parser