wfexs_backend.workflow_engines
#
Submodules#
Package Contents#
Classes#
engineName: symbolic name of the engine shortname: short name used in the WfExS-backend configuration files for the workflow language name: Textual representation of the workflow language clazz: Class implementing the engine invocation uriMatch: The URI patterns used in RO-Crate to identify the workflow type uriTemplate: The URI template to be used when RO-Crate ComputerLanguage is generated url: The URL used in RO-Crate to represent the workflow language trs_descriptor: The string used in GA4GH TRSv2 specification to define this workflow type rocrate_programming_language: Traditional internal id in RO-Crate implementations used for this workflow type (to be deprecated) |
|
instance: Instance of the workflow engine version: Version of the engine to be used fingerprint: Fingerprint of the engine to be used (it could be the version) engine_path: Absolute path to the fetched engine workflow: Instance of LocalWorkflow containers_path: Where the containers are going to be available for offline-execute containers: List of Container instances (needed by workflow) operational_containers: List of Container instances (needed by engine) |
|
The description of the execution of a workflow, giving the relative directory of the output |
|
Data#
API#
- wfexs_backend.workflow_engines.WORKDIR_INPUTS_RELDIR = 'inputs'#
- wfexs_backend.workflow_engines.WORKDIR_EXTRAPOLATED_INPUTS_RELDIR = 'extrapolated-inputs'#
- wfexs_backend.workflow_engines.WORKDIR_INTERMEDIATE_RELDIR = 'intermediate'#
- wfexs_backend.workflow_engines.WORKDIR_META_RELDIR = 'meta'#
- wfexs_backend.workflow_engines.WORKDIR_STATS_RELDIR = 'stats'#
- wfexs_backend.workflow_engines.WORKDIR_OUTPUTS_RELDIR = 'outputs'#
- wfexs_backend.workflow_engines.WORKDIR_ENGINE_TWEAKS_RELDIR = 'engineTweaks'#
- wfexs_backend.workflow_engines.WORKDIR_WORKFLOW_RELDIR = 'workflow'#
- wfexs_backend.workflow_engines.WORKDIR_CONSOLIDATED_WORKFLOW_RELDIR = 'consolidated-workflow'#
- wfexs_backend.workflow_engines.WORKDIR_CONTAINERS_RELDIR = 'containers'#
- wfexs_backend.workflow_engines.WORKDIR_STDOUT_FILE = 'cast(...)'#
- wfexs_backend.workflow_engines.WORKDIR_STDERR_FILE = 'cast(...)'#
- wfexs_backend.workflow_engines.WORKDIR_WORKFLOW_META_FILE = 'cast(...)'#
- wfexs_backend.workflow_engines.WORKDIR_MARSHALLED_STAGE_FILE = 'cast(...)'#
- wfexs_backend.workflow_engines.WORKDIR_MARSHALLED_EXECUTE_FILE = 'cast(...)'#
- wfexs_backend.workflow_engines.WORKDIR_MARSHALLED_EXPORT_FILE = 'cast(...)'#
- wfexs_backend.workflow_engines.WORKDIR_PASSPHRASE_FILE = 'cast(...)'#
- wfexs_backend.workflow_engines.STATS_DAG_DOT_FILE = 'cast(...)'#
- class wfexs_backend.workflow_engines.WorkflowType#
Bases:
typing.NamedTuple
engineName: symbolic name of the engine shortname: short name used in the WfExS-backend configuration files for the workflow language name: Textual representation of the workflow language clazz: Class implementing the engine invocation uriMatch: The URI patterns used in RO-Crate to identify the workflow type uriTemplate: The URI template to be used when RO-Crate ComputerLanguage is generated url: The URL used in RO-Crate to represent the workflow language trs_descriptor: The string used in GA4GH TRSv2 specification to define this workflow type rocrate_programming_language: Traditional internal id in RO-Crate implementations used for this workflow type (to be deprecated)
- clazz: Type[AbstractWorkflowEngineType] = None#
- uriTemplate: wfexs_backend.common.URIType = None#
- url: wfexs_backend.common.URIType = None#
- trs_descriptor: wfexs_backend.common.TRS_Workflow_Descriptor = None#
- class wfexs_backend.workflow_engines.MaterializedWorkflowEngine#
Bases:
typing.NamedTuple
instance: Instance of the workflow engine version: Version of the engine to be used fingerprint: Fingerprint of the engine to be used (it could be the version) engine_path: Absolute path to the fetched engine workflow: Instance of LocalWorkflow containers_path: Where the containers are going to be available for offline-execute containers: List of Container instances (needed by workflow) operational_containers: List of Container instances (needed by engine)
- instance: AbstractWorkflowEngineType = None#
- version: wfexs_backend.common.EngineVersion = None#
- engine_path: pathlib.Path = None#
- workflow: wfexs_backend.common.LocalWorkflow = None#
- containers_path: pathlib.Path | None = None#
- classmethod _mapping_fixes(orig: Mapping[str, Any], workdir: pathlib.Path | None) Mapping[str, Any] #
- class wfexs_backend.workflow_engines.StagedExecution#
Bases:
typing.NamedTuple
The description of the execution of a workflow, giving the relative directory of the output
- exitVal: wfexs_backend.common.ExitVal = None#
- augmentedInputs: Sequence[MaterializedInput] = None#
- matCheckOutputs: Sequence[MaterializedOutput] = None#
- outputsDir: pathlib.Path = None#
- started: datetime.datetime = None#
- ended: datetime.datetime = None#
- environment: Sequence[MaterializedInput] = []#
- outputMetaDir: pathlib.Path | None = None#
- diagram: pathlib.Path | None = None#
- logfile: Sequence[pathlib.Path] = []#
- queued: datetime.datetime | None = None#
- status: wfexs_backend.common.ExecutionStatus = None#
- classmethod _mapping_fixes(orig: Mapping[str, Any], workdir: pathlib.Path | None) Mapping[str, Any] #
- class wfexs_backend.workflow_engines.AbstractWorkflowEngineType#
Bases:
abc.ABC
- abstract classmethod MyWorkflowType() wfexs_backend.workflow_engines.WorkflowType #
- property workflowType: wfexs_backend.workflow_engines.WorkflowType#
- abstract getConfiguredContainerType() wfexs_backend.common.ContainerType #
- property configuredContainerType: wfexs_backend.common.ContainerType#
- abstract property engine_url: wfexs_backend.common.URIType#
- abstract _get_engine_version_str(matWfEng: wfexs_backend.workflow_engines.MaterializedWorkflowEngine) wfexs_backend.workflow_engines.WorkflowEngineVersionStr #
It must return a string in the form of “{symbolic engine name} {version}”
- abstract sideContainers() Sequence[ContainerTaggedName] #
- abstract materialize_containers(listOfContainerTags: Sequence[ContainerTaggedName], containersDir: pathlib.Path, offline: bool = False, force: bool = False, injectable_containers: Sequence[Container] = []) Tuple[ContainerEngineVersionStr, Sequence[Container], ContainerOperatingSystem, ProcessorArchitecture] #
- abstract deploy_containers(containers_list: Sequence[Container], containersDir: pathlib.Path | None = None, force: bool = False) Sequence[Container] #
- abstract property staged_containers_dir: pathlib.Path#
- abstract materializeEngine(localWf: wfexs_backend.common.LocalWorkflow, engineVersion: EngineVersion | None = None, do_identify: bool = False) MaterializedWorkflowEngine | None #
- abstract identifyWorkflow(localWf: wfexs_backend.common.LocalWorkflow, engineVer: EngineVersion | None = None) Tuple[EngineVersion, LocalWorkflow] | Tuple[None, None] #
This method should return the effective engine version needed to run it when this workflow engine recognizes the workflow type
- abstract materializeWorkflow(matWorfklowEngine: wfexs_backend.workflow_engines.MaterializedWorkflowEngine, consolidatedWorkflowDir: pathlib.Path, offline: bool = False, profiles: Sequence[str] | None = None, context_inputs: Sequence[MaterializedInput] = [], context_environment: Sequence[MaterializedInput] = []) Tuple[MaterializedWorkflowEngine, Sequence[ContainerTaggedName]] #
Method to ensure the workflow has been materialized. It returns a possibly updated materialized workflow engine, as well as the list of containers
For Nextflow it is usually a no-op, but for CWL it requires resolution
- abstract launchWorkflow(matWfEng: wfexs_backend.workflow_engines.MaterializedWorkflowEngine, inputs: Sequence[MaterializedInput], environment: Sequence[MaterializedInput], outputs: Sequence[ExpectedOutput], profiles: Sequence[str] | None = None) Iterator[StagedExecution] #
- abstract classmethod FromStagedSetup(staged_setup: wfexs_backend.common.StagedSetup, container_factory_classes: Sequence[Type[ContainerFactory]] = [NoContainerFactory], progs_mapping: ProgsMapping | None = None, cache_dir: pathlib.Path | None = None, cache_workflow_dir: pathlib.Path | None = None, cache_workflow_inputs_dir: pathlib.Path | None = None, local_config: EngineLocalConfig | None = None, config_directory: pathlib.Path | None = None) wfexs_backend.workflow_engines.AbstractWorkflowEngineType #
- exception wfexs_backend.workflow_engines.WorkflowEngineException#
Bases:
wfexs_backend.common.AbstractWfExSException
Exceptions fired by instances of WorkflowEngine
Initialization
Initialize self. See help(type(self)) for accurate signature.
- exception wfexs_backend.workflow_engines.WorkflowEngineInstallException#
Bases:
wfexs_backend.workflow_engines.WorkflowEngineException
Exceptions fired by instances of WorkflowEngine when the engine could not be installed
Initialization
Initialize self. See help(type(self)) for accurate signature.
- class wfexs_backend.workflow_engines.WorkflowEngine(container_factory_clazz: Type[ContainerFactory] = NoContainerFactory, cacheDir: pathlib.Path | None = None, engine_config: EngineLocalConfig | None = None, progs_mapping: ProgsMapping | None = None, engineTweaksDir: pathlib.Path | None = None, cacheWorkflowDir: pathlib.Path | None = None, cacheWorkflowInputsDir: pathlib.Path | None = None, workDir: pathlib.Path | None = None, outputsDir: pathlib.Path | None = None, outputMetaDir: pathlib.Path | None = None, intermediateDir: pathlib.Path | None = None, tempDir: pathlib.Path | None = None, stagedContainersDir: pathlib.Path | None = None, secure_exec: bool = False, allowOther: bool = False, config_directory: pathlib.Path | None = None, engine_mode: wfexs_backend.common.EngineMode = DEFAULT_ENGINE_MODE, writable_containers: bool = False)#
Bases:
wfexs_backend.workflow_engines.AbstractWorkflowEngineType
- ENGINE_NAME = 'abstract'#
- classmethod FromStagedSetup(staged_setup: wfexs_backend.common.StagedSetup, container_factory_classes: Sequence[Type[ContainerFactory]] = [NoContainerFactory], progs_mapping: ProgsMapping | None = None, cache_dir: pathlib.Path | None = None, cache_workflow_dir: pathlib.Path | None = None, cache_workflow_inputs_dir: pathlib.Path | None = None, local_config: EngineLocalConfig | None = None, config_directory: pathlib.Path | None = None) wfexs_backend.workflow_engines.AbstractWorkflowEngineType #
Init method from staged setup instance
- Parameters:
staged_setup
cache_dir
cache_workflow_dir
cache_workflow_inputs_dir
local_config
config_directory
- getConfiguredContainerType() wfexs_backend.common.ContainerType #
- abstract classmethod SupportedContainerTypes() Set[ContainerType] #
- abstract classmethod SupportedSecureExecContainerTypes() Set[ContainerType] #
- classmethod SupportsContainerType(container_type: wfexs_backend.common.ContainerType) bool #
- classmethod SupportsContainerFactory(container_factory_clazz: Type[ContainerFactory]) bool #
- classmethod SupportsSecureExecContainerType(container_type: wfexs_backend.common.ContainerType) bool #
- classmethod SupportsSecureExecContainerFactory(container_factory_clazz: Type[ContainerFactory]) bool #
- abstract identifyWorkflow(localWf: wfexs_backend.common.LocalWorkflow, engineVer: EngineVersion | None = None) Tuple[EngineVersion, LocalWorkflow] | Tuple[None, None] #
This method should return the effective engine version needed to run it when this workflow engine recognizes the workflow type
- abstract materializeEngineVersion(engineVersion: wfexs_backend.common.EngineVersion) Tuple[EngineVersion, pathlib.Path, Fingerprint] #
Method to ensure the required engine version is materialized It should raise an exception when the exact version is unavailable, and no replacement could be fetched
- static GetEngineVersion(matWfEng: wfexs_backend.workflow_engines.MaterializedWorkflowEngine) wfexs_backend.workflow_engines.WorkflowEngineVersionStr #
It must return a string in the form of “{symbolic engine name} {version}”
- materializeEngine(localWf: wfexs_backend.common.LocalWorkflow, engineVersion: EngineVersion | None = None, do_identify: bool = False) MaterializedWorkflowEngine | None #
Method to ensure the required engine version is materialized It should raise an exception when the exact version is unavailable, and no replacement could be fetched
- abstract materializeWorkflow(matWorfklowEngine: wfexs_backend.workflow_engines.MaterializedWorkflowEngine, consolidatedWorkflowDir: pathlib.Path, offline: bool = False, profiles: Sequence[str] | None = None, context_inputs: Sequence[MaterializedInput] = [], context_environment: Sequence[MaterializedInput] = []) Tuple[MaterializedWorkflowEngine, Sequence[ContainerTaggedName]] #
Method to ensure the workflow has been materialized. It returns the localWorkflow directory, as well as the list of containers
For Nextflow it is usually a no-op, but for CWL it requires resolution
- sideContainers() Sequence[ContainerTaggedName] #
Containers needed by the engine to work
- abstract simpleContainerFileName(imageUrl: wfexs_backend.common.URIType) Sequence[RelPath] #
This method must be implemented to tell which names expect the workflow engine on its container cache directories when an image is locally materialized (currently only useful for Singularity)
- materialize_containers(listOfContainerTags: Sequence[ContainerTaggedName], containersDir: pathlib.Path | None = None, offline: bool = False, force: bool = False, injectable_containers: Sequence[Container] = []) Tuple[ContainerEngineVersionStr, Sequence[Container], ContainerOperatingSystem, ProcessorArchitecture] #
- deploy_containers(containers_list: Sequence[Container], containersDir: pathlib.Path | None = None, force: bool = False) Sequence[Container] #
- property staged_containers_dir: pathlib.Path#
- create_job_directories() Tuple[str, pathlib.Path, pathlib.Path, pathlib.Path] #
- abstract launchWorkflow(matWfEng: wfexs_backend.workflow_engines.MaterializedWorkflowEngine, inputs: Sequence[MaterializedInput], environment: Sequence[MaterializedInput], outputs: Sequence[ExpectedOutput], profiles: Sequence[str] | None = None) Iterator[StagedExecution] #
- classmethod ExecuteWorkflow(matWfEng: wfexs_backend.workflow_engines.MaterializedWorkflowEngine, inputs: Sequence[MaterializedInput], environment: Sequence[MaterializedInput], outputs: Sequence[ExpectedOutput], profiles: Sequence[str] | None = None) Iterator[StagedExecution] #
- classmethod MaterializeWorkflowAndContainers(matWfEng: wfexs_backend.workflow_engines.MaterializedWorkflowEngine, containersDir: pathlib.Path, consolidatedWorkflowDir: pathlib.Path, offline: bool = False, injectable_containers: Sequence[Container] = [], injectable_operational_containers: Sequence[Container] = [], profiles: Sequence[str] | None = None, context_inputs: Sequence[MaterializedInput] = [], context_environment: Sequence[MaterializedInput] = []) Tuple[MaterializedWorkflowEngine, ContainerEngineVersionStr, ContainerOperatingSystem, ProcessorArchitecture] #
- GuessedCardinalityMapping = None#
- GuessedOutputKindMapping: Mapping[str, ContentKind] = None#
- identifyMaterializedOutputs(matInputs: Sequence[MaterializedInput], expectedOutputs: Sequence[ExpectedOutput], outputsDir: pathlib.Path, outputsMapping: Mapping[SymbolicOutputName, Any] | None = None) Sequence[MaterializedOutput] #
This method is used to identify outputs by either file glob descriptions or matching with a mapping