diff --git a/transformers/__init__.py b/transformers/__init__.py new file mode 100644 index 00000000..93236b69 --- /dev/null +++ b/transformers/__init__.py @@ -0,0 +1,5 @@ +""" +Transformers package initialization. + +This package provides core functionality and public APIs for the transformers library. +""" diff --git a/transformers/core/base.py b/transformers/core/base.py deleted file mode 100644 index a3031dcd..00000000 --- a/transformers/core/base.py +++ /dev/null @@ -1,24 +0,0 @@ -""" -Base transformer definitions. - -Purpose: - - -Context: - Part of the use case within the Unified Policy Transformation Framework. - -Responsibilities: - - - - - - - -Notes: - - Auto-generated placeholder module. - - Extend implementation as needed. - -Author: - - -Created: - -""" diff --git a/transformers/core/enums/enums.py b/transformers/core/enums/enums.py deleted file mode 100644 index 79b9faba..00000000 --- a/transformers/core/enums/enums.py +++ /dev/null @@ -1,24 +0,0 @@ -""" -enums base class. - -Purpose: - - -Context: - Part of the use case within the Unified Policy Transformation Framework. - -Responsibilities: - - - - - - - -Notes: - - Auto-generated placeholder module. - - Extend implementation as needed. - -Author: - - -Created: - -""" diff --git a/transformers/core/models/models.py b/transformers/core/models/models.py deleted file mode 100644 index 365a7584..00000000 --- a/transformers/core/models/models.py +++ /dev/null @@ -1,24 +0,0 @@ -""" -models base class. - -Purpose: - - -Context: - Part of the use case within the Unified Policy Transformation Framework. - -Responsibilities: - - - - - - - -Notes: - - Auto-generated placeholder module. - - Extend implementation as needed. - -Author: - - -Created: - -""" diff --git a/transformers/domains/__init__.py b/transformers/domains/__init__.py new file mode 100644 index 00000000..04ff6ebc --- /dev/null +++ b/transformers/domains/__init__.py @@ -0,0 +1,7 @@ +""" +Transformers package initialization. + +This package provides core functionality and public APIs for the transformers library. +""" + +# Sub-package for different logic domains (url, firewall, etc.) diff --git a/transformers/domains/url/__init__.py b/transformers/domains/url/__init__.py new file mode 100644 index 00000000..1cbf6e7a --- /dev/null +++ b/transformers/domains/url/__init__.py @@ -0,0 +1,7 @@ +""" +Transformers package initialization. + +This package provides core functionality and public APIs for the transformers library. +""" + +# url domain package diff --git a/transformers/domains/url/models.py b/transformers/domains/url/models.py index 365a7584..5ec6dba0 100644 --- a/transformers/domains/url/models.py +++ b/transformers/domains/url/models.py @@ -1,24 +1,99 @@ """ -models base class. +URL Domain Models - Unified Data Model (UDM). -Purpose: - +This module defines the canonical schema for URLs, URL collections, and +categories within the URL domain. -Context: - Part of the use case within the Unified Policy Transformation Framework. +Design Principles: -Responsibilities: - - - - - - +- Domain-Level Logic: Operates purely on domain concepts. +- Vendor-Agnostic: No vendor-specific logic is contained here. +- Strong Typing: Enforces RFC-compliant formatting and normalization. +""" -Notes: - - Auto-generated placeholder module. - - Extend implementation as needed. +from datetime import datetime +from typing import List +from typing import Literal +from typing import Optional -Author: - +from pydantic import BaseModel +from pydantic import ConfigDict +from pydantic import Field -Created: - -""" + +class Category(BaseModel): + """ + Represents a normalized category entity. + + This includes a stable identifier and taxonomic classification. + """ + + id: str = Field(..., description="Internal unique identifier for the category") + name: str = Field(..., description="Human-readable name of the category") + type: Literal["standard", "custom"] = Field( + ..., + description="Distinguishes between system-standard and user-defined categories", + ) + + +class Metadata(BaseModel): + """ + Extensible container for enrichment data. + + This includes timestamps, source information, and optional metadata fields. + """ + + processed_at: datetime = Field( + ..., description="Timestamp of when the record was processed" + ) + source: Optional[str] = Field( + None, description="The origin system of the data" + ) + additional_info: Optional[dict] = Field( + None, description="Placeholder for custom metadata expansion" + ) + + +class URL_UDM(BaseModel): + """ + Unified Data Model for URL entities. + + This model serves as the source of truth for processing, independent + of any external vendor system. + """ + + model_config = ConfigDict(populate_by_name=True) + + pattern: str = Field( + ..., description="The URL pattern (literal, wildcard, or regex)" + ) + type: Literal["literal", "wildcard", "regex"] = Field( + ..., description="The syntax type of the pattern" + ) + action: Literal["allow", "block", "monitor"] = Field( + ..., description="Standardized enforcement action" + ) + status: Literal["enable", "disable"] = Field( + ..., description="Operational status of the rule" + ) + url_list_id: str = Field( + ..., description="Unique ID for the parent URL list" + ) + url_list_name: str = Field( + ..., description="Human-readable name of the URL list" + ) + + categories: List[Category] = Field( + default_factory=list, + description="Merged array of standard and custom categories", + ) + + vendor: Optional[str] = Field( + None, description="Original vendor for traceability purposes" + ) + metadata: Optional[Metadata] = Field( + None, description="Processing metadata and timestamps" + ) + notes: Optional[str] = Field( + None, description="Optional justifications or comments" + ) diff --git a/transformers/domains/url/transformers.py b/transformers/domains/url/transformers.py deleted file mode 100644 index 0c16c5d3..00000000 --- a/transformers/domains/url/transformers.py +++ /dev/null @@ -1,24 +0,0 @@ -""" -transformers base class. - -Purpose: - - -Context: - Part of the use case within the Unified Policy Transformation Framework. - -Responsibilities: - - - - - - - -Notes: - - Auto-generated placeholder module. - - Extend implementation as needed. - -Author: - - -Created: - -""" diff --git a/transformers/domains/url/validators.py b/transformers/domains/url/validators.py deleted file mode 100644 index bf6b1e1d..00000000 --- a/transformers/domains/url/validators.py +++ /dev/null @@ -1,24 +0,0 @@ -""" -validators base class. - -Purpose: - - -Context: - Part of the use case within the Unified Policy Transformation Framework. - -Responsibilities: - - - - - - - -Notes: - - Auto-generated placeholder module. - - Extend implementation as needed. - -Author: - - -Created: - -""" diff --git a/transformers/domains/url/vendors/__init__.py b/transformers/domains/url/vendors/__init__.py new file mode 100644 index 00000000..93236b69 --- /dev/null +++ b/transformers/domains/url/vendors/__init__.py @@ -0,0 +1,5 @@ +""" +Transformers package initialization. + +This package provides core functionality and public APIs for the transformers library. +""" diff --git a/transformers/domains/url/vendors/fortinet.py b/transformers/domains/url/vendors/fortinet.py index 429e429f..bc443aac 100644 --- a/transformers/domains/url/vendors/fortinet.py +++ b/transformers/domains/url/vendors/fortinet.py @@ -1,24 +1,211 @@ """ -fortinet transformers. +Fortinet URL Domain Integration. -Purpose: - +This module implements the transformer, mapper, and exporter for Fortinet, +converting between Fortinet-specific configurations and the Unified Data +Model (UDM). +""" + +from datetime import datetime +from typing import Any +from typing import List +from typing import Optional + +import jmespath -Context: - Part of the use case within the Unified Policy Transformation Framework. +from transformers.domains.url.models import URL_UDM +from transformers.domains.url.models import Category +from transformers.domains.url.models import Metadata +from transformers.framework.udm_transformers.category_mapper import \ + CategoryMapper +from transformers.framework.udm_transformers.metadata_enricher import \ + MetadataEnricher +from transformers.framework.udm_transformers.pattern_normalizer import \ + PatternNormalizer +from transformers.framework.udm_transformers.type_mapper import TypeMapper -Responsibilities: - - - - - - +FORTINET_ACTION_MAP = { + "allow": "allow", + "block": "block", + "monitor": "monitor", + "exempt": "allow", +} -Notes: - - Auto-generated placeholder module. - - Extend implementation as needed. +FORTINET_CATEGORY_MAP = { + "3": "malware", + "4": "phishing", + "5": "gambling", + "default": "uncategorized", +} -Author: - +FORTINET_TYPE_MAP = { + "simple": "literal", + "wildcard": "wildcard", + "regex": "regex", +} -Created: - +JMESPATH_FLATTEN_URLS = """ +*.urls.*.{ + pattern: url, + action: action, + status: status, + type: type, + url_id: url_id +} """ + + +def flatten_fortinet_jmespath(raw_data: Dict[str, Any]) -> List[Dict[str, Any]]: + """Flatten nested Fortinet URL data into normalized records.""" + flat = [] + + for _, url_list in raw_data.items(): + list_id = url_list["object_id"] + list_name = url_list["filter_name"] + + for _, item in url_list["urls"].items(): + flat.append( + { + "pattern": item["url"], + "action": item["action"], + "status": item["status"], + "type": item["type"], + "url_id": item["url_id"], + "list_id": list_id, + "list_name": list_name, + "category_id": "Uncategorized", + } + ) + + return flat + + +class FortinetMapper: + """Map transformed dictionaries into URL_UDM instances.""" + + def to_udm(self, item: Dict[str, Any]) -> URL_UDM: + """Convert a transformed dictionary into a validated URL_UDM.""" + cat_id = item.get("category_id", "uncategorized") + categories = [ + Category( + id=cat_id, + name=cat_id.capitalize(), + type="standard", + ) + ] + + meta = Metadata( + processed_at=datetime.fromisoformat( + item["metadata"]["processed_at"] + ), + source=item["metadata"].get("source"), + ) + + return URL_UDM( + pattern=item["pattern"], + type=item["type"], + action=item["action"], + status="enable", + url_list_id=str(item["list_id"]), + url_list_name=item["list_name"], + categories=categories, + vendor=item["vendor"], + metadata=meta, + notes=item.get("notes"), + ) + + +class FortinetExporter: + """Export URL_UDM records into Fortinet format.""" + + def transform(self, udm: URL_UDM) -> Dict[str, Any]: + """Reconstruct Fortinet-specific pattern and type syntax.""" + reverse_type_map = { + value: key for key, value in FORTINET_TYPE_MAP.items() + } + + return { + "url": udm.pattern, + "type": reverse_type_map.get(udm.type, "simple"), + } + + +def run_universal_to_fortinet_pipeline( + records: List[URL_UDM], +) -> List[Dict[str, Any]]: + """Transform universal records into Fortinet export records.""" + output = [] + + for record in records: + output.append( + { + "pattern": record.pattern, + "type": record.type, + "action": record.action, + "list_id": record.url_list_id, + "list_name": record.url_list_name, + } + ) + + return output + + +def export_fortinet_json(records: List[Dict[str, Any]]) -> Dict[str, Any]: + """Convert flat Fortinet records into grouped Fortinet JSON.""" + grouped = {} + counters = {} + + for record in records: + key = record["list_id"] + + if key not in grouped: + grouped[key] = { + "object_id": record["list_id"], + "filter_name": record["list_name"], + "urls": {}, + } + counters[key] = 0 + + idx = str(counters[key]) + counters[key] += 1 + + grouped[key]["urls"][idx] = { + "url_id": str(counters[key]), + "url": record["pattern"], + "type": record["type"], + "action": record["action"], + "status": "enable", + } + + return grouped + + +def run_fortinet_to_universal_pipeline( + raw_data: Dict[str, Any], +) -> List[URL_UDM]: + """Run the full Fortinet-to-universal transformation pipeline.""" + flat_data = flatten_fortinet_jmespath(raw_data) + + steps = [ + ActionMapper(FORTINET_ACTION_MAP), + PatternNormalizer(), + TypeMapper(FORTINET_TYPE_MAP), + CategoryMapper(FORTINET_CATEGORY_MAP), + MetadataEnricher("fortinet"), + ] + + mapper = FortinetMapper() + udm_records = [] + + for record in flat_data: + for step in steps: + record = step.transform(record) + + udm_records.append(mapper.to_udm(record)) + + return udm_records + + +VENDOR_TO_UNIVERSAL_PIPELINES = { + "fortinet": run_fortinet_to_universal_pipeline, +} diff --git a/transformers/domains/url/vendors/netskope.py b/transformers/domains/url/vendors/netskope.py index 5e54d718..7a7a67ee 100644 --- a/transformers/domains/url/vendors/netskope.py +++ b/transformers/domains/url/vendors/netskope.py @@ -1,24 +1,213 @@ """ -netskope transformers. +Netskope URL Domain Integration. -Purpose: - +This module implements the transformer, mapper, and exporter for Netskope, +converting between Netskope-specific configurations and the Pydantic +Unified Data Model (UDM). +""" + +from datetime import datetime +from typing import Any +from typing import List +from typing import Optional + +import jmespath -Context: - Part of the use case within the Unified Policy Transformation Framework. +from transformers.domains.url.models import URL_UDM +from transformers.domains.url.models import Category +from transformers.domains.url.models import Metadata +from transformers.framework.udm_transformers.category_mapper import \ + CategoryMapper +from transformers.framework.udm_transformers.metadata_enricher import \ + MetadataEnricher +from transformers.framework.udm_transformers.pattern_normalizer import \ + PatternNormalizer +from transformers.framework.udm_transformers.type_mapper import TypeMapper -Responsibilities: - - - - - - +NETSKOPE_ACTION_MAP = { + "block": "deny", + "allow": "allow", + "monitor": "allow", +} -Notes: - - Auto-generated placeholder module. - - Extend implementation as needed. +NETSKOPE_CATEGORY_MAP = { + "malware": "malware", + "phishing": "phishing", + "gambling": "gambling", + "uncategorized": "uncategorized", +} -Author: - +NETSKOPE_TO_UNIVERSAL_TYPE_MAP = { + "exact": "literal", + "regex": "regex", +} -Created: - +JMESPATH_NETSKOPE = """ +values(@)[?modify_type!='Deleted'].{ + list_name: name, + list_id: object_id, + type: data_type, + urls: values(data_urls) +} """ + + +def flatten_netskope_jmespath(url_lists: Dict[str, Any]) -> List[Dict[str, Any]]: + """Flatten Netskope hierarchical data using JMESPath extraction.""" + extracted = jmespath.search(JMESPATH_NETSKOPE, url_lists) or [] + flat = [] + + for lst in extracted: + for entry in lst.get("urls", []): + url = entry.get("url") + if not url: + continue + + flat.append( + { + "pattern": url, + "action": "allow", + "category_id": "Uncategorized", + "list_name": lst["list_name"], + "list_id": str(lst["list_id"]), + "type": lst["type"], + } + ) + + return flat + + +class NetskopePatternNormalizer(BaseTransformer): + """Normalize Netskope URL patterns for universal compatibility.""" + + def wildcard_to_regex(self, pattern: str) -> str: + """Convert wildcard pattern to regex format.""" + if pattern.startswith("*."): + domain = pattern[2:].replace(".", r"\.") + return rf"^([^.]+\.)*{domain}$" + return pattern + + def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: + """Normalize pattern into Netskope-compatible format.""" + raw_pattern = item.get("pattern", "") + universal_type = item.get("type", "literal") + + if universal_type in ("literal", "exact"): + item["pattern"] = raw_pattern + item["netskope_type"] = "exact" + elif universal_type in ("wildcard", "regex"): + item["pattern"] = self.wildcard_to_regex(raw_pattern) + item["netskope_type"] = "regex" + else: + item["pattern"] = raw_pattern + item["netskope_type"] = "exact" + + return item + + +class NetskopePatternDenormalizer(BaseTransformer): + """Convert Netskope patterns back to universal format.""" + + def regex_to_wildcard(self, pattern: str) -> Optional[str]: + """Attempt to convert regex to wildcard pattern.""" + wildcard_regex = r"^\^\(\[\^\.\]\+\\\.\)\*(.+)\\\.([a-zA-Z0-9\-]+)\$$" + match = re.match(wildcard_regex, pattern) + if match: + domain = f"{match.group(1)}.{match.group(2)}" + return f"*.{domain}" + return None + + def is_regex(self, pattern: str) -> bool: + """Check if a pattern contains regex syntax.""" + regex_markers = ("^", "$", "(", ")", "[", "]", "+", "?", "|", "{", "}") + return any(marker in pattern for marker in regex_markers) + + def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: + """Denormalize Netskope pattern into universal format.""" + pattern = item.get("pattern", "").replace("\\\\", "\\") + + if pattern.startswith("*.") and pattern.count("*") == 1: + item["type"] = "wildcard" + elif "*" in pattern: + item["type"] = "regex" + elif self.is_regex(pattern): + wildcard = self.regex_to_wildcard(pattern) + if wildcard: + item["type"] = "wildcard" + pattern = wildcard + else: + item["type"] = "regex" + else: + item["type"] = "exact" + + item["pattern"] = pattern + item.pop("netskope_type", None) + return item + + +class NetskopeMapper: + """Handles semantic mapping into the Unified Data Model.""" + + def to_udm(self, item: Dict[str, Any]) -> URL_UDM: + """Convert transformed dictionary into URL_UDM instance.""" + cat_id = item.get("category_id", "uncategorized") + categories = [ + Category(id=cat_id, name=cat_id.capitalize(), type="standard") + ] + + meta = Metadata( + processed_at=datetime.fromisoformat( + item["metadata"]["processed_at"] + ), + source="netskope", + ) + + return URL_UDM( + pattern=item["pattern"], + type=item["type"], + action=item["action"], + status="enable", + url_list_id=item["list_id"], + url_list_name=item["list_name"], + categories=categories, + vendor=item["vendor"], + metadata=meta, + notes=item.get("notes"), + ) + + +class NetskopeExporter: + """Convert UDM objects into Netskope format.""" + + def transform(self, udm: URL_UDM) -> Dict[str, Any]: + """Convert UDM object into Netskope-compatible structure.""" + return { + "pattern": udm.pattern, + "type": udm.type, + "action": udm.action, + } + + +def run_netskope_to_universal_pipeline( + raw_data: Dict[str, Any], +) -> List[URL_UDM]: + """Run full Netskope ingestion pipeline into UDM objects.""" + flat_data = flatten_netskope_jmespath(raw_data) + + steps = [ + ActionMapper(NETSKOPE_ACTION_MAP), + TypeMapper(NETSKOPE_TO_UNIVERSAL_TYPE_MAP), + NetskopePatternDenormalizer(), + CategoryMapper(NETSKOPE_CATEGORY_MAP), + MetadataEnricher("netskope"), + ] + + mapper = NetskopeMapper() + udm_records = [] + + for record in flat_data: + for step in steps: + record = step.transform(record) + udm_records.append(mapper.to_udm(record)) + + return udm_records diff --git a/transformers/framework/__init__.py b/transformers/framework/__init__.py new file mode 100644 index 00000000..93236b69 --- /dev/null +++ b/transformers/framework/__init__.py @@ -0,0 +1,5 @@ +""" +Transformers package initialization. + +This package provides core functionality and public APIs for the transformers library. +""" diff --git a/transformers/framework/context.py b/transformers/framework/context.py deleted file mode 100644 index fd6081f3..00000000 --- a/transformers/framework/context.py +++ /dev/null @@ -1,24 +0,0 @@ -""" -context base class. - -Purpose: - - -Context: - Part of the use case within the Unified Policy Transformation Framework. - -Responsibilities: - - - - - - - -Notes: - - Auto-generated placeholder module. - - Extend implementation as needed. - -Author: - - -Created: - -""" diff --git a/transformers/framework/pipeline.py b/transformers/framework/pipeline.py deleted file mode 100644 index e6137c79..00000000 --- a/transformers/framework/pipeline.py +++ /dev/null @@ -1,24 +0,0 @@ -""" -pipeline base class. - -Purpose: - - -Context: - Part of the use case within the Unified Policy Transformation Framework. - -Responsibilities: - - - - - - - -Notes: - - Auto-generated placeholder module. - - Extend implementation as needed. - -Author: - - -Created: - -""" diff --git a/transformers/pipelines.py b/transformers/framework/pipelines.py similarity index 88% rename from transformers/pipelines.py rename to transformers/framework/pipelines.py index 1ec2647d..46927eee 100644 --- a/transformers/pipelines.py +++ b/transformers/framework/pipelines.py @@ -8,13 +8,14 @@ from typing import Dict from typing import List -from transformers.base_transformer import BaseTransformer +from transformers.framework.udm_transformers.base_transformer import \ + BaseTransformer def apply_transformers( items: List[Dict[str, Any]], transformers: List[BaseTransformer] -) -> List[Dict[str, Any]]: +) -> List[Dict[str, Any]]: """Apply a sequence of transformers to a list of items. Each item in the input list is processed sequentially by all @@ -28,7 +29,7 @@ def apply_transformers( Returns: A list of transformed dictionaries. - """ + """ result: List[Dict[str, Any]] = [] for item in items: diff --git a/transformers/framework/transformers/generic_transformers.py b/transformers/framework/transformers/generic_transformers.py deleted file mode 100644 index 93962265..00000000 --- a/transformers/framework/transformers/generic_transformers.py +++ /dev/null @@ -1,24 +0,0 @@ -""" -generic_transformers base class. - -Purpose: - - -Context: - Part of the use case within the Unified Policy Transformation Framework. - -Responsibilities: - - - - - - - -Notes: - - Auto-generated placeholder module. - - Extend implementation as needed. - -Author: - - -Created: - -""" diff --git a/transformers/framework/udm_transformers/__init__.py b/transformers/framework/udm_transformers/__init__.py new file mode 100644 index 00000000..93236b69 --- /dev/null +++ b/transformers/framework/udm_transformers/__init__.py @@ -0,0 +1,5 @@ +""" +Transformers package initialization. + +This package provides core functionality and public APIs for the transformers library. +""" diff --git a/transformers/action_mapper.py b/transformers/framework/udm_transformers/action_mapper.py similarity index 88% rename from transformers/action_mapper.py rename to transformers/framework/udm_transformers/action_mapper.py index fc6cb9bc..279629a8 100644 --- a/transformers/action_mapper.py +++ b/transformers/framework/udm_transformers/action_mapper.py @@ -7,7 +7,10 @@ from typing import Any from typing import Dict -from .base_transformer import BaseTransformer +# Change from: from .base_transformer import BaseTransformer +# To the absolute framework path: +from transformers.framework.udm_transformers.base_transformer import \ + BaseTransformer class ActionMapper(BaseTransformer): diff --git a/transformers/base_transformer.py b/transformers/framework/udm_transformers/base_transformer.py similarity index 58% rename from transformers/base_transformer.py rename to transformers/framework/udm_transformers/base_transformer.py index 04bd40b4..161d545f 100644 --- a/transformers/base_transformer.py +++ b/transformers/framework/udm_transformers/base_transformer.py @@ -1,7 +1,9 @@ -"""Base transformer definition. +""" +Base Transformer Definition - Framework Layer. -This module defines the abstract base class used by all transformers -in the transformation pipeline. +This module defines the abstract base class (ABC) used by all transformers +within the generic transformation engine. It ensures a consistent interface +for reusable transformation components. """ from abc import ABC @@ -11,16 +13,18 @@ class BaseTransformer(ABC): - """Define the interface for all transformers. + """ + Define the interface for all transformers. All concrete transformers must implement the ``transform`` method, which takes a single dictionary item and returns a transformed - dictionary. + dictionary. This maintains a domain-agnostic execution model. """ @abstractmethod def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: - """Transform a single dictionary item. + """ + Transform a single dictionary item. Args: item: A dictionary representing a single configuration or diff --git a/transformers/category_mapper.py b/transformers/framework/udm_transformers/category_mapper.py similarity index 83% rename from transformers/category_mapper.py rename to transformers/framework/udm_transformers/category_mapper.py index 3010f14b..a1d99c07 100644 --- a/transformers/category_mapper.py +++ b/transformers/framework/udm_transformers/category_mapper.py @@ -8,10 +8,12 @@ from typing import Any from typing import Dict -from .base_transformer import BaseTransformer +# Restructured to use the absolute path within the Framework layer +from transformers.framework.udm_transformers.base_transformer import \ + BaseTransformer -class CategoryMapper(BaseTransformer): +class CategoryMapper(BaseTransformer): """Map category identifiers between vendor and universal models. This transformer replaces the ``category_id`` field of an item using @@ -19,7 +21,7 @@ class CategoryMapper(BaseTransformer): mapping, it is left unchanged. """ - def __init__(self, category_map: Dict[str, str]) -> None: + def __init__(self, category_map: Dict[str, str]) -> None: """Initialize the CategoryMapper. Args: @@ -28,7 +30,7 @@ def __init__(self, category_map: Dict[str, str]) -> None: """ self.category_map = category_map - def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: + def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: """Transform an item's category identifier using the category map. Args: diff --git a/transformers/metadata_enricher.py b/transformers/framework/udm_transformers/metadata_enricher.py similarity index 79% rename from transformers/metadata_enricher.py rename to transformers/framework/udm_transformers/metadata_enricher.py index 95669776..39e4d10a 100644 --- a/transformers/metadata_enricher.py +++ b/transformers/framework/udm_transformers/metadata_enricher.py @@ -8,7 +8,9 @@ from typing import Any from typing import Dict -from .base_transformer import BaseTransformer +# Restructured to use the absolute path within the Framework layer +from transformers.framework.udm_transformers.base_transformer import \ + BaseTransformer class MetadataEnricher(BaseTransformer): @@ -17,16 +19,16 @@ class MetadataEnricher(BaseTransformer): This transformer adds a ``vendor`` field and a ``metadata`` dictionary containing a ``processed_at`` timestamp to each item. """ - - def __init__(self, vendor: str) -> None: + + def __init__(self, vendor: str) -> None: """Initialize the MetadataEnricher. Args: vendor: The vendor name to attach to each item. - """ + """ self.vendor = vendor - def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: + def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: """Add vendor and metadata information to an item. Args: @@ -35,11 +37,12 @@ def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: Returns: The transformed dictionary containing the ``vendor`` field and a ``metadata.processed_at`` timestamp. - """ + """ item["vendor"] = self.vendor if "metadata" not in item: item["metadata"] = {} + # Standardizing to the UDM requirement for processed_at timestamps item["metadata"]["processed_at"] = datetime.utcnow().isoformat() return item diff --git a/transformers/pattern_normalizer.py b/transformers/framework/udm_transformers/pattern_normalizer.py similarity index 74% rename from transformers/pattern_normalizer.py rename to transformers/framework/udm_transformers/pattern_normalizer.py index f88212d0..7a85b25c 100644 --- a/transformers/pattern_normalizer.py +++ b/transformers/framework/udm_transformers/pattern_normalizer.py @@ -1,23 +1,26 @@ """Pattern normalization transformer. This module defines a generic pattern normalizer that ensures each item -has a ``pattern`` field. Currently, this transformer acts as a pass-through. +has a ``pattern`` field. Currently, this transformer acts as a pass-through +but serves as a hook for formal pattern representations. """ from typing import Any from typing import Dict -from .base_transformer import BaseTransformer +# Restructured to use the absolute path within the Framework layer +from transformers.framework.udm_transformers.base_transformer import \ + BaseTransformer -class PatternNormalizer(BaseTransformer): +class PatternNormalizer(BaseTransformer): """Normalize or enforce the presence of a pattern field in items. This transformer guarantees that each dictionary item contains a ``pattern`` key. If the key is missing, it is initialized to an empty string. """ - - def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: + + def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: """Ensure the item has a pattern field. Args: @@ -25,6 +28,6 @@ def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: Returns: The same dictionary with a ``pattern`` key ensured. - """ + """ item["pattern"] = item.get("pattern", "") return item diff --git a/transformers/type_mapper.py b/transformers/framework/udm_transformers/type_mapper.py similarity index 81% rename from transformers/type_mapper.py rename to transformers/framework/udm_transformers/type_mapper.py index 79a6afd8..d058d595 100644 --- a/transformers/type_mapper.py +++ b/transformers/framework/udm_transformers/type_mapper.py @@ -8,27 +8,29 @@ from typing import Any from typing import Dict -from .base_transformer import BaseTransformer +# Restructured to use the absolute path within the Framework layer +from transformers.framework.udm_transformers.base_transformer import \ + BaseTransformer -class TypeMapper(BaseTransformer): +class TypeMapper(BaseTransformer): """Map type values between vendor and universal models. This transformer replaces the ``type`` field of an item using a predefined mapping dictionary. If the type is not found in the mapping, it is left unchanged. """ - - def __init__(self, type_map: Dict[str, str]) -> None: + + def __init__(self, type_map: Dict[str, str]) -> None: """Initialize the TypeMapper. Args: type_map: A dictionary mapping source type values to destination type values. - """ + """ self.type_map = type_map - def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: + def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: """Transform an item's type field using the type mapping. Args: @@ -38,7 +40,7 @@ def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: Returns: The transformed item with its ``type`` field mapped according to the configured type map. - """ + """ item_type = item.get("type") if item_type in self.type_map: item["type"] = self.type_map[item_type] diff --git a/transformers/shared/exceptions.py b/transformers/shared/exceptions.py deleted file mode 100644 index 3d282c87..00000000 --- a/transformers/shared/exceptions.py +++ /dev/null @@ -1,24 +0,0 @@ -""" -exceptions base class. - -Purpose: - - -Context: - Part of the use case within the Unified Policy Transformation Framework. - -Responsibilities: - - - - - - - -Notes: - - Auto-generated placeholder module. - - Extend implementation as needed. - -Author: - - -Created: - -""" diff --git a/transformers/shared/logging.py b/transformers/shared/logging.py deleted file mode 100644 index 40d44e83..00000000 --- a/transformers/shared/logging.py +++ /dev/null @@ -1,24 +0,0 @@ -""" -logging base class. - -Purpose: - - -Context: - Part of the use case within the Unified Policy Transformation Framework. - -Responsibilities: - - - - - - - -Notes: - - Auto-generated placeholder module. - - Extend implementation as needed. - -Author: - - -Created: - -""" diff --git a/transformers/shared/utils.py b/transformers/shared/utils.py deleted file mode 100644 index 31d119f9..00000000 --- a/transformers/shared/utils.py +++ /dev/null @@ -1,24 +0,0 @@ -""" -utils base class. - -Purpose: - - -Context: - Part of the use case within the Unified Policy Transformation Framework. - -Responsibilities: - - - - - - - -Notes: - - Auto-generated placeholder module. - - Extend implementation as needed. - -Author: - - -Created: - -""" diff --git a/transformers/transformers.py b/transformers/transformers.py deleted file mode 100644 index 4f55395c..00000000 --- a/transformers/transformers.py +++ /dev/null @@ -1,371 +0,0 @@ -""" -Single-file SDK for bidirectional URL-filter migration transformers. - -This module provides: -- Universal intermediate URL-filter model -- Vendor-specific transformers for: Fortinet, Netskope, Zscaler, Prisma Access -- Modular basic transformers (action, pattern, type, category, metadata) -- Transformer pipelines (vendor → universal and universal → vendor) -- CLI and unit-test support -- Built-in sample mappings for immediate use -""" - -from dataclasses import dataclass -from dataclasses import field -from typing import Any -from typing import Dict -from typing import List - -# ---------------- UNIVERSAL DATA MODEL ---------------- - - -@dataclass -class UniversalURLFilter: - """Universal URL Filter Model.""" - - pattern: str - action: str - category: str - list_name: str = "" - list_id: str = "" - type: str = "literal" - vendor: str = "" - metadata: Dict[str, Any] = field(default_factory=dict) - - -# ---------------- BASIC TRANSFORMERS ---------------- - - -class BaseTransformer: - """Abstract base class for all transformers.""" - - def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: - """ - Transform a single item. - - Args: - item: Input dictionary from vendor configuration. - - Returns: - Transformed dictionary. - - Raises: - NotImplementedError: If the subclass does not override this method. - """ - raise NotImplementedError - - -class ActionMapper(BaseTransformer): - """Map vendor actions to universal actions (or reverse).""" - - def __init__(self, action_map: Dict[str, str]): - """ - Initialize the ActionMapper. - - Args: - action_map: Mapping from vendor action → universal action. - """ - self.action_map = action_map - - def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: - """ - Map the action field using the configured mapping. - - Args: - item: Input dictionary. - - Returns: - Dictionary with updated action. - """ - item["action"] = self.action_map.get(item.get("action"), item.get("action")) - return item - - -class PatternNormalizer(BaseTransformer): - """Normalize or pass through patterns unchanged.""" - - def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: - """ - Normalize the pattern field. - - Args: - item: Input dictionary. - - Returns: - Updated dictionary with normalized pattern. - """ - item["pattern"] = item.get("pattern", "") - return item - - -class NetskopePatternNormalizer(BaseTransformer): - """Normalize Netskope patterns and convert wildcards to regex formats.""" - - def wildcard_to_regex(self, pattern: str) -> str: - """ - Convert a Netskope wildcard pattern into a regex expression. - - Args: - pattern: Input wildcard pattern. - - Returns: - Regex-safe version of the pattern. - """ - if pattern.startswith("*."): - domain = pattern[2:] - domain = domain.replace(".", r"\.") - return rf"^([^.]+\.)*{domain}$" - else: - escaped = pattern.replace(".", r"\.") - return rf"^{escaped}$" - - def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: - """ - Transform Netskope patterns into the correct regex or literal format. - - Args: - item: Input dictionary. - - Returns: - Updated dictionary with Netskope-normalized pattern. - """ - raw = item.get("pattern", "") - utype = item.get("type", "literal") - - # literal/exact - if utype in ("literal", "exact"): - final = raw - item["netskope_type"] = "exact" - - # wildcard - elif utype == "wildcard": - final = self.wildcard_to_regex(raw) - item["netskope_type"] = "regex" - - # regex - elif utype == "regex": - final = raw - item["netskope_type"] = "regex" - - # substring → literal - else: - final = raw - item["netskope_type"] = "exact" - - # escape ONCE for JSON - final = final.replace("\\", "\\\\") - item["pattern"] = final - - return item - - -class TypeMapper(BaseTransformer): - """Map vendor pattern types to universal types (or reverse).""" - - def __init__(self, type_map: Dict[str, str]): - """ - Initialize the TypeMapper. - - Args: - type_map: Mapping from vendor type → universal type. - """ - self.type_map = type_map - - def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: - """ - Map the pattern type field. - - Vendor types may be inconsistent (uppercase/lowercase), - so normalization rules apply before mapping. - - Args: - item: Input dictionary. - - Returns: - Updated dictionary with normalized pattern type. - """ - vendor_type = item.get("type", "simple") - - vendor_type_normalized = ( - vendor_type.upper() - if vendor_type in ["STRING", "WILDCARD", "REGEX"] - else vendor_type.lower() - ) - - item["type"] = self.type_map.get(vendor_type_normalized, "literal") - return item - - -class CategoryMapper(BaseTransformer): - """Map vendor category IDs or names to universal names.""" - - def __init__(self, category_map: Dict[str, str]): - """ - Initialize the CategoryMapper. - - Args: - category_map: Mapping from vendor category → universal. - """ - self.category_map = category_map - - def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: - """ - Map the category field. - - Args: - item: Input dictionary. - - Returns: - Updated dictionary with normalized category. - """ - vendor_cat = str(item.get("category_id", "default")) - item["category"] = self.category_map.get(vendor_cat, "uncategorized") - return item - - -class MetadataEnricher(BaseTransformer): - """Enrich items with vendor name and embedded metadata fields.""" - - def __init__(self, vendor: str, extra_fields: List[str] | None = None): - """ - Initialize the MetadataEnricher. - - Args: - vendor: Vendor identifier. - extra_fields: Optional list of fields to copy to metadata. - """ - self.vendor = vendor - self.extra_fields = extra_fields or [] - - def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: - """ - Add vendor metadata fields to the item. - - Args: - item: Input dictionary. - - Returns: - Updated dictionary enriched with metadata. - """ - item["vendor"] = self.vendor - item.setdefault("metadata", {}) - - for field_name in self.extra_fields: - if field_name in item: - item["metadata"][field_name] = item[field_name] - - return item - - -# ---------------- PIPELINE EXECUTOR ---------------- - - -def apply_transformers( - items: List[Dict[str, Any]], transformers: List[BaseTransformer] -) -> List[Dict[str, Any]]: - """ - Apply a list of transformers sequentially to a list of items. - - Args: - items: List of vendor configuration dictionaries. - transformers: Ordered list of transformer instances. - - Returns: - List of transformed dictionaries. - """ - result = [] - for item in items: - for transformer in transformers: - item = transformer.transform(item) - result.append(item) - return result - - -# ---------------- VENDOR MAPPINGS ---------------- - -FORTINET_ACTION_MAP = {"block": "block", "allow": "allow", "monitor": "monitor"} -FORTINET_CATEGORY_MAP = {"3": "malware", "4": "phishing", "5": "gambling", "default": "uncategorized"} -FORTINET_TYPE_MAP = {"simple": "literal", "wildcard": "wildcard", "regex": "regex", "substring": "substring"} - -# ---------------- NETSKOPE ---------------- - -NETSKOPE_ACTION_MAP = {"block": "deny", "allow": "allow", "monitor": "monitor"} -NETSKOPE_CATEGORY_MAP = {"malware": "malware", "phishing": "phishing", "gambling": "gambling", "uncategorized": "uncategorized"} -NETSKOPE_TO_UNIVERSAL_TYPE_MAP = {"exact": "literal", "regex": "regex"} -UNIVERSAL_TO_NETSKOPE_TYPE_MAP = {"literal": "exact", "regex": "regex", "wildcard": "regex", "substring": "regex"} - -# ---------------- ZSCALER ---------------- - -ZSCALER_ACTION_MAP = {"block": "BLOCK", "allow": "ALLOW", "monitor": "MONITOR"} -ZSCALER_CATEGORY_MAP = {"malware": "malware", "phishing": "phishing", "gambling": "gambling", "uncategorized": "uncategorized"} -ZSCALER_TYPE_MAP = {"STRING": "literal", "WILDCARD": "wildcard", "REGEX": "regex"} - -# ---------------- PALO ALTO ---------------- - -PRISMA_ACTION_MAP = {"block": "deny", "allow": "allow", "monitor": "alert"} -PRISMA_CATEGORY_MAP = {"malware": "malware", "phishing": "phishing", "gambling": "gambling", "uncategorized": "uncategorized"} -PRISMA_TYPE_MAP = {"simple": "literal", "wildcard": "wildcard", "regex": "regex", "substring": "substring"} - -# ---------------- PIPELINE DEFINITIONS ---------------- - -VENDOR_TO_UNIVERSAL_PIPELINES = { - "fortinet": [ - ActionMapper(FORTINET_ACTION_MAP), - PatternNormalizer(), - TypeMapper(FORTINET_TYPE_MAP), - CategoryMapper(FORTINET_CATEGORY_MAP), - MetadataEnricher("fortinet"), - ], - "netskope": [ - ActionMapper(NETSKOPE_ACTION_MAP), - PatternNormalizer(), - TypeMapper(NETSKOPE_TO_UNIVERSAL_TYPE_MAP), - CategoryMapper(NETSKOPE_CATEGORY_MAP), - MetadataEnricher("netskope"), - ], - "zscaler": [ - ActionMapper(ZSCALER_ACTION_MAP), - PatternNormalizer(), - TypeMapper(ZSCALER_TYPE_MAP), - CategoryMapper(ZSCALER_CATEGORY_MAP), - MetadataEnricher("zscaler"), - ], - "prisma": [ - ActionMapper(PRISMA_ACTION_MAP), - PatternNormalizer(), - TypeMapper(PRISMA_TYPE_MAP), - CategoryMapper(PRISMA_CATEGORY_MAP), - MetadataEnricher("prisma"), - ], -} - -UNIVERSAL_TO_VENDOR_PIPELINES = { - "fortinet": [ - ActionMapper({v: k for k, v in FORTINET_ACTION_MAP.items()}), - PatternNormalizer(), - TypeMapper({v: k for k, v in FORTINET_TYPE_MAP.items()}), - CategoryMapper({v: k for k, v in FORTINET_CATEGORY_MAP.items()}), - MetadataEnricher("fortinet"), - ], - "netskope": [ - ActionMapper({v: k for k, v in NETSKOPE_ACTION_MAP.items()}), - NetskopePatternNormalizer(), - TypeMapper(UNIVERSAL_TO_NETSKOPE_TYPE_MAP), - CategoryMapper({v: k for k, v in NETSKOPE_CATEGORY_MAP.items()}), - MetadataEnricher("netskope"), - ], - "zscaler": [ - ActionMapper({v: k for k, v in ZSCALER_ACTION_MAP.items()}), - PatternNormalizer(), - TypeMapper({v: k for k, v in ZSCALER_TYPE_MAP.items()}), - CategoryMapper({v: k for k, v in ZSCALER_CATEGORY_MAP.items()}), - MetadataEnricher("zscaler"), - ], - "prisma": [ - ActionMapper({v: k for k, v in PRISMA_ACTION_MAP.items()}), - PatternNormalizer(), - TypeMapper({v: k for k, v in PRISMA_TYPE_MAP.items()}), - CategoryMapper({v: k for k, v in PRISMA_CATEGORY_MAP.items()}), - MetadataEnricher("prisma"), - ], -} diff --git a/transformers/vendors/fortinet_transformer.py b/transformers/vendors/fortinet_transformer.py deleted file mode 100644 index cd17d34c..00000000 --- a/transformers/vendors/fortinet_transformer.py +++ /dev/null @@ -1,82 +0,0 @@ -"""Fortinet transformation pipelines. - -This module defines action, category, and type mappings for Fortinet -URL filtering configurations, as well as transformation pipelines -to convert between Fortinet-specific and universal data models. -""" - -import jmespath - -from transformers.action_mapper import ActionMapper -from transformers.base_transformer import BaseTransformer -from transformers.category_mapper import CategoryMapper -from transformers.metadata_enricher import MetadataEnricher -from transformers.pattern_normalizer import PatternNormalizer -from transformers.type_mapper import TypeMapper - -# ---------------- FORTINET MAPPINGS ---------------- - -FORTINET_ACTION_MAP = { - "block": "block", - "allow": "allow", - "monitor": "monitor", -} - -FORTINET_CATEGORY_MAP = { - "3": "malware", - "4": "phishing", - "5": "gambling", - "default": "uncategorized", -} - -FORTINET_TYPE_MAP = { - "simple": "literal", - "wildcard": "wildcard", - "regex": "regex", - "substring": "substring", -} - - -# ---------------- FORTINET PIPELINES ---------------- - -VENDOR_TO_UNIVERSAL_PIPELINES = [ - ActionMapper(FORTINET_ACTION_MAP), - PatternNormalizer(), - TypeMapper(FORTINET_TYPE_MAP), - CategoryMapper(FORTINET_CATEGORY_MAP), - MetadataEnricher("fortinet"), -] - -UNIVERSAL_TO_VENDOR_PIPELINES = [ - ActionMapper({value: key for key, value in FORTINET_ACTION_MAP.items()}), - PatternNormalizer(), - TypeMapper({value: key for key, value in FORTINET_TYPE_MAP.items()}), - CategoryMapper({value: key for key, value in FORTINET_CATEGORY_MAP.items()}), - MetadataEnricher("fortinet"), -] - -JMESPATH_FLATTEN_URLS = """ -*[?modify_type!='Deleted'].*.data_urls.*.{ - pattern: url, - action: `allow`, - category_id: 'Uncategorized', - list_name: @.name, - list_id: @.object_id, - type: @.data_type -} -""" - -def flatten_fortinet_jmespath(url_lists: dict) -> list[dict]: - """ - Flatten Fortinet JSON using JMESPath. - - Args: - url_lists: Nested Fortinet JSON URL list. - - Returns: - Flat list of URL entries suitable for transformer input. - """ - result = jmespath.search(JMESPATH_FLATTEN_URLS, url_lists) - # jmespath returns a nested list, flatten if needed - flat_result = [item for sublist in result for item in sublist] if result else [] - return flat_result diff --git a/transformers/vendors/netskope_transformer.py b/transformers/vendors/netskope_transformer.py deleted file mode 100644 index ba3088e3..00000000 --- a/transformers/vendors/netskope_transformer.py +++ /dev/null @@ -1,236 +0,0 @@ -"""Netskope transformation pipelines and pattern normalization. - -This module defines transformers and mappings required to convert -Netskope URL list configurations to and from the universal data model. -""" -import re -from typing import Any -from typing import Dict -from typing import List -from typing import Optional - -import jmespath - -from transformers.action_mapper import ActionMapper -from transformers.base_transformer import BaseTransformer -from transformers.category_mapper import CategoryMapper -from transformers.metadata_enricher import MetadataEnricher -from transformers.pattern_normalizer import PatternNormalizer -from transformers.type_mapper import TypeMapper - -JMESPATH_NETSKOPE = """ -values(@)[?modify_type!='Deleted'].{ - list_name: name, - list_id: object_id, - type: data_type, - urls: values(data_urls) -} -""" - -def flatten_netskope_jmespath(url_lists: dict) -> list[dict]: - """Flatten the structure using jmespath.""" - extracted = jmespath.search(JMESPATH_NETSKOPE, url_lists) or [] - flat = [] - - for lst in extracted: - for entry in lst.get("urls", []): - url = entry.get("url") - if not url: - continue - flat.append({ - "pattern": url, - "action": "allow", - "category_id": "Uncategorized", - "list_name": lst["list_name"], - "list_id": str(lst["list_id"]), - "type": lst["type"] - }) - return flat - -class NetskopePatternNormalizer(BaseTransformer): - """Normalize Netskope URL patterns for vendor compatibility. - - This transformer converts universal URL patterns into Netskope- - compatible formats: - - - ``literal`` / ``exact`` patterns are preserved. - - ``wildcard`` patterns are converted into regex. - - ``regex`` patterns are passed through unchanged. - """ - - def wildcard_to_regex(self, pattern: str) -> str: - r"""Convert a wildcard domain pattern to a regex. - - Example: - ``*.example.com`` → ``^([^.]+\.)*example\.com$`` - - Args: - pattern: A wildcard URL pattern. - - Returns: - A regex representation of the wildcard pattern. - """ - if pattern.startswith("*."): - domain = pattern[2:].replace(".", r"\.") - return rf"^([^.]+\.)*{domain}$" - - return pattern - - def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: - """Transform a URL item into a Netskope-compatible pattern. - - Args: - item: A universal URL dictionary. - - Returns: - The transformed dictionary with Netskope pattern semantics. - """ - raw_pattern = item.get("pattern", "") - universal_type = item.get("type", "literal") - - if universal_type in ("literal", "exact"): - item["pattern"] = raw_pattern - item["netskope_type"] = "exact" - - elif universal_type in ("wildcard", "regex"): - item["pattern"] = self.wildcard_to_regex(raw_pattern) - item["netskope_type"] = "regex" - - else: - item["pattern"] = raw_pattern - item["netskope_type"] = "exact" - - return item - - def transform_list( - self, - items: List[Dict[str, Any]], - ) -> List[Dict[str, Any]]: - """Transform a list of URL items. - - Args: - items: A list of universal URL dictionaries. - - Returns: - A list of Netskope-compatible URL dictionaries. - """ - return [self.transform(item) for item in items] - - -class NetskopePatternDenormalizer(BaseTransformer): - """Convert Netskope patterns back to universal model patterns.""" - - def regex_to_wildcard(self, pattern: str) -> Optional[str]: - r"""Convert a Netskope regex pattern back to wildcard format. - - Example: - ``^([^.]+\\.)*example\\.com$`` → ``*.example.com`` - - Args: - pattern: A Netskope regex pattern. - - Returns: - A wildcard pattern if conversion is possible, otherwise ``None``. - """ - wildcard_regex = ( - r"^\^\(\[\^\.\]\+\\\.\)\*(.+)\\\.([a-zA-Z0-9\-]+)\$$" - ) - match = re.match(wildcard_regex, pattern) - - if match: - domain = f"{match.group(1)}.{match.group(2)}" - return f"*.{domain}" - - return None - - def is_regex(self, pattern: str) -> bool: - """Determine whether a pattern contains regex syntax. - - Args: - pattern: A URL pattern string. - - Returns: - ``True`` if the pattern appears to be a regex, otherwise ``False``. - """ - regex_markers = ("^", "$", "(", ")", "[", "]", "+", "?", "|", "{", "}") - return any(marker in pattern for marker in regex_markers) - - def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: - """Transform a Netskope URL item into a universal-compatible form. - - Args: - item: A Netskope URL dictionary. - - Returns: - A universal URL dictionary. - """ - pattern = item.get("pattern", "").replace("\\\\", "\\") - - if pattern.startswith("*.") and pattern.count("*") == 1: - item["type"] = "wildcard" - - elif "*" in pattern: - item["type"] = "regex" - - elif self.is_regex(pattern): - wildcard = self.regex_to_wildcard(pattern) - if wildcard: - item["type"] = "wildcard" - pattern = wildcard - else: - item["type"] = "regex" - - else: - item["type"] = "exact" - - item["pattern"] = pattern - item.pop("netskope_type", None) - - return item - - -# ---------------- NETSKOPE MAPPINGS ---------------- - -NETSKOPE_ACTION_MAP = { - "block": "deny", - "allow": "allow", - "monitor": "allow", -} - -NETSKOPE_CATEGORY_MAP = { - "malware": "malware", - "phishing": "phishing", - "gambling": "gambling", - "uncategorized": "uncategorized", -} - -NETSKOPE_TO_UNIVERSAL_TYPE_MAP = { - "exact": "literal", - "regex": "regex", -} - -UNIVERSAL_TO_NETSKOPE_TYPE_MAP = { - "literal": "exact", - "regex": "regex", - "wildcard": "regex", - "substring": "regex", -} - - -# ---------------- NETSKOPE PIPELINES ---------------- - -VENDOR_TO_UNIVERSAL_PIPELINES = [ - ActionMapper(NETSKOPE_ACTION_MAP), - TypeMapper(NETSKOPE_TO_UNIVERSAL_TYPE_MAP), - NetskopePatternDenormalizer(), - CategoryMapper(NETSKOPE_CATEGORY_MAP), - MetadataEnricher("netskope"), -] - -UNIVERSAL_TO_VENDOR_PIPELINES = [ - ActionMapper({value: key for key, value in NETSKOPE_ACTION_MAP.items()}), - TypeMapper(UNIVERSAL_TO_NETSKOPE_TYPE_MAP), - NetskopePatternNormalizer(), - CategoryMapper({value: key for key, value in NETSKOPE_CATEGORY_MAP.items()}), - MetadataEnricher("netskope"), -] diff --git a/transformers/vendors/prisma_transformer.py b/transformers/vendors/prisma_transformer.py deleted file mode 100644 index 87ec8120..00000000 --- a/transformers/vendors/prisma_transformer.py +++ /dev/null @@ -1,54 +0,0 @@ -"""Prisma transformation pipelines. - -This module defines action, category, and type mappings for Prisma -URL filtering configurations, along with transformation pipelines -to convert between Prisma-specific and universal data models. -""" - -from transformers.action_mapper import ActionMapper -from transformers.base_transformer import BaseTransformer -from transformers.category_mapper import CategoryMapper -from transformers.metadata_enricher import MetadataEnricher -from transformers.pattern_normalizer import PatternNormalizer -from transformers.type_mapper import TypeMapper - -# ---------------- PRISMA MAPPINGS ---------------- - -PRISMA_ACTION_MAP = { - "block": "deny", - "allow": "allow", - "monitor": "alert", -} - -PRISMA_CATEGORY_MAP = { - "malware": "malware", - "phishing": "phishing", - "gambling": "gambling", - "uncategorized": "uncategorized", -} - -PRISMA_TYPE_MAP = { - "simple": "literal", - "wildcard": "wildcard", - "regex": "regex", - "substring": "substring", -} - - -# ---------------- PRISMA PIPELINES ---------------- - -VENDOR_TO_UNIVERSAL_PIPELINES = [ - ActionMapper(PRISMA_ACTION_MAP), - PatternNormalizer(), - TypeMapper(PRISMA_TYPE_MAP), - CategoryMapper(PRISMA_CATEGORY_MAP), - MetadataEnricher("prisma"), -] - -UNIVERSAL_TO_VENDOR_PIPELINES = [ - ActionMapper({value: key for key, value in PRISMA_ACTION_MAP.items()}), - PatternNormalizer(), - TypeMapper({value: key for key, value in PRISMA_TYPE_MAP.items()}), - CategoryMapper({value: key for key, value in PRISMA_CATEGORY_MAP.items()}), - MetadataEnricher("prisma"), -] diff --git a/transformers/vendors/zscaler_transformer.py b/transformers/vendors/zscaler_transformer.py deleted file mode 100644 index 600cb85f..00000000 --- a/transformers/vendors/zscaler_transformer.py +++ /dev/null @@ -1,54 +0,0 @@ -"""Zscaler transformation pipelines. - -This module defines action, category, and type mappings for Zscaler -URL filtering configurations, along with transformation pipelines -to convert between Zscaler-specific and universal data models. -""" - -from transformers.action_mapper import ActionMapper -from transformers.base_transformer import BaseTransformer -from transformers.category_mapper import CategoryMapper -from transformers.metadata_enricher import MetadataEnricher -from transformers.pattern_normalizer import PatternNormalizer -from transformers.type_mapper import TypeMapper - -# ---------------- ZSCALER MAPPINGS ---------------- - -ZSCALER_ACTION_MAP = { - "block": "BLOCK", - "allow": "ALLOW", - "monitor": "MONITOR", -} - -ZSCALER_CATEGORY_MAP = { - "malware": "malware", - "phishing": "phishing", - "gambling": "gambling", - "uncategorized": "uncategorized", -} - -ZSCALER_TYPE_MAP = { - "STRING": "literal", - "WILDCARD": "wildcard", - "REGEX": "regex", -} - - -# ---------------- ZSCALER PIPELINES ---------------- - -VENDOR_TO_UNIVERSAL_PIPELINES = [ - ActionMapper(ZSCALER_ACTION_MAP), - PatternNormalizer(), - TypeMapper(ZSCALER_TYPE_MAP), - CategoryMapper(ZSCALER_CATEGORY_MAP), - MetadataEnricher("zscaler"), -] - -UNIVERSAL_TO_VENDOR_PIPELINES = [ - ActionMapper({value: key for key, value in ZSCALER_ACTION_MAP.items()}), - PatternNormalizer(), - TypeMapper({value: key for key, value in ZSCALER_TYPE_MAP.items()}), - CategoryMapper({value: key for key, value in ZSCALER_CATEGORY_MAP.items()}), - MetadataEnricher("zscaler"), -] -