Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 51 additions & 3 deletions packages/linkml/src/linkml/generators/jsonldcontextgen.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,22 @@ class ContextGenerator(Generator):
fix_multivalue_containers: bool | None = False
exclude_imports: bool = False
"""If True, elements from imported schemas won't be included in the generated context"""
exclude_external_imports: bool = False
"""If True, elements from URL-based external vocabulary imports are excluded.

Local file imports and linkml standard imports are kept. This is useful
when extending an external ontology (e.g. W3C Verifiable Credentials)
whose terms are ``@protected`` in their own JSON-LD context — redefining
them locally would violate JSON-LD 1.1 §4.1.11.

This flag is effective regardless of the ``mergeimports`` setting:
even with ``mergeimports=False``, external vocabulary elements can
leak into the context via the schema map.
"""
_local_classes: set | None = field(default=None, repr=False)
_local_slots: set | None = field(default=None, repr=False)
_external_classes: set | None = field(default=None, repr=False)
_external_slots: set | None = field(default=None, repr=False)

# Framing (opt-in via CLI flag)
emit_frame: bool = False
Expand All @@ -69,16 +83,39 @@ def __post_init__(self) -> None:
super().__post_init__()
if self.namespaces is None:
raise TypeError("Schema text must be supplied to context generator. Preparsed schema will not work")
if self.exclude_imports:
if self.exclude_imports or self.exclude_external_imports:
if self.schemaview:
sv = self.schemaview
else:
source = self.schema.source_file or self.schema
if isinstance(source, str) and self.base_dir and not Path(source).is_absolute():
source = str(Path(self.base_dir) / source)
sv = SchemaView(source, importmap=self.importmap, base_dir=self.base_dir)
self._local_classes = set(sv.all_classes(imports=False).keys())
self._local_slots = set(sv.all_slots(imports=False).keys())
if self.exclude_imports:
self._local_classes = set(sv.all_classes(imports=False).keys())
self._local_slots = set(sv.all_slots(imports=False).keys())
if self.exclude_external_imports:
self._external_classes, self._external_slots = self._collect_external_elements(sv)

@staticmethod
def _collect_external_elements(sv: SchemaView) -> tuple[set[str], set[str]]:
"""Identify classes and slots from URL-based external vocabulary imports.

Walks the SchemaView ``schema_map`` (populated by ``imports_closure``)
and collects element names from schemas whose import key starts with
``http://`` or ``https://``. Local file imports and ``linkml:``
standard imports are left untouched.
"""
sv.imports_closure()
external_classes: set[str] = set()
external_slots: set[str] = set()
for schema_key, schema_def in sv.schema_map.items():
if schema_key == sv.schema.name:
continue
if schema_key.startswith("http://") or schema_key.startswith("https://"):
external_classes.update(schema_def.classes.keys())
external_slots.update(schema_def.slots.keys())
return external_classes, external_slots

def visit_schema(self, base: str | Namespace | None = None, output: str | None = None, **_):
# Add any explicitly declared prefixes
Expand Down Expand Up @@ -194,6 +231,8 @@ def end_schema(
def visit_class(self, cls: ClassDefinition) -> bool:
if self.exclude_imports and cls.name not in self._local_classes:
return False
if self.exclude_external_imports and cls.name in self._external_classes:
return False

class_def = {}
cn = camelcase(cls.name)
Expand Down Expand Up @@ -246,6 +285,8 @@ def _literal_coercion_for_ranges(self, ranges: list[str]) -> tuple[bool, str | N
def visit_slot(self, aliased_slot_name: str, slot: SlotDefinition) -> None:
if self.exclude_imports and slot.name not in self._local_slots:
return
if self.exclude_external_imports and slot.name in self._external_slots:
return

if slot.identifier:
slot_def = "@id"
Expand Down Expand Up @@ -390,6 +431,13 @@ def serialize(
help="Use --exclude-imports to exclude imported elements from the generated JSON-LD context. This is useful when "
"extending an ontology whose terms already have context definitions in their own JSON-LD context file.",
)
@click.option(
"--exclude-external-imports/--no-exclude-external-imports",
default=False,
show_default=True,
help="Exclude elements from URL-based external vocabulary imports while keeping local file imports. "
"Useful when extending ontologies (e.g. W3C VC v2) whose terms are @protected in their own JSON-LD context.",
)
@click.version_option(__version__, "-V", "--version")
def cli(yamlfile, emit_frame, embed_context_in_frame, output, **args):
"""Generate jsonld @context definition from LinkML model"""
Expand Down
71 changes: 52 additions & 19 deletions packages/linkml/src/linkml/generators/pydanticgen/pydanticgen.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ def make_valid_python_identifier(name: str) -> str:
return identifier


def _is_valid_python_name(name: str) -> bool:
"""Check if a string is a valid Python identifier and not a keyword."""
return name.isidentifier() and not keyword.iskeyword(name)


@dataclass
class PydanticGenerator(OOCodeGenerator, LifecycleMixin):
"""
Expand Down Expand Up @@ -475,9 +480,10 @@ def generate_class(self, cls: ClassDefinition) -> ClassResult:
if cls.union_of:
return self._generate_union_class(cls)

class_python_name = self._get_class_python_name(cls.name)
pyclass = PydanticClass(
name=camelcase(cls.name),
bases=self.class_bases.get(camelcase(cls.name), PydanticBaseModel.default_name),
name=class_python_name,
bases=self.class_bases.get(class_python_name, PydanticBaseModel.default_name),
description=cls.description.replace('"', '\\"') if cls.description is not None else None,
)

Expand Down Expand Up @@ -537,14 +543,14 @@ def _generate_union_class(self, cls: ClassDefinition) -> ClassResult:
)

# Get the union types with string quotes to handle forward references
union_types = [f'"{camelcase(union_cls)}"' for union_cls in cls.union_of]
union_types = [f'"{self._get_class_python_name(union_cls)}"' for union_cls in cls.union_of]
union_type_str = f"Union[{', '.join(union_types)}]"

# Create a type alias instead of a class
# Sanitize description for single-line comment (replace newlines with spaces)
description = cls.description.replace("\n", " ").strip() if cls.description else None
pyclass = PydanticClass(
name=camelcase(cls.name),
name=self._get_class_python_name(cls.name),
bases=[], # Empty list for type aliases
description=description,
is_type_alias=True,
Expand Down Expand Up @@ -581,7 +587,7 @@ def generate_slot(self, slot: SlotDefinition, cls: ClassDefinition) -> SlotResul
del slot_args["alias"]

slot_args["description"] = slot.description.replace('"', '\\"') if slot.description is not None else None
predef = self.predefined_slot_values.get(camelcase(cls.name), {}).get(slot.name, None)
predef = self.predefined_slot_values.get(self._get_class_python_name(cls.name), {}).get(slot.name, None)
if predef is not None:
slot_args["predefined"] = str(predef)

Expand Down Expand Up @@ -658,21 +664,19 @@ def predefined_slot_values(self) -> dict[str, dict[str, str]]:
ifabsent_processor = PydanticIfAbsentProcessor(sv)
slot_values = defaultdict(dict)
for class_def in sv.all_classes().values():
class_python_name = self._get_class_python_name(class_def.name)
for slot_name in sv.class_slots(class_def.name):
slot = sv.induced_slot(slot_name, class_def.name)
if slot.designates_type:
target_value = get_type_designator_value(sv, slot, class_def)
slot_values[camelcase(class_def.name)][slot.name] = f'"{target_value}"'
slot_values[class_python_name][slot.name] = f'"{target_value}"'
if slot.multivalued:
slot_values[camelcase(class_def.name)][slot.name] = (
"[" + slot_values[camelcase(class_def.name)][slot.name] + "]"
slot_values[class_python_name][slot.name] = (
"[" + slot_values[class_python_name][slot.name] + "]"
)
slot_values[camelcase(class_def.name)][slot.name] = slot_values[camelcase(class_def.name)][
slot.name
]
elif slot.ifabsent is not None:
value = ifabsent_processor.process_slot(slot, class_def)
slot_values[camelcase(class_def.name)][slot.name] = value
slot_values[class_python_name][slot.name] = value

self._predefined_slot_values = slot_values

Expand All @@ -690,19 +694,46 @@ def class_bases(self) -> dict[str, list[str]]:
for class_def in sv.all_classes().values():
class_parents = []
if class_def.is_a:
class_parents.append(camelcase(class_def.is_a))
class_parents.append(self._get_class_python_name(class_def.is_a))
if self.gen_mixin_inheritance and class_def.mixins:
class_parents.extend([camelcase(mixin) for mixin in class_def.mixins])
class_parents.extend([self._get_class_python_name(mixin) for mixin in class_def.mixins])
if len(class_parents) > 0:
# Use the sorted list of classes to order the parent classes, but reversed to match MRO needs
class_parents.sort(
key=lambda x: self.sorted_class_names.index(x) if x in self.sorted_class_names else -1
)
class_parents.reverse()
parents[camelcase(class_def.name)] = class_parents
parents[self._get_class_python_name(class_def.name)] = class_parents
self._class_bases = parents
return self._class_bases

def _get_class_python_name(self, class_name: str) -> str:
"""
Get a valid Python class name for a LinkML class.

Tries ``camelcase(name)`` first. If that is not a valid Python identifier,
falls back to ``camelcase(alias)`` when the class defines one. Raises
:class:`ValueError` if neither yields a valid identifier.
"""
python_name = camelcase(class_name)
if _is_valid_python_name(python_name):
return python_name

class_def = self.schemaview.get_class(class_name)
if class_def and class_def.alias:
alias_name = camelcase(class_def.alias)
if _is_valid_python_name(alias_name):
return alias_name
raise ValueError(
f"Class '{class_name}' has alias '{class_def.alias}' but "
f"'{alias_name}' is not a valid Python identifier"
)

raise ValueError(
f"Class name '{class_name}' (Python: '{python_name}') is not a valid Python identifier. "
"Consider providing a class alias that is a valid Python identifier."
)

def get_mixin_identifier_range(self, mixin) -> str:
sv = self.schemaview
id_ranges = list(
Expand Down Expand Up @@ -738,9 +769,10 @@ def get_class_slot_range(self, slot_range: str, inlined: bool, inlined_as_list:
len([x for x in sv.class_induced_slots(slot_range) if x.designates_type]) > 0
and len(sv.class_descendants(slot_range)) > 1
):
return "Union[" + ",".join([camelcase(c) for c in sv.class_descendants(slot_range)]) + "]"
descendants = [self._get_class_python_name(c) for c in sv.class_descendants(slot_range)]
return "Union[" + ",".join(descendants) + "]"
else:
return f"{camelcase(slot_range)}"
return f"{self._get_class_python_name(slot_range)}"

# For the more difficult cases, set string as the default and attempt to improve it
range_cls_identifier_slot_range = "str"
Expand Down Expand Up @@ -1064,7 +1096,8 @@ def _get_element_import(self, class_name: ElementName) -> Import:
schema_name = self.schemaview.element_by_schema_map()[class_name]
schema = [s for s in self.schemaview.schema_map.values() if s.name == schema_name][0]
module = self.generate_module_import(schema, self.split_context)
return Import(module=module, objects=[ObjectImport(name=camelcase(class_name))], is_schema=True)
python_name = self._get_class_python_name(class_name)
return Import(module=module, objects=[ObjectImport(name=python_name)], is_schema=True)

def render(self) -> PydanticModule:
"""
Expand Down Expand Up @@ -1107,7 +1140,7 @@ def render(self) -> PydanticModule:
# just swap in typing.Any instead down below
source_classes = [c for c in source_classes if c.class_uri != "linkml:Any"]
source_classes = self.before_generate_classes(source_classes, sv)
self.sorted_class_names = [camelcase(c.name) for c in source_classes]
self.sorted_class_names = [self._get_class_python_name(c.name) for c in source_classes]
for cls in source_classes:
cls = self.before_generate_class(cls, sv)
result = self.generate_class(cls)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
sqlalchemy_declarative_2x_template_str = """\
from __future__ import annotations

from datetime import date, datetime, time
from decimal import Decimal

from sqlalchemy import (
Boolean,
Date,
DateTime,
Enum,
Float,
ForeignKey,
Integer,
Numeric,
Text,
Time,
)
from sqlalchemy.ext.associationproxy import AssociationProxy, association_proxy
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship


class Base(DeclarativeBase):
pass


metadata = Base.metadata
{% for c in classes %}


class {{ classname(c.name) }}({% if c.is_a %}{{ classname(c.is_a) }}{% else %}Base{% endif %}):
\"\"\"
{% if c.description %}
{{ c.description }}
{% else %}
{{ c.alias }}
{% endif %}
\"\"\"

__tablename__ = "{{ c.name }}"

{% for s in c.attributes.values() %}
{% set pytype = python_type(s.annotations['sql_type'].value) %}
{% if 'primary_key' in s.annotations %}
{{ s.alias }}: Mapped[{{ pytype }}] = mapped_column({{ s.annotations['sql_type'].value }}
{%- if 'foreign_key' in s.annotations %}, ForeignKey("{{ s.annotations['foreign_key'].value }}"){% endif -%}
, primary_key=True
{%- if 'autoincrement' in s.annotations %}, autoincrement=True{% endif -%}
)
{% elif 'required' in s.annotations %}
{{ s.alias }}: Mapped[{{ pytype }}] = mapped_column({{ s.annotations['sql_type'].value }}
{%- if 'foreign_key' in s.annotations %}, ForeignKey("{{ s.annotations['foreign_key'].value }}"){% endif -%}
)
{% else %}
{{ s.alias }}: Mapped[{{ pytype }} | None] = mapped_column({{ s.annotations['sql_type'].value }}
{%- if 'foreign_key' in s.annotations %}, ForeignKey("{{ s.annotations['foreign_key'].value }}"){% endif -%}
)
{% endif %}
{% if 'foreign_key' in s.annotations and 'original_slot' in s.annotations %}
{{ s.annotations['original_slot'].value }}: Mapped[{{ classname(s.range) }} | None] = relationship(foreign_keys=[{{ s.alias }}])
{% endif %}
{% endfor %}
{% for mapping in backrefs[c.name] %}
{% if mapping.mapping_type == "ManyToMany" %}

# ManyToMany
{{ mapping.source_slot }}: Mapped[list[{{ classname(mapping.target_class) }}]] = relationship(secondary="{{ mapping.join_class }}")
{% elif mapping.mapping_type == "MultivaluedScalar" %}

{{ mapping.source_slot }}_rel: Mapped[list[{{ classname(mapping.join_class) }}]] = relationship()
{{ mapping.source_slot }}: AssociationProxy[list[str]] = association_proxy(
"{{ mapping.source_slot }}_rel",
"{{ mapping.target_slot }}",
creator=lambda x_: {{ classname(mapping.join_class) }}({{ mapping.target_slot }}=x_),
)
{% else %}

# One-To-Many: {{ mapping }}
{{ mapping.source_slot }}: Mapped[list[{{ classname(mapping.target_class) }}]] = relationship(foreign_keys="[{{ mapping.target_class }}.{{ mapping.target_slot }}]")
{% endif %}
{% endfor %}

def __repr__(self):
return f"{{ c.name }}(
{%- for s in c.attributes.values() -%}
{{ s.alias }}={self.{{ s.alias }}},
{%- endfor -%}
)"
{% if c.is_a or c.mixins %}

__mapper_args__ = {"concrete": True}
{% endif %}
{% endfor %}
"""
Loading
Loading