wfexs_backend.workflow

Contents

wfexs_backend.workflow#

Module Contents#

Classes#

ExportItem

ExportAction

MaterializedExportAction

The description of an export action which was materialized, so a permanent identifier was obtained, along with some metadata

DefaultMissing

This is inspired in the example available at https://docs.python.org/3/library/stdtypes.html#str.format_map

WF

Workflow enaction class

Functions#

_wakeupEncDir

This method periodically checks whether the directory is still available

API#

class wfexs_backend.workflow.ExportItem#

Bases: typing.NamedTuple

type: wfexs_backend.common.ExportItemType = None#
block: str | None = None#
name: SymbolicParamName | SymbolicOutputName | None = None#
class wfexs_backend.workflow.ExportAction#

Bases: typing.NamedTuple

action_id: wfexs_backend.common.SymbolicName = None#
plugin_id: wfexs_backend.common.SymbolicName = None#
what: Sequence[ExportItem] = None#
context_name: SymbolicName | None = None#
setup: SecurityContextConfig | None = None#
preferred_scheme: str | None = None#
preferred_id: str | None = None#
licences: Sequence[str] = []#
title: str | None = None#
description: str | None = None#
custom_metadata: Mapping[str, Any] | None = None#
community_custom_metadata: Mapping[str, Any] | None = None#
class wfexs_backend.workflow.MaterializedExportAction#

Bases: typing.NamedTuple

The description of an export action which was materialized, so a permanent identifier was obtained, along with some metadata

action: wfexs_backend.workflow.ExportAction = None#
elems: Sequence[AnyContent] = None#
pids: Sequence[URIWithMetadata] = None#
when: datetime.datetime = 'astimezone(...)'#
class wfexs_backend.workflow.DefaultMissing#

Bases: typing.Dict[wfexs_backend.workflow.KT, wfexs_backend.workflow.VT]

This is inspired in the example available at https://docs.python.org/3/library/stdtypes.html#str.format_map

Initialization

Initialize self. See help(type(self)) for accurate signature.

__missing__(key: wfexs_backend.workflow.KT) wfexs_backend.workflow.VT#
wfexs_backend.workflow._wakeupEncDir(cond: threading.Condition, workDir: pathlib.Path, logger: logging.Logger) None#

This method periodically checks whether the directory is still available

exception wfexs_backend.workflow.WFException#

Bases: wfexs_backend.common.AbstractWfExSException

exception wfexs_backend.workflow.ExportActionException#

Bases: wfexs_backend.common.AbstractWfExSException

exception wfexs_backend.workflow.WFWarning#

Bases: UserWarning

class wfexs_backend.workflow.WF(wfexs: wfexs_backend.wfexs_backend.WfExSBackend, workflow_id: WorkflowId | None = None, version_id: WFVersionId | None = None, descriptor_type: TRS_Workflow_Descriptor | None = None, trs_endpoint: str = DEFAULT_TRS_ENDPOINT, params: ParamsBlock | None = None, enabled_profiles: Sequence[str] | None = None, environment: EnvironmentBlock | None = None, outputs: OutputsBlock | None = None, placeholders: PlaceHoldersBlock | None = None, default_actions: Sequence[ExportActionBlock] | None = None, workflow_config: WorkflowConfigBlock | None = None, vault: SecurityContextVault | None = None, instanceId: WfExSInstanceId | None = None, nickname: str | None = None, orcids: Sequence[str] = [], creation: datetime.datetime | None = None, rawWorkDir: pathlib.Path | None = None, paranoid_mode: bool | None = None, public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, fail_ok: bool = False, cached_repo: Tuple[RemoteRepo, WorkflowType] | None = None, cached_workflow: LocalWorkflow | None = None, cached_inputs: Sequence[MaterializedInput] | None = None, cached_environment: Sequence[MaterializedInput] | None = None, preferred_containers: Sequence[Container] = [], preferred_operational_containers: Sequence[Container] = [], reproducibility_level: wfexs_backend.utils.rocrate.ReproducibilityLevel = ReproducibilityLevel.Minimal, strict_reproducibility_level: bool = False)#

Workflow enaction class

Initialization

Init function

Parameters:
  • wfexs – A WfExSBackend instance

  • workflow_id – A unique identifier of the workflow. Although it is an integer in WorkflowHub,

we cannot assume it is so in all the GA4GH TRS implementations which are exposing workflows. :param version_id: An identifier of the workflow version. Although it is an integer in WorkflowHub, we cannot assume the format of the version id, as it could follow semantic versioning, providing an UUID, etc. :param descriptor_type: The type of descriptor that represents this version of the workflow (e.g. CWL, WDL, NFL, or GALAXY). It is optional, so it is guessed from the calls to the API. It can be either the short name of the workflow engine, or the name used by GA4GH TRS. :param trs_endpoint: The TRS endpoint used to find the workflow. :param params: Optional params for the workflow execution. :param outputs: :param workflow_config: Tweaks for workflow enactment, like some overrides :param vault: Dictionary with the different credential contexts, only used to fetch fresh contents :param instanceId: The instance id of this working directory :param nickname: The nickname of this working directory :param creation: The creation timestamp :param rawWorkDir: Raw working directory :param paranoid_mode: Should we enable paranoid mode for this workflow? :type wfexs: WfExSBackend :type workflow_id: str :type version_id: str :type descriptor_type: str :type trs_endpoint: str :type params: dict :type outputs: dict :type workflow_config: dict :type vault: SecurityContextVault :type instanceId: str :type creation datetime.datetime :type rawWorkDir: str :type paranoid_mode: bool :type fail_ok: bool

TRS_TOOL_FILES_FILE: Final[RelPath] = 'cast(...)'#
STAGE_DEFINITION_SCHEMA: Final[RelPath] = 'cast(...)'#
EXPORT_ACTIONS_SCHEMA: Final[RelPath] = 'cast(...)'#
STAGED_CRATE_FILE: Final[RelPath] = 'cast(...)'#
EXECUTION_CRATE_FILE: Final[RelPath] = 'cast(...)'#
DEFAULT_TRS_ENDPOINT: Final[str] = 'https://dev.workflowhub.eu/ga4gh/trs/v2/'#
TRS_TOOLS_PATH: Final[str] = 'tools/'#
FUSE_SYSTEM_CONF = '/etc/fuse.conf'#
getPID() str | None#

It provides the most permanent workflow id it can generate from the details in the YAML

setupWorkdir(doSecureWorkDir: bool, fail_ok: bool = False, public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None) Tuple[bool, pathlib.Path]#
unmountWorkdir() None#
cleanup() None#
getStagedSetup() wfexs_backend.common.StagedSetup#
getMarshallingStatus(reread_stats: bool = False) wfexs_backend.common.MarshallingStatus#
getMaterializedWorkflow() LocalWorkflow | None#
getMaterializedContainers() Sequence[Container]#
enableParanoidMode() None#
static __read_yaml_config(filename: pathlib.Path) wfexs_backend.workflow.WritableWorkflowMetaConfigBlock#
classmethod __merge_params_from_file(wfexs: wfexs_backend.wfexs_backend.WfExSBackend, base_workflow_meta: wfexs_backend.workflow.WorkflowMetaConfigBlock, replaced_parameters_filename: pathlib.Path) Tuple[WritableWorkflowMetaConfigBlock, Mapping[str, Set[str]]]#
classmethod FromWorkDir(wfexs: wfexs_backend.wfexs_backend.WfExSBackend, workflowWorkingDirectory: pathlib.Path, private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, fail_ok: bool = False) wfexs_backend.workflow.WF#

This class method requires an existing staged working directory

classmethod TryWorkflowURI(wfexs: wfexs_backend.wfexs_backend.WfExSBackend, workflow_uri: str, securityContextsConfigFilename: pathlib.Path | None = None, nickname_prefix: str | None = None) wfexs_backend.workflow.WF#

This class method creates a new staged working directory

classmethod FromFiles(wfexs: wfexs_backend.wfexs_backend.WfExSBackend, workflowMetaFilename: pathlib.Path, securityContextsConfigFilename: pathlib.Path | None = None, nickname_prefix: str | None = None, orcids: Sequence[str] = [], public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, paranoidMode: bool = False) wfexs_backend.workflow.WF#

This class method creates a new staged working directory

classmethod FromStagedRecipe(wfexs: wfexs_backend.wfexs_backend.WfExSBackend, workflow_meta: wfexs_backend.workflow.WritableWorkflowMetaConfigBlock, securityContextsConfigFilename: pathlib.Path | None = None, nickname_prefix: str | None = None, orcids: Sequence[str] = [], public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, paranoidMode: bool = False, cached_repo: Tuple[RemoteRepo, WorkflowType] | None = None, cached_workflow: LocalWorkflow | None = None, cached_inputs: Sequence[MaterializedInput] | None = None, cached_environment: Sequence[MaterializedInput] | None = None, preferred_containers: Sequence[Container] = [], preferred_operational_containers: Sequence[Container] = [], reproducibility_level: wfexs_backend.utils.rocrate.ReproducibilityLevel = ReproducibilityLevel.Metadata, strict_reproducibility_level: bool = False) wfexs_backend.workflow.WF#

This class method creates a new staged working directory

classmethod FromPreviousInstanceDeclaration(wfexs: wfexs_backend.wfexs_backend.WfExSBackend, wfInstance: wfexs_backend.workflow.WF, securityContextsConfigFilename: pathlib.Path | None = None, replaced_parameters_filename: pathlib.Path | None = None, nickname_prefix: str | None = None, orcids: Sequence[str] = [], public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, secure: bool = True, paranoidMode: bool = False, reproducibility_level: wfexs_backend.utils.rocrate.ReproducibilityLevel = ReproducibilityLevel.Metadata, strict_reproducibility_level: bool = False) wfexs_backend.workflow.WF#

This class method creates a new staged working directory based on the declaration of an existing one

static _transferInputs(payload_dir: pathlib.Path, inputs_dir: pathlib.Path, cached_inputs: Sequence[MaterializedInput]) Sequence[MaterializedInput]#
classmethod FromPreviousROCrate(wfexs: wfexs_backend.wfexs_backend.WfExSBackend, workflowROCrateFilename: pathlib.Path, public_name: str, securityContextsConfigFilename: pathlib.Path | None = None, replaced_parameters_filename: pathlib.Path | None = None, nickname_prefix: str | None = None, orcids: Sequence[str] = [], public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, secure: bool = True, paranoidMode: bool = False, reproducibility_level: wfexs_backend.utils.rocrate.ReproducibilityLevel = ReproducibilityLevel.Metadata, strict_reproducibility_level: bool = False, retrospective_first: bool = True) wfexs_backend.workflow.WF#

This class method creates a new staged working directory based on the declaration of an existing one

classmethod FromDescription(wfexs: wfexs_backend.wfexs_backend.WfExSBackend, workflow_meta: wfexs_backend.workflow.WorkflowMetaConfigBlock, vault: wfexs_backend.security_context.SecurityContextVault, orcids: Sequence[str] = [], public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, paranoidMode: bool = False, cached_repo: Tuple[RemoteRepo, WorkflowType] | None = None, cached_workflow: LocalWorkflow | None = None, cached_inputs: Sequence[MaterializedInput] | None = None, cached_environment: Sequence[MaterializedInput] | None = None, preferred_containers: Sequence[Container] = [], preferred_operational_containers: Sequence[Container] = [], reproducibility_level: wfexs_backend.utils.rocrate.ReproducibilityLevel = ReproducibilityLevel.Metadata, strict_reproducibility_level: bool = False) wfexs_backend.workflow.WF#

This class method might create a new staged working directory

Parameters:
  • wfexs – WfExSBackend instance

  • workflow_meta – The configuration describing both the workflow

and the inputs to use when it is being instantiated. :param paranoidMode: :type wfexs: WfExSBackend :type workflow_meta: dict :type paranoidMode: bool :return: Workflow configuration

classmethod FromForm(wfexs: wfexs_backend.wfexs_backend.WfExSBackend, workflow_meta: wfexs_backend.workflow.WorkflowMetaConfigBlock, orcids: Sequence[str] = [], public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, paranoidMode: bool = False) wfexs_backend.workflow.WF#
Parameters:
  • wfexs – WfExSBackend instance

  • workflow_meta – The configuration describing both the workflow

and the inputs to use when it is being instantiated. :param paranoidMode: :type workflow_meta: dict :type paranoidMode: bool :return: Workflow configuration

fetchWorkflow(workflow_id: wfexs_backend.workflow.WorkflowId, version_id: WFVersionId | None, trs_endpoint: str | None, descriptor_type: TRS_Workflow_Descriptor | None, offline: bool = False, ignoreCache: bool = False, injectable_repo: Tuple[RemoteRepo, WorkflowType] | None = None, injectable_workflow: LocalWorkflow | None = None) None#

Fetch the whole workflow description based on the data obtained from the TRS where it is being published.

If the workflow id is an URL, it is supposed to be a repository (git, swh, …), and the version will represent either the branch, tag or specific commit. So, the whole TRS fetching machinery is bypassed.

setupEngine(offline: bool = False, ignoreCache: bool = False, initial_engine_version: EngineVersion | None = None, injectable_repo: Tuple[RemoteRepo, WorkflowType] | None = None, injectable_workflow: LocalWorkflow | None = None) None#
materializeWorkflowAndContainers(offline: bool = False, ignoreCache: bool = False, injectable_repo: Tuple[RemoteRepo, WorkflowType] | None = None, injectable_workflow: LocalWorkflow | None = None, injectable_containers: Sequence[Container] = [], injectable_operational_containers: Sequence[Container] = [], context_inputs: Sequence[MaterializedInput] = [], context_environment: Sequence[MaterializedInput] = []) None#
materializeInputs(formatted_params: ParamsBlock | Sequence[Mapping[str, Any]], offline: bool = False, ignoreCache: bool = False, injectable_inputs: Sequence[MaterializedInput] | None = None, lastInput: int = 0) Sequence[MaterializedInput]#
_buildLicensedURI(remote_file_f: wfexs_backend.workflow.Sch_InputURI_Fetchable, contextName: str | None = None, licences: Tuple[URIType, ...] = DefaultNoLicenceTuple, attributions: Sequence[Attribution] = []) Tuple[LicensedURI | Sequence[LicensedURI], bool]#
_fetchRemoteFile(remote_file: wfexs_backend.workflow.Sch_InputURI_Fetchable, contextName: str | None, offline: bool, storeDir: pathlib.Path | CacheType, cacheable: bool, inputDestDir: pathlib.Path, globExplode: str | None, prefix: str = '', hardenPrettyLocal: bool = False, prettyRelname: RelPath | None = None, ignoreCache: bool = False, cloneToStore: bool = True) Sequence[MaterializedContent]#
_formatStringFromPlaceHolders(the_string: str, placeholders: PlaceHoldersBlock | None = None) str#
_formatInputURIFromPlaceHolders(input_uri: wfexs_backend.workflow.Sch_InputURI) wfexs_backend.workflow.Sch_InputURI#
formatParams(params: ParamsBlock | None, prefix: str = '') Tuple[ParamsBlock | None, Sequence[Sch_Output] | None]#
_fetchContentWithURIs(inputs: wfexs_backend.workflow.ParamsBlock, linearKey: wfexs_backend.common.SymbolicParamName, workflowInputs_destdir: pathlib.Path, workflowExtrapolatedInputs_destdir: pathlib.Path, lastInput: int = 0, offline: bool = False, ignoreCache: bool = False, cloneToStore: bool = True) Tuple[Sequence[MaterializedInput], int, Sequence[str]]#
_injectContent(injectable_content: Sequence[MaterializedContent], dest_path: pathlib.Path, pretty_relname: str | None, last_input: int = 1) Tuple[MutableSequence[MaterializedContent], int]#
fetchInputs(params: ParamsBlock | Sequence[ParamsBlock], workflowInputs_destdir: pathlib.Path, workflowExtrapolatedInputs_destdir: pathlib.Path, prefix: str = '', injectable_inputs_dict: Mapping[str, MaterializedInput] = {}, lastInput: int = 0, offline: bool = False, ignoreCache: bool = False) Tuple[Sequence[MaterializedInput], int, Sequence[str]]#

Fetch the input files for the workflow execution. All the inputs must be URLs or CURIEs from identifiers.org / n2t.net.

Parameters:
  • params (dict) – Optional params for the workflow execution.

  • workflowInputs_destdir

  • workflowExtrapolatedInputs_destdir

  • prefix (str)

  • lastInput

  • offline

tryStageWorkflow(offline: bool = False, ignoreCache: bool = False) wfexs_backend.common.StagedSetup#

This method is here to try materializing and identifying a workflow

stageWorkDir(offline: bool = False, ignoreCache: bool = False) wfexs_backend.common.StagedSetup#

This method is here to simplify the understanding of the needed steps

workdirToBagit() bagit.Bag#

BEWARE: This is a destructive step! So, once run, there is no back!

DefaultCardinality = '1'#
CardinalityMapping: Mapping[str, Tuple[int, int]] = None#
OutputClassMapping = None#
parseExpectedOutputs(outputs_to_inject: Sequence[Sch_Output], outputs: Sequence[Sch_Output] | Mapping[str, Sch_Output], default_synthetic_output: bool) Sequence[ExpectedOutput]#
parseExportActions(raw_actions: Sequence[ExportActionBlock]) Sequence[ExportAction]#
executeWorkflow(offline: bool = False) wfexs_backend.workflow_engines.StagedExecution#
queueExecution(offline: bool = False) str#
listMaterializedExportActions() Sequence[MaterializedExportAction]#

This method should return the pids generated from the contents

exportResultsFromFiles(exportActionsFile: pathlib.Path | None = None, securityContextFile: pathlib.Path | None = None, action_ids: Sequence[SymbolicName] = [], fail_ok: bool = False) Tuple[Sequence[MaterializedExportAction], Sequence[Tuple[ExportAction, Exception]]]#
_curate_orcid_list(orcids: Sequence[str], fail_ok: bool = True) Sequence[ResolvedORCID]#
_instantiate_export_plugin(action: wfexs_backend.workflow.ExportAction, sec_context: SecurityContextConfig | None, default_licences: Sequence[LicenceDescription], default_orcids: Sequence[ResolvedORCID], default_preferred_id: str | None) wfexs_backend.pushers.AbstractExportPlugin#

This method instantiates an stateful export plugin. Although the licences, ORCIDs and preferred ids are not used at the beginning, they are supplied as default values to the implementation, in case it is able to do something meaningful with them. Licence list should be curated outside this method.

exportResults(actions: Sequence[ExportAction] | None = None, vault: SecurityContextVault | None = None, action_ids: Sequence[SymbolicName] = [], fail_ok: bool = False, op_licences: Sequence[str] = [], op_orcids: Sequence[str] = []) Tuple[Sequence[MaterializedExportAction], Sequence[Tuple[ExportAction, Exception]]]#
property staging_recipe: wfexs_backend.workflow.WritableWorkflowMetaConfigBlock#
marshallConfig(overwrite: bool = False) bool | datetime.datetime#
__get_combined_globals() Mapping[str, Any]#

This method is needed since workflow engines and container factories are dynamically loaded.

unmarshallConfig(fail_ok: bool = False) bool | datetime.datetime | None#
marshallStage(exist_ok: bool = True, overwrite: bool = False) bool | datetime.datetime | None#
unmarshallStage(offline: bool = False, fail_ok: bool = False, do_full_setup: bool = True) bool | datetime.datetime | None#
marshallExecute(staged_exec: wfexs_backend.workflow_engines.StagedExecution) bool | datetime.datetime | None#
unmarshallExecute(offline: bool = True, fail_ok: bool = False) Tuple[bool | datetime.datetime | None, Sequence[StagedExecution]]#
_unmarshallExecuteFH(meF: IO[str], creation_time: float | None = None) Tuple[MutableSequence[StagedExecution], datetime.datetime]#

Internal method used to unmarshall staged executions metadata.

Parameters:
  • meF – open file (or similar), with fileno method

  • creation_time – when the marshalled execution file was created, measured in number of seconds since epoch

Returns:

The list of unmarshalled, staged executions

marshallExport(new_mat_actions: Sequence[MaterializedExportAction]) bool | datetime.datetime | None#
unmarshallExport(offline: bool = True, fail_ok: bool = False) bool | datetime.datetime | None#
_unmarshallExportFH(meF: IO[str], creation_time: float | None = None) Tuple[MutableSequence[MaterializedExportAction], datetime.datetime]#
ExportROCrate2Payloads: Final[Mapping[str, CratableItem]] = None#
locateExportItems(items: Sequence[ExportItem], licences: Sequence[LicenceDescription] = [], resolved_orcids: Sequence[ResolvedORCID] = [], crate_pid: str | None = None) Sequence[AnyContent]#

The located paths in the contents should be relative to the working directory

createStageResearchObject(filename: pathlib.Path | None = None, payloads: wfexs_backend.common.CratableItem = NoCratableItem, licences: Sequence[LicenceDescription] = [], resolved_orcids: Sequence[ResolvedORCID] = [], crate_pid: str | None = None) pathlib.Path#

Create RO-crate from stage provenance.

createResultsResearchObject(filename: pathlib.Path | None = None, payloads: wfexs_backend.common.CratableItem = NoCratableItem, licences: Sequence[LicenceDescription] = [], resolved_orcids: Sequence[ResolvedORCID] = [], crate_pid: str | None = None) pathlib.Path#

Create RO-crate from stage provenance.