wfexs_backend.workflow_engines

Contents

wfexs_backend.workflow_engines#

Submodules#

Package Contents#

Classes#

WorkflowType

engineName: symbolic name of the engine shortname: short name used in the WfExS-backend configuration files for the workflow language name: Textual representation of the workflow language clazz: Class implementing the engine invocation uriMatch: The URI patterns used in RO-Crate to identify the workflow type uriTemplate: The URI template to be used when RO-Crate ComputerLanguage is generated url: The URL used in RO-Crate to represent the workflow language trs_descriptor: The string used in GA4GH TRSv2 specification to define this workflow type rocrate_programming_language: Traditional internal id in RO-Crate implementations used for this workflow type (to be deprecated)

MaterializedWorkflowEngine

instance: Instance of the workflow engine version: Version of the engine to be used fingerprint: Fingerprint of the engine to be used (it could be the version) engine_path: Absolute path to the fetched engine workflow: Instance of LocalWorkflow containers_path: Where the containers are going to be available for offline-execute containers: List of Container instances (needed by workflow) operational_containers: List of Container instances (needed by engine)

StagedExecution

The description of the execution of a workflow, giving the relative directory of the output

AbstractWorkflowEngineType

WorkflowEngine

Data#

API#

wfexs_backend.workflow_engines.WORKDIR_INPUTS_RELDIR = 'inputs'#
wfexs_backend.workflow_engines.WORKDIR_EXTRAPOLATED_INPUTS_RELDIR = 'extrapolated-inputs'#
wfexs_backend.workflow_engines.WORKDIR_INTERMEDIATE_RELDIR = 'intermediate'#
wfexs_backend.workflow_engines.WORKDIR_META_RELDIR = 'meta'#
wfexs_backend.workflow_engines.WORKDIR_STATS_RELDIR = 'stats'#
wfexs_backend.workflow_engines.WORKDIR_OUTPUTS_RELDIR = 'outputs'#
wfexs_backend.workflow_engines.WORKDIR_ENGINE_TWEAKS_RELDIR = 'engineTweaks'#
wfexs_backend.workflow_engines.WORKDIR_WORKFLOW_RELDIR = 'workflow'#
wfexs_backend.workflow_engines.WORKDIR_CONSOLIDATED_WORKFLOW_RELDIR = 'consolidated-workflow'#
wfexs_backend.workflow_engines.WORKDIR_CONTAINERS_RELDIR = 'containers'#
wfexs_backend.workflow_engines.WORKDIR_STDOUT_FILE = 'cast(...)'#
wfexs_backend.workflow_engines.WORKDIR_STDERR_FILE = 'cast(...)'#
wfexs_backend.workflow_engines.WORKDIR_WORKFLOW_META_FILE = 'cast(...)'#
wfexs_backend.workflow_engines.WORKDIR_MARSHALLED_STAGE_FILE = 'cast(...)'#
wfexs_backend.workflow_engines.WORKDIR_MARSHALLED_EXECUTE_FILE = 'cast(...)'#
wfexs_backend.workflow_engines.WORKDIR_MARSHALLED_EXPORT_FILE = 'cast(...)'#
wfexs_backend.workflow_engines.WORKDIR_PASSPHRASE_FILE = 'cast(...)'#
wfexs_backend.workflow_engines.STATS_DAG_DOT_FILE = 'cast(...)'#
wfexs_backend.workflow_engines.DEFAULT_PRIORITY: Final[int] = 0#
class wfexs_backend.workflow_engines.WorkflowType#

Bases: typing.NamedTuple

engineName: symbolic name of the engine shortname: short name used in the WfExS-backend configuration files for the workflow language name: Textual representation of the workflow language clazz: Class implementing the engine invocation uriMatch: The URI patterns used in RO-Crate to identify the workflow type uriTemplate: The URI template to be used when RO-Crate ComputerLanguage is generated url: The URL used in RO-Crate to represent the workflow language trs_descriptor: The string used in GA4GH TRSv2 specification to define this workflow type rocrate_programming_language: Traditional internal id in RO-Crate implementations used for this workflow type (to be deprecated)

engineName: str = None#
shortname: str = None#
name: str = None#
clazz: Type[AbstractWorkflowEngineType] = None#
uriMatch: Sequence[Pattern[str] | URIType] = None#
uriTemplate: wfexs_backend.common.URIType = None#
url: wfexs_backend.common.URIType = None#
trs_descriptor: wfexs_backend.common.TRS_Workflow_Descriptor = None#
rocrate_programming_language: str = None#
priority: int = None#
enabled: bool = True#
classmethod _value_fixes() Mapping[str, str | None]#
property has_explicit_outputs: bool#
class wfexs_backend.workflow_engines.MaterializedWorkflowEngine#

Bases: typing.NamedTuple

instance: Instance of the workflow engine version: Version of the engine to be used fingerprint: Fingerprint of the engine to be used (it could be the version) engine_path: Absolute path to the fetched engine workflow: Instance of LocalWorkflow containers_path: Where the containers are going to be available for offline-execute containers: List of Container instances (needed by workflow) operational_containers: List of Container instances (needed by engine)

instance: AbstractWorkflowEngineType = None#
version: wfexs_backend.common.EngineVersion = None#
fingerprint: Fingerprint | str = None#
engine_path: pathlib.Path = None#
workflow: wfexs_backend.common.LocalWorkflow = None#
containers_path: pathlib.Path | None = None#
containers: Sequence[Container] | None = None#
operational_containers: Sequence[Container] | None = None#
classmethod _mapping_fixes(orig: Mapping[str, Any], workdir: pathlib.Path | None) Mapping[str, Any]#
class wfexs_backend.workflow_engines.StagedExecution#

Bases: typing.NamedTuple

The description of the execution of a workflow, giving the relative directory of the output

exitVal: wfexs_backend.common.ExitVal = None#
augmentedInputs: Sequence[MaterializedInput] = None#
matCheckOutputs: Sequence[MaterializedOutput] = None#
outputsDir: pathlib.Path = None#
started: datetime.datetime = None#
ended: datetime.datetime = None#
environment: Sequence[MaterializedInput] = []#
outputMetaDir: pathlib.Path | None = None#
diagram: pathlib.Path | None = None#
logfile: Sequence[pathlib.Path] = []#
profiles: Sequence[str] | None = None#
queued: datetime.datetime | None = None#
status: wfexs_backend.common.ExecutionStatus = None#
job_id: str | None = None#
classmethod _mapping_fixes(orig: Mapping[str, Any], workdir: pathlib.Path | None) Mapping[str, Any]#
class wfexs_backend.workflow_engines.AbstractWorkflowEngineType#

Bases: abc.ABC

abstract classmethod MyWorkflowType() wfexs_backend.workflow_engines.WorkflowType#
abstract classmethod HasExplicitOutputs() bool#
property workflowType: wfexs_backend.workflow_engines.WorkflowType#
abstract getConfiguredContainerType() wfexs_backend.common.ContainerType#
property configuredContainerType: wfexs_backend.common.ContainerType#
abstract property engine_url: wfexs_backend.common.URIType#
abstract _get_engine_version_str(matWfEng: wfexs_backend.workflow_engines.MaterializedWorkflowEngine) wfexs_backend.workflow_engines.WorkflowEngineVersionStr#

It must return a string in the form of “{symbolic engine name} {version}”

abstract sideContainers() Sequence[ContainerTaggedName]#
abstract materialize_containers(listOfContainerTags: Sequence[ContainerTaggedName], containersDir: pathlib.Path, offline: bool = False, force: bool = False, injectable_containers: Sequence[Container] = []) Tuple[ContainerEngineVersionStr, Sequence[Container], ContainerOperatingSystem, ProcessorArchitecture]#
abstract deploy_containers(containers_list: Sequence[Container], containersDir: pathlib.Path | None = None, force: bool = False) Sequence[Container]#
abstract property staged_containers_dir: pathlib.Path#
abstract materializeEngine(localWf: wfexs_backend.common.LocalWorkflow, engineVersion: EngineVersion | None = None, do_identify: bool = False) MaterializedWorkflowEngine | None#
abstract identifyWorkflow(localWf: wfexs_backend.common.LocalWorkflow, engineVer: EngineVersion | None = None) Tuple[EngineVersion, LocalWorkflow] | Tuple[None, None]#

This method should return the effective engine version needed to run it when this workflow engine recognizes the workflow type

abstract materializeWorkflow(matWorfklowEngine: wfexs_backend.workflow_engines.MaterializedWorkflowEngine, consolidatedWorkflowDir: pathlib.Path, offline: bool = False, profiles: Sequence[str] | None = None, context_inputs: Sequence[MaterializedInput] = [], context_environment: Sequence[MaterializedInput] = []) Tuple[MaterializedWorkflowEngine, Sequence[ContainerTaggedName]]#

Method to ensure the workflow has been materialized. It returns a possibly updated materialized workflow engine, as well as the list of containers

For Nextflow it is usually a no-op, but for CWL it requires resolution

abstract launchWorkflow(matWfEng: wfexs_backend.workflow_engines.MaterializedWorkflowEngine, inputs: Sequence[MaterializedInput], environment: Sequence[MaterializedInput], outputs: Sequence[ExpectedOutput], profiles: Sequence[str] | None = None) Iterator[StagedExecution]#
abstract classmethod FromStagedSetup(staged_setup: wfexs_backend.common.StagedSetup, container_factory_classes: Sequence[Type[ContainerFactory]] = [NoContainerFactory], progs_mapping: ProgsMapping | None = None, cache_dir: pathlib.Path | None = None, cache_workflow_dir: pathlib.Path | None = None, cache_workflow_inputs_dir: pathlib.Path | None = None, local_config: EngineLocalConfig | None = None, config_directory: pathlib.Path | None = None) wfexs_backend.workflow_engines.AbstractWorkflowEngineType#
exception wfexs_backend.workflow_engines.WorkflowEngineException#

Bases: wfexs_backend.common.AbstractWfExSException

Exceptions fired by instances of WorkflowEngine

Initialization

Initialize self. See help(type(self)) for accurate signature.

exception wfexs_backend.workflow_engines.WorkflowEngineInstallException#

Bases: wfexs_backend.workflow_engines.WorkflowEngineException

Exceptions fired by instances of WorkflowEngine when the engine could not be installed

Initialization

Initialize self. See help(type(self)) for accurate signature.

class wfexs_backend.workflow_engines.WorkflowEngine(container_factory_clazz: Type[ContainerFactory] = NoContainerFactory, cacheDir: pathlib.Path | None = None, engine_config: EngineLocalConfig | None = None, progs_mapping: ProgsMapping | None = None, engineTweaksDir: pathlib.Path | None = None, cacheWorkflowDir: pathlib.Path | None = None, cacheWorkflowInputsDir: pathlib.Path | None = None, workDir: pathlib.Path | None = None, outputsDir: pathlib.Path | None = None, outputMetaDir: pathlib.Path | None = None, intermediateDir: pathlib.Path | None = None, tempDir: pathlib.Path | None = None, stagedContainersDir: pathlib.Path | None = None, secure_exec: bool = False, allowOther: bool = False, config_directory: pathlib.Path | None = None, engine_mode: wfexs_backend.common.EngineMode = DEFAULT_ENGINE_MODE, writable_containers: bool = False)#

Bases: wfexs_backend.workflow_engines.AbstractWorkflowEngineType

ENGINE_NAME = 'abstract'#
classmethod FromStagedSetup(staged_setup: wfexs_backend.common.StagedSetup, container_factory_classes: Sequence[Type[ContainerFactory]] = [NoContainerFactory], progs_mapping: ProgsMapping | None = None, cache_dir: pathlib.Path | None = None, cache_workflow_dir: pathlib.Path | None = None, cache_workflow_inputs_dir: pathlib.Path | None = None, local_config: EngineLocalConfig | None = None, config_directory: pathlib.Path | None = None) wfexs_backend.workflow_engines.AbstractWorkflowEngineType#

Init method from staged setup instance

Parameters:
  • staged_setup

  • cache_dir

  • cache_workflow_dir

  • cache_workflow_inputs_dir

  • local_config

  • config_directory

getConfiguredContainerType() wfexs_backend.common.ContainerType#
abstract classmethod SupportedContainerTypes() Set[ContainerType]#
abstract classmethod SupportedSecureExecContainerTypes() Set[ContainerType]#
classmethod SupportsContainerType(container_type: wfexs_backend.common.ContainerType) bool#
classmethod SupportsContainerFactory(container_factory_clazz: Type[ContainerFactory]) bool#
classmethod SupportsSecureExecContainerType(container_type: wfexs_backend.common.ContainerType) bool#
classmethod SupportsSecureExecContainerFactory(container_factory_clazz: Type[ContainerFactory]) bool#
abstract identifyWorkflow(localWf: wfexs_backend.common.LocalWorkflow, engineVer: EngineVersion | None = None) Tuple[EngineVersion, LocalWorkflow] | Tuple[None, None]#

This method should return the effective engine version needed to run it when this workflow engine recognizes the workflow type

abstract materializeEngineVersion(engineVersion: wfexs_backend.common.EngineVersion) Tuple[EngineVersion, pathlib.Path, Fingerprint]#

Method to ensure the required engine version is materialized It should raise an exception when the exact version is unavailable, and no replacement could be fetched

static GetEngineVersion(matWfEng: wfexs_backend.workflow_engines.MaterializedWorkflowEngine) wfexs_backend.workflow_engines.WorkflowEngineVersionStr#

It must return a string in the form of “{symbolic engine name} {version}”

materializeEngine(localWf: wfexs_backend.common.LocalWorkflow, engineVersion: EngineVersion | None = None, do_identify: bool = False) MaterializedWorkflowEngine | None#

Method to ensure the required engine version is materialized It should raise an exception when the exact version is unavailable, and no replacement could be fetched

abstract materializeWorkflow(matWorfklowEngine: wfexs_backend.workflow_engines.MaterializedWorkflowEngine, consolidatedWorkflowDir: pathlib.Path, offline: bool = False, profiles: Sequence[str] | None = None, context_inputs: Sequence[MaterializedInput] = [], context_environment: Sequence[MaterializedInput] = []) Tuple[MaterializedWorkflowEngine, Sequence[ContainerTaggedName]]#

Method to ensure the workflow has been materialized. It returns the localWorkflow directory, as well as the list of containers

For Nextflow it is usually a no-op, but for CWL it requires resolution

sideContainers() Sequence[ContainerTaggedName]#

Containers needed by the engine to work

abstract simpleContainerFileName(imageUrl: wfexs_backend.common.URIType) Sequence[RelPath]#

This method must be implemented to tell which names expect the workflow engine on its container cache directories when an image is locally materialized (currently only useful for Singularity)

materialize_containers(listOfContainerTags: Sequence[ContainerTaggedName], containersDir: pathlib.Path | None = None, offline: bool = False, force: bool = False, injectable_containers: Sequence[Container] = []) Tuple[ContainerEngineVersionStr, Sequence[Container], ContainerOperatingSystem, ProcessorArchitecture]#
deploy_containers(containers_list: Sequence[Container], containersDir: pathlib.Path | None = None, force: bool = False) Sequence[Container]#
property staged_containers_dir: pathlib.Path#
create_job_directories() Tuple[str, pathlib.Path, pathlib.Path, pathlib.Path]#
abstract launchWorkflow(matWfEng: wfexs_backend.workflow_engines.MaterializedWorkflowEngine, inputs: Sequence[MaterializedInput], environment: Sequence[MaterializedInput], outputs: Sequence[ExpectedOutput], profiles: Sequence[str] | None = None) Iterator[StagedExecution]#
classmethod ExecuteWorkflow(matWfEng: wfexs_backend.workflow_engines.MaterializedWorkflowEngine, inputs: Sequence[MaterializedInput], environment: Sequence[MaterializedInput], outputs: Sequence[ExpectedOutput], profiles: Sequence[str] | None = None) Iterator[StagedExecution]#
classmethod MaterializeWorkflowAndContainers(matWfEng: wfexs_backend.workflow_engines.MaterializedWorkflowEngine, containersDir: pathlib.Path, consolidatedWorkflowDir: pathlib.Path, offline: bool = False, injectable_containers: Sequence[Container] = [], injectable_operational_containers: Sequence[Container] = [], profiles: Sequence[str] | None = None, context_inputs: Sequence[MaterializedInput] = [], context_environment: Sequence[MaterializedInput] = []) Tuple[MaterializedWorkflowEngine, ContainerEngineVersionStr, ContainerOperatingSystem, ProcessorArchitecture]#
GuessedCardinalityMapping = None#
GuessedOutputKindMapping: Mapping[str, ContentKind] = None#
identifyMaterializedOutputs(matInputs: Sequence[MaterializedInput], expectedOutputs: Sequence[ExpectedOutput], outputsDir: pathlib.Path, outputsMapping: Mapping[SymbolicOutputName, Any] | None = None) Sequence[MaterializedOutput]#

This method is used to identify outputs by either file glob descriptions or matching with a mapping