Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 78 additions & 53 deletions src/rat_king_parser/config_parser/rat_config_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from collections import OrderedDict
from logging import getLogger
from os.path import isfile
from re import DOTALL, compile, search
Expand Down Expand Up @@ -77,11 +76,13 @@ def __init__(
}
self.remap_config = remap_config
self.preserve_obfuscated_keys = preserve_obfuscated_keys
self._dnpp: DotNetPEPayload | None = None
self._decryptor: ConfigDecryptor | None = None
try:
if data is None and not isfile(file_path):
raise ConfigParserException("File not found")
# Filled in _decrypt_and_decode_config()
self._incompatible_decryptors: list[int] = []
self._incompatible_decryptors: list[Any] = []
try:
self._dnpp = DotNetPEPayload(file_path, yara_rule, data)
except Exception as e:
Expand All @@ -90,7 +91,6 @@ def __init__(
self.report["yara_possible_family"] = self._dnpp.yara_match

# Assigned in _decrypt_and_decode_config()
self._decryptor: ConfigDecryptor = None
self.report["config"] = self._get_config()
key_hex = "None"
if self._decryptor is not None and self._decryptor.key is not None:
Expand Down Expand Up @@ -128,38 +128,7 @@ def _decrypt_and_decode_config(
# Translate config value RVAs to string values
for k in item_data:
item_data[k] = self._dnpp.user_string_from_rva(item_data[k])

# Attempt to decrypt encrypted values
for decryptor in SUPPORTED_DECRYPTORS:
if decryptor in self._incompatible_decryptors:
continue

if self._decryptor is None:
# Try to instantiate the selected decryptor
# Add to incompatible list and move on upon failure
try:
self._decryptor = decryptor(self._dnpp)
except IncompatibleDecryptorException as ide:
logger.debug(
f"Decryptor incompatible {decryptor} : {ide}"
)
self._incompatible_decryptors.append(decryptor)
continue
try:
# Try to decrypt the encrypted strings
# Continue to next compatible decryptor on failure
item_data = self._decryptor.decrypt_encrypted_strings(
item_data
)
break
except Exception as e:
logger.debug(
f"Decryption failed with decryptor {decryptor} : {e}"
)
self._decryptor = None

if self._decryptor is None:
raise ConfigParserException("All decryptors failed")
item_data = self._attempt_decryption(item_data)

elif isinstance(item, config_item.ByteArrayConfigItem):
for k in item_data:
Expand All @@ -169,32 +138,88 @@ def _decrypt_and_decode_config(
).hex()

decoded_config.update(item_data)

# UrlHost is a marker of a special case until this can be standardized
if len(decoded_config) < min_config_len and "UrlHost" not in decoded_config:
raise ConfigParserException(
f"Minimum threshold of config items not met: {len(decoded_config)}/{min_config_len}"
)
if self.remap_config:
sorted_decoded_config = OrderedDict()
normalized_fields = []
for k in sorted(config_fields_map.keys()):
key_name = config_fields_map[k]
value = decoded_config[key_name]
key_normalized, value = check_key_n_value(key_name, value)
if key_normalized != key_name:
normalized_fields.append(key_name)
sorted_decoded_config[key_normalized] = value
# Ensure config items added by decryptors dynamically are preserved
sorted_decoded_config.update(
{
key: decoded_config[key]
for key in decoded_config
if key not in sorted_decoded_config and key not in normalized_fields
}
)
return sorted_decoded_config
return self._remap_config(decoded_config, config_fields_map)
return decoded_config

def _attempt_decryption(self, item_data: dict[str, Any]) -> dict[str, Any]:
# Attempt to decrypt encrypted values
for decryptor in SUPPORTED_DECRYPTORS:
if decryptor in self._incompatible_decryptors:
continue

if self._decryptor is None:
# Try to instantiate the selected decryptor
# Add to incompatible list and move on upon failure
try:
self._decryptor = decryptor(self._dnpp)
except IncompatibleDecryptorException as ide:
logger.debug(f"Decryptor incompatible {decryptor} : {ide}")
self._incompatible_decryptors.append(decryptor)
continue
try:
# Try to decrypt the encrypted strings
# Continue to next compatible decryptor on failure
return self._decryptor.decrypt_encrypted_strings(item_data)
except Exception as e:
logger.debug(f"Decryption failed with decryptor {decryptor} : {e}")
self._decryptor = None

if self._decryptor is None:
raise ConfigParserException("All decryptors failed")
return item_data

def _remap_config(
self, decoded_config: dict[str, Any], config_fields_map: dict[int, str]
) -> dict[str, Any]:
remapped_config = {}
normalized_fields = []
for k in sorted(config_fields_map.keys()):
key_name = config_fields_map[k]
value = decoded_config[key_name]

# Run your normalization (e.g. converting HostsFE -> Hosts)
key_normalized, value = check_key_n_value(key_name, value)

if key_normalized != key_name:
normalized_fields.append(key_name)

# --- LOGIC TO APPEND INSTEAD OF OVERWRITE ---
if key_normalized in remapped_config:
existing_val = remapped_config[key_normalized]

# Case 1: Values are Strings (e.g. "1.2.3.4:80")
if isinstance(existing_val, str) and value:
# Append with a comma separator
new_val = ",".join(map(str, value)) if isinstance(value, list) else value
remapped_config[key_normalized] = f"{existing_val},{new_val}"

# Case 2: Values are Lists (e.g. ["1.2.3.4:80"])
elif isinstance(existing_val, list):
# If the new value is also a list, extend; otherwise append
if isinstance(value, list):
remapped_config[key_normalized] = existing_val + value
else:
remapped_config[key_normalized].append(value)
else:
# Key does not exist yet, create it
remapped_config[key_normalized] = value
# Ensure config items added by decryptors dynamically are preserved
remapped_config.update(
{
key: decoded_config[key]
for key in decoded_config
if key not in remapped_config and key not in normalized_fields
}
)
return remapped_config

# Searches for the RAT configuration section, using the VerifyHash() marker
# or brute-force, returning the decrypted config on success
def _get_config(self) -> dict[str, Any]:
Expand Down
14 changes: 8 additions & 6 deletions src/rat_king_parser/config_parser/utils/config_normalization.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,24 @@
from typing import Any

normalized_keys = {
"Hosts": ("HOSTS", "Hosts", "ServerIp", "hardcodedhosts", "PasteUrl"),
"Hosts": ("HOSTS", "Hosts", "HostsFE", "ServerIp", "hardcodedhosts", "PasteUrl"),
"Ports": ("Port", "Ports", "ServerPort"),
"Mutex": ("MTX", "MUTEX", "Mutex", "mutex_string"),
"Version": ("VERSION", "Version"),
"Key": ("Key", "key", "EncryptionKey", "ENCRYPTIONKEY"),
"Group": ("Group", "Groub", "GroupTag", "TAG"),
}

_normalized_keys_map = {
alias: k for k, aliases in normalized_keys.items() for alias in aliases
}


# Normalizes config keys/values for easier mapping
def check_key_n_value(key: str, value: Any) -> tuple[str, Any]:
key = key.replace("_", "")
for k, v in normalized_keys.items():
if key in v:
key = k
break
key_clean = key.replace("_", "")
if key_clean in _normalized_keys_map:
key = _normalized_keys_map[key_clean]

if key in ("Hosts", "Ports") and isinstance(value, str):
if value not in ("null", "false"):
Expand Down
Loading