Skip to content
Merged
33 changes: 1 addition & 32 deletions transformers/domains/url/models.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
"""
URL Domain Models - Unified Data Model (UDM).

This module defines the canonical schema for URLs, URL collections, and
categories within the URL domain.
This module defines the canonical schema for URLs, URL collections.

Design Principles:

Expand All @@ -21,21 +20,6 @@
from pydantic import Field


class Category(BaseModel):
"""
Represents a normalized category entity.

This includes a stable identifier and taxonomic classification.
"""

id: str = Field(..., description="Internal unique identifier for the category")
name: str = Field(..., description="Human-readable name of the category")
type: Literal["standard", "custom"] = Field(
...,
description="Distinguishes between system-standard and user-defined categories",
)


class Metadata(BaseModel):
"""
Extensible container for enrichment data.
Expand Down Expand Up @@ -70,30 +54,15 @@ class URL_UDM(BaseModel):
type: Literal["literal", "wildcard", "regex"] = Field(
..., description="The syntax type of the pattern"
)
action: Literal["allow", "block", "monitor"] = Field(
..., description="Standardized enforcement action"
)
status: Literal["enable", "disable"] = Field(
..., description="Operational status of the rule"
)
url_list_id: str = Field(
..., description="Unique ID for the parent URL list"
)
url_list_name: str = Field(
..., description="Human-readable name of the URL list"
)

categories: List[Category] = Field(
default_factory=list,
description="Merged array of standard and custom categories",
)

vendor: Optional[str] = Field(
None, description="Original vendor for traceability purposes"
)
metadata: Optional[Metadata] = Field(
None, description="Processing metadata and timestamps"
)
notes: Optional[str] = Field(
None, description="Optional justifications or comments"
)
151 changes: 59 additions & 92 deletions transformers/domains/url/vendors/fortinet.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
"""
Fortinet URL Domain Integration.

This module implements the transformer, mapper, and exporter for Fortinet,
converting between Fortinet-specific configurations and the Unified Data
Model (UDM).
This module implements the Transformer, Mapper, and Exporter for Fortinet,
converting between Fortinet-specific configurations and the Pydantic
Unified Data Model (UDM).
"""

from datetime import datetime
from typing import Any
from typing import Dict
from typing import List
from typing import Optional

import jmespath

# Domain Model imports
from transformers.domains.url.models import URL_UDM
from transformers.domains.url.models import Category
from transformers.domains.url.models import Metadata
# Framework imports - Absolute paths
from transformers.framework.udm_transformers.action_mapper import ActionMapper
from transformers.framework.udm_transformers.category_mapper import \
CategoryMapper
from transformers.framework.udm_transformers.metadata_enricher import \
Expand All @@ -24,145 +28,107 @@
PatternNormalizer
from transformers.framework.udm_transformers.type_mapper import TypeMapper

FORTINET_ACTION_MAP = {
"allow": "allow",
"block": "block",
"monitor": "monitor",
"exempt": "allow",
}

FORTINET_CATEGORY_MAP = {
"3": "malware",
"4": "phishing",
"5": "gambling",
"default": "uncategorized",
}
# ---------------- FORTINET MAPPINGS ----------------

FORTINET_TYPE_MAP = {
"simple": "literal",
"wildcard": "wildcard",
"regex": "regex",
}

# ---------------- EXTRACTION LAYER ----------------


JMESPATH_FLATTEN_URLS = """
*.urls.*.{
pattern: url,
action: action,
status: status,
type: type,
url_id: url_id
}
"""


def flatten_fortinet_jmespath(raw_data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Flatten nested Fortinet URL data into normalized records."""
def flatten_fortinet_jmespath(raw_data):
"""Flatten nested Fortinet dict into a list of record dictionaries."""
flat = []

for _, url_list in raw_data.items():
list_id = url_list["object_id"]
list_name = url_list["filter_name"]

for _, item in url_list["urls"].items():
flat.append(
{
"pattern": item["url"],
"action": item["action"],
"status": item["status"],
"type": item["type"],
"url_id": item["url_id"],
"list_id": list_id,
"list_name": list_name,
"category_id": "Uncategorized",
}
)
flat.append({
"pattern": item["url"],
"type": item["type"],
"url_id": item["url_id"],
"list_id": list_id,
"list_name": list_name,
})

return flat

# ---------------- MAPPER & EXPORTER ----------------

class FortinetMapper:
"""Map transformed dictionaries into URL_UDM instances."""
"""Handle semantic alignment and Pydantic UDM instantiation."""

def to_udm(self, item: Dict[str, Any]) -> URL_UDM:
"""Convert a transformed dictionary into a validated URL_UDM."""
cat_id = item.get("category_id", "uncategorized")
categories = [
Category(
id=cat_id,
name=cat_id.capitalize(),
type="standard",
)
]

"""Convert a transformed dictionary into a validated URL_UDM instance."""
# Construct the Metadata model
# MetadataEnricher provides the ISO timestamp string
meta = Metadata(
processed_at=datetime.fromisoformat(
item["metadata"]["processed_at"]
),
source=item["metadata"].get("source"),
processed_at=datetime.fromisoformat(item["metadata"]["processed_at"]),
)

return URL_UDM(
pattern=item["pattern"],
type=item["type"],
action=item["action"],
status="enable",
url_list_id=str(item["list_id"]),
url_list_name=item["list_name"],
categories=categories,
vendor=item["vendor"],
metadata=meta,
notes=item.get("notes"),
)


class FortinetExporter:
"""Export URL_UDM records into Fortinet format."""
"""Universal Model -> Fortinet Format."""

def transform(self, udm: URL_UDM) -> Dict[str, Any]:
"""Reconstruct Fortinet-specific pattern and type syntax."""
reverse_type_map = {
value: key for key, value in FORTINET_TYPE_MAP.items()
}
# Reverse mapping for Type
reverse_type_map = {v: k for k, v in FORTINET_TYPE_MAP.items()}

return {
"url": udm.pattern,
"type": reverse_type_map.get(udm.type, "simple"),
"type": reverse_type_map.get(udm.type, "simple")
}


def run_universal_to_fortinet_pipeline(
records: List[URL_UDM],
) -> List[Dict[str, Any]]:
"""Transform universal records into Fortinet export records."""
def run_universal_to_fortinet_pipeline(records: List[URL_UDM]) -> List[dict]:
"""Execute the pipeline to convert UDM records back to Fortinet dicts."""
output = []

for record in records:
output.append(
{
"pattern": record.pattern,
"type": record.type,
"action": record.action,
"list_id": record.url_list_id,
"list_name": record.url_list_name,
}
)
for r in records:
output.append({
"pattern": r.pattern,
"type": r.type,
"list_id": r.url_list_id,
"list_name": r.url_list_name
})

return output


def export_fortinet_json(records: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Convert flat Fortinet records into grouped Fortinet JSON."""
def export_fortinet_json(records: List[dict]) -> dict:
"""Group flat records into the Fortinet-specific JSON structure."""
grouped = {}
counters = {}

for record in records:
key = record["list_id"]
for r in records:
key = r["list_id"]

if key not in grouped:
grouped[key] = {
"object_id": record["list_id"],
"filter_name": record["list_name"],
"urls": {},
"object_id": r["list_id"],
"filter_name": r["list_name"],
"urls": {}
}
counters[key] = 0

Expand All @@ -171,41 +137,42 @@ def export_fortinet_json(records: List[Dict[str, Any]]) -> Dict[str, Any]:

grouped[key]["urls"][idx] = {
"url_id": str(counters[key]),
"url": record["pattern"],
"type": record["type"],
"action": record["action"],
"status": "enable",
"url": r["pattern"],
"type": r["type"],
}

return grouped

# ---------------- EXECUTION PIPELINE ----------------

def run_fortinet_to_universal_pipeline(
raw_data: Dict[str, Any],
) -> List[URL_UDM]:
"""Run the full Fortinet-to-universal transformation pipeline."""
def run_fortinet_to_universal_pipeline(raw_data: Dict[str, Any]) -> List[URL_UDM]:
"""Orchestrate deterministic flow from raw Fortinet data to UDM objects."""
# 1. Extraction
flat_data = flatten_fortinet_jmespath(raw_data)

# 2. Transformation Pipeline
steps = [
ActionMapper(FORTINET_ACTION_MAP),
PatternNormalizer(),
TypeMapper(FORTINET_TYPE_MAP),
CategoryMapper(FORTINET_CATEGORY_MAP),
MetadataEnricher("fortinet"),
MetadataEnricher("fortinet")
]

mapper = FortinetMapper()
udm_records = []

for record in flat_data:
# Apply each modular transformation unit
for step in steps:
record = step.transform(record)

# 3. Validation & Pydantic Conversion
udm_records.append(mapper.to_udm(record))

return udm_records

# ---------------- REGISTRATION ----------------

# This is what debugPythonScript.py is looking for
VENDOR_TO_UNIVERSAL_PIPELINES = {
"fortinet": run_fortinet_to_universal_pipeline,
"fortinet": run_fortinet_to_universal_pipeline
}
Loading