From 85377a1521e16b27cdfeeb769f11cba4725fa090 Mon Sep 17 00:00:00 2001 From: pierrerondel Date: Fri, 12 Dec 2025 10:45:20 +0100 Subject: [PATCH 1/3] Update transformers.py modify the mapping for netskope and add an URL pattern transform for netskope --- transformers/transformers.py | 126 +++++++++++++++++------------------ 1 file changed, 60 insertions(+), 66 deletions(-) diff --git a/transformers/transformers.py b/transformers/transformers.py index 2914f535..1143cdbd 100644 --- a/transformers/transformers.py +++ b/transformers/transformers.py @@ -1,6 +1,5 @@ """ Single-file SDK for bidirectional URL-filter migration transformers. - This module provides: - Universal intermediate URL-filter model - Vendor-specific transformers for: Fortinet, Netskope, Zscaler, Prisma Access @@ -42,13 +41,10 @@ class BaseTransformer: def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: """ Transform a single item. - Args: item: Input dictionary from vendor configuration. - Returns: Transformed dictionary. - Raises: NotImplementedError: If the subclass does not override this method. """ @@ -61,7 +57,6 @@ class ActionMapper(BaseTransformer): def __init__(self, action_map: Dict[str, str]): """ Initialize the ActionMapper. - Args: action_map: Mapping from vendor action → universal action. """ @@ -70,10 +65,8 @@ def __init__(self, action_map: Dict[str, str]): def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: """ Map the action field using the configured mapping. - Args: item: Input dictionary. - Returns: Dictionary with updated action. """ @@ -87,16 +80,56 @@ class PatternNormalizer(BaseTransformer): def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: """ Normalize the pattern field. - Args: item: Input dictionary. - Returns: Updated dictionary with normalized pattern. """ item["pattern"] = item.get("pattern", "") return item +class NetskopePatternNormalizer(BaseTransformer): + """Normalize patterns for Netskope""" + + def wildcard_to_regex(self, pattern: str) -> str: + if pattern.startswith("*."): + domain = pattern[2:] + domain = domain.replace(".", r"\.") + return rf"^([^.]+\.)*{domain}$" + else: + escaped = pattern.replace(".", r"\.") + return rf"^{escaped}$" + + def transform(self, item): + raw = item.get("pattern", "") + utype = item.get("type", "literal") + + # literal/exact + if utype in ("literal", "exact"): + final = raw + item["netskope_type"] = "exact" + + # wildcard + elif utype == "wildcard": + final = self.wildcard_to_regex(raw) + item["netskope_type"] = "regex" + + # regex + elif utype == "regex": + final = raw + item["netskope_type"] = "regex" + + # substring → literal + else: + final = raw + item["netskope_type"] = "exact" + + # escape ONCE for JSON + final = final.replace("\\", "\\\\") + item["pattern"] = final + + return item + class TypeMapper(BaseTransformer): """Map vendor pattern types to universal types (or reverse).""" @@ -104,7 +137,6 @@ class TypeMapper(BaseTransformer): def __init__(self, type_map: Dict[str, str]): """ Initialize the TypeMapper. - Args: type_map: Mapping from vendor type → universal type. """ @@ -113,13 +145,10 @@ def __init__(self, type_map: Dict[str, str]): def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: """ Map the pattern type field. - Vendor types may be inconsistent (uppercase/lowercase), so normalization rules apply before mapping. - Args: item: Input dictionary. - Returns: Updated dictionary with normalized pattern type. """ @@ -141,7 +170,6 @@ class CategoryMapper(BaseTransformer): def __init__(self, category_map: Dict[str, str]): """ Initialize the CategoryMapper. - Args: category_map: Mapping from vendor category → universal. """ @@ -150,10 +178,8 @@ def __init__(self, category_map: Dict[str, str]): def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: """ Map the category field. - Args: item: Input dictionary. - Returns: Updated dictionary with normalized category. """ @@ -168,7 +194,6 @@ class MetadataEnricher(BaseTransformer): def __init__(self, vendor: str, extra_fields: List[str] | None = None): """ Initialize the MetadataEnricher. - Args: vendor: Vendor identifier. extra_fields: Optional list of fields to copy to metadata. @@ -179,10 +204,8 @@ def __init__(self, vendor: str, extra_fields: List[str] | None = None): def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: """ Add vendor metadata fields to the item. - Args: item: Input dictionary. - Returns: Updated dictionary enriched with metadata. """ @@ -204,11 +227,9 @@ def apply_transformers( ) -> List[Dict[str, Any]]: """ Apply a list of transformers sequentially to a list of items. - Args: items: List of vendor configuration dictionaries. transformers: Ordered list of transformer instances. - Returns: List of transformed dictionaries. """ @@ -224,55 +245,28 @@ def apply_transformers( FORTINET_ACTION_MAP = {"block": "block", "allow": "allow", "monitor": "monitor"} -FORTINET_CATEGORY_MAP = { - "3": "malware", - "4": "phishing", - "5": "gambling", - "default": "uncategorized", -} -FORTINET_TYPE_MAP = { - "simple": "literal", - "wildcard": "wildcard", - "regex": "regex", - "substring": "substring", -} +FORTINET_CATEGORY_MAP = {"3": "malware", "4": "phishing", "5": "gambling", "default": "uncategorized"} +FORTINET_TYPE_MAP = {"simple": "literal", "wildcard": "wildcard", "regex": "regex", "substring": "substring"} + +# ---------------- NETSKOPE ---------------- NETSKOPE_ACTION_MAP = {"block": "deny", "allow": "allow", "monitor": "monitor"} -NETSKOPE_CATEGORY_MAP = { - "malware": "malware", - "phishing": "phishing", - "gambling": "gambling", - "uncategorized": "uncategorized", -} -NETSKOPE_TYPE_MAP = { - "exact": "literal", - "wildcard": "wildcard", - "regex": "regex", - "substring": "substring", -} +NETSKOPE_CATEGORY_MAP = {"malware": "malware", "phishing": "phishing", "gambling": "gambling", "uncategorized": "uncategorized"} +NETSKOPE_TO_UNIVERSAL_TYPE_MAP = { "exact": "literal", "regex": "regex" } +UNIVERSAL_TO_NETSKOPE_TYPE_MAP = { "literal": "exact", "regex": "regex", "wildcard": "regex", "substring": "exact" } + + +# ---------------- ZSCALER ---------------- ZSCALER_ACTION_MAP = {"block": "BLOCK", "allow": "ALLOW", "monitor": "MONITOR"} -ZSCALER_CATEGORY_MAP = { - "malware": "malware", - "phishing": "phishing", - "gambling": "gambling", - "uncategorized": "uncategorized", -} +ZSCALER_CATEGORY_MAP = {"malware": "malware", "phishing": "phishing", "gambling": "gambling", "uncategorized": "uncategorized" } ZSCALER_TYPE_MAP = {"STRING": "literal", "WILDCARD": "wildcard", "REGEX": "regex"} +# ---------------- PALO ALTO ---------------- + PRISMA_ACTION_MAP = {"block": "deny", "allow": "allow", "monitor": "alert"} -PRISMA_CATEGORY_MAP = { - "malware": "malware", - "phishing": "phishing", - "gambling": "gambling", - "uncategorized": "uncategorized", -} -PRISMA_TYPE_MAP = { - "simple": "literal", - "wildcard": "wildcard", - "regex": "regex", - "substring": "substring", -} +PRISMA_CATEGORY_MAP = {"malware": "malware", "phishing": "phishing", "gambling": "gambling", "uncategorized": "uncategorized"} +PRISMA_TYPE_MAP = {"simple": "literal", "wildcard": "wildcard", "regex": "regex", "substring": "substring"} # ---------------- PIPELINE DEFINITIONS ---------------- @@ -289,7 +283,7 @@ def apply_transformers( "netskope": [ ActionMapper(NETSKOPE_ACTION_MAP), PatternNormalizer(), - TypeMapper(NETSKOPE_TYPE_MAP), + TypeMapper(NETSKOPE_TO_UNIVERSAL_TYPE_MAP), CategoryMapper(NETSKOPE_CATEGORY_MAP), MetadataEnricher("netskope"), ], @@ -319,8 +313,8 @@ def apply_transformers( ], "netskope": [ ActionMapper({v: k for k, v in NETSKOPE_ACTION_MAP.items()}), - PatternNormalizer(), - TypeMapper({v: k for k, v in NETSKOPE_TYPE_MAP.items()}), + NetskopePatternNormalizer(), + TypeMapper(UNIVERSAL_TO_NETSKOPE_TYPE_MAP), CategoryMapper({v: k for k, v in NETSKOPE_CATEGORY_MAP.items()}), MetadataEnricher("netskope"), ], From 7a38a500efbd656a50a72cfc8af32fc6fef85752 Mon Sep 17 00:00:00 2001 From: pierrerondel Date: Fri, 12 Dec 2025 10:53:29 +0100 Subject: [PATCH 2/3] Update transformers.py --- transformers/transformers.py | 130 +++++++++++++---------------------- 1 file changed, 47 insertions(+), 83 deletions(-) diff --git a/transformers/transformers.py b/transformers/transformers.py index 1143cdbd..d3d63948 100644 --- a/transformers/transformers.py +++ b/transformers/transformers.py @@ -1,5 +1,6 @@ """ Single-file SDK for bidirectional URL-filter migration transformers. + This module provides: - Universal intermediate URL-filter model - Vendor-specific transformers for: Fortinet, Netskope, Zscaler, Prisma Access @@ -41,10 +42,13 @@ class BaseTransformer: def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: """ Transform a single item. + Args: item: Input dictionary from vendor configuration. + Returns: Transformed dictionary. + Raises: NotImplementedError: If the subclass does not override this method. """ @@ -57,6 +61,7 @@ class ActionMapper(BaseTransformer): def __init__(self, action_map: Dict[str, str]): """ Initialize the ActionMapper. + Args: action_map: Mapping from vendor action → universal action. """ @@ -65,8 +70,10 @@ def __init__(self, action_map: Dict[str, str]): def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: """ Map the action field using the configured mapping. + Args: item: Input dictionary. + Returns: Dictionary with updated action. """ @@ -80,18 +87,30 @@ class PatternNormalizer(BaseTransformer): def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: """ Normalize the pattern field. + Args: item: Input dictionary. + Returns: Updated dictionary with normalized pattern. """ item["pattern"] = item.get("pattern", "") return item + class NetskopePatternNormalizer(BaseTransformer): - """Normalize patterns for Netskope""" - + """Normalize Netskope patterns and convert wildcards to regex formats.""" + def wildcard_to_regex(self, pattern: str) -> str: + """ + Convert a Netskope wildcard pattern into a regex expression. + + Args: + pattern: Input wildcard pattern. + + Returns: + Regex-safe version of the pattern. + """ if pattern.startswith("*."): domain = pattern[2:] domain = domain.replace(".", r"\.") @@ -100,7 +119,16 @@ def wildcard_to_regex(self, pattern: str) -> str: escaped = pattern.replace(".", r"\.") return rf"^{escaped}$" - def transform(self, item): + def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: + """ + Transform Netskope patterns into the correct regex or literal format. + + Args: + item: Input dictionary. + + Returns: + Updated dictionary with Netskope-normalized pattern. + """ raw = item.get("pattern", "") utype = item.get("type", "literal") @@ -129,7 +157,7 @@ def transform(self, item): item["pattern"] = final return item - + class TypeMapper(BaseTransformer): """Map vendor pattern types to universal types (or reverse).""" @@ -137,6 +165,7 @@ class TypeMapper(BaseTransformer): def __init__(self, type_map: Dict[str, str]): """ Initialize the TypeMapper. + Args: type_map: Mapping from vendor type → universal type. """ @@ -145,10 +174,13 @@ def __init__(self, type_map: Dict[str, str]): def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: """ Map the pattern type field. + Vendor types may be inconsistent (uppercase/lowercase), so normalization rules apply before mapping. + Args: item: Input dictionary. + Returns: Updated dictionary with normalized pattern type. """ @@ -170,6 +202,7 @@ class CategoryMapper(BaseTransformer): def __init__(self, category_map: Dict[str, str]): """ Initialize the CategoryMapper. + Args: category_map: Mapping from vendor category → universal. """ @@ -178,8 +211,10 @@ def __init__(self, category_map: Dict[str, str]): def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: """ Map the category field. + Args: item: Input dictionary. + Returns: Updated dictionary with normalized category. """ @@ -194,6 +229,7 @@ class MetadataEnricher(BaseTransformer): def __init__(self, vendor: str, extra_fields: List[str] | None = None): """ Initialize the MetadataEnricher. + Args: vendor: Vendor identifier. extra_fields: Optional list of fields to copy to metadata. @@ -204,8 +240,10 @@ def __init__(self, vendor: str, extra_fields: List[str] | None = None): def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: """ Add vendor metadata fields to the item. + Args: item: Input dictionary. + Returns: Updated dictionary enriched with metadata. """ @@ -227,9 +265,11 @@ def apply_transformers( ) -> List[Dict[str, Any]]: """ Apply a list of transformers sequentially to a list of items. + Args: items: List of vendor configuration dictionaries. transformers: Ordered list of transformer instances. + Returns: List of transformed dictionaries. """ @@ -243,7 +283,6 @@ def apply_transformers( # ---------------- VENDOR MAPPINGS ---------------- - FORTINET_ACTION_MAP = {"block": "block", "allow": "allow", "monitor": "monitor"} FORTINET_CATEGORY_MAP = {"3": "malware", "4": "phishing", "5": "gambling", "default": "uncategorized"} FORTINET_TYPE_MAP = {"simple": "literal", "wildcard": "wildcard", "regex": "regex", "substring": "substring"} @@ -252,84 +291,9 @@ def apply_transformers( NETSKOPE_ACTION_MAP = {"block": "deny", "allow": "allow", "monitor": "monitor"} NETSKOPE_CATEGORY_MAP = {"malware": "malware", "phishing": "phishing", "gambling": "gambling", "uncategorized": "uncategorized"} -NETSKOPE_TO_UNIVERSAL_TYPE_MAP = { "exact": "literal", "regex": "regex" } -UNIVERSAL_TO_NETSKOPE_TYPE_MAP = { "literal": "exact", "regex": "regex", "wildcard": "regex", "substring": "exact" } - +NETSKOPE_TO_UNIVERSAL_TYPE_MAP = {"exact": "literal", "regex": "regex"} +UNIVERSAL_TO_NETSKOPE_TYPE_MAP = {"literal": "exact", "regex": "regex", "wildcard": "regex", "substring": "exact"} # ---------------- ZSCALER ---------------- -ZSCALER_ACTION_MAP = {"block": "BLOCK", "allow": "ALLOW", "monitor": "MONITOR"} -ZSCALER_CATEGORY_MAP = {"malware": "malware", "phishing": "phishing", "gambling": "gambling", "uncategorized": "uncategorized" } -ZSCALER_TYPE_MAP = {"STRING": "literal", "WILDCARD": "wildcard", "REGEX": "regex"} - -# ---------------- PALO ALTO ---------------- - -PRISMA_ACTION_MAP = {"block": "deny", "allow": "allow", "monitor": "alert"} -PRISMA_CATEGORY_MAP = {"malware": "malware", "phishing": "phishing", "gambling": "gambling", "uncategorized": "uncategorized"} -PRISMA_TYPE_MAP = {"simple": "literal", "wildcard": "wildcard", "regex": "regex", "substring": "substring"} - - -# ---------------- PIPELINE DEFINITIONS ---------------- - - -VENDOR_TO_UNIVERSAL_PIPELINES = { - "fortinet": [ - ActionMapper(FORTINET_ACTION_MAP), - PatternNormalizer(), - TypeMapper(FORTINET_TYPE_MAP), - CategoryMapper(FORTINET_CATEGORY_MAP), - MetadataEnricher("fortinet"), - ], - "netskope": [ - ActionMapper(NETSKOPE_ACTION_MAP), - PatternNormalizer(), - TypeMapper(NETSKOPE_TO_UNIVERSAL_TYPE_MAP), - CategoryMapper(NETSKOPE_CATEGORY_MAP), - MetadataEnricher("netskope"), - ], - "zscaler": [ - ActionMapper(ZSCALER_ACTION_MAP), - PatternNormalizer(), - TypeMapper(ZSCALER_TYPE_MAP), - CategoryMapper(ZSCALER_CATEGORY_MAP), - MetadataEnricher("zscaler"), - ], - "prisma": [ - ActionMapper(PRISMA_ACTION_MAP), - PatternNormalizer(), - TypeMapper(PRISMA_TYPE_MAP), - CategoryMapper(PRISMA_CATEGORY_MAP), - MetadataEnricher("prisma"), - ], -} - -UNIVERSAL_TO_VENDOR_PIPELINES = { - "fortinet": [ - ActionMapper({v: k for k, v in FORTINET_ACTION_MAP.items()}), - PatternNormalizer(), - TypeMapper({v: k for k, v in FORTINET_TYPE_MAP.items()}), - CategoryMapper({v: k for k, v in FORTINET_CATEGORY_MAP.items()}), - MetadataEnricher("fortinet"), - ], - "netskope": [ - ActionMapper({v: k for k, v in NETSKOPE_ACTION_MAP.items()}), - NetskopePatternNormalizer(), - TypeMapper(UNIVERSAL_TO_NETSKOPE_TYPE_MAP), - CategoryMapper({v: k for k, v in NETSKOPE_CATEGORY_MAP.items()}), - MetadataEnricher("netskope"), - ], - "zscaler": [ - ActionMapper({v: k for k, v in ZSCALER_ACTION_MAP.items()}), - PatternNormalizer(), - TypeMapper({v: k for k, v in ZSCALER_TYPE_MAP.items()}), - CategoryMapper({v: k for k, v in ZSCALER_CATEGORY_MAP.items()}), - MetadataEnricher("zscaler"), - ], - "prisma": [ - ActionMapper({v: k for k, v in PRISMA_ACTION_MAP.items()}), - PatternNormalizer(), - TypeMapper({v: k for k, v in PRISMA_TYPE_MAP.items()}), - CategoryMapper({v: k for k, v in PRISMA_CATEGORY_MAP.items()}), - MetadataEnricher("prisma"), - ], -} +ZSCALER_ACTION_MAP = {"block": "B From 86bfffa575cbf001035b34f15e1970fd8a3fcf83 Mon Sep 17 00:00:00 2001 From: pierrerondel Date: Fri, 12 Dec 2025 10:56:49 +0100 Subject: [PATCH 3/3] Update transformers.py --- transformers/transformers.py | 74 +++++++++++++++++++++++++++++++++++- 1 file changed, 73 insertions(+), 1 deletion(-) diff --git a/transformers/transformers.py b/transformers/transformers.py index d3d63948..1ac15a7b 100644 --- a/transformers/transformers.py +++ b/transformers/transformers.py @@ -296,4 +296,76 @@ def apply_transformers( # ---------------- ZSCALER ---------------- -ZSCALER_ACTION_MAP = {"block": "B +ZSCALER_ACTION_MAP = {"block": "BLOCK", "allow": "ALLOW", "monitor": "MONITOR"} +ZSCALER_CATEGORY_MAP = {"malware": "malware", "phishing": "phishing", "gambling": "gambling", "uncategorized": "uncategorized"} +ZSCALER_TYPE_MAP = {"STRING": "literal", "WILDCARD": "wildcard", "REGEX": "regex"} + +# ---------------- PALO ALTO ---------------- + +PRISMA_ACTION_MAP = {"block": "deny", "allow": "allow", "monitor": "alert"} +PRISMA_CATEGORY_MAP = {"malware": "malware", "phishing": "phishing", "gambling": "gambling", "uncategorized": "uncategorized"} +PRISMA_TYPE_MAP = {"simple": "literal", "wildcard": "wildcard", "regex": "regex", "substring": "substring"} + +# ---------------- PIPELINE DEFINITIONS ---------------- + +VENDOR_TO_UNIVERSAL_PIPELINES = { + "fortinet": [ + ActionMapper(FORTINET_ACTION_MAP), + PatternNormalizer(), + TypeMapper(FORTINET_TYPE_MAP), + CategoryMapper(FORTINET_CATEGORY_MAP), + MetadataEnricher("fortinet"), + ], + "netskope": [ + ActionMapper(NETSKOPE_ACTION_MAP), + PatternNormalizer(), + TypeMapper(NETSKOPE_TO_UNIVERSAL_TYPE_MAP), + CategoryMapper(NETSKOPE_CATEGORY_MAP), + MetadataEnricher("netskope"), + ], + "zscaler": [ + ActionMapper(ZSCALER_ACTION_MAP), + PatternNormalizer(), + TypeMapper(ZSCALER_TYPE_MAP), + CategoryMapper(ZSCALER_CATEGORY_MAP), + MetadataEnricher("zscaler"), + ], + "prisma": [ + ActionMapper(PRISMA_ACTION_MAP), + PatternNormalizer(), + TypeMapper(PRISMA_TYPE_MAP), + CategoryMapper(PRISMA_CATEGORY_MAP), + MetadataEnricher("prisma"), + ], +} + +UNIVERSAL_TO_VENDOR_PIPELINES = { + "fortinet": [ + ActionMapper({v: k for k, v in FORTINET_ACTION_MAP.items()}), + PatternNormalizer(), + TypeMapper({v: k for k, v in FORTINET_TYPE_MAP.items()}), + CategoryMapper({v: k for k, v in FORTINET_CATEGORY_MAP.items()}), + MetadataEnricher("fortinet"), + ], + "netskope": [ + ActionMapper({v: k for k, v in NETSKOPE_ACTION_MAP.items()}), + NetskopePatternNormalizer(), + TypeMapper(UNIVERSAL_TO_NETSKOPE_TYPE_MAP), + CategoryMapper({v: k for k, v in NETSKOPE_CATEGORY_MAP.items()}), + MetadataEnricher("netskope"), + ], + "zscaler": [ + ActionMapper({v: k for k, v in ZSCALER_ACTION_MAP.items()}), + PatternNormalizer(), + TypeMapper({v: k for k, v in ZSCALER_TYPE_MAP.items()}), + CategoryMapper({v: k for k, v in ZSCALER_CATEGORY_MAP.items()}), + MetadataEnricher("zscaler"), + ], + "prisma": [ + ActionMapper({v: k for k, v in PRISMA_ACTION_MAP.items()}), + PatternNormalizer(), + TypeMapper({v: k for k, v in PRISMA_TYPE_MAP.items()}), + CategoryMapper({v: k for k, v in PRISMA_CATEGORY_MAP.items()}), + MetadataEnricher("prisma"), + ], +}