-
Notifications
You must be signed in to change notification settings - Fork 126
/
Copy pathgcs_helper.py
119 lines (91 loc) · 3.76 KB
/
gcs_helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from typing import List
from google.cloud import storage
from data_validation import client_info
WRITE_SUCCESS_STRING = "Success! Config output written to"
DELETE_SUCCESS_STRING = "Successfully deleted"
def _is_gcs_path(file_path: str) -> bool:
return True if file_path.startswith("gs://") else False
def get_validation_path(name: str) -> str:
"""Returns the full path to a validation."""
if _is_gcs_path(name):
return name
return os.path.join("./", name)
def get_gcs_bucket(gcs_file_path: str) -> storage.Bucket:
"""Returns storage.Bucket given GCS file path with prefix."""
bucket_name = gcs_file_path[5:].split("/")[0]
info = client_info.get_http_client_info()
storage_client = storage.Client(client_info=info)
try:
return storage_client.bucket(bucket_name)
except ValueError as e:
raise ValueError("GCS Path Failure {} -> {}".format(gcs_file_path, e))
def _get_gcs_file_path(gcs_file_path: str) -> str:
"""
Returns relative object file path i.e. `path/to/file.yaml`
given full GCS file path with prefix.
"""
return "".join(gcs_file_path[5:].split("/", 1)[1:])
def _read_gcs_file(file_path: str, download_as_text: bool = False):
gcs_bucket = get_gcs_bucket(file_path)
blob = gcs_bucket.blob(_get_gcs_file_path(file_path))
if not blob:
raise ValueError(f"Invalid Cloud Storage Path: {file_path}")
return blob.download_as_text() if download_as_text else blob.download_as_bytes()
def _write_gcs_file(file_path: str, data: str):
gcs_bucket = get_gcs_bucket(file_path)
blob = gcs_bucket.blob(_get_gcs_file_path(file_path))
blob.upload_from_string(data)
def _delete_gcs_file(file_path: str):
"""Delete a file stored in GCS."""
gcs_bucket = get_gcs_bucket(file_path)
blob = gcs_bucket.blob(_get_gcs_file_path(file_path))
blob.delete()
def read_file(file_path: str, download_as_text: bool = False):
if _is_gcs_path(file_path):
return _read_gcs_file(file_path, download_as_text)
else:
with open(file_path, "r") as f:
return f.read()
def write_file(file_path: str, data: str, include_log: bool = True):
if _is_gcs_path(file_path):
_write_gcs_file(file_path, data)
else:
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "w") as file:
file.write(data)
if include_log:
logging.info(f"{WRITE_SUCCESS_STRING} {file_path}")
def delete_file(file_path: str, include_log: bool = True):
"""Delete a file from GCS or local filesystem, depending on the path."""
if _is_gcs_path(file_path):
_delete_gcs_file(file_path)
else:
if os.path.exists(file_path):
os.remove(file_path)
else:
raise FileNotFoundError(f"File not found: {file_path}")
if include_log:
logging.info(f"{DELETE_SUCCESS_STRING}: {file_path}")
def list_gcs_directory(directory_path: str) -> List[str]:
gcs_prefix = _get_gcs_file_path(directory_path)
gcs_bucket = get_gcs_bucket(directory_path)
blobs = [
f.name.replace(gcs_prefix, "")
for f in gcs_bucket.list_blobs(prefix=gcs_prefix, delimiter="/")
]
return blobs