Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 95 additions & 33 deletions device/windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta, timezone
from device.device import Device
from utils.utils import prtauth, extract_pfx, save_encrypted_message_as_smime, decrypt_smime_file, aes_decrypt, renew_token
from utils.utils import prtauth, extract_pfx, save_encrypted_message_as_smime, decrypt_smime_file, aes_decrypt, renew_token, decrypt_encrypted_body, safe_filename
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import Encoding



class Windows(Device):
def __init__(self, logger, os, device_name, deviceid, uid, tenant, prt, session_key, proxy):
super().__init__(logger, os, device_name, deviceid, uid, tenant, prt, session_key, proxy)
Expand Down Expand Up @@ -390,62 +391,123 @@ def download_apps(self, mdmpfx):
certpath = 'pytune_mdm.crt'
keypath = 'pytune_mdm.key'
extract_pfx(mdmpfx, certpath, keypath)

ime = IME(self.device_name, certpath, keypath)


scripts_dir = os.path.join('IntuneScriptsAndApps', 'PlatformScripts')
apps_dir = os.path.join('IntuneScriptsAndApps', 'Apps')
os.makedirs(scripts_dir, exist_ok=True)
os.makedirs(apps_dir, exist_ok=True)

self.logger.info(f'downloading scripts...')
policies = ime.request_policy()
if len(policies) == 0:
self.logger.error(f'available scripts not found')
else:
self.logger.alert(f'scripts found!')
i = 1
for policy in policies:
self.logger.info(f'#{i} (policyid:{policy["PolicyId"]}):\n')
print(policy["PolicyBody"] + '\n')
i=i+1
for i, policy in enumerate(policies, start=1):
policy_id = policy["PolicyId"]
policy_name = policy.get("PolicyName") or policy_id
self.logger.info(f'#{i} (Platform Script/Policy ID:{policy_id}, Name:{policy_name})')

body = policy.get("PolicyBody")
encrypted = policy.get("EncryptedPolicyBody")

raw = None
if body:
raw = base64.b64decode(body)
elif encrypted:
try:
plaintext = decrypt_encrypted_body(encrypted, certpath, keypath)
raw = plaintext.encode('utf-8') if isinstance(plaintext, str) else plaintext
except Exception as e:
self.logger.error(f'decrypt failed for {policy_id}: {e}')
else:
self.logger.warn(f'no PolicyBody or EncryptedPolicyBody for {policy_id}')

if raw is not None:
out_path = os.path.join(scripts_dir, f'{safe_filename(policy_name)}.ps1')
with open(out_path, 'wb') as f:
f.write(raw)
self.logger.success(f'saved script to {out_path}')

self.logger.info(f'downloading win32apps...')
apps = ime.get_selected_app()
if len(apps) == 0:
self.logger.error(f'available intunewin file not found')

for app in apps:
self.logger.alert(f'found {app["Name"]}!')
content_info = ime.get_content_info(app)
upload_location = json.loads(content_info["ContentInfo"])["UploadLocation"]
decrypt_info = ime.decrypt_decryptinfo(content_info["DecryptInfo"])
self.logger.info(f'downloading from {upload_location} ...')
ime.download_decrypt_intunewin(app["Name"], upload_location, decrypt_info["EncryptionKey"], decrypt_info["IV"])
self.logger.success(f'successfully downloaded to {app["Name"]}.intunewin!')


try:
content_info = ime.get_content_info(app)
upload_location = json.loads(content_info["ContentInfo"])["UploadLocation"]
decrypt_info = ime.decrypt_decryptinfo(content_info["DecryptInfo"])
self.logger.info(f'downloading from {upload_location} ...')
app_out_base = os.path.join(apps_dir, safe_filename(app["Name"]))
ime.download_decrypt_intunewin(app_out_base, upload_location, decrypt_info["EncryptionKey"], decrypt_info["IV"])
self.logger.success(f'successfully downloaded to {app_out_base}.intunewin!')
except Exception as e:
self.logger.error(f'failed to download {app["Name"]}: {e}')
continue
os.remove(certpath)
os.remove(keypath)


def download_remediation_scripts(self, mdmpfx):
certpath = 'pytune_mdm.crt'
keypath = 'pytune_mdm.key'
extract_pfx(mdmpfx, certpath, keypath)

ime = IME(self.device_name, certpath, keypath)


remediation_dir = os.path.join('IntuneScriptsAndApps', 'RemediationScripts')
os.makedirs(remediation_dir, exist_ok=True)

self.logger.info(f'downloading remediation scripts...')
scripts = ime.get_remediation_scripts()
if len(scripts) == 0:
self.logger.error(f'available remediation scripts not found')
else:
self.logger.alert(f'remediation scripts found!')
i = 1
for script in scripts:
self.logger.info(f'#{i} (Remediation/Policy ID:{script["PolicyId"]}):\n')
print("Detection Script Parameters:" + script["PolicyScriptParameters"])
print("Detection Script:")
print(base64.b64decode(script["PolicyBody"]).decode('utf-8') + '\n')
print("Remediation Script Parameters:" + script["RemediationScriptParameters"])
print("Remediation Script:")
print(base64.b64decode(script["RemediationScript"]).decode('utf-8') + '\n')
i=i+1
for i, script in enumerate(scripts, start=1):
policy_id = script["PolicyId"]
policy_name = script.get("PolicyName") or policy_id
safe_name = safe_filename(policy_name)
self.logger.info(f'#{i} (Remediation/Policy ID:{policy_id}, Name:{policy_name})')

body = script.get("PolicyBody")
encrypted = script.get("EncryptedPolicyBody")

if body:
# Detection script
det_path = os.path.join(remediation_dir, f'{safe_name}.detect.ps1')
with open(det_path, 'wb') as f:
f.write(base64.b64decode(body))
self.logger.success(f'saved detection script to {det_path}')

det_params = script.get("PolicyScriptParameters", "") or ""
if det_params:
with open(det_path + '.params.txt', 'w', encoding='utf-8') as f:
f.write(det_params)

# Remediation script
rem_body = script.get("RemediationScript")
if rem_body:
rem_path = os.path.join(remediation_dir, f'{safe_name}.remediate.ps1')
with open(rem_path, 'wb') as f:
f.write(base64.b64decode(rem_body))
self.logger.success(f'saved remediation script to {rem_path}')

rem_params = script.get("RemediationScriptParameters", "") or ""
if rem_params:
with open(rem_path + '.params.txt', 'w', encoding='utf-8') as f:
f.write(rem_params)

elif encrypted:
try:
plaintext = decrypt_encrypted_body(encrypted, certpath, keypath)
raw = plaintext.encode('utf-8') if isinstance(plaintext, str) else plaintext
out_path = os.path.join(remediation_dir, f'{safe_name}.ps1')
with open(out_path, 'wb') as f:
f.write(raw)
self.logger.success(f'saved to {out_path}')
except Exception as e:
self.logger.error(f'decrypt failed for {policy_id}: {e}')
else:
self.logger.warn(f'no PolicyBody or EncryptedPolicyBody for {policy_id}')

os.remove(certpath)
os.remove(keypath)
Expand Down
18 changes: 17 additions & 1 deletion utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,17 @@ def extract_pfx(pfxpath, certpath, keypath):
subprocess.run(f'openssl pkcs12 -in {pfxpath} -nodes -password pass:password -out {keypath} -nocerts -nodes', shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
return

def decrypt_encrypted_body(encrypted_xml: str, cert_path: str, key_path: str) -> str:
m = re.search(r"<(?:\w+:)?EncryptedContent>([^<]+)</", encrypted_xml)
der = base64.b64decode(re.sub(r"\s+", "", m.group(1)))

result = subprocess.run(
["openssl", "cms", "-decrypt", "-inform", "DER",
"-recip", cert_path, "-inkey", key_path],
input=der, capture_output=True, check=True,
)
return result.stdout.decode("utf-8", errors="replace")

def get_str_and_next(blob, start):
str_size = (struct.unpack('<I', blob[start:start+0x4])[0]) * 2
str = blob[start+0xc:start+0xc+str_size].decode('utf-16le')
Expand Down Expand Up @@ -207,4 +218,9 @@ def aes_decrypt(key, iv, content):
cipher = Cipher(algorithms.AES(base64.b64decode(key)), modes.CBC(base64.b64decode(iv)), backend=default_backend())
decryptor = cipher.decryptor()
decrypted_data = decryptor.update(content) + decryptor.finalize()
return decrypted_data
return decrypted_data

def safe_filename(name):
for ch in '<>:"/\\|?*':
name = name.replace(ch, '_')
return name.strip().rstrip('.')