diff --git a/cterasdk/core/cloudfs.py b/cterasdk/core/cloudfs.py index e18b7080..de2a09a1 100644 --- a/cterasdk/core/cloudfs.py +++ b/cterasdk/core/cloudfs.py @@ -625,6 +625,57 @@ def add_folders(self, name, folder_finding_helpers): logger.error('Failed adding folders to zone.') raise CTERAException(f'Failed adding folders to zone: {name}') from error + def add_users(self, name, user_accounts): + """ + Add users to a zone + + :param str name: The name of the zone + :param list[cterasdk.core.types.UserAccount] user_accounts: List of user accounts to add + """ + self._modify_zone_users(name, user_accounts, add=True) + + def remove_users(self, name, user_accounts): + """ + Remove users from a zone + + :param str name: The name of the zone + :param list[cterasdk.core.types.UserAccount] user_accounts: List of user accounts to remove + """ + self._modify_zone_users(name, user_accounts, add=False) + + def _modify_zone_users(self, name, user_accounts, add=True): + """ + Internal method to add or remove users from a zone. + + :param str name: The name of the zone + :param list[cterasdk.core.types.UserAccount] user_accounts: List of user accounts + :param bool add: True to add users, False to remove users + """ + action = 'adding' if add else 'removing' + preposition = 'to' if add else 'from' + zone = self._core.cloudfs.zones.get(name) + info = self._zone_info(zone.zoneId) + description = info.description if hasattr(info, 'description') else None + param = self._zone_param(info.name, info.policyType, description, info.zoneId) + + param.delta.usersDelta = Object() + param.delta.usersDelta._classname = 'ZoneUserDelta' # pylint: disable=protected-access + param.delta.usersDelta.added = [] + param.delta.usersDelta.removed = [] + + target_list = param.delta.usersDelta.added if add else param.delta.usersDelta.removed + for user_account in user_accounts: + user = self._core.users.get(user_account, include=['uid']) + target_list.append(user.uid) + + logger.info('%s users %s zone. %s', action.capitalize(), preposition, {'zone': info.name}) + + try: + self._save(param) + except CTERAException as error: + logger.error('Failed %s users %s zone.', action, preposition) + raise CTERAException(f'Failed {action} users {preposition} zone: {name}') from error + def _zone_info(self, zid): logger.debug('Obtaining zone info. %s', {'id': zid}) response = self._core.api.execute('', 'getZoneBasicInfo', zid) diff --git a/tests/ut/core/admin/test_cloudfs_zones.py b/tests/ut/core/admin/test_cloudfs_zones.py index 8fc0f942..14b1625e 100644 --- a/tests/ut/core/admin/test_cloudfs_zones.py +++ b/tests/ut/core/admin/test_cloudfs_zones.py @@ -38,6 +38,8 @@ def setUp(self): self._zone_id = TestCoreZones._zone_id self._device_names = ['dev1', 'dev2', 'dev3'] self._device_ids = [100, 101, 102] + self._user_accounts = [UserAccount('alice'), UserAccount('bruce')] + self._user_uids = [200, 201] def test_get_zone_success(self): execute_response = self._get_zones_display_info_response() @@ -186,6 +188,102 @@ def test_add_devices_raise(self): self._assert_equal_objects(actual_param, expected_param) self.assertEqual(f"Failed adding devices: [{', '.join(self._device_names)}] to zone: {self._zone_name}", str(error.exception)) + def test_add_users_success(self): + self._init_global_admin() + zone = self._get_zones_display_info_response().objects.pop() + self._global_admin.api.execute = mock.MagicMock(side_effect=TestCoreZones._mock_execute) + self._global_admin.cloudfs.zones.get = mock.MagicMock(return_value=zone) + get_user_mock = self.patch_call("cterasdk.core.users.Users.get") + get_user_mock.side_effect = self._get_user_objects_side_effect + + cloudfs.Zones(self._global_admin).add_users(self._zone_name, self._user_accounts) + + self._global_admin.cloudfs.zones.get.assert_called_once_with(self._zone_name) + get_user_calls = [mock.call(user_account, include=['uid']) for user_account in self._user_accounts] + get_user_mock.assert_has_calls(get_user_calls) + self._global_admin.api.execute.assert_has_calls( + [ + mock.call('', 'getZoneBasicInfo', self._zone_id), + mock.call('', 'saveZone', mock.ANY) + ] + ) + expected_param = self._get_add_users_param() + actual_param = self._global_admin.api.execute.call_args[0][2] + self._assert_equal_objects(actual_param, expected_param) + + def test_add_users_raise(self): + self._init_global_admin() + zone = self._get_zones_display_info_response().objects.pop() + self._global_admin.api.execute = mock.MagicMock(side_effect=TestCoreZones._save_zone_side_effect) + self._global_admin.cloudfs.zones.get = mock.MagicMock(return_value=zone) + get_user_mock = self.patch_call("cterasdk.core.users.Users.get") + get_user_mock.side_effect = self._get_user_objects_side_effect + + with self.assertRaises(exceptions.CTERAException) as error: + cloudfs.Zones(self._global_admin).add_users(self._zone_name, self._user_accounts) + + self._global_admin.cloudfs.zones.get.assert_called_once_with(self._zone_name) + get_user_calls = [mock.call(user_account, include=['uid']) for user_account in self._user_accounts] + get_user_mock.assert_has_calls(get_user_calls) + self._global_admin.api.execute.assert_has_calls( + [ + mock.call('', 'getZoneBasicInfo', self._zone_id), + mock.call('', 'saveZone', mock.ANY) + ] + ) + expected_param = self._get_add_users_param() + actual_param = self._global_admin.api.execute.call_args[0][2] + self._assert_equal_objects(actual_param, expected_param) + self.assertEqual(f'Failed adding users to zone: {self._zone_name}', str(error.exception)) + + def test_remove_users_success(self): + self._init_global_admin() + zone = self._get_zones_display_info_response().objects.pop() + self._global_admin.api.execute = mock.MagicMock(side_effect=TestCoreZones._mock_execute) + self._global_admin.cloudfs.zones.get = mock.MagicMock(return_value=zone) + get_user_mock = self.patch_call("cterasdk.core.users.Users.get") + get_user_mock.side_effect = self._get_user_objects_side_effect + + cloudfs.Zones(self._global_admin).remove_users(self._zone_name, self._user_accounts) + + self._global_admin.cloudfs.zones.get.assert_called_once_with(self._zone_name) + get_user_calls = [mock.call(user_account, include=['uid']) for user_account in self._user_accounts] + get_user_mock.assert_has_calls(get_user_calls) + self._global_admin.api.execute.assert_has_calls( + [ + mock.call('', 'getZoneBasicInfo', self._zone_id), + mock.call('', 'saveZone', mock.ANY) + ] + ) + expected_param = self._get_remove_users_param() + actual_param = self._global_admin.api.execute.call_args[0][2] + self._assert_equal_objects(actual_param, expected_param) + + def test_remove_users_raise(self): + self._init_global_admin() + zone = self._get_zones_display_info_response().objects.pop() + self._global_admin.api.execute = mock.MagicMock(side_effect=TestCoreZones._save_zone_side_effect) + self._global_admin.cloudfs.zones.get = mock.MagicMock(return_value=zone) + get_user_mock = self.patch_call("cterasdk.core.users.Users.get") + get_user_mock.side_effect = self._get_user_objects_side_effect + + with self.assertRaises(exceptions.CTERAException) as error: + cloudfs.Zones(self._global_admin).remove_users(self._zone_name, self._user_accounts) + + self._global_admin.cloudfs.zones.get.assert_called_once_with(self._zone_name) + get_user_calls = [mock.call(user_account, include=['uid']) for user_account in self._user_accounts] + get_user_mock.assert_has_calls(get_user_calls) + self._global_admin.api.execute.assert_has_calls( + [ + mock.call('', 'getZoneBasicInfo', self._zone_id), + mock.call('', 'saveZone', mock.ANY) + ] + ) + expected_param = self._get_remove_users_param() + actual_param = self._global_admin.api.execute.call_args[0][2] + self._assert_equal_objects(actual_param, expected_param) + self.assertEqual(f'Failed removing users from zone: {self._zone_name}', str(error.exception)) + def _get_add_devices_param(self): param = self._get_zone_param(zone_id=self._zone_id) for device_id in self._device_ids: @@ -219,6 +317,34 @@ def _get_device_objects(self): devices.append(param) return devices + def _get_add_users_param(self): + param = self._get_zone_param(zone_id=self._zone_id) + param.delta.usersDelta = Object() + param.delta.usersDelta._classname = 'ZoneUserDelta' # pylint: disable=protected-access + param.delta.usersDelta.added = list(self._user_uids) + param.delta.usersDelta.removed = [] + return param + + def _get_remove_users_param(self): + param = self._get_zone_param(zone_id=self._zone_id) + param.delta.usersDelta = Object() + param.delta.usersDelta._classname = 'ZoneUserDelta' # pylint: disable=protected-access + param.delta.usersDelta.added = [] + param.delta.usersDelta.removed = list(self._user_uids) + return param + + def _get_user_objects_side_effect(self, user_account, include): # pylint: disable=unused-argument + user_index = next( + (i for i, ua in enumerate(self._user_accounts) if ua.name == user_account.name), + None + ) + if user_index is not None: + user = Object() + user.uid = self._user_uids[user_index] + user.name = user_account.name + return user + raise exceptions.ObjectNotFoundException(f'User not found: {user_account.name}') + @staticmethod def _find_cloud_folder(folder_name, folder_owner, include): # pylint: disable=unused-argument