download
Functions for downloading output from the Job Attachment CAS.
OutputDownloader
¶
Handler for downloading all output files from the given job, with optional step and task-level granularity. If no session is provided the default credentials path will be used, see: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html#configuring-credentials
TODO: The download location is OS-specific to the submitting machine matching the profile of the submitting machine. The OS of the downloading machine might be different, so we need to check that and apply path mapping rules in that case.
download_job_output(file_conflict_resolution=FileConflictResolution.CREATE_COPY, on_downloading_files=None)
¶
Downloads outputs files from S3 bucket to the asset root(s).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_conflict_resolution
|
Optional[FileConflictResolution]
|
resolution method for file conflicts. |
CREATE_COPY
|
on_downloading_files
|
Optional[Callable[[ProgressReportMetadata], bool]]
|
a callback to be called to periodically report progress to the caller. The callback returns True if the operation should continue as normal, or False to cancel. |
None
|
Returns:
| Type | Description |
|---|---|
DownloadSummaryStatistics
|
The download summary statistics |
get_output_paths_by_root()
¶
Returns a dict of asset root paths to lists of output paths.
set_root_path(original_root, new_root)
¶
Changes the root path for downloading output files, (which is the root path saved in the S3 metadata for the output manifest by default,) with a custom path. (It will store the new root path as an absolute path.)
download_file(file, hash_algorithm, local_download_dir, collision_lock, collision_file_dict, s3_bucket, cas_prefix, s3_client=None, session=None, modified_time_override=None, progress_tracker=None, file_conflict_resolution=FileConflictResolution.CREATE_COPY)
¶
Downloads a file from the S3 bucket to the local directory. modified_time_override is ignored if the manifest
version used supports timestamps.
Returns a tuple of (size in bytes, filename) of the downloaded file.
- The file size of 0 means that this file comes from a manifest version that does not provide file sizes.
- The filename of None indicates that this file has been skipped or has not been downloaded.
download_files(files, hash_algorithm, local_download_dir, s3_settings, session=None, progress_tracker=None, file_conflict_resolution=FileConflictResolution.CREATE_COPY)
¶
Downloads all files from the S3 bucket in the Job Attachment settings to the specified directory. Returns a list of local paths of downloaded files.
download_files_from_manifests(s3_bucket, manifests_by_root, cas_prefix=None, fs_permission_settings=None, session=None, on_downloading_files=None, logger=None, conflict_resolution=FileConflictResolution.CREATE_COPY)
¶
Given manifests, downloads all files from a CAS in each manifest.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
s3_bucket
|
str
|
The name of the S3 bucket. |
required |
manifests_by_root
|
dict[str, BaseAssetManifest]
|
a map from each local root path to a corresponding list of tuples of manifest contents and their path. |
required |
cas_prefix
|
Optional[str]
|
The CAS prefix of the files. |
None
|
session
|
Optional[Session]
|
The boto3 session to use. |
None
|
on_downloading_files
|
Optional[Callable[[ProgressReportMetadata], bool]]
|
a callback to be called to periodically report progress to the caller. The callback returns True if the operation should continue as normal, or False to cancel. |
None
|
Returns:
| Type | Description |
|---|---|
DownloadSummaryStatistics
|
The download summary statistics. |
download_files_in_directory(s3_settings, attachments, farm_id, queue_id, job_id, directory_path, local_download_dir, session=None, on_downloading_files=None)
¶
From a given job's input and output files, downloads all files in
the given directory path.
(example of directory_path: "inputs/subdirectory1")
(example of local_download_dir: "/home/username")
get_job_input_output_paths_by_asset_root(s3_settings, attachments, farm_id, queue_id, job_id, step_id=None, task_id=None, session_action_id=None, session=None)
¶
With given IDs, gets the paths of all input and output files of this job. The grouped paths are separated by asset root. Returns a dict of ManifestPathGroups, with the root path as the key.
get_job_input_paths_by_asset_root(s3_settings, attachments, session=None)
¶
Gets dict of grouped paths of all input files of a given job. The grouped paths are separated by asset root. Returns a dict of ManifestPathGroups, with the root path as the key.
get_job_output_paths_by_asset_root(s3_settings, farm_id, queue_id, job_id, step_id=None, task_id=None, session_action_id=None, session=None)
¶
Gets dict of grouped paths of all output files of a given job. The grouped paths are separated by asset root. Returns a dict of ManifestPathGroups, with the root path as the key.
get_output_manifests_by_asset_root(s3_settings, farm_id, queue_id, job_id, step_id=None, task_id=None, session_action_id=None, session=None)
¶
Gets output manifests grouped by asset root for job, step, or task outputs, handling both chunked and non-chunked steps.
When session_action_id is provided, retrieves outputs for that specific session action by searching S3 paths containing the session action ID. Session action ID is typically provided with task ID but is not required.
When session_action_id is not provided, retrieves all output manifests for the specified scope (job, step, or task) and merges them chronologically by asset root. This is used for downloading complete job/step/task outputs or syncing step dependencies by WorkerAgent.
handle_existing_vfs(manifest, session_dir, mount_point, os_user)
¶
Combines provided manifest with the input manifest of the running VFS at the given mount_point if it exists. Then kills the running process at that mount so it can be replaced
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
manifest
|
BaseAssetManifest
|
The manifest for the new inputs to be mounted |
required |
mount_point
|
str
|
The local directory where the manifest is to be mounted |
required |
os_user
|
str
|
the user running the job. |
required |
Returns: BaseAssetManifest: A single manifest containing the merged paths or the original manifest
merge_asset_manifests(manifests)
¶
Merge files from multiple manifests into a single list, ensuring that each filename is unique by keeping the one from the last encountered manifest. (Thus, the steps' outputs are downloaded over the input job attachments.)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
manifests
|
list[AssetManifest]
|
A list of manifests to be merged. |
required |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
When two manifests have different hash algorithms. All manifests must use the same hash algorithm. |
Returns:
| Type | Description |
|---|---|
BaseAssetManifest | None
|
AssetManifest | None: A single manifest containing the merged paths of all provided manifests or None if no manifests were provided |
mount_vfs_from_manifests(s3_bucket, manifests_by_root, boto3_session, session_dir, os_env_vars, fs_permission_settings, cas_prefix=None)
¶
Given manifests, downloads all files from a CAS in those manifests.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
s3_bucket
|
str
|
The name of the S3 bucket. |
required |
manifests_by_root
|
dict[str, BaseAssetManifest]
|
a map from each local root path to a corresponding list of tuples of manifest contents and their path. |
required |
boto3_session
|
Session
|
The boto3 session to use. |
required |
session_dir
|
Path
|
the directory that the session is going to use. |
required |
os_env_vars
|
dict[str, str]
|
environment variables to set for launched subprocesses |
required |
cas_prefix
|
Optional[str]
|
The CAS prefix of the files. |
None
|
Returns:
| Type | Description |
|---|---|
None
|
None |