Source code for spack.util.gcs

# Copyright 2013-2022 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
# SPDX-License-Identifier: (Apache-2.0 OR MIT)

This file contains the definition of the GCS Blob storage Class used to
integrate GCS Blob storage with spack buildcache.

import os
import sys

import llnl.util.tty as tty

[docs]def gcs_client(): """Create a GCS client Creates an authenticated GCS client to access GCS buckets and blobs """ try: import google.auth from import storage except ImportError as ex: tty.error('{0}, google-cloud-storage python module is missing.'.format(ex) + ' Please install to use the gs:// backend.') sys.exit(1) storage_credentials, storage_project = google.auth.default() storage_client = storage.Client(storage_project, storage_credentials) return storage_client
[docs]class GCSBucket(object): """GCS Bucket Object Create a wrapper object for a GCS Bucket. Provides methods to wrap spack related tasks, such as destroy. """ def __init__(self, url, client=None): """Constructor for GCSBucket objects Args: url (str): The url pointing to the GCS bucket to build an object out of client ( A pre-defined storage client that will be used to access the GCS bucket. """ if url.scheme != 'gs': raise ValueError('Can not create GCS bucket connection with scheme {SCHEME}' .format(SCHEME=url.scheme)) self.url = url = self.url.netloc if self.url.path[0] == '/': self.prefix = self.url.path[1:] else: self.prefix = self.url.path self.client = client or gcs_client() self.bucket = None tty.debug('New GCS bucket:') tty.debug(" name: {0}".format( tty.debug(" prefix: {0}".format(self.prefix))
[docs] def exists(self): from import NotFound if not self.bucket: try: self.bucket = self.client.bucket( except NotFound as ex: tty.error("{0}, Failed check for bucket existence".format(ex)) sys.exit(1) return self.bucket is not None
[docs] def create(self): if not self.bucket: self.bucket = self.client.create_bucket(
[docs] def get_blob(self, blob_path): if self.exists(): return self.bucket.get_blob(blob_path) return None
[docs] def blob(self, blob_path): if self.exists(): return self.bucket.blob(blob_path) return None
[docs] def get_all_blobs(self, recursive=True, relative=True): """Get a list of all blobs Returns a list of all blobs within this bucket. Args: relative: If true (default), print blob paths relative to 'build_cache' directory. If false, print absolute blob paths (useful for destruction of bucket) """ tty.debug('Getting GCS blobs... Recurse {0} -- Rel: {1}'.format( recursive, relative)) converter = str if relative: converter = self._relative_blob_name if self.exists(): all_blobs = self.bucket.list_blobs(prefix=self.prefix) blob_list = [] base_dirs = len(self.prefix.split('/')) + 1 for blob in all_blobs: if not recursive: num_dirs = len('/')) if num_dirs <= base_dirs: blob_list.append(converter( else: blob_list.append(converter( return blob_list
def _relative_blob_name(self, blob_name): return os.path.relpath(blob_name, self.prefix)
[docs] def destroy(self, recursive=False, **kwargs): """Bucket destruction method Deletes all blobs within the bucket, and then deletes the bucket itself. Uses GCS Batch operations to bundle several delete operations together. """ from import NotFound tty.debug("Bucket.destroy(recursive={0})".format(recursive)) try: bucket_blobs = self.get_all_blobs(recursive=recursive, relative=False) batch_size = 1000 num_blobs = len(bucket_blobs) for i in range(0, num_blobs, batch_size): with self.client.batch(): for j in range(i, min(i + batch_size, num_blobs)): blob = self.blob(bucket_blobs[j]) blob.delete() except NotFound as ex: tty.error("{0}, Could not delete a blob in bucket {1}.".format( ex, sys.exit(1)
[docs]class GCSBlob(object): """GCS Blob object Wraps some blob methods for spack functionality """ def __init__(self, url, client=None): self.url = url if url.scheme != 'gs': raise ValueError('Can not create GCS blob connection with scheme: {SCHEME}' .format(SCHEME=url.scheme)) self.client = client or gcs_client() self.bucket = GCSBucket(url) self.blob_path = self.url.path.lstrip('/') tty.debug("New GCSBlob") tty.debug(" blob_path = {0}".format(self.blob_path)) if not self.bucket.exists(): tty.warn("The bucket {0} does not exist, it will be created" .format( self.bucket.create()
[docs] def get(self): return self.bucket.get_blob(self.blob_path)
[docs] def exists(self): from import NotFound try: blob = self.bucket.blob(self.blob_path) exists = blob.exists() except NotFound: return False return exists
[docs] def delete_blob(self): from import NotFound try: blob = self.bucket.blob(self.blob_path) blob.delete() except NotFound as ex: tty.error("{0}, Could not delete gcs blob {1}".format(ex, self.blob_path))
[docs] def upload_to_blob(self, local_file_path): blob = self.bucket.blob(self.blob_path) blob.upload_from_filename(local_file_path)
[docs] def get_blob_byte_stream(self): return self.bucket.get_blob(self.blob_path).open(mode='rb')
[docs] def get_blob_headers(self): blob = self.bucket.get_blob(self.blob_path) headers = { 'Content-type': blob.content_type, 'Content-encoding': blob.content_encoding, 'Content-language': blob.content_language, 'MD5Hash': blob.md5_hash } return headers