wfexs_backend.fetchers.internal.ftp_downloader#

Module Contents#

Classes#

Functions#

asyncio_run

Helper method which abstracts differences from Python 3.7 and before about coroutines

API#

wfexs_backend.fetchers.internal.ftp_downloader.asyncio_run(tasks: Tuple[asyncio.Task[CT], ...]) wfexs_backend.fetchers.internal.ftp_downloader.CT#

Helper method which abstracts differences from Python 3.7 and before about coroutines

class wfexs_backend.fetchers.internal.ftp_downloader.FTPDownloader(HOST: str, PORT: int = DEFAULT_FTP_PORT, USER: str = DEFAULT_USER, PASSWORD: str = DEFAULT_PASS, max_retries: int = DEFAULT_MAX_RETRIES)#

Initialization

DEFAULT_USER: Final[str] = 'ftp'#
DEFAULT_PASS: Final[str] = 'guest@'#
DEFAULT_FTP_PORT: Final[int] = 21#
DEFAULT_MAX_RETRIES: Final[int] = 5#
__enter__() wfexs_backend.fetchers.internal.ftp_downloader.FTPDownloader#
__exit__(exc_type, exc_val, exc_tb)#
async __download_file_async(client: aioftp.Client, upload_file_path: pathlib.Path, dfdPath: pathlib.Path, dfdStat: Mapping[str, Any]) None#
async _reconnect(client: aioftp.Client) None#
async _download_dir_async(client: aioftp.Client, dfdPath: pathlib.Path, utdPath: pathlib.Path, exclude_ext: Sequence[str]) Sequence[Path]#

This method mirrors a whole directory into a destination one dfdPath must be absolute

async _download_file_async(client: aioftp.Client, dfdPath: pathlib.Path, utdPath: pathlib.Path) pathlib.Path#

dfdPath must be absolute

async download_dir_async(download_from_dir: str, upload_to_dir: str, exclude_ext: Sequence[str]) Sequence[Path]#
async download_file_async(download_from_file: str, upload_to_file: str) pathlib.Path#
async download_async(download_from_df: str, upload_to_df: str, exclude_ext: Sequence[str]) Path | Sequence[Path]#

This method returns a Path when a file is fetched and a list of Path when it is a directory

download_dir(download_from_dir: str, upload_to_dir: str = '.', exclude_ext: Sequence[str] = []) Sequence[Path]#
download_file(download_from_file: str, upload_to_file: str) pathlib.Path#
download(download_path: str, upload_path: str, exclude_ext: Sequence[str] = []) Path | Sequence[Path]#
static clear_tasks() None#