From ad1bddde1913c3f78f9db013f9288f4711e522e2 Mon Sep 17 00:00:00 2001 From: redd1ng Date: Mon, 11 May 2026 16:41:10 +0200 Subject: [PATCH] Handle EncryptedPolicyBody for platform and remediation scripts. Files are now written to disk instead of stdout. This helps when a tenant has a large number of scripts and fixes the edge case where encoding errors threw an exception. Added a try except to apps download so it will continue to download the other apps in case an error occurs. --- device/windows.py | 128 ++++++++++++++++++++++++++++++++++------------ utils/utils.py | 18 ++++++- 2 files changed, 112 insertions(+), 34 deletions(-) diff --git a/device/windows.py b/device/windows.py index f0d18a1..aad1745 100644 --- a/device/windows.py +++ b/device/windows.py @@ -11,12 +11,13 @@ import xml.etree.ElementTree as ET from datetime import datetime, timedelta, timezone from device.device import Device -from utils.utils import prtauth, extract_pfx, save_encrypted_message_as_smime, decrypt_smime_file, aes_decrypt, renew_token +from utils.utils import prtauth, extract_pfx, save_encrypted_message_as_smime, decrypt_smime_file, aes_decrypt, renew_token, decrypt_encrypted_body, safe_filename from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.serialization import Encoding + class Windows(Device): def __init__(self, logger, os, device_name, deviceid, uid, tenant, prt, session_key, proxy): super().__init__(logger, os, device_name, deviceid, uid, tenant, prt, session_key, proxy) @@ -390,62 +391,123 @@ def download_apps(self, mdmpfx): certpath = 'pytune_mdm.crt' keypath = 'pytune_mdm.key' extract_pfx(mdmpfx, certpath, keypath) - ime = IME(self.device_name, certpath, keypath) - + + scripts_dir = os.path.join('IntuneScriptsAndApps', 'PlatformScripts') + apps_dir = os.path.join('IntuneScriptsAndApps', 'Apps') + os.makedirs(scripts_dir, exist_ok=True) + os.makedirs(apps_dir, exist_ok=True) + self.logger.info(f'downloading scripts...') policies = ime.request_policy() - if len(policies) == 0: - self.logger.error(f'available scripts not found') - else: - self.logger.alert(f'scripts found!') - i = 1 - for policy in policies: - self.logger.info(f'#{i} (policyid:{policy["PolicyId"]}):\n') - print(policy["PolicyBody"] + '\n') - i=i+1 + for i, policy in enumerate(policies, start=1): + policy_id = policy["PolicyId"] + policy_name = policy.get("PolicyName") or policy_id + self.logger.info(f'#{i} (Platform Script/Policy ID:{policy_id}, Name:{policy_name})') + + body = policy.get("PolicyBody") + encrypted = policy.get("EncryptedPolicyBody") + + raw = None + if body: + raw = base64.b64decode(body) + elif encrypted: + try: + plaintext = decrypt_encrypted_body(encrypted, certpath, keypath) + raw = plaintext.encode('utf-8') if isinstance(plaintext, str) else plaintext + except Exception as e: + self.logger.error(f'decrypt failed for {policy_id}: {e}') + else: + self.logger.warn(f'no PolicyBody or EncryptedPolicyBody for {policy_id}') + + if raw is not None: + out_path = os.path.join(scripts_dir, f'{safe_filename(policy_name)}.ps1') + with open(out_path, 'wb') as f: + f.write(raw) + self.logger.success(f'saved script to {out_path}') self.logger.info(f'downloading win32apps...') apps = ime.get_selected_app() if len(apps) == 0: self.logger.error(f'available intunewin file not found') - for app in apps: self.logger.alert(f'found {app["Name"]}!') - content_info = ime.get_content_info(app) - upload_location = json.loads(content_info["ContentInfo"])["UploadLocation"] - decrypt_info = ime.decrypt_decryptinfo(content_info["DecryptInfo"]) - self.logger.info(f'downloading from {upload_location} ...') - ime.download_decrypt_intunewin(app["Name"], upload_location, decrypt_info["EncryptionKey"], decrypt_info["IV"]) - self.logger.success(f'successfully downloaded to {app["Name"]}.intunewin!') - - + try: + content_info = ime.get_content_info(app) + upload_location = json.loads(content_info["ContentInfo"])["UploadLocation"] + decrypt_info = ime.decrypt_decryptinfo(content_info["DecryptInfo"]) + self.logger.info(f'downloading from {upload_location} ...') + app_out_base = os.path.join(apps_dir, safe_filename(app["Name"])) + ime.download_decrypt_intunewin(app_out_base, upload_location, decrypt_info["EncryptionKey"], decrypt_info["IV"]) + self.logger.success(f'successfully downloaded to {app_out_base}.intunewin!') + except Exception as e: + self.logger.error(f'failed to download {app["Name"]}: {e}') + continue os.remove(certpath) os.remove(keypath) + def download_remediation_scripts(self, mdmpfx): certpath = 'pytune_mdm.crt' keypath = 'pytune_mdm.key' extract_pfx(mdmpfx, certpath, keypath) - ime = IME(self.device_name, certpath, keypath) - + + remediation_dir = os.path.join('IntuneScriptsAndApps', 'RemediationScripts') + os.makedirs(remediation_dir, exist_ok=True) + self.logger.info(f'downloading remediation scripts...') scripts = ime.get_remediation_scripts() if len(scripts) == 0: self.logger.error(f'available remediation scripts not found') else: self.logger.alert(f'remediation scripts found!') - i = 1 - for script in scripts: - self.logger.info(f'#{i} (Remediation/Policy ID:{script["PolicyId"]}):\n') - print("Detection Script Parameters:" + script["PolicyScriptParameters"]) - print("Detection Script:") - print(base64.b64decode(script["PolicyBody"]).decode('utf-8') + '\n') - print("Remediation Script Parameters:" + script["RemediationScriptParameters"]) - print("Remediation Script:") - print(base64.b64decode(script["RemediationScript"]).decode('utf-8') + '\n') - i=i+1 + for i, script in enumerate(scripts, start=1): + policy_id = script["PolicyId"] + policy_name = script.get("PolicyName") or policy_id + safe_name = safe_filename(policy_name) + self.logger.info(f'#{i} (Remediation/Policy ID:{policy_id}, Name:{policy_name})') + + body = script.get("PolicyBody") + encrypted = script.get("EncryptedPolicyBody") + + if body: + # Detection script + det_path = os.path.join(remediation_dir, f'{safe_name}.detect.ps1') + with open(det_path, 'wb') as f: + f.write(base64.b64decode(body)) + self.logger.success(f'saved detection script to {det_path}') + + det_params = script.get("PolicyScriptParameters", "") or "" + if det_params: + with open(det_path + '.params.txt', 'w', encoding='utf-8') as f: + f.write(det_params) + + # Remediation script + rem_body = script.get("RemediationScript") + if rem_body: + rem_path = os.path.join(remediation_dir, f'{safe_name}.remediate.ps1') + with open(rem_path, 'wb') as f: + f.write(base64.b64decode(rem_body)) + self.logger.success(f'saved remediation script to {rem_path}') + + rem_params = script.get("RemediationScriptParameters", "") or "" + if rem_params: + with open(rem_path + '.params.txt', 'w', encoding='utf-8') as f: + f.write(rem_params) + + elif encrypted: + try: + plaintext = decrypt_encrypted_body(encrypted, certpath, keypath) + raw = plaintext.encode('utf-8') if isinstance(plaintext, str) else plaintext + out_path = os.path.join(remediation_dir, f'{safe_name}.ps1') + with open(out_path, 'wb') as f: + f.write(raw) + self.logger.success(f'saved to {out_path}') + except Exception as e: + self.logger.error(f'decrypt failed for {policy_id}: {e}') + else: + self.logger.warn(f'no PolicyBody or EncryptedPolicyBody for {policy_id}') os.remove(certpath) os.remove(keypath) diff --git a/utils/utils.py b/utils/utils.py index 4a859fa..f722e73 100644 --- a/utils/utils.py +++ b/utils/utils.py @@ -174,6 +174,17 @@ def extract_pfx(pfxpath, certpath, keypath): subprocess.run(f'openssl pkcs12 -in {pfxpath} -nodes -password pass:password -out {keypath} -nocerts -nodes', shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) return +def decrypt_encrypted_body(encrypted_xml: str, cert_path: str, key_path: str) -> str: + m = re.search(r"<(?:\w+:)?EncryptedContent>([^<]+):"/\\|?*': + name = name.replace(ch, '_') + return name.strip().rstrip('.') \ No newline at end of file