wfexs_backend.utils.rocrate
#
Module Contents#
Classes#
Functions#
Data#
API#
- class wfexs_backend.utils.rocrate.ReproducibilityLevel#
Bases:
enum.IntEnum
- Minimal = 'auto(...)'#
- Metadata = 'auto(...)'#
- Full = 'auto(...)'#
- class wfexs_backend.utils.rocrate.ContainerTypeMetadata#
Bases:
typing.NamedTuple
- class wfexs_backend.utils.rocrate.ROCratePayload#
Bases:
typing.NamedTuple
- path: pathlib.Path = None#
- wfexs_backend.utils.rocrate.ContainerTypeMetadataDetails: Final[Mapping[ContainerType, ContainerTypeMetadata]] = None#
- wfexs_backend.utils.rocrate.ApplicationCategory2ContainerType: Final[Mapping[str, ContainerType]] = None#
- wfexs_backend.utils.rocrate.WORKFLOW_RUN_BASE: Final[str] = 'https://w3id.org/ro/terms/workflow-run'#
- wfexs_backend.utils.rocrate.ContentWithURIsMIMEs = None#
- wfexs_backend.utils.rocrate.RevContentWithURIsMIMEs = 'dict(...)'#
- class wfexs_backend.utils.rocrate.ContainerImageAdditionalType(*args, **kwds)#
Bases:
enum.Enum
- Docker = None#
- Singularity = None#
- wfexs_backend.utils.rocrate.StrContainerAdditionalType2ContainerImageAdditionalType: Final[Mapping[str, ContainerImageAdditionalType]] = None#
- wfexs_backend.utils.rocrate.ContainerType2AdditionalType: Final[Mapping[ContainerType, ContainerImageAdditionalType]] = None#
- wfexs_backend.utils.rocrate.AdditionalType2ContainerType: Final[Mapping[ContainerImageAdditionalType, ContainerType]] = None#
- wfexs_backend.utils.rocrate.LEGACY_ROCRATE_JSONLD_FILENAME: Final[str] = 'ro-crate-metadata.jsonld'#
- wfexs_backend.utils.rocrate.ReadROCrateMetadata(workflowROCrateFilename: pathlib.Path, public_name: str) Tuple[Any, pathlib.Path | None] #
- class wfexs_backend.utils.rocrate.ROCrateToolbox(wfexs: wfexs_backend.wfexs_backend.WfExSBackend)#
Bases:
abc.ABC
- SPARQL_NS = None#
- identifyROCrate(jsonld: Mapping[str, Any], public_name: str) Tuple[rdflib.query.ResultRow | None, rdflib.graph.Graph] #
This method is used to identify where the input JSON is a JSON-LD related to RO-Crate.
The returned value is a tuple, where the first element is the result row giving the QName of the root dataset, and the different profiles being matched: RO-Crate, Workflow RO-Crate, WRROC process and WRROC workflow. The second element of the returned tuple is the rdflib RDF graph from the read JSON-LD, which should allow exploring it.
- _parseContainersFromWorkflow(g: rdflib.graph.Graph, main_entity: rdflib.term.Identifier, payload_dir: pathlib.Path | None = None) Tuple[ContainerType, Sequence[Container]] | None #
- _parseContainersFromExecution(g: rdflib.graph.Graph, execution: rdflib.term.Identifier, main_entity: rdflib.term.Identifier, payload_dir: pathlib.Path | None = None) Tuple[ContainerType, Sequence[Container]] | None #
- __parseContainersResults(qcontainersres: rdflib.query.Result, main_entity: rdflib.term.Identifier, payload_dir: pathlib.Path | None = None) Tuple[ContainerType, Sequence[Container]] | None #
- _parseOutputsFromExecution(g: rdflib.graph.Graph, execution: rdflib.term.Identifier, main_entity: rdflib.term.Identifier, public_name: str) wfexs_backend.workflow.OutputsBlock #
- _parseOutputsFromMainEntity(g: rdflib.graph.Graph, main_entity: rdflib.term.Identifier, public_name: str) wfexs_backend.workflow.OutputsBlock #
- __parseOutputsResults(qoutputsres: rdflib.query.Result, g: rdflib.graph.Graph, public_name: str) wfexs_backend.workflow.OutputsBlock #
- _parseInputsFromExecution(g: rdflib.graph.Graph, execution: rdflib.term.Identifier, main_entity: rdflib.term.Identifier, default_licences: Sequence[str], public_name: str, payload_dir: pathlib.Path | None = None) Tuple[ParamsBlock, Sequence[MaterializedInput] | None] #
- _parseInputsFromMainEntity(g: rdflib.graph.Graph, main_entity: rdflib.term.Identifier, default_licences: Sequence[str], public_name: str, payload_dir: pathlib.Path | None = None) Tuple[ParamsBlock, Sequence[MaterializedInput] | None] #
- __processPayloadInput(inputrow: rdflib.query.ResultRow, payload_dir: pathlib.Path, the_uri: str, licences: Sequence[str], input_type: str, kindobj: wfexs_backend.common.ContentKind, cached_inputs_hash: MutableMapping[str, MaterializedInput]) MutableMapping[str, MaterializedInput] #
- __parseInputsResults(qinputsres: rdflib.query.Result, g: rdflib.graph.Graph, default_licences: Sequence[str], public_name: str, payload_dir: pathlib.Path | None = None) Tuple[ParamsBlock, Sequence[MaterializedInput] | None] #
- _parseEnvFromExecution(g: rdflib.graph.Graph, execution: rdflib.term.Identifier, main_entity: rdflib.term.Identifier, default_licences: Sequence[str], public_name: str, payload_dir: pathlib.Path | None = None) Tuple[EnvironmentBlock, Sequence[MaterializedInput] | None] #
- _parseEnvFromMainEntity(g: rdflib.graph.Graph, main_entity: rdflib.term.Identifier, default_licences: Sequence[str], public_name: str, payload_dir: pathlib.Path | None = None) Tuple[EnvironmentBlock, Sequence[MaterializedInput] | None] #
- __parseEnvResults(qenvres: rdflib.query.Result, g: rdflib.graph.Graph, default_licences: Sequence[str], public_name: str, payload_dir: pathlib.Path | None = None) Tuple[EnvironmentBlock, Sequence[MaterializedInput] | None] #
This method is (almost) identical to __parseInputsResults
- _getLicences(g: rdflib.graph.Graph, entity: rdflib.term.Identifier, public_name: str) Sequence[str] #
- __processPayloadEntity(the_entity: rdflib.term.Identifier, payload_dir: pathlib.Path, kindobj: wfexs_backend.common.ContentKind, entity_type: str, entity_name: str, the_file_size: rdflib.term.Node, the_file_sha256: rdflib.term.Node) ROCratePayload | None #
- __list_entity_parts(g: rdflib.graph.Graph, entity: rdflib.term.Identifier, public_name: str) rdflib.query.Result #
- __list_payload_entity_parts(g: rdflib.graph.Graph, entity: rdflib.term.Identifier, public_name: str, payload_dir: pathlib.Path) Sequence[str | ROCratePayload] #
- extractWorkflowMetadata(g: rdflib.graph.Graph, main_entity: rdflib.term.Identifier, default_repo: str | None, public_name: str, payload_dir: pathlib.Path | None = None) Tuple[RemoteRepo, WorkflowType, LocalWorkflow | None] #
- generateWorkflowMetaFromJSONLD(jsonld_obj: Mapping[str, Any], public_name: str, retrospective_first: bool = True, reproducibility_level: wfexs_backend.utils.rocrate.ReproducibilityLevel = ReproducibilityLevel.Metadata, strict_reproducibility_level: bool = False, payload_dir: pathlib.Path | None = None) Tuple[RemoteRepo, WorkflowType, ContainerType, ParamsBlock, Sequence[str] | None, EnvironmentBlock, OutputsBlock, LocalWorkflow | None, Sequence[Container], Sequence[MaterializedInput] | None, Sequence[MaterializedInput] | None] #