diff --git a/transformers/transformers.py b/transformers/transformers.py index 2914f535..d3d63948 100644 --- a/transformers/transformers.py +++ b/transformers/transformers.py @@ -98,6 +98,67 @@ def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: return item +class NetskopePatternNormalizer(BaseTransformer): + """Normalize Netskope patterns and convert wildcards to regex formats.""" + + def wildcard_to_regex(self, pattern: str) -> str: + """ + Convert a Netskope wildcard pattern into a regex expression. + + Args: + pattern: Input wildcard pattern. + + Returns: + Regex-safe version of the pattern. + """ + if pattern.startswith("*."): + domain = pattern[2:] + domain = domain.replace(".", r"\.") + return rf"^([^.]+\.)*{domain}$" + else: + escaped = pattern.replace(".", r"\.") + return rf"^{escaped}$" + + def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: + """ + Transform Netskope patterns into the correct regex or literal format. + + Args: + item: Input dictionary. + + Returns: + Updated dictionary with Netskope-normalized pattern. + """ + raw = item.get("pattern", "") + utype = item.get("type", "literal") + + # literal/exact + if utype in ("literal", "exact"): + final = raw + item["netskope_type"] = "exact" + + # wildcard + elif utype == "wildcard": + final = self.wildcard_to_regex(raw) + item["netskope_type"] = "regex" + + # regex + elif utype == "regex": + final = raw + item["netskope_type"] = "regex" + + # substring → literal + else: + final = raw + item["netskope_type"] = "exact" + + # escape ONCE for JSON + final = final.replace("\\", "\\\\") + item["pattern"] = final + + return item + + class TypeMapper(BaseTransformer): """Map vendor pattern types to universal types (or reverse).""" @@ -222,120 +283,17 @@ def apply_transformers( # ---------------- VENDOR MAPPINGS ---------------- - FORTINET_ACTION_MAP = {"block": "block", "allow": "allow", "monitor": "monitor"} -FORTINET_CATEGORY_MAP = { - "3": "malware", - "4": "phishing", - "5": "gambling", - "default": "uncategorized", -} -FORTINET_TYPE_MAP = { - "simple": "literal", - "wildcard": "wildcard", - "regex": "regex", - "substring": "substring", -} +FORTINET_CATEGORY_MAP = {"3": "malware", "4": "phishing", "5": "gambling", "default": "uncategorized"} +FORTINET_TYPE_MAP = {"simple": "literal", "wildcard": "wildcard", "regex": "regex", "substring": "substring"} + +# ---------------- NETSKOPE ---------------- NETSKOPE_ACTION_MAP = {"block": "deny", "allow": "allow", "monitor": "monitor"} -NETSKOPE_CATEGORY_MAP = { - "malware": "malware", - "phishing": "phishing", - "gambling": "gambling", - "uncategorized": "uncategorized", -} -NETSKOPE_TYPE_MAP = { - "exact": "literal", - "wildcard": "wildcard", - "regex": "regex", - "substring": "substring", -} - -ZSCALER_ACTION_MAP = {"block": "BLOCK", "allow": "ALLOW", "monitor": "MONITOR"} -ZSCALER_CATEGORY_MAP = { - "malware": "malware", - "phishing": "phishing", - "gambling": "gambling", - "uncategorized": "uncategorized", -} -ZSCALER_TYPE_MAP = {"STRING": "literal", "WILDCARD": "wildcard", "REGEX": "regex"} - -PRISMA_ACTION_MAP = {"block": "deny", "allow": "allow", "monitor": "alert"} -PRISMA_CATEGORY_MAP = { - "malware": "malware", - "phishing": "phishing", - "gambling": "gambling", - "uncategorized": "uncategorized", -} -PRISMA_TYPE_MAP = { - "simple": "literal", - "wildcard": "wildcard", - "regex": "regex", - "substring": "substring", -} - - -# ---------------- PIPELINE DEFINITIONS ---------------- - - -VENDOR_TO_UNIVERSAL_PIPELINES = { - "fortinet": [ - ActionMapper(FORTINET_ACTION_MAP), - PatternNormalizer(), - TypeMapper(FORTINET_TYPE_MAP), - CategoryMapper(FORTINET_CATEGORY_MAP), - MetadataEnricher("fortinet"), - ], - "netskope": [ - ActionMapper(NETSKOPE_ACTION_MAP), - PatternNormalizer(), - TypeMapper(NETSKOPE_TYPE_MAP), - CategoryMapper(NETSKOPE_CATEGORY_MAP), - MetadataEnricher("netskope"), - ], - "zscaler": [ - ActionMapper(ZSCALER_ACTION_MAP), - PatternNormalizer(), - TypeMapper(ZSCALER_TYPE_MAP), - CategoryMapper(ZSCALER_CATEGORY_MAP), - MetadataEnricher("zscaler"), - ], - "prisma": [ - ActionMapper(PRISMA_ACTION_MAP), - PatternNormalizer(), - TypeMapper(PRISMA_TYPE_MAP), - CategoryMapper(PRISMA_CATEGORY_MAP), - MetadataEnricher("prisma"), - ], -} - -UNIVERSAL_TO_VENDOR_PIPELINES = { - "fortinet": [ - ActionMapper({v: k for k, v in FORTINET_ACTION_MAP.items()}), - PatternNormalizer(), - TypeMapper({v: k for k, v in FORTINET_TYPE_MAP.items()}), - CategoryMapper({v: k for k, v in FORTINET_CATEGORY_MAP.items()}), - MetadataEnricher("fortinet"), - ], - "netskope": [ - ActionMapper({v: k for k, v in NETSKOPE_ACTION_MAP.items()}), - PatternNormalizer(), - TypeMapper({v: k for k, v in NETSKOPE_TYPE_MAP.items()}), - CategoryMapper({v: k for k, v in NETSKOPE_CATEGORY_MAP.items()}), - MetadataEnricher("netskope"), - ], - "zscaler": [ - ActionMapper({v: k for k, v in ZSCALER_ACTION_MAP.items()}), - PatternNormalizer(), - TypeMapper({v: k for k, v in ZSCALER_TYPE_MAP.items()}), - CategoryMapper({v: k for k, v in ZSCALER_CATEGORY_MAP.items()}), - MetadataEnricher("zscaler"), - ], - "prisma": [ - ActionMapper({v: k for k, v in PRISMA_ACTION_MAP.items()}), - PatternNormalizer(), - TypeMapper({v: k for k, v in PRISMA_TYPE_MAP.items()}), - CategoryMapper({v: k for k, v in PRISMA_CATEGORY_MAP.items()}), - MetadataEnricher("prisma"), - ], -} +NETSKOPE_CATEGORY_MAP = {"malware": "malware", "phishing": "phishing", "gambling": "gambling", "uncategorized": "uncategorized"} +NETSKOPE_TO_UNIVERSAL_TYPE_MAP = {"exact": "literal", "regex": "regex"} +UNIVERSAL_TO_NETSKOPE_TYPE_MAP = {"literal": "exact", "regex": "regex", "wildcard": "regex", "substring": "exact"} + +# ---------------- ZSCALER ---------------- + +ZSCALER_ACTION_MAP = {"block": "B