wfexs_backend.wfexs_backend

Contents

wfexs_backend.wfexs_backend#

Module Contents#

Classes#

IdentifiedWorkflow

workflow_type: The identified workflow type

WfExSBackend

WfExS-backend setup class

API#

class wfexs_backend.wfexs_backend.IdentifiedWorkflow#

Bases: typing.NamedTuple

workflow_type: The identified workflow type

workflow_type: wfexs_backend.workflow_engines.WorkflowType = None#
remote_repo: wfexs_backend.fetchers.RemoteRepo = None#
exception wfexs_backend.wfexs_backend.WfExSBackendException#

Bases: wfexs_backend.common.AbstractWfExSException

class wfexs_backend.wfexs_backend.WfExSBackend(local_config: WfExSConfigBlock | None = None, config_directory: pathlib.Path | None = None)#

WfExS-backend setup class

Initialization

Init function

Parameters:

local_config (dict) – Local setup configuration, telling where caching directories live

DEFAULT_PASSPHRASE_LENGTH: Final[int] = 4#
CRYPT4GH_SECTION: Final[str] = 'crypt4gh'#
CRYPT4GH_PRIVKEY_KEY: Final[str] = 'key'#
CRYPT4GH_PUBKEY_KEY: Final[str] = 'pub'#
CRYPT4GH_PASSPHRASE_KEY: Final[str] = 'passphrase'#
ID_JSON_FILENAME: Final[str] = '.id.json'#
SCHEMAS_REL_DIR: Final[str] = 'schemas'#
CONFIG_SCHEMA: Final[RelPath] = 'cast(...)'#
_PassGen: ClassVar[WfExSPassphraseGenerator | None] = None#
classmethod GetPassGen() wfexs_backend.utils.passphrase_wrapper.WfExSPassphraseGenerator#
classmethod generate_passphrase() str#
classmethod bootstrap_config(local_config_ro: wfexs_backend.wfexs_backend.WfExSConfigBlock, config_directory: pathlib.Path | None = None, key_prefix: str | None = None) Tuple[bool, WfExSConfigBlock, pathlib.Path]#
Parameters:
  • local_config (dict) – Relevant local configuration, like the cache directory.

  • config_directory – The filename to be used to resolve relative paths

  • key_prefix – Prefix for the files of newly generated key pairs

classmethod FromDescription(workflow_meta: wfexs_backend.workflow.WorkflowMetaConfigBlock, local_config: wfexs_backend.wfexs_backend.WfExSConfigBlock, vault: SecurityContextVault | None = None, config_directory: pathlib.Path | None = None, public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None) wfexs_backend.workflow.WF#
Parameters:

workflow_meta – The configuration describing both the workflow

and the inputs to use when it is being instantiated. :param local_config: Relevant local configuration, like the cache directory. :param config_directory: :type workflow_meta: dict :type local_config: dict :type config_directory: :return: Workflow configuration

property cacheWorkflowDir: pathlib.Path#
property cacheROCrateDir: pathlib.Path#
property cacheTRSFilesDir: pathlib.Path#
property cacheWorkflowInputsDir: pathlib.Path#
getCacheHandler(cache_type: wfexs_backend.common.CacheType) Tuple[SchemeHandlerCacheHandler, pathlib.Path | None]#
instantiateStatefulFetcher(statefulFetcher: Type[StatefulFetcher], setup_block: Mapping[str, Any] | None = None) wfexs_backend.fetchers.StatefulFetcher#

Method to instantiate stateful fetchers once

instantiateRepoFetcher(repoFetcher: Type[RepoFetcher], setup_block: Mapping[str, Any] | None = None) wfexs_backend.fetchers.RepoFetcher#

Method to instantiate repo fetchers once

findAndAddWorkflowEnginesFromModuleName(the_module_name: str = 'wfexs_backend.workflow_engines') None#
findAndAddWorkflowEnginesFromModule(the_module: types.ModuleType) None#
addWorkflowEngine(workflowEngineClazz: Type[WorkflowEngine]) None#
listWorkflowEngines() Sequence[str]#
listWorkflowEngineClasses() Sequence[Type[WorkflowEngine]]#
getWorkflowEngineClass(engine_shortname: str) Type[WorkflowEngine] | None#
matchWorkflowType(mainEntityProgrammingLanguageUrl: str, mainEntityProgrammingLanguageId: str | None) wfexs_backend.workflow_engines.WorkflowType#
findAndAddContainerFactoriesFromModuleName(the_module_name: str = 'wfexs_backend.container_factories') None#
findAndAddContainerFactoriesFromModule(the_module: types.ModuleType) None#
addContainerFactory(containerFactoryClazz: Type[ContainerFactory]) None#
listImplementedContainerTypes() Sequence[ContainerType]#
listContainerFactoryClasses() Sequence[Type[ContainerFactory]]#
getContainerFactoryClass(container_type: wfexs_backend.common.ContainerType) Type[ContainerFactory] | None#
findAndAddExportPluginsFromModuleName(the_module_name: str = 'wfexs_backend.pushers') None#
findAndAddExportPluginsFromModule(the_module: types.ModuleType) None#
addExportPlugin(exportClazz: Type[AbstractExportPlugin]) None#
listExportPluginNames() Sequence[SymbolicName]#
getExportPluginClass(plugin_id: wfexs_backend.common.SymbolicName) Type[AbstractExportPlugin] | None#
findAndAddSchemeHandlersFromModuleName(the_module_name: str = 'wfexs_backend.fetchers', fetchers_setup_block: Mapping[str, Mapping[str, Any]] | None = None) None#
findAndAddSchemeHandlersFromModule(the_module: types.ModuleType, fetchers_setup_block: Mapping[str, Mapping[str, Any]] | None = None) None#
addStatefulSchemeHandlers(statefulSchemeHandler: Type[AbstractStatefulFetcher], fetchers_setup_block: Mapping[str, Mapping[str, Any]] | None = None) None#

This method adds scheme handlers (aka “fetchers”) from a given stateful fetcher, also adding the needed programs

SCHEME_PAT: Final[Pattern[str]] = 'compile(...)'#
addSchemeHandlers(schemeHandlers: Mapping[str, DocumentedProtocolFetcher | DocumentedStatefulProtocolFetcher], fetchers_setup_block: Mapping[str, Mapping[str, Any]] | None = None) None#

This method adds scheme handlers (aka “fetchers”) or instantiates stateful scheme handlers (aka “stateful fetchers”)

gen_workflow_pid(remote_repo: wfexs_backend.fetchers.RemoteRepo) str#

This method tries generating the workflow pid passing the remote repo to each one of the registered repo fetchers. The contract is that BuildPIDFromRepo should return None if it does not recognize the repo_url as usable.

describeFetchableSchemes() Sequence[Tuple[str, str, int]]#
newSetup(workflow_id: wfexs_backend.workflow.WorkflowId, version_id: WFVersionId | None = None, descriptor_type: TRS_Workflow_Descriptor | None = None, trs_endpoint: str = WF.DEFAULT_TRS_ENDPOINT, params: ParamsBlock | None = None, enabled_profiles: Sequence[str] | None = None, environment: EnvironmentBlock | None = None, outputs: OutputsBlock | None = None, default_actions: Sequence[ExportActionBlock] | None = None, workflow_config: WorkflowConfigBlock | None = None, vault: SecurityContextVault | None = None, public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None) wfexs_backend.workflow.WF#

Init function, which delegates on WF class

createRawWorkDir(nickname_prefix: str | None = None, orcids: Sequence[str] = []) Tuple[WfExSInstanceId, str, datetime.datetime, Sequence[str], pathlib.Path]#

This method creates a new, empty, raw working directory

getOrCreateRawWorkDirFromInstanceId(instanceId: wfexs_backend.common.WfExSInstanceId, nickname: str | None = None, orcids: Sequence[str] = [], create_ok: bool = False) Tuple[WfExSInstanceId, str, datetime.datetime, Sequence[str], pathlib.Path]#

This method returns the absolute path to the raw working directory

parseOrCreateRawWorkDir(uniqueRawWorkDir: pathlib.Path, instanceId: WfExSInstanceId | None = None, nickname: str | None = None, orcids: Sequence[str] = [], create_ok: bool = False) Tuple[WfExSInstanceId, str, datetime.datetime, Sequence[str], pathlib.Path]#

This method returns the absolute path to the raw working directory

normalizeRawWorkingDirectory(uniqueRawWorkDir: pathlib.Path) Tuple[WfExSInstanceId, str, datetime.datetime, Sequence[str], pathlib.Path]#

This method returns the id of a working directory, as well as the nickname

fromWorkDir(workflowWorkingDirectory: pathlib.Path, private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, fail_ok: bool = False) wfexs_backend.workflow.WF#
getDefaultParanoidMode() bool#
enableDefaultParanoidMode() None#
tryWorkflowURI(workflow_uri: str, securityContextsConfigFilename: pathlib.Path | None = None, nickname_prefix: str | None = None) wfexs_backend.workflow.WF#
fromFiles(workflowMetaFilename: pathlib.Path, securityContextsConfigFilename: pathlib.Path | None = None, nickname_prefix: str | None = None, orcids: Sequence[str] = [], public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, paranoidMode: bool = False) wfexs_backend.workflow.WF#
fromPreviousInstanceDeclaration(wfInstance: wfexs_backend.workflow.WF, securityContextsConfigFilename: pathlib.Path | None = None, replaced_parameters_filename: pathlib.Path | None = None, nickname_prefix: str | None = None, orcids: Sequence[str] = [], public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, secure: bool = True, paranoidMode: bool = False, reproducibility_level: wfexs_backend.utils.rocrate.ReproducibilityLevel = ReproducibilityLevel.Metadata, strict_reproducibility_level: bool = False) wfexs_backend.workflow.WF#
fromPreviousROCrate(workflowROCrateFilenameOrURI: AnyPath | URIType, securityContextsConfigFilename: pathlib.Path | None = None, replaced_parameters_filename: pathlib.Path | None = None, nickname_prefix: str | None = None, orcids: Sequence[str] = [], public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, secure: bool = True, paranoidMode: bool = False, reproducibility_level: wfexs_backend.utils.rocrate.ReproducibilityLevel = ReproducibilityLevel.Metadata, strict_reproducibility_level: bool = False, retrospective_first: bool = True) wfexs_backend.workflow.WF#
parseAndValidateSecurityContextFile(securityContextsConfigFilename: pathlib.Path) Tuple[ExitVal, SecurityContextConfigBlock]#
validateConfigFiles(workflowMetaFilename: pathlib.Path | WorkflowMetaConfigBlock, securityContextsConfigFilename: pathlib.Path | None = None) wfexs_backend.common.ExitVal#
fromDescription(workflow_meta: wfexs_backend.workflow.WorkflowMetaConfigBlock, vault: SecurityContextVault | None = None, orcids: Sequence[str] = [], public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, paranoidMode: bool = False) wfexs_backend.workflow.WF#
Parameters:

workflow_meta – The configuration describing both the workflow

and the inputs to use when it is being instantiated. :param creds_config: Dictionary with the different credential contexts (to be implemented) :param paranoidMode: :type workflow_meta: dict :type creds_config: dict :type paranoidMode: :return: Workflow configuration

fromForm(workflow_meta: wfexs_backend.workflow.WorkflowMetaConfigBlock, orcids: Sequence[str] = [], public_key_filenames: Sequence[pathlib.Path] = [], private_key_filename: pathlib.Path | None = None, paranoidMode: bool = False) wfexs_backend.workflow.WF#

Method mainly used by OpenVRE deployment in iPC

Parameters:

workflow_meta – The configuration describing both the workflow

and the inputs to use when it is being instantiated. :param paranoidMode: :type workflow_meta: dict :type paranoidMode: :return: Workflow configuration

getFusermountParams() Tuple[AnyPath, int]#
readSecuredWorkdirPassphrase(workdir_passphrase_file: pathlib.Path, private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None) Tuple[EncryptedFSType, AnyPath, str]#

This method decrypts a crypt4gh file containing a passphrase which has been encrypted with either the WfExS installation public key (if the filename is not provided), or the public key paired with the provided private key stored in the file.

generateSecuredWorkdirPassphrase(workdir_passphrase_file: pathlib.Path, private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None, public_key_filenames: Sequence[pathlib.Path] = []) Tuple[EncryptedFSType, AnyPath, str, Sequence[pathlib.Path]]#

This method generates a random passphrase, which is stored in a crypt4gh encrypted file (along with the FUSE filesystem used) using either the default WfExS-backend public key, or the public keys stored in the files provided as a parameter. It returns the FUSE filesystem, the command to be used to mount, the generated passphrase (to be consumed in memory) and the list of public keys used to encrypt the passphrase file, which can be used later for some additional purposes.

listStagedWorkflows(*args: str, acceptGlob: bool = False, doCleanup: bool = True, private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None) Iterator[Tuple[WfExSInstanceId, str, datetime.datetime, StagedSetup | None, WF | None]]#
statusStagedWorkflows(*args: str, acceptGlob: bool = False, private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None) Iterator[Tuple[WfExSInstanceId, str, datetime.datetime, StagedSetup | None, MarshallingStatus | None]]#
removeStagedWorkflows(*args: str, acceptGlob: bool = False, private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None) Iterator[Tuple[WfExSInstanceId, str]]#
shellFirstStagedWorkflow(*args: str, stdin: IO[str] = sys.stdin, stdout: IO[str] = sys.stdout, stderr: IO[str] = sys.stderr, acceptGlob: bool = False, firstMatch: bool = True, private_key_filename: pathlib.Path | None = None, private_key_passphrase: str | None = None) wfexs_backend.common.ExitVal#
cacheFetch(remote_file: LicensedURI | parse.ParseResult | URIType | Sequence[LicensedURI] | Sequence[parse.ParseResult] | Sequence[URIType], cacheType: wfexs_backend.common.CacheType, offline: bool, ignoreCache: bool = False, registerInCache: bool = True, vault: SecurityContextVault | None = None, sec_context_name: str | None = None, default_clonable: bool = True) wfexs_backend.cache_handler.CachedContent#

This is a pass-through method to the cache handler, which translates from symbolic types of cache to their corresponding directories

Parameters:
  • remote_file – The URI to be fetched (if not already cached)

  • cacheType – The type of cache where to look up the URI

  • offline – Is the instance working in offline mode?

(i.e. raise exceptions when external content is needed) :param ignoreCache: Even if the content is cache, discard and re-fetch it :param registerInCache: Should the fetched content be registered in the cache? :param vault: The security context which has to be passed to the fetchers, in case they have to be used

instantiateEngine(engineDesc: wfexs_backend.workflow_engines.WorkflowType, stagedSetup: wfexs_backend.common.StagedSetup) wfexs_backend.workflow_engines.AbstractWorkflowEngineType#
guess_repo_params(wf_url: URIType | parse.ParseResult, fail_ok: bool = False) RemoteRepo | None#
cacheWorkflow(workflow_id: wfexs_backend.workflow.WorkflowId, version_id: WFVersionId | None = None, trs_endpoint: str | None = None, descriptor_type: TRS_Workflow_Descriptor | None = None, ignoreCache: bool = False, registerInCache: bool = True, offline: bool = False, meta_dir: pathlib.Path | None = None) Tuple[pathlib.Path, RemoteRepo, WorkflowType | None, RepoTag | None]#

Fetch the whole workflow description based on the data obtained from the TRS where it is being published.

If the workflow id is an URL, it is supposed to be a git repository, and the version will represent either the branch, tag or specific commit. So, the whole TRS fetching machinery is bypassed.

TRS_METADATA_FILE: Final[RelPath] = 'cast(...)'#
TRS_QUERY_CACHE_FILE: Final[RelPath] = 'cast(...)'#
getWorkflowRepoFromTRS(trs_endpoint: str, workflow_id: wfexs_backend.workflow.WorkflowId, version_id: WFVersionId | None, descriptor_type: TRS_Workflow_Descriptor | None, offline: bool = False, ignoreCache: bool = False, meta_dir: pathlib.Path | None = None) Tuple[IdentifiedWorkflow, pathlib.Path | None]#
Returns:

doMaterializeRepo(repo: wfexs_backend.fetchers.RemoteRepo, doUpdate: bool = True, registerInCache: bool = True) Tuple[pathlib.Path, RepoTag]#
_doMaterializeGitRepo(repo: wfexs_backend.fetchers.RemoteRepo, doUpdate: bool = True) Tuple[URIType, RepoTag, pathlib.Path, Sequence[URIWithMetadata]]#
Parameters:
  • repoURL

  • repoTag

  • doUpdate

Returns:

_doMaterializeSoftwareHeritageDirOrContent(repo: wfexs_backend.fetchers.RemoteRepo, doUpdate: bool = True) Tuple[URIType, RepoTag, pathlib.Path, Sequence[URIWithMetadata]]#
Parameters:
  • repoURL

  • repoTag

  • doUpdate

Returns:

getWorkflowBundleFromURI(remote_url: wfexs_backend.common.URIType, expectedEngineDesc: WorkflowType | None = None, offline: bool = False, ignoreCache: bool = False, registerInCache: bool = True) Tuple[IdentifiedWorkflow | None, pathlib.Path, Sequence[URIWithMetadata], RelPath | None]#
getWorkflowRepoFromROCrateFile(roCrateFile: pathlib.Path, expectedEngineDesc: WorkflowType | None = None) wfexs_backend.wfexs_backend.IdentifiedWorkflow#
Parameters:
  • roCrateFile

  • expectedEngineDesc – If defined, an instance of WorkflowType

Returns:

DEFAULT_RO_EXTENSION: Final[str] = '.crate.zip'#
cacheROcrate(roCrateURL: wfexs_backend.common.URIType, offline: bool = False, ignoreCache: bool = False) pathlib.Path#

Download RO-crate from WorkflowHub (https://dev.workflowhub.eu/) using GA4GH TRS API and save RO-Crate in path.

Parameters:
  • roCrateURL (str) – location path to save RO-Crate

  • offline (bool) – Are we in offline mode?

Returns:

downloadContent(remote_file: LicensedURI | Sequence[LicensedURI], dest: pathlib.Path | CacheType, vault: SecurityContextVault | None = None, offline: bool = False, ignoreCache: bool = False, registerInCache: bool = True, keep_cache_licence: bool = True, default_clonable: bool = True) wfexs_backend.common.MaterializedContent#

Download remote file or directory / dataset.

Parameters:
  • remote_file (str) – URL or CURIE to download remote file

  • contextName

  • workflowInputs_destdir

  • offline

_LicenceMatcher: ClassVar[LicenceMatcher | None] = None#
classmethod GetLicenceMatcher() wfexs_backend.utils.licences.LicenceMatcher#
curate_licence_list(licences: Sequence[str], default_licence: LicenceDescription | None = None) Sequence[LicenceDescription]#