From 24c24f436b6d2e361b29480e6e201fc6380bd302 Mon Sep 17 00:00:00 2001 From: Blatzy Date: Wed, 3 Dec 2025 16:07:33 +0100 Subject: [PATCH 1/4] inital commit for new module check-add-computer --- nxc/modules/check-add-computer.py | 319 ++++++++++++++++++++++++++++++ 1 file changed, 319 insertions(+) create mode 100644 nxc/modules/check-add-computer.py diff --git a/nxc/modules/check-add-computer.py b/nxc/modules/check-add-computer.py new file mode 100644 index 0000000000..b62036839e --- /dev/null +++ b/nxc/modules/check-add-computer.py @@ -0,0 +1,319 @@ +import ssl +from io import BytesIO +import ldap3 +from impacket.smbconnection import SMBConnection +from nxc.helpers.misc import CATEGORY +from nxc.protocols.smb.samrfunc import LSAQuery + + +class NXCModule: + """ + Module to check which users/groups can add workstations to domain + Author : @Blatzy github.com/Blatzy + """ + + name = "check-add-computer" + description = "Checks the 'Add workstations to domain' policy from Default Domain Controllers Policy" + supported_protocols = ["smb"] + category = CATEGORY.ENUMERATION + + def options(self, context, module_options): + """ + Check the SeMachineAccountPrivilege in the Default Domain Controllers Policy and MachineAccountQuota via LDAP. + Displays which users/groups can add workstations to the domain. + Usage: nxc smb $DC-IP -u 'username' -p 'password' -M check-add-computer + """ + pass + + def on_login(self, context, connection): + self.context = context + self.connection = connection + + # Check if SYSVOL share exists (DC verification) + if not self.check_sysvol_exists(): + self.context.log.fail("SYSVOL share not found - This may not be a Domain Controller") + return + + self.context.log.debug("SYSVOL share found - Confirmed Domain Controller") + + # Initialize LSA for SID resolution + try: + self.lsa_query = LSAQuery( + username=connection.username, + password=connection.password, + domain=connection.domain, + remote_name=connection.hostname, + remote_host=connection.host, + lmhash=connection.lmhash, + nthash=connection.nthash, + kerberos=connection.kerberos, + kdcHost=connection.kdcHost, + aesKey=connection.aesKey, + logger=context.log + ) + except Exception as e: + self.context.log.fail(f"Failed to initialize LSA connection: {e}") + self.lsa_query = None + + # Try static path first (most reliable) + # Use targetDomain (DC's domain) not domain (user's auth domain) for trust scenarios + dc_domain = connection.targetDomain + dc_policy_guid = "{6AC1786C-016F-11D2-945F-00C04fB984F9}" + dc_policy_path = f"{dc_domain}\\Policies\\{dc_policy_guid}\\MACHINE\\Microsoft\\Windows NT\\SecEdit\\GptTmpl.inf" + + self.context.log.info("Trying static path for Default Domain Controllers Policy...") + self.context.log.debug(f"Static path: {dc_policy_path}") + + # Test if static path works + try: + buf = BytesIO() + connection.conn.getFile("SYSVOL", dc_policy_path, buf.write) + if buf.getvalue(): + self.context.log.highlight("Found Default Domain Controllers Policy via static path") + else: + dc_policy_path = None + except Exception as e: + self.context.log.debug(f"Static path failed: {e}") + dc_policy_path = None + + # If static path fails, try spider + if not dc_policy_path: + self.context.log.info("Static path failed, searching with spider...") + try: + paths = connection.spider("SYSVOL", pattern=["GptTmpl.inf"]) + self.context.log.debug(f"Spider found {len(paths) if paths else 0} GptTmpl.inf files") + + if paths: + for path in paths: + self.context.log.debug(f" - {path}") + # Look for Default Domain Controllers Policy GUID + if "6AC1786C-016F-11D2-945F-00C04fB984F9" in path.upper(): + dc_policy_path = path + self.context.log.success(f"Found Default Domain Controllers Policy: {path}") + break + else: + self.context.log.fail("No GptTmpl.inf files found in SYSVOL") + except Exception as e: + self.context.log.fail(f"Failed to search SYSVOL: {e}") + + if not dc_policy_path: + self.context.log.fail("Default Domain Controllers Policy not found") + return + + # Get the policy file content + policy_content = self.get_policy_file(dc_policy_path) + if not policy_content: + self.context.log.fail("Could not retrieve Default Domain Controllers Policy") + return + + # Parse and display SeMachineAccountPrivilege + self.parse_machine_account_privilege(policy_content) + + def check_sysvol_exists(self): + """Check if SYSVOL share exists on the target""" + try: + shares = self.connection.conn.listShares() + for share in shares: + if share['shi1_netname'].rstrip('\x00').upper() == 'SYSVOL': + return True + return False + except Exception as e: + self.context.log.debug(f"Error checking for SYSVOL: {e}") + return False + + def get_policy_file(self, policy_path): + """Retrieve GptTmpl.inf content from given path""" + self.context.log.info(f"Reading policy file...") + self.context.log.debug(f"Policy path: {policy_path}") + + try: + # Use getFile with BytesIO like gpp_privileges.py + buf = BytesIO() + self.connection.conn.getFile("SYSVOL", policy_path, buf.write) + + content = buf.getvalue() + + if not content: + self.context.log.fail("File is empty or could not be read") + return None + + self.context.log.debug(f"Read {len(content)} bytes from policy file") + + # Try different encodings + for encoding in ['utf-16-le', 'utf-16', 'latin-1', 'utf-8']: + try: + decoded = content.decode(encoding, errors='ignore') + if decoded and len(decoded) > 0: + self.context.log.debug(f"Successfully decoded with {encoding}") + return decoded + except: + continue + + self.context.log.fail("Could not decode policy file with any known encoding") + return None + + except Exception as e: + self.context.log.fail(f"Error reading policy file: {e}") + self.context.log.debug(f"Full error details: {type(e).__name__}: {str(e)}") + return None + + def parse_machine_account_privilege(self, content): + """Parse GptTmpl.inf to find SeMachineAccountPrivilege""" + self.context.log.info("Parsing security policy...") + + # Find the [Privilege Rights] section + in_privilege_section = False + machine_account_line = None + + for line in content.split('\n'): + line = line.strip() + + if line.upper() == "[PRIVILEGE RIGHTS]": + in_privilege_section = True + continue + + if in_privilege_section: + # Check if we've moved to another section + if line.startswith('['): + break + + # Look for SeMachineAccountPrivilege + if line.startswith('SeMachineAccountPrivilege'): + machine_account_line = line + break + + if not machine_account_line: + self.context.log.info("SeMachineAccountPrivilege not found in policy") + self.context.log.highlight("=" * 60) + self.context.log.highlight("Default configuration applies:") + self.context.log.highlight(" - Authenticated Users can join computers to the domain") + self.context.log.highlight("") + + # Query MachineAccountQuota + maq_value = self.get_machine_account_quota() + if maq_value is not None: + if maq_value == 0: + self.context.log.highlight(f"ms-DS-MachineAccountQuota: {maq_value} (You cannot add any machines)") + else: + self.context.log.highlight(f"ms-DS-MachineAccountQuota: {maq_value} ") + else: + self.context.log.info("Default: ms-DS-MachineAccountQuota (default: 10 machines per user)") + + self.context.log.highlight("=" * 60) + return + + # Parse the line: SeMachineAccountPrivilege = *S-1-5-32-544,*S-1-5-21-...-512 + parts = machine_account_line.split('=', 1) + if len(parts) != 2: + self.context.log.fail("Could not parse SeMachineAccountPrivilege line") + return + + sids = parts[1].strip() + + if not sids: + self.context.log.info("No users/groups explicitly assigned (using default)") + return + + # Split by comma and process each SID + sid_list = [s.strip().lstrip('*') for s in sids.split(',') if s.strip()] + + if not sid_list: + self.context.log.info("No SIDs found in policy") + return + + # Resolve all SIDs at once using LSA + resolved_names = self.resolve_sids(sid_list) + + # Display results + self.context.log.highlight("Users/Groups that can add computers to the domain:") + self.context.log.highlight("=" * 60) + + for sid, name in zip(sid_list, resolved_names): + if name and name != "": + self.context.log.highlight(f" - {name} ({sid})") + else: + self.context.log.highlight(f" - UNKNOWN ({sid})") + + self.context.log.highlight("=" * 60) + + def resolve_sids(self, sid_list): + """Resolve a list of SIDs to friendly names using LSA""" + if not self.lsa_query: + self.context.log.debug("LSA not available, cannot resolve SIDs") + return ["UNKNOWN"] * len(sid_list) + + try: + # Use LSAQuery to resolve all SIDs at once + resolved_names = self.lsa_query.lookup_sids(sid_list) + return resolved_names + except Exception as e: + self.context.log.debug(f"Error resolving SIDs via LSA: {e}") + # Fallback to returning UNKNOWN for all SIDs + return [""] * len(sid_list) + + def get_machine_account_quota(self): + """Query ms-DS-MachineAccountQuota via LDAP""" + try: + # Build LDAP domain DN + ldap_domain = f"dc={self.connection.targetDomain.replace('.', ',dc=')}" + + self.context.log.debug(f"Querying MachineAccountQuota via LDAP on {self.connection.host}") + + # Connect to LDAP (try LDAPS first, fallback to LDAP) + ldap_server = None + ldap_conn = None + + # Try LDAPS (port 636) + try: + tls = ldap3.Tls(validate=ssl.CERT_NONE, version=ssl.PROTOCOL_TLSv1_2, ciphers="ALL:@SECLEVEL=0") + ldap_server = ldap3.Server(self.connection.host, use_ssl=True, port=636, get_info=ldap3.ALL, tls=tls) + ldap_conn = ldap3.Connection( + ldap_server, + user=f"{self.connection.username}@{self.connection.domain}", + password=self.connection.password + ) + if not ldap_conn.bind(): + ldap_conn = None + else: + self.context.log.debug("Connected via LDAPS") + except Exception as e: + self.context.log.debug(f"LDAPS connection failed: {e}") + + # Fallback to LDAP (port 389) + if not ldap_conn: + try: + ldap_server = ldap3.Server(self.connection.host, port=389, get_info=ldap3.ALL) + ldap_conn = ldap3.Connection( + ldap_server, + user=f"{self.connection.username}@{self.connection.domain}", + password=self.connection.password + ) + if not ldap_conn.bind(): + self.context.log.debug("LDAP connection failed") + return None + else: + self.context.log.debug("Connected via LDAP") + except Exception as e: + self.context.log.debug(f"LDAP connection failed: {e}") + return None + + # Search for ms-DS-MachineAccountQuota + ldap_conn.search( + search_base=ldap_domain, + search_filter="(objectClass=*)", + search_scope=ldap3.BASE, + attributes=["ms-DS-MachineAccountQuota"] + ) + + if ldap_conn.entries: + maq = ldap_conn.entries[0]["ms-DS-MachineAccountQuota"].value + self.context.log.debug(f"MachineAccountQuota retrieved: {maq}") + ldap_conn.unbind() + return int(maq) if maq is not None else None + + ldap_conn.unbind() + return None + + except Exception as e: + self.context.log.debug(f"Error querying MachineAccountQuota: {e}") + return None From de46f2c6bb776461cb703b8a0c3981fff4df34d3 Mon Sep 17 00:00:00 2001 From: Blatzy Date: Wed, 3 Dec 2025 16:17:27 +0100 Subject: [PATCH 2/4] first fix with ruff --- nxc/modules/check-add-computer.py | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/nxc/modules/check-add-computer.py b/nxc/modules/check-add-computer.py index b62036839e..5c15b779b6 100644 --- a/nxc/modules/check-add-computer.py +++ b/nxc/modules/check-add-computer.py @@ -1,7 +1,6 @@ import ssl from io import BytesIO import ldap3 -from impacket.smbconnection import SMBConnection from nxc.helpers.misc import CATEGORY from nxc.protocols.smb.samrfunc import LSAQuery @@ -23,7 +22,6 @@ def options(self, context, module_options): Displays which users/groups can add workstations to the domain. Usage: nxc smb $DC-IP -u 'username' -p 'password' -M check-add-computer """ - pass def on_login(self, context, connection): self.context = context @@ -114,7 +112,7 @@ def check_sysvol_exists(self): try: shares = self.connection.conn.listShares() for share in shares: - if share['shi1_netname'].rstrip('\x00').upper() == 'SYSVOL': + if share["shi1_netname"].rstrip("\x00").upper() == "SYSVOL": return True return False except Exception as e: @@ -123,7 +121,7 @@ def check_sysvol_exists(self): def get_policy_file(self, policy_path): """Retrieve GptTmpl.inf content from given path""" - self.context.log.info(f"Reading policy file...") + self.context.log.info("Reading policy file...") self.context.log.debug(f"Policy path: {policy_path}") try: @@ -140,9 +138,9 @@ def get_policy_file(self, policy_path): self.context.log.debug(f"Read {len(content)} bytes from policy file") # Try different encodings - for encoding in ['utf-16-le', 'utf-16', 'latin-1', 'utf-8']: + for encoding in ["utf-16-le", "utf-16", "latin-1", "utf-8"]: try: - decoded = content.decode(encoding, errors='ignore') + decoded = content.decode(encoding, errors="ignore") if decoded and len(decoded) > 0: self.context.log.debug(f"Successfully decoded with {encoding}") return decoded @@ -154,7 +152,7 @@ def get_policy_file(self, policy_path): except Exception as e: self.context.log.fail(f"Error reading policy file: {e}") - self.context.log.debug(f"Full error details: {type(e).__name__}: {str(e)}") + self.context.log.debug(f"Full error details: {type(e).__name__}: {e!s}") return None def parse_machine_account_privilege(self, content): @@ -165,7 +163,7 @@ def parse_machine_account_privilege(self, content): in_privilege_section = False machine_account_line = None - for line in content.split('\n'): + for line in content.split("\n"): line = line.strip() if line.upper() == "[PRIVILEGE RIGHTS]": @@ -174,11 +172,11 @@ def parse_machine_account_privilege(self, content): if in_privilege_section: # Check if we've moved to another section - if line.startswith('['): + if line.startswith("["): break # Look for SeMachineAccountPrivilege - if line.startswith('SeMachineAccountPrivilege'): + if line.startswith("SeMachineAccountPrivilege"): machine_account_line = line break @@ -203,7 +201,7 @@ def parse_machine_account_privilege(self, content): return # Parse the line: SeMachineAccountPrivilege = *S-1-5-32-544,*S-1-5-21-...-512 - parts = machine_account_line.split('=', 1) + parts = machine_account_line.split("=", 1) if len(parts) != 2: self.context.log.fail("Could not parse SeMachineAccountPrivilege line") return @@ -215,7 +213,7 @@ def parse_machine_account_privilege(self, content): return # Split by comma and process each SID - sid_list = [s.strip().lstrip('*') for s in sids.split(',') if s.strip()] + sid_list = [s.strip().lstrip("*") for s in sids.split(",") if s.strip()] if not sid_list: self.context.log.info("No SIDs found in policy") @@ -228,7 +226,7 @@ def parse_machine_account_privilege(self, content): self.context.log.highlight("Users/Groups that can add computers to the domain:") self.context.log.highlight("=" * 60) - for sid, name in zip(sid_list, resolved_names): + for sid, name in zip(sid_list, resolved_names, strict=False): if name and name != "": self.context.log.highlight(f" - {name} ({sid})") else: From b7bd3a3b40f5008f76b47e667fd2ad9c3f6b39f6 Mon Sep 17 00:00:00 2001 From: Blatzy Date: Wed, 3 Dec 2025 16:23:30 +0100 Subject: [PATCH 3/4] add chech-add-computer to e2e_commands --- tests/e2e_commands.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/e2e_commands.txt b/tests/e2e_commands.txt index c5793dcdbe..0a4806a0d7 100644 --- a/tests/e2e_commands.txt +++ b/tests/e2e_commands.txt @@ -71,6 +71,7 @@ netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS -M add-comp netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS -M add-computer -o NAME="BADPC" PASSWORD="Password2" CHANGEPW=True netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS -M add-computer -o NAME="BADPC" DELETE=True netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS -M bitlocker +netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS -M check-add-computer netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS -M dpapi_hash netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS -M dpapi_hash -o OUTPUTFILE=hashes.txt netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD KERBEROS -M drop-library-ms -o SERVER=127.0.0.1 NAME=test From 729b18cf776718e62c9c071f109776bf12dd50f7 Mon Sep 17 00:00:00 2001 From: Blatzy Date: Wed, 3 Dec 2025 18:06:26 +0100 Subject: [PATCH 4/4] Focusing on parsing GPO and reference to check with nxc ldap module --- nxc/modules/check-add-computer.py | 84 ++----------------------------- 1 file changed, 3 insertions(+), 81 deletions(-) diff --git a/nxc/modules/check-add-computer.py b/nxc/modules/check-add-computer.py index 5c15b779b6..ca2077e982 100644 --- a/nxc/modules/check-add-computer.py +++ b/nxc/modules/check-add-computer.py @@ -1,6 +1,4 @@ -import ssl from io import BytesIO -import ldap3 from nxc.helpers.misc import CATEGORY from nxc.protocols.smb.samrfunc import LSAQuery @@ -18,7 +16,7 @@ class NXCModule: def options(self, context, module_options): """ - Check the SeMachineAccountPrivilege in the Default Domain Controllers Policy and MachineAccountQuota via LDAP. + Check the SeMachineAccountPrivilege in the Default Domain Controllers Policy. Displays which users/groups can add workstations to the domain. Usage: nxc smb $DC-IP -u 'username' -p 'password' -M check-add-computer """ @@ -67,7 +65,9 @@ def on_login(self, context, connection): buf = BytesIO() connection.conn.getFile("SYSVOL", dc_policy_path, buf.write) if buf.getvalue(): + self.context.log.highlight("") self.context.log.highlight("Found Default Domain Controllers Policy via static path") + self.context.log.highlight("(don't forget to check MAQ : nxc ldap <...> -M MAQ)") else: dc_policy_path = None except Exception as e: @@ -185,18 +185,6 @@ def parse_machine_account_privilege(self, content): self.context.log.highlight("=" * 60) self.context.log.highlight("Default configuration applies:") self.context.log.highlight(" - Authenticated Users can join computers to the domain") - self.context.log.highlight("") - - # Query MachineAccountQuota - maq_value = self.get_machine_account_quota() - if maq_value is not None: - if maq_value == 0: - self.context.log.highlight(f"ms-DS-MachineAccountQuota: {maq_value} (You cannot add any machines)") - else: - self.context.log.highlight(f"ms-DS-MachineAccountQuota: {maq_value} ") - else: - self.context.log.info("Default: ms-DS-MachineAccountQuota (default: 10 machines per user)") - self.context.log.highlight("=" * 60) return @@ -249,69 +237,3 @@ def resolve_sids(self, sid_list): # Fallback to returning UNKNOWN for all SIDs return [""] * len(sid_list) - def get_machine_account_quota(self): - """Query ms-DS-MachineAccountQuota via LDAP""" - try: - # Build LDAP domain DN - ldap_domain = f"dc={self.connection.targetDomain.replace('.', ',dc=')}" - - self.context.log.debug(f"Querying MachineAccountQuota via LDAP on {self.connection.host}") - - # Connect to LDAP (try LDAPS first, fallback to LDAP) - ldap_server = None - ldap_conn = None - - # Try LDAPS (port 636) - try: - tls = ldap3.Tls(validate=ssl.CERT_NONE, version=ssl.PROTOCOL_TLSv1_2, ciphers="ALL:@SECLEVEL=0") - ldap_server = ldap3.Server(self.connection.host, use_ssl=True, port=636, get_info=ldap3.ALL, tls=tls) - ldap_conn = ldap3.Connection( - ldap_server, - user=f"{self.connection.username}@{self.connection.domain}", - password=self.connection.password - ) - if not ldap_conn.bind(): - ldap_conn = None - else: - self.context.log.debug("Connected via LDAPS") - except Exception as e: - self.context.log.debug(f"LDAPS connection failed: {e}") - - # Fallback to LDAP (port 389) - if not ldap_conn: - try: - ldap_server = ldap3.Server(self.connection.host, port=389, get_info=ldap3.ALL) - ldap_conn = ldap3.Connection( - ldap_server, - user=f"{self.connection.username}@{self.connection.domain}", - password=self.connection.password - ) - if not ldap_conn.bind(): - self.context.log.debug("LDAP connection failed") - return None - else: - self.context.log.debug("Connected via LDAP") - except Exception as e: - self.context.log.debug(f"LDAP connection failed: {e}") - return None - - # Search for ms-DS-MachineAccountQuota - ldap_conn.search( - search_base=ldap_domain, - search_filter="(objectClass=*)", - search_scope=ldap3.BASE, - attributes=["ms-DS-MachineAccountQuota"] - ) - - if ldap_conn.entries: - maq = ldap_conn.entries[0]["ms-DS-MachineAccountQuota"].value - self.context.log.debug(f"MachineAccountQuota retrieved: {maq}") - ldap_conn.unbind() - return int(maq) if maq is not None else None - - ldap_conn.unbind() - return None - - except Exception as e: - self.context.log.debug(f"Error querying MachineAccountQuota: {e}") - return None