Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/rat_king_parser/config_parser/rat_config_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,8 @@ def _attempt_decryption(self, item_data: dict[str, Any]) -> dict[str, Any]:
logger.debug(f"Decryption failed with decryptor {decryptor} : {e}")
self._decryptor = None

if self._decryptor is None:
raise ConfigParserException("All decryptors failed")
return item_data
# If we exit the loop without returning, no decryptor succeeded
raise ConfigParserException("All decryptors failed")

def _remap_config(
self, decoded_config: dict[str, Any], config_fields_map: dict[int, str]
Expand Down
15 changes: 11 additions & 4 deletions src/rat_king_parser/config_parser/utils/config_normalization.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,23 @@
"Group": ("Group", "Groub", "GroupTag", "TAG"),
}

# Strip underscores from aliases at build time so the lookup is consistent
# with the underscore-stripped input key (e.g. so "mutex_string" actually
# resolves to "Mutex")
_normalized_keys_map = {
alias: k for k, aliases in normalized_keys.items() for alias in aliases
alias.replace("_", ""): k
for k, aliases in normalized_keys.items()
for alias in aliases
}
Comment thread
jeFF0Falltrades marked this conversation as resolved.


# Normalizes config keys/values for easier mapping
def check_key_n_value(key: str, value: Any) -> tuple[str, Any]:
key_clean = key.replace("_", "")
if key_clean in _normalized_keys_map:
key = _normalized_keys_map[key_clean]
# Always strip underscores from the returned key to preserve the prior
# behavior of this function for unrecognized keys
key = key.replace("_", "")
if key in _normalized_keys_map:
key = _normalized_keys_map[key]
Comment thread
jeFF0Falltrades marked this conversation as resolved.

if key in ("Hosts", "Ports") and isinstance(value, str):
if value not in ("null", "false"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,14 @@ def _derive_aes_passphrase_candidates(self, key_val: str) -> list[bytes]:

# Decrypts encrypted config values with the provided cipher data
def decrypt_encrypted_strings(
self, encrypted_strings: dict[str, str]) -> dict[str, str]:
self, encrypted_strings: dict[str, str]
) -> dict[str, str]:
logger.debug("Decrypting encrypted strings...")
if self._key_candidates is None:
self._key_candidates = self._get_aes_key_candidates(encrypted_strings)

decrypted_config_strings = {}
attempted_decryptions = 0
successfully_decrypted_count = 0
successful_key = None

Expand All @@ -140,9 +142,11 @@ def decrypt_encrypted_strings(
decrypted_config_strings[k] = v
continue

# Otherwise, extract the IV from the 16 bytes after the HMAC
# (first 32 bytes) and the ciphertext from the rest of the data
# after the IV, and run the decryption
# Past this point we are actually attempting a decryption
attempted_decryptions += 1
# Extract the IV from the 16 bytes after the HMAC (first 32 bytes)
# and the ciphertext from the rest of the data after the IV, and
# run the decryption
iv, ciphertext = decoded_val[32:48], decoded_val[48:]
result, last_exc = None, None

Expand Down Expand Up @@ -179,7 +183,12 @@ def decrypt_encrypted_strings(
logger.debug(f"Key: {k}, Value: {result}")
decrypted_config_strings[k] = result

if successfully_decrypted_count == 0:
# Only raise if we actually tried to decrypt at least one string and
# none succeeded. Template/builder samples have 0 attempts (all values
# are placeholders that fail the b64/length filter above), and should
# fall through cleanly so this AES decryptor remains the active one
# and its salt is still reported.
if attempted_decryptions > 0 and successfully_decrypted_count == 0:
raise ConfigParserException(
"No strings could be decrypted with the available keys"
)
Expand All @@ -193,7 +202,8 @@ def decrypt_encrypted_strings(

# Extracts AES key candidates from the payload
def _get_aes_key_candidates(
self, encrypted_strings: dict[str, str]) -> list[bytes]:
self, encrypted_strings: dict[str, str]
) -> list[bytes]:
logger.debug("Extracting AES key candidates...")
keys = []

Expand Down Expand Up @@ -284,7 +294,9 @@ def _get_aes_metadata(self) -> None:
if not self._metadata_candidates:
raise ConfigParserException("Could not identify AES metadata")

# Extraction of common metadata
# Key size, block size, and algo are properties of the embedded AES
# class definition, not of any single salt/iter candidate, so they are
# extracted once and shared across all metadata candidates
self._key_size, self._block_size, self._aes_algo = (
self._get_aes_key_and_block_size_and_algo()
)
Expand Down
Loading