"""
Retrieve JSON schemas for validating dicts representing a ``pyproject.toml`` file.
"""
from __future__ import annotations
import json
import logging
import typing
from collections.abc import Iterator, Mapping, Sequence
from enum import Enum
from functools import partial, reduce
from types import MappingProxyType, ModuleType
from typing import (
Callable,
TypeVar,
)
import fastjsonschema as FJS
from . import _resources, errors, formats
from .error_reporting import detailed_errors
from .extra_validations import EXTRA_VALIDATIONS
from .types import FormatValidationFn, Schema, ValidationFn
_logger = logging.getLogger(__name__)
if typing.TYPE_CHECKING: # pragma: no cover
from .plugins import PluginProtocol
__all__ = ["Validator"]
assert __spec__ is not None
assert __spec__.parent is not None
_PARENT = __spec__.parent
T = TypeVar("T", bound=Mapping)
AllPlugins = Enum("AllPlugins", "ALL_PLUGINS") #: :meta private:
ALL_PLUGINS = AllPlugins.ALL_PLUGINS
TOP_LEVEL_SCHEMA = "pyproject_toml"
PROJECT_TABLE_SCHEMA = "project_metadata"
def _get_public_functions(module: ModuleType) -> Mapping[str, FormatValidationFn]:
return {
fn.__name__.replace("_", "-"): fn
for fn in module.__dict__.values()
if callable(fn) and not fn.__name__.startswith("_")
}
FORMAT_FUNCTIONS = MappingProxyType(_get_public_functions(formats))
def load(name: str, package: str = _PARENT, ext: str = ".schema.json") -> Schema:
"""Load the schema from a JSON Schema file.
The returned dict-like object is immutable.
:meta private: (low level detail)
"""
return Schema(json.loads(_resources.read_text(package, f"{name}{ext}")))
def load_builtin_plugin(name: str) -> Schema:
""":meta private: (low level detail)"""
return load(name, f"{_PARENT}.plugins")
class SchemaRegistry(Mapping[str, Schema]):
"""Repository of parsed JSON Schemas used for validating a ``pyproject.toml``.
During instantiation the schemas equivalent to PEP 517, PEP 518 and PEP 621
will be combined with the schemas for the ``tool`` subtables provided by the
plugins.
Since this object work as a mapping between each schema ``$id`` and the schema
itself, all schemas provided by plugins **MUST** have a top level ``$id``.
:meta private: (low level detail)
"""
def __init__(self, plugins: Sequence[PluginProtocol] = ()):
self._schemas: dict[str, tuple[str, str, Schema]] = {}
# (which part of the TOML, who defines, schema)
top_level = typing.cast("dict", load(TOP_LEVEL_SCHEMA)) # Make it mutable
self._spec_version: str = top_level["$schema"]
top_properties = top_level["properties"]
tool_properties = top_properties["tool"].setdefault("properties", {})
# Add PEP 621
project_table_schema = load(PROJECT_TABLE_SCHEMA)
self._ensure_compatibility(PROJECT_TABLE_SCHEMA, project_table_schema)
sid = project_table_schema["$id"]
top_level["project"] = {"$ref": sid}
origin = f"{__name__} - project metadata"
self._schemas = {sid: ("project", origin, project_table_schema)}
# Add tools using Plugins
for plugin in plugins:
if plugin.tool:
allow_overwrite: str | None = None
if plugin.tool in tool_properties:
_logger.warning(f"{plugin} overwrites `tool.{plugin.tool}` schema")
allow_overwrite = plugin.schema.get("$id")
else:
_logger.info(f"{plugin} defines `tool.{plugin.tool}` schema")
compatible = self._ensure_compatibility(
plugin.tool, plugin.schema, allow_overwrite
)
sid = compatible["$id"]
sref = f"{sid}#{plugin.fragment}" if plugin.fragment else sid
tool_properties[plugin.tool] = {"$ref": sref}
self._schemas[sid] = (f"tool.{plugin.tool}", plugin.id, plugin.schema)
else:
_logger.info(f"{plugin} defines extra schema {plugin.id}")
self._schemas[plugin.id] = (plugin.id, plugin.id, plugin.schema)
self._main_id: str = top_level["$id"]
main_schema = Schema(top_level)
origin = f"{__name__} - build metadata"
self._schemas[self._main_id] = ("<$ROOT>", origin, main_schema)
@property
def spec_version(self) -> str:
"""Version of the JSON Schema spec in use"""
return self._spec_version
@property
def main(self) -> str:
"""Top level schema for validating a ``pyproject.toml`` file"""
return self._main_id
def _ensure_compatibility(
self,
reference: str,
schema: Schema,
allow_overwrite: str | None = None,
) -> Schema:
if "$id" not in schema or not schema["$id"]:
raise errors.SchemaMissingId(reference or "<extra>")
sid = schema["$id"]
if sid in self._schemas and sid != allow_overwrite:
existing = self._schemas[sid][-1]
if dict(existing) != dict(schema):
raise errors.SchemaWithDuplicatedId(sid)
_logger.warning(
f"Duplicate schema {sid!r} for `tool.{reference}` ignored "
"(same schema already registered)"
)
return existing
version = schema.get("$schema")
# Support schemas with missing trailing # (incorrect, but required before 0.15)
if version and version.rstrip("#") != self.spec_version.rstrip("#"):
raise errors.InvalidSchemaVersion(
reference or sid, version, self.spec_version
)
return schema
def __getitem__(self, key: str) -> Schema:
return self._schemas[key][-1]
def __iter__(self) -> Iterator[str]:
return iter(self._schemas)
def __len__(self) -> int:
return len(self._schemas)
class RefHandler(Mapping[str, Callable[[str], Schema]]):
""":mod:`fastjsonschema` allows passing a dict-like object to load external schema
``$ref``s. Such objects map the URI schema (e.g. ``http``, ``https``, ``ftp``)
into a function that receives the schema URI and returns the schema (as parsed JSON)
(otherwise :mod:`urllib` is used and the URI is assumed to be a valid URL).
This class will ensure all the URIs are loaded from the local registry.
:meta private: (low level detail)
"""
def __init__(self, registry: Mapping[str, Schema]):
self._uri_schemas = ["http", "https"]
self._registry = registry
def __contains__(self, key: object) -> bool:
if isinstance(key, str):
if key not in self._uri_schemas:
self._uri_schemas.append(key)
return True
return False
def __iter__(self) -> Iterator[str]:
return iter(self._uri_schemas)
def __len__(self) -> int:
return len(self._uri_schemas)
def __getitem__(self, key: str) -> Callable[[str], Schema]:
"""All the references should be retrieved from the registry"""
return self._registry.__getitem__
[docs]
class Validator:
_plugins: Sequence[PluginProtocol]
def __init__(
self,
plugins: Sequence[PluginProtocol] | AllPlugins = ALL_PLUGINS,
format_validators: Mapping[str, FormatValidationFn] = FORMAT_FUNCTIONS,
extra_validations: Sequence[ValidationFn] = EXTRA_VALIDATIONS,
*,
extra_plugins: Sequence[PluginProtocol] = (),
):
self._code_cache: str | None = None
self._cache: ValidationFn | None = None
self._schema: Schema | None = None
# Let's make the following options readonly
self._format_validators = MappingProxyType(format_validators)
self._extra_validations = tuple(extra_validations)
if plugins is ALL_PLUGINS:
from .plugins import list_from_entry_points
plugins = list_from_entry_points()
self._plugins = (*plugins, *extra_plugins)
self._schema_registry = SchemaRegistry(self._plugins)
self.handlers = RefHandler(self._schema_registry)
@property
def registry(self) -> SchemaRegistry:
return self._schema_registry
@property
def schema(self) -> Schema:
"""Top level ``pyproject.toml`` JSON Schema"""
return Schema({"$ref": self._schema_registry.main})
@property
def extra_validations(self) -> Sequence[ValidationFn]:
"""List of extra validation functions that run after the JSON Schema check"""
return self._extra_validations
@property
def formats(self) -> Mapping[str, FormatValidationFn]:
"""Mapping between JSON Schema formats and functions that validates them"""
return self._format_validators
@property
def generated_code(self) -> str:
if self._code_cache is None:
fmts = dict(self.formats)
self._code_cache = FJS.compile_to_code(
self.schema, self.handlers, fmts, use_default=False
)
return self._code_cache
def __getitem__(self, schema_id: str) -> Schema:
"""Retrieve a schema from registry"""
return self._schema_registry[schema_id]
[docs]
def __call__(self, pyproject: T) -> T:
"""Checks a parsed ``pyproject.toml`` file (given as :obj:`typing.Mapping`)
and raises an exception when it is not a valid.
"""
if self._cache is None:
compiled = FJS.compile(
self.schema, self.handlers, dict(self.formats), use_default=False
)
fn = partial(compiled, custom_formats=self._format_validators)
self._cache = typing.cast("ValidationFn", fn)
with detailed_errors():
self._cache(pyproject)
return reduce(lambda acc, fn: fn(acc), self.extra_validations, pyproject)