diff --git a/transformers/transformers.py b/transformers/transformers.py index 2914f535..1ac15a7b 100644 --- a/transformers/transformers.py +++ b/transformers/transformers.py @@ -98,6 +98,67 @@ def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: return item +class NetskopePatternNormalizer(BaseTransformer): + """Normalize Netskope patterns and convert wildcards to regex formats.""" + + def wildcard_to_regex(self, pattern: str) -> str: + """ + Convert a Netskope wildcard pattern into a regex expression. + + Args: + pattern: Input wildcard pattern. + + Returns: + Regex-safe version of the pattern. + """ + if pattern.startswith("*."): + domain = pattern[2:] + domain = domain.replace(".", r"\.") + return rf"^([^.]+\.)*{domain}$" + else: + escaped = pattern.replace(".", r"\.") + return rf"^{escaped}$" + + def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: + """ + Transform Netskope patterns into the correct regex or literal format. + + Args: + item: Input dictionary. + + Returns: + Updated dictionary with Netskope-normalized pattern. + """ + raw = item.get("pattern", "") + utype = item.get("type", "literal") + + # literal/exact + if utype in ("literal", "exact"): + final = raw + item["netskope_type"] = "exact" + + # wildcard + elif utype == "wildcard": + final = self.wildcard_to_regex(raw) + item["netskope_type"] = "regex" + + # regex + elif utype == "regex": + final = raw + item["netskope_type"] = "regex" + + # substring → literal + else: + final = raw + item["netskope_type"] = "exact" + + # escape ONCE for JSON + final = final.replace("\\", "\\\\") + item["pattern"] = final + + return item + + class TypeMapper(BaseTransformer): """Map vendor pattern types to universal types (or reverse).""" @@ -222,62 +283,31 @@ def apply_transformers( # ---------------- VENDOR MAPPINGS ---------------- - FORTINET_ACTION_MAP = {"block": "block", "allow": "allow", "monitor": "monitor"} -FORTINET_CATEGORY_MAP = { - "3": "malware", - "4": "phishing", - "5": "gambling", - "default": "uncategorized", -} -FORTINET_TYPE_MAP = { - "simple": "literal", - "wildcard": "wildcard", - "regex": "regex", - "substring": "substring", -} +FORTINET_CATEGORY_MAP = {"3": "malware", "4": "phishing", "5": "gambling", "default": "uncategorized"} +FORTINET_TYPE_MAP = {"simple": "literal", "wildcard": "wildcard", "regex": "regex", "substring": "substring"} + +# ---------------- NETSKOPE ---------------- NETSKOPE_ACTION_MAP = {"block": "deny", "allow": "allow", "monitor": "monitor"} -NETSKOPE_CATEGORY_MAP = { - "malware": "malware", - "phishing": "phishing", - "gambling": "gambling", - "uncategorized": "uncategorized", -} -NETSKOPE_TYPE_MAP = { - "exact": "literal", - "wildcard": "wildcard", - "regex": "regex", - "substring": "substring", -} +NETSKOPE_CATEGORY_MAP = {"malware": "malware", "phishing": "phishing", "gambling": "gambling", "uncategorized": "uncategorized"} +NETSKOPE_TO_UNIVERSAL_TYPE_MAP = {"exact": "literal", "regex": "regex"} +UNIVERSAL_TO_NETSKOPE_TYPE_MAP = {"literal": "exact", "regex": "regex", "wildcard": "regex", "substring": "exact"} + +# ---------------- ZSCALER ---------------- ZSCALER_ACTION_MAP = {"block": "BLOCK", "allow": "ALLOW", "monitor": "MONITOR"} -ZSCALER_CATEGORY_MAP = { - "malware": "malware", - "phishing": "phishing", - "gambling": "gambling", - "uncategorized": "uncategorized", -} +ZSCALER_CATEGORY_MAP = {"malware": "malware", "phishing": "phishing", "gambling": "gambling", "uncategorized": "uncategorized"} ZSCALER_TYPE_MAP = {"STRING": "literal", "WILDCARD": "wildcard", "REGEX": "regex"} -PRISMA_ACTION_MAP = {"block": "deny", "allow": "allow", "monitor": "alert"} -PRISMA_CATEGORY_MAP = { - "malware": "malware", - "phishing": "phishing", - "gambling": "gambling", - "uncategorized": "uncategorized", -} -PRISMA_TYPE_MAP = { - "simple": "literal", - "wildcard": "wildcard", - "regex": "regex", - "substring": "substring", -} +# ---------------- PALO ALTO ---------------- +PRISMA_ACTION_MAP = {"block": "deny", "allow": "allow", "monitor": "alert"} +PRISMA_CATEGORY_MAP = {"malware": "malware", "phishing": "phishing", "gambling": "gambling", "uncategorized": "uncategorized"} +PRISMA_TYPE_MAP = {"simple": "literal", "wildcard": "wildcard", "regex": "regex", "substring": "substring"} # ---------------- PIPELINE DEFINITIONS ---------------- - VENDOR_TO_UNIVERSAL_PIPELINES = { "fortinet": [ ActionMapper(FORTINET_ACTION_MAP), @@ -289,7 +319,7 @@ def apply_transformers( "netskope": [ ActionMapper(NETSKOPE_ACTION_MAP), PatternNormalizer(), - TypeMapper(NETSKOPE_TYPE_MAP), + TypeMapper(NETSKOPE_TO_UNIVERSAL_TYPE_MAP), CategoryMapper(NETSKOPE_CATEGORY_MAP), MetadataEnricher("netskope"), ], @@ -319,8 +349,8 @@ def apply_transformers( ], "netskope": [ ActionMapper({v: k for k, v in NETSKOPE_ACTION_MAP.items()}), - PatternNormalizer(), - TypeMapper({v: k for k, v in NETSKOPE_TYPE_MAP.items()}), + NetskopePatternNormalizer(), + TypeMapper(UNIVERSAL_TO_NETSKOPE_TYPE_MAP), CategoryMapper({v: k for k, v in NETSKOPE_CATEGORY_MAP.items()}), MetadataEnricher("netskope"), ],