wfexs_backend.workflow_engines.nextflow_engine
#
Module Contents#
Classes#
Functions#
Data#
API#
- wfexs_backend.workflow_engines.nextflow_engine.DEFAULT_STATIC_BASH_CMDS = None#
- wfexs_backend.workflow_engines.nextflow_engine.DEFAULT_STATIC_PS_CMDS = None#
- class wfexs_backend.workflow_engines.nextflow_engine.NextflowWorkflowEngine(container_factory_clazz: Type[ContainerFactory] = NoContainerFactory, cacheDir: pathlib.Path | None = None, engine_config: EngineLocalConfig | None = None, progs_mapping: ProgsMapping | None = None, engineTweaksDir: pathlib.Path | None = None, cacheWorkflowDir: pathlib.Path | None = None, cacheWorkflowInputsDir: pathlib.Path | None = None, workDir: pathlib.Path | None = None, outputsDir: pathlib.Path | None = None, outputMetaDir: pathlib.Path | None = None, intermediateDir: pathlib.Path | None = None, tempDir: pathlib.Path | None = None, stagedContainersDir: pathlib.Path | None = None, secure_exec: bool = False, allowOther: bool = False, config_directory: pathlib.Path | None = None, writable_containers: bool = False)#
Bases:
wfexs_backend.workflow_engines.WorkflowEngine
- NEXTFLOW_REPO = 'https://github.com/nextflow-io/nextflow'#
- DEFAULT_NEXTFLOW_VERSION = 'cast(...)'#
- DEFAULT_NEXTFLOW_VERSION_WITH_PODMAN = 'cast(...)'#
- DEFAULT_NEXTFLOW_VERSION_20_04 = 'cast(...)'#
- DEFAULT_NEXTFLOW_DOCKER_IMAGE = 'nextflow/nextflow'#
- DEFAULT_NEXTFLOW_ENTRYPOINT = 'main.nf'#
- NEXTFLOW_CONFIG_FILENAME = 'nextflow.config'#
- TROJAN_CONFIG_FILENAME = 'force-params-with-trojan.config'#
- INPUT_DECLARATIONS_FILENAME = 'inputdeclarations.yaml'#
- NEXTFLOW_IO = 'cast(...)'#
- DEFAULT_MAX_RETRIES = 5#
- DEFAULT_MAX_CPUS = 4#
- ENGINE_NAME = 'nextflow'#
- SUPPORTED_CONTAINER_TYPES = None#
- SUPPORTED_SECURE_EXEC_CONTAINER_TYPES = None#
- classmethod MyWorkflowType() wfexs_backend.workflow_engines.WorkflowType #
- classmethod SupportedContainerTypes() Set[ContainerType] #
- classmethod SupportedSecureExecContainerTypes() Set[ContainerType] #
- property engine_url: wfexs_backend.common.URIType#
- identifyWorkflow(localWf: wfexs_backend.common.LocalWorkflow, engineVer: EngineVersion | None = None) Tuple[EngineVersion, LocalWorkflow] | Tuple[None, None] #
This method should return the effective engine version needed to run it when this workflow engine recognizes the workflow type
- materializeEngineVersion(engineVersion: wfexs_backend.common.EngineVersion) Tuple[EngineVersion, pathlib.Path, Fingerprint] #
Method to ensure the required engine version is materialized It should raise an exception when the exact version is unavailable, and no replacement could be fetched
- runNextflowCommand(nextflow_version: wfexs_backend.common.EngineVersion, commandLine: Sequence[str], containers_path: pathlib.Path | None = None, workdir: pathlib.Path | None = None, intermediateDir: pathlib.Path | None = None, nextflow_path: pathlib.Path | None = None, stdoutFilename: pathlib.Path | None = None, stderrFilename: pathlib.Path | None = None, runEnv: Mapping[str, str] | None = None) Tuple[ExitVal, str | None, str | None] #
- runLocalNextflowCommand(nextflow_version: wfexs_backend.common.EngineVersion, commandLine: Sequence[str], containers_path: pathlib.Path, workdir: pathlib.Path | None = None, intermediateDir: pathlib.Path | None = None, nextflow_install_dir: pathlib.Path | None = None, stdoutFilename: pathlib.Path | None = None, stderrFilename: pathlib.Path | None = None, runEnv: Mapping[str, str] | None = None) Tuple[ExitVal, str | None, str | None] #
- runNextflowCommandInDocker(nextflow_version: wfexs_backend.common.EngineVersion, commandLine: Sequence[str], containers_path: pathlib.Path, workdir: pathlib.Path | None = None, intermediateDir: pathlib.Path | None = None, stdoutFilename: pathlib.Path | None = None, stderrFilename: pathlib.Path | None = None, runEnv: Mapping[str, str] | None = None) Tuple[ExitVal, str | None, str | None] #
- _get_engine_version_str(matWfEng: wfexs_backend.workflow_engines.MaterializedWorkflowEngine) wfexs_backend.workflow_engines.WorkflowEngineVersionStr #
- _genDockSingContainerTaggedName(container_tag: str, registries: Mapping[ContainerType, str]) ContainerTaggedName | None #
- materializeWorkflow(matWorkflowEngine: wfexs_backend.workflow_engines.MaterializedWorkflowEngine, consolidatedWorkflowDir: pathlib.Path, offline: bool = False, profiles: Sequence[str] | None = None, context_inputs: Sequence[MaterializedInput] = [], context_environment: Sequence[MaterializedInput] = []) Tuple[MaterializedWorkflowEngine, Sequence[ContainerTaggedName]] #
Method to ensure the workflow has been materialized. In the case of Nextflow, it returns the input matWorkflowEngine after the checks, as well as the list of containers
- simpleContainerFileName(imageUrl: wfexs_backend.common.URIType) Sequence[RelPath] #
This method was borrowed from nextflow-io/nextflow and translated to Python
- structureAsNXFParams(matInputs: Sequence[MaterializedInput], outputsDir: pathlib.Path) Mapping[str, Any] #
- augmentNextflowInputs(matHash: Mapping[SymbolicParamName, MaterializedInput], allExecutionParams: Mapping[str, Any], prefix: str = '') Sequence[MaterializedInput] #
Generate additional MaterializedInput for the implicit params.
- launchWorkflow(matWfEng: wfexs_backend.workflow_engines.MaterializedWorkflowEngine, matInputs: Sequence[MaterializedInput], matEnvironment: Sequence[MaterializedInput], outputs: Sequence[ExpectedOutput], profiles: Sequence[str] | None = None) Iterator[StagedExecution] #
- inspectWorkflow(matWorkflowEngine: wfexs_backend.workflow_engines.MaterializedWorkflowEngine, consolidatedWorkflowDir: pathlib.Path, offline: bool = False, profiles: Sequence[str] | None = None, context_inputs: Sequence[MaterializedInput] = [], context_environment: Sequence[MaterializedInput] = []) Sequence[ContainerTaggedName] #