wfexs_backend.wfexs_backend
#
Module Contents#
Classes#
workflow_type: The identified workflow type |
|
WfExS-backend setup class |
API#
- class wfexs_backend.wfexs_backend.IdentifiedWorkflow#
Bases:
typing.NamedTuple
workflow_type: The identified workflow type
- workflow_type: wfexs_backend.workflow_engines.WorkflowType = None#
- remote_repo: wfexs_backend.fetchers.RemoteRepo = None#
- exception wfexs_backend.wfexs_backend.WfExSBackendException#
- class wfexs_backend.wfexs_backend.WfExSBackend(local_config: WfExSConfigBlock | None = None, config_directory: pathlib.Path | None = None)#
WfExS-backend setup class
Initialization
Init function
- Parameters:
local_config (dict) – Local setup configuration, telling where caching directories live
- CONFIG_SCHEMA: Final[RelPath] = 'cast(...)'#
- _PassGen: ClassVar[WfExSPassphraseGenerator | None] = None#
- classmethod GetPassGen() wfexs_backend.utils.passphrase_wrapper.WfExSPassphraseGenerator #
- classmethod bootstrap_config(local_config_ro: wfexs_backend.wfexs_backend.WfExSConfigBlock, config_directory: pathlib.Path | None = None, key_prefix: str | None = None) Tuple[bool, WfExSConfigBlock, pathlib.Path] #
- Parameters:
local_config (dict) – Relevant local configuration, like the cache directory.
config_directory – The filename to be used to resolve relative paths
key_prefix – Prefix for the files of newly generated key pairs
- classmethod FromDescription(workflow_meta: wfexs_backend.workflow.WorkflowMetaConfigBlock, local_config: wfexs_backend.wfexs_backend.WfExSConfigBlock, vault: SecurityContextVault | None = None, config_directory: pathlib.Path | None = None, public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None) wfexs_backend.workflow.WF #
- Parameters:
workflow_meta – The configuration describing both the workflow
and the inputs to use when it is being instantiated. :param local_config: Relevant local configuration, like the cache directory. :param config_directory: :type workflow_meta: dict :type local_config: dict :type config_directory: :return: Workflow configuration
- property cacheWorkflowDir: pathlib.Path#
- property cacheROCrateDir: pathlib.Path#
- property cacheTRSFilesDir: pathlib.Path#
- property cacheWorkflowInputsDir: pathlib.Path#
- getCacheHandler(cache_type: wfexs_backend.common.CacheType) Tuple[SchemeHandlerCacheHandler, pathlib.Path | None] #
- instantiateStatefulFetcher(statefulFetcher: Type[StatefulFetcher], setup_block: Mapping[str, Any] | None = None) wfexs_backend.fetchers.StatefulFetcher #
Method to instantiate stateful fetchers once
- instantiateRepoFetcher(repoFetcher: Type[RepoFetcher], setup_block: Mapping[str, Any] | None = None) wfexs_backend.fetchers.RepoFetcher #
Method to instantiate repo fetchers once
- findAndAddWorkflowEnginesFromModuleName(the_module_name: str = 'wfexs_backend.workflow_engines') None #
- findAndAddWorkflowEnginesFromModule(the_module: types.ModuleType) None #
- addWorkflowEngine(workflowEngineClazz: Type[WorkflowEngine]) None #
- listWorkflowEngineClasses() Sequence[Type[WorkflowEngine]] #
- getWorkflowEngineClass(engine_shortname: str) Type[WorkflowEngine] | None #
- matchWorkflowType(mainEntityProgrammingLanguageUrl: str, mainEntityProgrammingLanguageId: str | None) wfexs_backend.workflow_engines.WorkflowType #
- findAndAddContainerFactoriesFromModuleName(the_module_name: str = 'wfexs_backend.container_factories') None #
- findAndAddContainerFactoriesFromModule(the_module: types.ModuleType) None #
- addContainerFactory(containerFactoryClazz: Type[ContainerFactory]) None #
- listImplementedContainerTypes() Sequence[ContainerType] #
- listContainerFactoryClasses() Sequence[Type[ContainerFactory]] #
- getContainerFactoryClass(container_type: wfexs_backend.common.ContainerType) Type[ContainerFactory] | None #
- findAndAddExportPluginsFromModule(the_module: types.ModuleType) None #
- addExportPlugin(exportClazz: Type[AbstractExportPlugin]) None #
- listExportPluginNames() Sequence[SymbolicName] #
- getExportPluginClass(plugin_id: wfexs_backend.common.SymbolicName) Type[AbstractExportPlugin] | None #
- findAndAddSchemeHandlersFromModuleName(the_module_name: str = 'wfexs_backend.fetchers', fetchers_setup_block: Mapping[str, Mapping[str, Any]] | None = None) None #
- findAndAddSchemeHandlersFromModule(the_module: types.ModuleType, fetchers_setup_block: Mapping[str, Mapping[str, Any]] | None = None) None #
- addStatefulSchemeHandlers(statefulSchemeHandler: Type[AbstractStatefulFetcher], fetchers_setup_block: Mapping[str, Mapping[str, Any]] | None = None) None #
This method adds scheme handlers (aka “fetchers”) from a given stateful fetcher, also adding the needed programs
- addSchemeHandlers(schemeHandlers: Mapping[str, DocumentedProtocolFetcher | DocumentedStatefulProtocolFetcher], fetchers_setup_block: Mapping[str, Mapping[str, Any]] | None = None) None #
This method adds scheme handlers (aka “fetchers”) or instantiates stateful scheme handlers (aka “stateful fetchers”)
- gen_workflow_pid(remote_repo: wfexs_backend.fetchers.RemoteRepo) str #
This method tries generating the workflow pid passing the remote repo to each one of the registered repo fetchers. The contract is that BuildPIDFromRepo should return None if it does not recognize the repo_url as usable.
- newSetup(workflow_id: wfexs_backend.workflow.WorkflowId, version_id: WFVersionId | None = None, descriptor_type: TRS_Workflow_Descriptor | None = None, trs_endpoint: str = WF.DEFAULT_TRS_ENDPOINT, params: ParamsBlock | None = None, enabled_profiles: Sequence[str] | None = None, environment: EnvironmentBlock | None = None, outputs: OutputsBlock | None = None, default_actions: Sequence[ExportActionBlock] | None = None, workflow_config: WorkflowConfigBlock | None = None, vault: SecurityContextVault | None = None, public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None) wfexs_backend.workflow.WF #
Init function, which delegates on WF class
- createRawWorkDir(nickname_prefix: str | None = None, orcids: Sequence[str] = []) Tuple[WfExSInstanceId, str, datetime.datetime, Sequence[str], pathlib.Path] #
This method creates a new, empty, raw working directory
- getOrCreateRawWorkDirFromInstanceId(instanceId: wfexs_backend.common.WfExSInstanceId, nickname: str | None = None, orcids: Sequence[str] = [], create_ok: bool = False) Tuple[WfExSInstanceId, str, datetime.datetime, Sequence[str], pathlib.Path] #
This method returns the absolute path to the raw working directory
- parseOrCreateRawWorkDir(uniqueRawWorkDir: pathlib.Path, instanceId: WfExSInstanceId | None = None, nickname: str | None = None, orcids: Sequence[str] = [], create_ok: bool = False) Tuple[WfExSInstanceId, str, datetime.datetime, Sequence[str], pathlib.Path] #
This method returns the absolute path to the raw working directory
- normalizeRawWorkingDirectory(uniqueRawWorkDir: pathlib.Path) Tuple[WfExSInstanceId, str, datetime.datetime, Sequence[str], pathlib.Path] #
This method returns the id of a working directory, as well as the nickname
- fromWorkDir(workflowWorkingDirectory: pathlib.Path, private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, fail_ok: bool = False) wfexs_backend.workflow.WF #
- tryWorkflowURI(workflow_uri: str, securityContextsConfigFilename: pathlib.Path | None = None, nickname_prefix: str | None = None) wfexs_backend.workflow.WF #
- fromFiles(workflowMetaFilename: pathlib.Path, securityContextsConfigFilename: pathlib.Path | None = None, nickname_prefix: str | None = None, orcids: Sequence[str] = [], public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, paranoidMode: bool = False) wfexs_backend.workflow.WF #
- fromPreviousInstanceDeclaration(wfInstance: wfexs_backend.workflow.WF, securityContextsConfigFilename: pathlib.Path | None = None, replaced_parameters_filename: pathlib.Path | None = None, nickname_prefix: str | None = None, orcids: Sequence[str] = [], public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, secure: bool = True, paranoidMode: bool = False, reproducibility_level: wfexs_backend.utils.rocrate.ReproducibilityLevel = ReproducibilityLevel.Metadata, strict_reproducibility_level: bool = False) wfexs_backend.workflow.WF #
- fromPreviousROCrate(workflowROCrateFilenameOrURI: AnyPath | URIType, securityContextsConfigFilename: pathlib.Path | None = None, replaced_parameters_filename: pathlib.Path | None = None, nickname_prefix: str | None = None, orcids: Sequence[str] = [], public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, secure: bool = True, paranoidMode: bool = False, reproducibility_level: wfexs_backend.utils.rocrate.ReproducibilityLevel = ReproducibilityLevel.Metadata, strict_reproducibility_level: bool = False, retrospective_first: bool = True) wfexs_backend.workflow.WF #
- parseAndValidateSecurityContextFile(securityContextsConfigFilename: pathlib.Path) Tuple[ExitVal, SecurityContextConfigBlock] #
- validateConfigFiles(workflowMetaFilename: pathlib.Path | WorkflowMetaConfigBlock, securityContextsConfigFilename: pathlib.Path | None = None) wfexs_backend.common.ExitVal #
- fromDescription(workflow_meta: wfexs_backend.workflow.WorkflowMetaConfigBlock, vault: SecurityContextVault | None = None, orcids: Sequence[str] = [], public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, paranoidMode: bool = False) wfexs_backend.workflow.WF #
- Parameters:
workflow_meta – The configuration describing both the workflow
and the inputs to use when it is being instantiated. :param creds_config: Dictionary with the different credential contexts (to be implemented) :param paranoidMode: :type workflow_meta: dict :type creds_config: dict :type paranoidMode: :return: Workflow configuration
- fromForm(workflow_meta: wfexs_backend.workflow.WorkflowMetaConfigBlock, orcids: Sequence[str] = [], public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, paranoidMode: bool = False) wfexs_backend.workflow.WF #
Method mainly used by OpenVRE deployment in iPC
- Parameters:
workflow_meta – The configuration describing both the workflow
and the inputs to use when it is being instantiated. :param paranoidMode: :type workflow_meta: dict :type paranoidMode: :return: Workflow configuration
- readSecuredWorkdirPassphrase(workdir_passphrase_file: pathlib.Path, private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None) Tuple[EncryptedFSType, AnyPath, str] #
This method decrypts a crypt4gh file containing a passphrase which has been encrypted with either the WfExS installation public key (if the filename is not provided), or the public key paired with the provided private key stored in the file.
- generateSecuredWorkdirPassphrase(workdir_passphrase_file: pathlib.Path, private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, public_key_filenames: Sequence[pathlib.Path] = []) Tuple[EncryptedFSType, AnyPath, str, Sequence[pathlib.Path]] #
This method generates a random passphrase, which is stored in a crypt4gh encrypted file (along with the FUSE filesystem used) using either the default WfExS-backend public key, or the public keys stored in the files provided as a parameter. It returns the FUSE filesystem, the command to be used to mount, the generated passphrase (to be consumed in memory) and the list of public keys used to encrypt the passphrase file, which can be used later for some additional purposes.
- listStagedWorkflows(*args: str, acceptGlob: bool = False, doCleanup: bool = True, private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None) Iterator[Tuple[WfExSInstanceId, str, datetime.datetime, StagedSetup | None, WF | None]] #
- statusStagedWorkflows(*args: str, acceptGlob: bool = False, private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None) Iterator[Tuple[WfExSInstanceId, str, datetime.datetime, StagedSetup | None, MarshallingStatus | None]] #
- removeStagedWorkflows(*args: str, acceptGlob: bool = False, private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None) Iterator[Tuple[WfExSInstanceId, str]] #
- shellFirstStagedWorkflow(*args: str, stdin: IO[str] = sys.stdin, stdout: IO[str] = sys.stdout, stderr: IO[str] = sys.stderr, acceptGlob: bool = False, firstMatch: bool = True, private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None) wfexs_backend.common.ExitVal #
- cacheFetch(remote_file: LicensedURI | parse.ParseResult | URIType | Sequence[LicensedURI] | Sequence[parse.ParseResult] | Sequence[URIType], cacheType: wfexs_backend.common.CacheType, offline: bool, ignoreCache: bool = False, registerInCache: bool = True, vault: SecurityContextVault | None = None, sec_context_name: str | None = None, default_clonable: bool = True) wfexs_backend.cache_handler.CachedContent #
This is a pass-through method to the cache handler, which translates from symbolic types of cache to their corresponding directories
- Parameters:
remote_file – The URI to be fetched (if not already cached)
cacheType – The type of cache where to look up the URI
offline – Is the instance working in offline mode?
(i.e. raise exceptions when external content is needed) :param ignoreCache: Even if the content is cache, discard and re-fetch it :param registerInCache: Should the fetched content be registered in the cache? :param vault: The security context which has to be passed to the fetchers, in case they have to be used
- instantiateEngine(engineDesc: wfexs_backend.workflow_engines.WorkflowType, stagedSetup: wfexs_backend.common.StagedSetup) wfexs_backend.workflow_engines.AbstractWorkflowEngineType #
- guess_repo_params(wf_url: URIType | parse.ParseResult, fail_ok: bool = False) RemoteRepo | None #
- cacheWorkflow(workflow_id: wfexs_backend.workflow.WorkflowId, version_id: WFVersionId | None = None, trs_endpoint: str | None = None, descriptor_type: TRS_Workflow_Descriptor | None = None, ignoreCache: bool = False, registerInCache: bool = True, offline: bool = False, meta_dir: pathlib.Path | None = None) Tuple[pathlib.Path, RemoteRepo, WorkflowType | None, RepoTag | None] #
Fetch the whole workflow description based on the data obtained from the TRS where it is being published.
If the workflow id is an URL, it is supposed to be a git repository, and the version will represent either the branch, tag or specific commit. So, the whole TRS fetching machinery is bypassed.
- TRS_METADATA_FILE: Final[RelPath] = 'cast(...)'#
- TRS_QUERY_CACHE_FILE: Final[RelPath] = 'cast(...)'#
- getWorkflowRepoFromTRS(trs_endpoint: str, workflow_id: wfexs_backend.workflow.WorkflowId, version_id: WFVersionId | None, descriptor_type: TRS_Workflow_Descriptor | None, offline: bool = False, ignoreCache: bool = False, meta_dir: pathlib.Path | None = None) Tuple[IdentifiedWorkflow, pathlib.Path | None] #
- Returns:
- doMaterializeRepo(repo: wfexs_backend.fetchers.RemoteRepo, doUpdate: bool = True, registerInCache: bool = True) Tuple[pathlib.Path, RepoTag] #
- _doMaterializeGitRepo(repo: wfexs_backend.fetchers.RemoteRepo, doUpdate: bool = True) Tuple[URIType, RepoTag, pathlib.Path, Sequence[URIWithMetadata]] #
- Parameters:
repoURL
repoTag
doUpdate
- Returns:
- _doMaterializeSoftwareHeritageDirOrContent(repo: wfexs_backend.fetchers.RemoteRepo, doUpdate: bool = True) Tuple[URIType, RepoTag, pathlib.Path, Sequence[URIWithMetadata]] #
- Parameters:
repoURL
repoTag
doUpdate
- Returns:
- getWorkflowBundleFromURI(remote_url: wfexs_backend.common.URIType, expectedEngineDesc: WorkflowType | None = None, offline: bool = False, ignoreCache: bool = False, registerInCache: bool = True) Tuple[IdentifiedWorkflow | None, pathlib.Path, Sequence[URIWithMetadata], RelPath | None] #
- getWorkflowRepoFromROCrateFile(roCrateFile: pathlib.Path, expectedEngineDesc: WorkflowType | None = None) wfexs_backend.wfexs_backend.IdentifiedWorkflow #
- Parameters:
roCrateFile
expectedEngineDesc – If defined, an instance of WorkflowType
- Returns:
- cacheROcrate(roCrateURL: wfexs_backend.common.URIType, offline: bool = False, ignoreCache: bool = False) pathlib.Path #
Download RO-crate from WorkflowHub (https://dev.workflowhub.eu/) using GA4GH TRS API and save RO-Crate in path.
- downloadContent(remote_file: LicensedURI | Sequence[LicensedURI], dest: pathlib.Path | CacheType, vault: SecurityContextVault | None = None, offline: bool = False, ignoreCache: bool = False, registerInCache: bool = True, keep_cache_licence: bool = True, default_clonable: bool = True) wfexs_backend.common.MaterializedContent #
Download remote file or directory / dataset.
- Parameters:
remote_file (str) – URL or CURIE to download remote file
contextName
workflowInputs_destdir
offline
- _LicenceMatcher: ClassVar[LicenceMatcher | None] = None#
- classmethod GetLicenceMatcher() wfexs_backend.utils.licences.LicenceMatcher #
- curate_licence_list(licences: Sequence[str], default_licence: LicenceDescription | None = None) Sequence[LicenceDescription] #