Skip to content

download

Functions for downloading output from the Job Attachment CAS.

OutputDownloader

Handler for downloading all output files from the given job, with optional step and task-level granularity. If no session is provided the default credentials path will be used, see: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html#configuring-credentials

TODO: The download location is OS-specific to the submitting machine matching the profile of the submitting machine. The OS of the downloading machine might be different, so we need to check that and apply path mapping rules in that case.

download_job_output(file_conflict_resolution=FileConflictResolution.CREATE_COPY, on_downloading_files=None)

Downloads outputs files from S3 bucket to the asset root(s).

Parameters:

Name Type Description Default
file_conflict_resolution Optional[FileConflictResolution]

resolution method for file conflicts.

CREATE_COPY
on_downloading_files Optional[Callable[[ProgressReportMetadata], bool]]

a callback to be called to periodically report progress to the caller. The callback returns True if the operation should continue as normal, or False to cancel.

None

Returns:

Type Description
DownloadSummaryStatistics

The download summary statistics

get_output_paths_by_root()

Returns a dict of asset root paths to lists of output paths.

set_root_path(original_root, new_root)

Changes the root path for downloading output files, (which is the root path saved in the S3 metadata for the output manifest by default,) with a custom path. (It will store the new root path as an absolute path.)

download_file(file, hash_algorithm, local_download_dir, collision_lock, collision_file_dict, s3_bucket, cas_prefix, s3_client=None, session=None, modified_time_override=None, progress_tracker=None, file_conflict_resolution=FileConflictResolution.CREATE_COPY)

Downloads a file from the S3 bucket to the local directory. modified_time_override is ignored if the manifest version used supports timestamps. Returns a tuple of (size in bytes, filename) of the downloaded file. - The file size of 0 means that this file comes from a manifest version that does not provide file sizes. - The filename of None indicates that this file has been skipped or has not been downloaded.

download_files(files, hash_algorithm, local_download_dir, s3_settings, session=None, progress_tracker=None, file_conflict_resolution=FileConflictResolution.CREATE_COPY)

Downloads all files from the S3 bucket in the Job Attachment settings to the specified directory. Returns a list of local paths of downloaded files.

download_files_from_manifests(s3_bucket, manifests_by_root, cas_prefix=None, fs_permission_settings=None, session=None, on_downloading_files=None, logger=None, conflict_resolution=FileConflictResolution.CREATE_COPY)

Given manifests, downloads all files from a CAS in each manifest.

Parameters:

Name Type Description Default
s3_bucket str

The name of the S3 bucket.

required
manifests_by_root dict[str, BaseAssetManifest]

a map from each local root path to a corresponding list of tuples of manifest contents and their path.

required
cas_prefix Optional[str]

The CAS prefix of the files.

None
session Optional[Session]

The boto3 session to use.

None
on_downloading_files Optional[Callable[[ProgressReportMetadata], bool]]

a callback to be called to periodically report progress to the caller. The callback returns True if the operation should continue as normal, or False to cancel.

None

Returns:

Type Description
DownloadSummaryStatistics

The download summary statistics.

download_files_in_directory(s3_settings, attachments, farm_id, queue_id, job_id, directory_path, local_download_dir, session=None, on_downloading_files=None)

From a given job's input and output files, downloads all files in the given directory path. (example of directory_path: "inputs/subdirectory1") (example of local_download_dir: "/home/username")

get_job_input_output_paths_by_asset_root(s3_settings, attachments, farm_id, queue_id, job_id, step_id=None, task_id=None, session_action_id=None, session=None)

With given IDs, gets the paths of all input and output files of this job. The grouped paths are separated by asset root. Returns a dict of ManifestPathGroups, with the root path as the key.

get_job_input_paths_by_asset_root(s3_settings, attachments, session=None)

Gets dict of grouped paths of all input files of a given job. The grouped paths are separated by asset root. Returns a dict of ManifestPathGroups, with the root path as the key.

get_job_output_paths_by_asset_root(s3_settings, farm_id, queue_id, job_id, step_id=None, task_id=None, session_action_id=None, session=None)

Gets dict of grouped paths of all output files of a given job. The grouped paths are separated by asset root. Returns a dict of ManifestPathGroups, with the root path as the key.

get_output_manifests_by_asset_root(s3_settings, farm_id, queue_id, job_id, step_id=None, task_id=None, session_action_id=None, session=None)

Gets output manifests grouped by asset root for job, step, or task outputs, handling both chunked and non-chunked steps.

When session_action_id is provided, retrieves outputs for that specific session action by searching S3 paths containing the session action ID. Session action ID is typically provided with task ID but is not required.

When session_action_id is not provided, retrieves all output manifests for the specified scope (job, step, or task) and merges them chronologically by asset root. This is used for downloading complete job/step/task outputs or syncing step dependencies by WorkerAgent.

handle_existing_vfs(manifest, session_dir, mount_point, os_user)

Combines provided manifest with the input manifest of the running VFS at the given mount_point if it exists. Then kills the running process at that mount so it can be replaced

Parameters:

Name Type Description Default
manifest BaseAssetManifest

The manifest for the new inputs to be mounted

required
mount_point str

The local directory where the manifest is to be mounted

required
os_user str

the user running the job.

required

Returns: BaseAssetManifest: A single manifest containing the merged paths or the original manifest

merge_asset_manifests(manifests)

Merge files from multiple manifests into a single list, ensuring that each filename is unique by keeping the one from the last encountered manifest. (Thus, the steps' outputs are downloaded over the input job attachments.)

Parameters:

Name Type Description Default
manifests list[AssetManifest]

A list of manifests to be merged.

required

Raises:

Type Description
NotImplementedError

When two manifests have different hash algorithms. All manifests must use the same hash algorithm.

Returns:

Type Description
BaseAssetManifest | None

AssetManifest | None: A single manifest containing the merged paths of all provided manifests or None if no manifests were provided

mount_vfs_from_manifests(s3_bucket, manifests_by_root, boto3_session, session_dir, os_env_vars, fs_permission_settings, cas_prefix=None)

Given manifests, downloads all files from a CAS in those manifests.

Parameters:

Name Type Description Default
s3_bucket str

The name of the S3 bucket.

required
manifests_by_root dict[str, BaseAssetManifest]

a map from each local root path to a corresponding list of tuples of manifest contents and their path.

required
boto3_session Session

The boto3 session to use.

required
session_dir Path

the directory that the session is going to use.

required
os_env_vars dict[str, str]

environment variables to set for launched subprocesses

required
cas_prefix Optional[str]

The CAS prefix of the files.

None

Returns:

Type Description
None

None