wfexs_backend.workflow_engines.nextflow_engine

Contents

wfexs_backend.workflow_engines.nextflow_engine#

Module Contents#

Classes#

Functions#

Data#

API#

wfexs_backend.workflow_engines.nextflow_engine.DEFAULT_STATIC_BASH_CMDS = None#
wfexs_backend.workflow_engines.nextflow_engine.DEFAULT_STATIC_PS_CMDS = None#
wfexs_backend.workflow_engines.nextflow_engine._tzstring() str#
class wfexs_backend.workflow_engines.nextflow_engine.NextflowWorkflowEngine(container_factory_clazz: Type[ContainerFactory] = NoContainerFactory, cacheDir: pathlib.Path | None = None, engine_config: EngineLocalConfig | None = None, progs_mapping: ProgsMapping | None = None, engineTweaksDir: pathlib.Path | None = None, cacheWorkflowDir: pathlib.Path | None = None, cacheWorkflowInputsDir: pathlib.Path | None = None, workDir: pathlib.Path | None = None, outputsDir: pathlib.Path | None = None, outputMetaDir: pathlib.Path | None = None, intermediateDir: pathlib.Path | None = None, tempDir: pathlib.Path | None = None, stagedContainersDir: pathlib.Path | None = None, secure_exec: bool = False, allowOther: bool = False, config_directory: pathlib.Path | None = None, writable_containers: bool = False)#

Bases: wfexs_backend.workflow_engines.WorkflowEngine

NEXTFLOW_REPO = 'https://github.com/nextflow-io/nextflow'#
DEFAULT_NEXTFLOW_VERSION = 'cast(...)'#
DEFAULT_NEXTFLOW_VERSION_WITH_PODMAN = 'cast(...)'#
DEFAULT_NEXTFLOW_VERSION_20_04 = 'cast(...)'#
DEFAULT_NEXTFLOW_DOCKER_IMAGE = 'nextflow/nextflow'#
DEFAULT_NEXTFLOW_ENTRYPOINT = 'main.nf'#
NEXTFLOW_CONFIG_FILENAME = 'nextflow.config'#
TROJAN_CONFIG_FILENAME = 'force-params-with-trojan.config'#
INPUT_DECLARATIONS_FILENAME = 'inputdeclarations.yaml'#
NEXTFLOW_IO = 'cast(...)'#
DEFAULT_MAX_RETRIES = 5#
DEFAULT_MAX_CPUS = 4#
ENGINE_NAME = 'nextflow'#
SUPPORTED_CONTAINER_TYPES = None#
SUPPORTED_SECURE_EXEC_CONTAINER_TYPES = None#
classmethod MyWorkflowType() wfexs_backend.workflow_engines.WorkflowType#
classmethod HasExplicitOutputs() bool#
classmethod SupportedContainerTypes() Set[ContainerType]#
classmethod SupportedSecureExecContainerTypes() Set[ContainerType]#
property engine_url: wfexs_backend.common.URIType#
NXF_VER_PAT: Pattern[str] = 'compile(...)'#
identifyWorkflow(localWf: wfexs_backend.common.LocalWorkflow, engineVer: EngineVersion | None = None) Tuple[EngineVersion, LocalWorkflow] | Tuple[None, None]#

This method should return the effective engine version needed to run it when this workflow engine recognizes the workflow type

materializeEngineVersion(engineVersion: wfexs_backend.common.EngineVersion) Tuple[EngineVersion, pathlib.Path, Fingerprint]#

Method to ensure the required engine version is materialized It should raise an exception when the exact version is unavailable, and no replacement could be fetched

runNextflowCommand(nextflow_version: wfexs_backend.common.EngineVersion, commandLine: Sequence[str], containers_path: pathlib.Path | None = None, workdir: pathlib.Path | None = None, intermediateDir: pathlib.Path | None = None, nextflow_path: pathlib.Path | None = None, stdoutFilename: pathlib.Path | None = None, stderrFilename: pathlib.Path | None = None, runEnv: Mapping[str, str] | None = None) Tuple[ExitVal, str | None, str | None]#
runLocalNextflowCommand(nextflow_version: wfexs_backend.common.EngineVersion, commandLine: Sequence[str], containers_path: pathlib.Path, workdir: pathlib.Path | None = None, intermediateDir: pathlib.Path | None = None, nextflow_install_dir: pathlib.Path | None = None, stdoutFilename: pathlib.Path | None = None, stderrFilename: pathlib.Path | None = None, runEnv: Mapping[str, str] | None = None) Tuple[ExitVal, str | None, str | None]#
runNextflowCommandInDocker(nextflow_version: wfexs_backend.common.EngineVersion, commandLine: Sequence[str], containers_path: pathlib.Path, workdir: pathlib.Path | None = None, intermediateDir: pathlib.Path | None = None, stdoutFilename: pathlib.Path | None = None, stderrFilename: pathlib.Path | None = None, runEnv: Mapping[str, str] | None = None) Tuple[ExitVal, str | None, str | None]#
_get_engine_version_str(matWfEng: wfexs_backend.workflow_engines.MaterializedWorkflowEngine) wfexs_backend.workflow_engines.WorkflowEngineVersionStr#
ContConfigPat: Pattern[str] = 'compile(...)'#
RegistryPat: Pattern[str] = 'compile(...)'#
C_URL_REGEX: Final[Pattern[str]] = 'compile(...)'#
C_DOCKER_REGEX: Final[Pattern[str]] = 'compile(...)'#
DSLEnablePat: Final[Pattern[str]] = 'compile(...)'#
_genDockSingContainerTaggedName(container_tag: str, registries: Mapping[ContainerType, str]) ContainerTaggedName | None#
materializeWorkflow(matWorkflowEngine: wfexs_backend.workflow_engines.MaterializedWorkflowEngine, consolidatedWorkflowDir: pathlib.Path, offline: bool = False, profiles: Sequence[str] | None = None, context_inputs: Sequence[MaterializedInput] = [], context_environment: Sequence[MaterializedInput] = []) Tuple[MaterializedWorkflowEngine, Sequence[ContainerTaggedName]]#

Method to ensure the workflow has been materialized. In the case of Nextflow, it returns the input matWorkflowEngine after the checks, as well as the list of containers

simpleContainerFileName(imageUrl: wfexs_backend.common.URIType) Sequence[RelPath]#

This method was borrowed from nextflow-io/nextflow and translated to Python

structureAsNXFParams(matInputs: Sequence[MaterializedInput], outputsDir: pathlib.Path) Mapping[str, Any]#
augmentNextflowInputs(matHash: Mapping[SymbolicParamName, MaterializedInput], allExecutionParams: Mapping[str, Any], prefix: str = '') Sequence[MaterializedInput]#

Generate additional MaterializedInput for the implicit params.

launchWorkflow(matWfEng: wfexs_backend.workflow_engines.MaterializedWorkflowEngine, matInputs: Sequence[MaterializedInput], matEnvironment: Sequence[MaterializedInput], outputs: Sequence[ExpectedOutput], profiles: Sequence[str] | None = None) Iterator[StagedExecution]#
inspectWorkflow(matWorkflowEngine: wfexs_backend.workflow_engines.MaterializedWorkflowEngine, consolidatedWorkflowDir: pathlib.Path, offline: bool = False, profiles: Sequence[str] | None = None, context_inputs: Sequence[MaterializedInput] = [], context_environment: Sequence[MaterializedInput] = []) Sequence[ContainerTaggedName]#