Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 72 additions & 114 deletions transformers/transformers.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,67 @@ def transform(self, item: Dict[str, Any]) -> Dict[str, Any]:
return item


class NetskopePatternNormalizer(BaseTransformer):
"""Normalize Netskope patterns and convert wildcards to regex formats."""

def wildcard_to_regex(self, pattern: str) -> str:
"""
Convert a Netskope wildcard pattern into a regex expression.

Args:
pattern: Input wildcard pattern.

Returns:
Regex-safe version of the pattern.
"""
if pattern.startswith("*."):
domain = pattern[2:]
domain = domain.replace(".", r"\.")
return rf"^([^.]+\.)*{domain}$"
else:
escaped = pattern.replace(".", r"\.")
return rf"^{escaped}$"

def transform(self, item: Dict[str, Any]) -> Dict[str, Any]:
"""
Transform Netskope patterns into the correct regex or literal format.

Args:
item: Input dictionary.

Returns:
Updated dictionary with Netskope-normalized pattern.
"""
raw = item.get("pattern", "")
utype = item.get("type", "literal")

# literal/exact
if utype in ("literal", "exact"):
final = raw
item["netskope_type"] = "exact"

# wildcard
elif utype == "wildcard":
final = self.wildcard_to_regex(raw)
item["netskope_type"] = "regex"

# regex
elif utype == "regex":
final = raw
item["netskope_type"] = "regex"

# substring → literal
else:
final = raw
item["netskope_type"] = "exact"

# escape ONCE for JSON
final = final.replace("\\", "\\\\")
item["pattern"] = final

return item


class TypeMapper(BaseTransformer):
"""Map vendor pattern types to universal types (or reverse)."""

Expand Down Expand Up @@ -222,120 +283,17 @@ def apply_transformers(

# ---------------- VENDOR MAPPINGS ----------------


FORTINET_ACTION_MAP = {"block": "block", "allow": "allow", "monitor": "monitor"}
FORTINET_CATEGORY_MAP = {
"3": "malware",
"4": "phishing",
"5": "gambling",
"default": "uncategorized",
}
FORTINET_TYPE_MAP = {
"simple": "literal",
"wildcard": "wildcard",
"regex": "regex",
"substring": "substring",
}
FORTINET_CATEGORY_MAP = {"3": "malware", "4": "phishing", "5": "gambling", "default": "uncategorized"}
FORTINET_TYPE_MAP = {"simple": "literal", "wildcard": "wildcard", "regex": "regex", "substring": "substring"}

# ---------------- NETSKOPE ----------------

NETSKOPE_ACTION_MAP = {"block": "deny", "allow": "allow", "monitor": "monitor"}
NETSKOPE_CATEGORY_MAP = {
"malware": "malware",
"phishing": "phishing",
"gambling": "gambling",
"uncategorized": "uncategorized",
}
NETSKOPE_TYPE_MAP = {
"exact": "literal",
"wildcard": "wildcard",
"regex": "regex",
"substring": "substring",
}

ZSCALER_ACTION_MAP = {"block": "BLOCK", "allow": "ALLOW", "monitor": "MONITOR"}
ZSCALER_CATEGORY_MAP = {
"malware": "malware",
"phishing": "phishing",
"gambling": "gambling",
"uncategorized": "uncategorized",
}
ZSCALER_TYPE_MAP = {"STRING": "literal", "WILDCARD": "wildcard", "REGEX": "regex"}

PRISMA_ACTION_MAP = {"block": "deny", "allow": "allow", "monitor": "alert"}
PRISMA_CATEGORY_MAP = {
"malware": "malware",
"phishing": "phishing",
"gambling": "gambling",
"uncategorized": "uncategorized",
}
PRISMA_TYPE_MAP = {
"simple": "literal",
"wildcard": "wildcard",
"regex": "regex",
"substring": "substring",
}


# ---------------- PIPELINE DEFINITIONS ----------------


VENDOR_TO_UNIVERSAL_PIPELINES = {
"fortinet": [
ActionMapper(FORTINET_ACTION_MAP),
PatternNormalizer(),
TypeMapper(FORTINET_TYPE_MAP),
CategoryMapper(FORTINET_CATEGORY_MAP),
MetadataEnricher("fortinet"),
],
"netskope": [
ActionMapper(NETSKOPE_ACTION_MAP),
PatternNormalizer(),
TypeMapper(NETSKOPE_TYPE_MAP),
CategoryMapper(NETSKOPE_CATEGORY_MAP),
MetadataEnricher("netskope"),
],
"zscaler": [
ActionMapper(ZSCALER_ACTION_MAP),
PatternNormalizer(),
TypeMapper(ZSCALER_TYPE_MAP),
CategoryMapper(ZSCALER_CATEGORY_MAP),
MetadataEnricher("zscaler"),
],
"prisma": [
ActionMapper(PRISMA_ACTION_MAP),
PatternNormalizer(),
TypeMapper(PRISMA_TYPE_MAP),
CategoryMapper(PRISMA_CATEGORY_MAP),
MetadataEnricher("prisma"),
],
}

UNIVERSAL_TO_VENDOR_PIPELINES = {
"fortinet": [
ActionMapper({v: k for k, v in FORTINET_ACTION_MAP.items()}),
PatternNormalizer(),
TypeMapper({v: k for k, v in FORTINET_TYPE_MAP.items()}),
CategoryMapper({v: k for k, v in FORTINET_CATEGORY_MAP.items()}),
MetadataEnricher("fortinet"),
],
"netskope": [
ActionMapper({v: k for k, v in NETSKOPE_ACTION_MAP.items()}),
PatternNormalizer(),
TypeMapper({v: k for k, v in NETSKOPE_TYPE_MAP.items()}),
CategoryMapper({v: k for k, v in NETSKOPE_CATEGORY_MAP.items()}),
MetadataEnricher("netskope"),
],
"zscaler": [
ActionMapper({v: k for k, v in ZSCALER_ACTION_MAP.items()}),
PatternNormalizer(),
TypeMapper({v: k for k, v in ZSCALER_TYPE_MAP.items()}),
CategoryMapper({v: k for k, v in ZSCALER_CATEGORY_MAP.items()}),
MetadataEnricher("zscaler"),
],
"prisma": [
ActionMapper({v: k for k, v in PRISMA_ACTION_MAP.items()}),
PatternNormalizer(),
TypeMapper({v: k for k, v in PRISMA_TYPE_MAP.items()}),
CategoryMapper({v: k for k, v in PRISMA_CATEGORY_MAP.items()}),
MetadataEnricher("prisma"),
],
}
NETSKOPE_CATEGORY_MAP = {"malware": "malware", "phishing": "phishing", "gambling": "gambling", "uncategorized": "uncategorized"}
NETSKOPE_TO_UNIVERSAL_TYPE_MAP = {"exact": "literal", "regex": "regex"}
UNIVERSAL_TO_NETSKOPE_TYPE_MAP = {"literal": "exact", "regex": "regex", "wildcard": "regex", "substring": "exact"}

# ---------------- ZSCALER ----------------

ZSCALER_ACTION_MAP = {"block": "B
Loading