wfexs_backend.workflow
#
Module Contents#
Classes#
The description of an export action which was materialized, so a permanent identifier was obtained, along with some metadata |
|
This is inspired in the example available at https://docs.python.org/3/library/stdtypes.html#str.format_map |
|
Workflow enaction class |
Functions#
This method periodically checks whether the directory is still available |
API#
- class wfexs_backend.workflow.ExportItem#
Bases:
typing.NamedTuple
- type: wfexs_backend.common.ExportItemType = None#
- class wfexs_backend.workflow.ExportAction#
Bases:
typing.NamedTuple
- action_id: wfexs_backend.common.SymbolicName = None#
- plugin_id: wfexs_backend.common.SymbolicName = None#
- what: Sequence[ExportItem] = None#
- class wfexs_backend.workflow.MaterializedExportAction#
Bases:
typing.NamedTuple
The description of an export action which was materialized, so a permanent identifier was obtained, along with some metadata
- action: wfexs_backend.workflow.ExportAction = None#
- elems: Sequence[AnyContent] = None#
- pids: Sequence[URIWithMetadata] = None#
- when: datetime.datetime = 'astimezone(...)'#
- class wfexs_backend.workflow.DefaultMissing#
Bases:
typing.Dict
[wfexs_backend.workflow.KT
,wfexs_backend.workflow.VT
]This is inspired in the example available at https://docs.python.org/3/library/stdtypes.html#str.format_map
Initialization
Initialize self. See help(type(self)) for accurate signature.
- __missing__(key: wfexs_backend.workflow.KT) wfexs_backend.workflow.VT #
- wfexs_backend.workflow._wakeupEncDir(cond: threading.Condition, workDir: pathlib.Path, logger: logging.Logger) None #
This method periodically checks whether the directory is still available
- exception wfexs_backend.workflow.WFException#
- exception wfexs_backend.workflow.ExportActionException#
- exception wfexs_backend.workflow.WFWarning#
Bases:
UserWarning
- class wfexs_backend.workflow.WF(wfexs: wfexs_backend.wfexs_backend.WfExSBackend, workflow_id: WorkflowId | None = None, version_id: WFVersionId | None = None, descriptor_type: TRS_Workflow_Descriptor | None = None, trs_endpoint: str = DEFAULT_TRS_ENDPOINT, params: ParamsBlock | None = None, enabled_profiles: Sequence[str] | None = None, environment: EnvironmentBlock | None = None, outputs: OutputsBlock | None = None, placeholders: PlaceHoldersBlock | None = None, default_actions: Sequence[ExportActionBlock] | None = None, workflow_config: WorkflowConfigBlock | None = None, vault: SecurityContextVault | None = None, instanceId: WfExSInstanceId | None = None, nickname: str | None = None, orcids: Sequence[str] = [], creation: datetime.datetime | None = None, rawWorkDir: pathlib.Path | None = None, paranoid_mode: bool | None = None, public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, fail_ok: bool = False, cached_repo: Tuple[RemoteRepo, WorkflowType] | None = None, cached_workflow: LocalWorkflow | None = None, cached_inputs: Sequence[MaterializedInput] | None = None, cached_environment: Sequence[MaterializedInput] | None = None, preferred_containers: Sequence[Container] = [], preferred_operational_containers: Sequence[Container] = [], reproducibility_level: wfexs_backend.utils.rocrate.ReproducibilityLevel = ReproducibilityLevel.Minimal, strict_reproducibility_level: bool = False)#
Workflow enaction class
Initialization
Init function
- Parameters:
wfexs – A WfExSBackend instance
workflow_id – A unique identifier of the workflow. Although it is an integer in WorkflowHub,
we cannot assume it is so in all the GA4GH TRS implementations which are exposing workflows. :param version_id: An identifier of the workflow version. Although it is an integer in WorkflowHub, we cannot assume the format of the version id, as it could follow semantic versioning, providing an UUID, etc. :param descriptor_type: The type of descriptor that represents this version of the workflow (e.g. CWL, WDL, NFL, or GALAXY). It is optional, so it is guessed from the calls to the API. It can be either the short name of the workflow engine, or the name used by GA4GH TRS. :param trs_endpoint: The TRS endpoint used to find the workflow. :param params: Optional params for the workflow execution. :param outputs: :param workflow_config: Tweaks for workflow enactment, like some overrides :param vault: Dictionary with the different credential contexts, only used to fetch fresh contents :param instanceId: The instance id of this working directory :param nickname: The nickname of this working directory :param creation: The creation timestamp :param rawWorkDir: Raw working directory :param paranoid_mode: Should we enable paranoid mode for this workflow? :type wfexs: WfExSBackend :type workflow_id: str :type version_id: str :type descriptor_type: str :type trs_endpoint: str :type params: dict :type outputs: dict :type workflow_config: dict :type vault: SecurityContextVault :type instanceId: str :type creation datetime.datetime :type rawWorkDir: str :type paranoid_mode: bool :type fail_ok: bool
- TRS_TOOL_FILES_FILE: Final[RelPath] = 'cast(...)'#
- STAGE_DEFINITION_SCHEMA: Final[RelPath] = 'cast(...)'#
- EXPORT_ACTIONS_SCHEMA: Final[RelPath] = 'cast(...)'#
- STAGED_CRATE_FILE: Final[RelPath] = 'cast(...)'#
- EXECUTION_CRATE_FILE: Final[RelPath] = 'cast(...)'#
- FUSE_SYSTEM_CONF = '/etc/fuse.conf'#
- getPID() str | None #
It provides the most permanent workflow id it can generate from the details in the YAML
- setupWorkdir(doSecureWorkDir: bool, fail_ok: bool = False, public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None) Tuple[bool, pathlib.Path] #
- getStagedSetup() wfexs_backend.common.StagedSetup #
- getMarshallingStatus(reread_stats: bool = False) wfexs_backend.common.MarshallingStatus #
- getMaterializedWorkflow() LocalWorkflow | None #
- static __read_yaml_config(filename: pathlib.Path) wfexs_backend.workflow.WritableWorkflowMetaConfigBlock #
- classmethod __merge_params_from_file(wfexs: wfexs_backend.wfexs_backend.WfExSBackend, base_workflow_meta: wfexs_backend.workflow.WorkflowMetaConfigBlock, replaced_parameters_filename: pathlib.Path) Tuple[WritableWorkflowMetaConfigBlock, Mapping[str, Set[str]]] #
- classmethod FromWorkDir(wfexs: wfexs_backend.wfexs_backend.WfExSBackend, workflowWorkingDirectory: pathlib.Path, private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, fail_ok: bool = False) wfexs_backend.workflow.WF #
This class method requires an existing staged working directory
- classmethod TryWorkflowURI(wfexs: wfexs_backend.wfexs_backend.WfExSBackend, workflow_uri: str, securityContextsConfigFilename: pathlib.Path | None = None, nickname_prefix: str | None = None) wfexs_backend.workflow.WF #
This class method creates a new staged working directory
- classmethod FromFiles(wfexs: wfexs_backend.wfexs_backend.WfExSBackend, workflowMetaFilename: pathlib.Path, securityContextsConfigFilename: pathlib.Path | None = None, nickname_prefix: str | None = None, orcids: Sequence[str] = [], public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, paranoidMode: bool = False) wfexs_backend.workflow.WF #
This class method creates a new staged working directory
- classmethod FromStagedRecipe(wfexs: wfexs_backend.wfexs_backend.WfExSBackend, workflow_meta: wfexs_backend.workflow.WritableWorkflowMetaConfigBlock, securityContextsConfigFilename: pathlib.Path | None = None, nickname_prefix: str | None = None, orcids: Sequence[str] = [], public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, paranoidMode: bool = False, cached_repo: Tuple[RemoteRepo, WorkflowType] | None = None, cached_workflow: LocalWorkflow | None = None, cached_inputs: Sequence[MaterializedInput] | None = None, cached_environment: Sequence[MaterializedInput] | None = None, preferred_containers: Sequence[Container] = [], preferred_operational_containers: Sequence[Container] = [], reproducibility_level: wfexs_backend.utils.rocrate.ReproducibilityLevel = ReproducibilityLevel.Metadata, strict_reproducibility_level: bool = False) wfexs_backend.workflow.WF #
This class method creates a new staged working directory
- classmethod FromPreviousInstanceDeclaration(wfexs: wfexs_backend.wfexs_backend.WfExSBackend, wfInstance: wfexs_backend.workflow.WF, securityContextsConfigFilename: pathlib.Path | None = None, replaced_parameters_filename: pathlib.Path | None = None, nickname_prefix: str | None = None, orcids: Sequence[str] = [], public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, secure: bool = True, paranoidMode: bool = False, reproducibility_level: wfexs_backend.utils.rocrate.ReproducibilityLevel = ReproducibilityLevel.Metadata, strict_reproducibility_level: bool = False) wfexs_backend.workflow.WF #
This class method creates a new staged working directory based on the declaration of an existing one
- static _transferInputs(payload_dir: pathlib.Path, inputs_dir: pathlib.Path, cached_inputs: Sequence[MaterializedInput]) Sequence[MaterializedInput] #
- classmethod FromPreviousROCrate(wfexs: wfexs_backend.wfexs_backend.WfExSBackend, workflowROCrateFilename: pathlib.Path, public_name: str, securityContextsConfigFilename: pathlib.Path | None = None, replaced_parameters_filename: pathlib.Path | None = None, nickname_prefix: str | None = None, orcids: Sequence[str] = [], public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, secure: bool = True, paranoidMode: bool = False, reproducibility_level: wfexs_backend.utils.rocrate.ReproducibilityLevel = ReproducibilityLevel.Metadata, strict_reproducibility_level: bool = False, retrospective_first: bool = True) wfexs_backend.workflow.WF #
This class method creates a new staged working directory based on the declaration of an existing one
- classmethod FromDescription(wfexs: wfexs_backend.wfexs_backend.WfExSBackend, workflow_meta: wfexs_backend.workflow.WorkflowMetaConfigBlock, vault: wfexs_backend.security_context.SecurityContextVault, orcids: Sequence[str] = [], public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, paranoidMode: bool = False, cached_repo: Tuple[RemoteRepo, WorkflowType] | None = None, cached_workflow: LocalWorkflow | None = None, cached_inputs: Sequence[MaterializedInput] | None = None, cached_environment: Sequence[MaterializedInput] | None = None, preferred_containers: Sequence[Container] = [], preferred_operational_containers: Sequence[Container] = [], reproducibility_level: wfexs_backend.utils.rocrate.ReproducibilityLevel = ReproducibilityLevel.Metadata, strict_reproducibility_level: bool = False) wfexs_backend.workflow.WF #
This class method might create a new staged working directory
- Parameters:
wfexs – WfExSBackend instance
workflow_meta – The configuration describing both the workflow
and the inputs to use when it is being instantiated. :param paranoidMode: :type wfexs: WfExSBackend :type workflow_meta: dict :type paranoidMode: bool :return: Workflow configuration
- classmethod FromForm(wfexs: wfexs_backend.wfexs_backend.WfExSBackend, workflow_meta: wfexs_backend.workflow.WorkflowMetaConfigBlock, orcids: Sequence[str] = [], public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, paranoidMode: bool = False) wfexs_backend.workflow.WF #
- Parameters:
wfexs – WfExSBackend instance
workflow_meta – The configuration describing both the workflow
and the inputs to use when it is being instantiated. :param paranoidMode: :type workflow_meta: dict :type paranoidMode: bool :return: Workflow configuration
- fetchWorkflow(workflow_id: wfexs_backend.workflow.WorkflowId, version_id: WFVersionId | None, trs_endpoint: str | None, descriptor_type: TRS_Workflow_Descriptor | None, offline: bool = False, ignoreCache: bool = False, injectable_repo: Tuple[RemoteRepo, WorkflowType] | None = None, injectable_workflow: LocalWorkflow | None = None) None #
Fetch the whole workflow description based on the data obtained from the TRS where it is being published.
If the workflow id is an URL, it is supposed to be a repository (git, swh, …), and the version will represent either the branch, tag or specific commit. So, the whole TRS fetching machinery is bypassed.
- setupEngine(offline: bool = False, ignoreCache: bool = False, initial_engine_version: EngineVersion | None = None, injectable_repo: Tuple[RemoteRepo, WorkflowType] | None = None, injectable_workflow: LocalWorkflow | None = None) None #
- materializeWorkflowAndContainers(offline: bool = False, ignoreCache: bool = False, injectable_repo: Tuple[RemoteRepo, WorkflowType] | None = None, injectable_workflow: LocalWorkflow | None = None, injectable_containers: Sequence[Container] = [], injectable_operational_containers: Sequence[Container] = [], context_inputs: Sequence[MaterializedInput] = [], context_environment: Sequence[MaterializedInput] = []) None #
- materializeInputs(formatted_params: ParamsBlock | Sequence[Mapping[str, Any]], offline: bool = False, ignoreCache: bool = False, injectable_inputs: Sequence[MaterializedInput] | None = None, lastInput: int = 0) Sequence[MaterializedInput] #
- _buildLicensedURI(remote_file_f: wfexs_backend.workflow.Sch_InputURI_Fetchable, contextName: str | None = None, licences: Tuple[URIType, ...] = DefaultNoLicenceTuple, attributions: Sequence[Attribution] = []) Tuple[LicensedURI | Sequence[LicensedURI], bool] #
- _fetchRemoteFile(remote_file: wfexs_backend.workflow.Sch_InputURI_Fetchable, contextName: str | None, offline: bool, storeDir: pathlib.Path | CacheType, cacheable: bool, inputDestDir: pathlib.Path, globExplode: str | None, prefix: str = '', hardenPrettyLocal: bool = False, prettyRelname: RelPath | None = None, ignoreCache: bool = False, cloneToStore: bool = True) Sequence[MaterializedContent] #
- _formatInputURIFromPlaceHolders(input_uri: wfexs_backend.workflow.Sch_InputURI) wfexs_backend.workflow.Sch_InputURI #
- formatParams(params: ParamsBlock | None, prefix: str = '') Tuple[ParamsBlock | None, Sequence[Sch_Output] | None] #
- _fetchContentWithURIs(inputs: wfexs_backend.workflow.ParamsBlock, linearKey: wfexs_backend.common.SymbolicParamName, workflowInputs_destdir: pathlib.Path, workflowExtrapolatedInputs_destdir: pathlib.Path, lastInput: int = 0, offline: bool = False, ignoreCache: bool = False, cloneToStore: bool = True) Tuple[Sequence[MaterializedInput], int, Sequence[str]] #
- _injectContent(injectable_content: Sequence[MaterializedContent], dest_path: pathlib.Path, pretty_relname: str | None, last_input: int = 1) Tuple[MutableSequence[MaterializedContent], int] #
- fetchInputs(params: ParamsBlock | Sequence[ParamsBlock], workflowInputs_destdir: pathlib.Path, workflowExtrapolatedInputs_destdir: pathlib.Path, prefix: str = '', injectable_inputs_dict: Mapping[str, MaterializedInput] = {}, lastInput: int = 0, offline: bool = False, ignoreCache: bool = False) Tuple[Sequence[MaterializedInput], int, Sequence[str]] #
Fetch the input files for the workflow execution. All the inputs must be URLs or CURIEs from identifiers.org / n2t.net.
- tryStageWorkflow(offline: bool = False, ignoreCache: bool = False) wfexs_backend.common.StagedSetup #
This method is here to try materializing and identifying a workflow
- stageWorkDir(offline: bool = False, ignoreCache: bool = False) wfexs_backend.common.StagedSetup #
This method is here to simplify the understanding of the needed steps
- workdirToBagit() bagit.Bag #
BEWARE: This is a destructive step! So, once run, there is no back!
- DefaultCardinality = '1'#
- OutputClassMapping = None#
- parseExpectedOutputs(outputs_to_inject: Sequence[Sch_Output], outputs: Sequence[Sch_Output] | Mapping[str, Sch_Output], default_synthetic_output: bool) Sequence[ExpectedOutput] #
- parseExportActions(raw_actions: Sequence[ExportActionBlock]) Sequence[ExportAction] #
- executeWorkflow(offline: bool = False) wfexs_backend.workflow_engines.StagedExecution #
- listMaterializedExportActions() Sequence[MaterializedExportAction] #
This method should return the pids generated from the contents
- exportResultsFromFiles(exportActionsFile: pathlib.Path | None = None, securityContextFile: pathlib.Path | None = None, action_ids: Sequence[SymbolicName] = [], fail_ok: bool = False) Tuple[Sequence[MaterializedExportAction], Sequence[Tuple[ExportAction, Exception]]] #
- _curate_orcid_list(orcids: Sequence[str], fail_ok: bool = True) Sequence[ResolvedORCID] #
- _instantiate_export_plugin(action: wfexs_backend.workflow.ExportAction, sec_context: SecurityContextConfig | None, default_licences: Sequence[LicenceDescription], default_orcids: Sequence[ResolvedORCID], default_preferred_id: str | None) wfexs_backend.pushers.AbstractExportPlugin #
This method instantiates an stateful export plugin. Although the licences, ORCIDs and preferred ids are not used at the beginning, they are supplied as default values to the implementation, in case it is able to do something meaningful with them. Licence list should be curated outside this method.
- exportResults(actions: Sequence[ExportAction] | None = None, vault: SecurityContextVault | None = None, action_ids: Sequence[SymbolicName] = [], fail_ok: bool = False, op_licences: Sequence[str] = [], op_orcids: Sequence[str] = []) Tuple[Sequence[MaterializedExportAction], Sequence[Tuple[ExportAction, Exception]]] #
- property staging_recipe: wfexs_backend.workflow.WritableWorkflowMetaConfigBlock#
- marshallConfig(overwrite: bool = False) bool | datetime.datetime #
- __get_combined_globals() Mapping[str, Any] #
This method is needed since workflow engines and container factories are dynamically loaded.
- unmarshallConfig(fail_ok: bool = False) bool | datetime.datetime | None #
- unmarshallStage(offline: bool = False, fail_ok: bool = False, do_full_setup: bool = True) bool | datetime.datetime | None #
- marshallExecute(staged_exec: wfexs_backend.workflow_engines.StagedExecution) bool | datetime.datetime | None #
- unmarshallExecute(offline: bool = True, fail_ok: bool = False) Tuple[bool | datetime.datetime | None, Sequence[StagedExecution]] #
- _unmarshallExecuteFH(meF: IO[str], creation_time: float | None = None) Tuple[MutableSequence[StagedExecution], datetime.datetime] #
Internal method used to unmarshall staged executions metadata.
- Parameters:
meF – open file (or similar), with fileno method
creation_time – when the marshalled execution file was created, measured in number of seconds since epoch
- Returns:
The list of unmarshalled, staged executions
- marshallExport(new_mat_actions: Sequence[MaterializedExportAction]) bool | datetime.datetime | None #
- _unmarshallExportFH(meF: IO[str], creation_time: float | None = None) Tuple[MutableSequence[MaterializedExportAction], datetime.datetime] #
- ExportROCrate2Payloads: Final[Mapping[str, CratableItem]] = None#
- locateExportItems(items: Sequence[ExportItem], licences: Sequence[LicenceDescription] = [], resolved_orcids: Sequence[ResolvedORCID] = [], crate_pid: str | None = None) Sequence[AnyContent] #
The located paths in the contents should be relative to the working directory
- createStageResearchObject(filename: pathlib.Path | None = None, payloads: wfexs_backend.common.CratableItem = NoCratableItem, licences: Sequence[LicenceDescription] = [], resolved_orcids: Sequence[ResolvedORCID] = [], crate_pid: str | None = None) pathlib.Path #
Create RO-crate from stage provenance.
- createResultsResearchObject(filename: pathlib.Path | None = None, payloads: wfexs_backend.common.CratableItem = NoCratableItem, licences: Sequence[LicenceDescription] = [], resolved_orcids: Sequence[ResolvedORCID] = [], crate_pid: str | None = None) pathlib.Path #
Create RO-crate from stage provenance.