forked from move-coop/parsons
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_google_cloud_storage.py
More file actions
103 lines (77 loc) · 3.79 KB
/
test_google_cloud_storage.py
File metadata and controls
103 lines (77 loc) · 3.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import unittest
from pathlib import Path
from google.cloud import storage
from parsons import GoogleCloudStorage, Table
from parsons.utilities import files
from test.utils import assert_matching_tables, mark_live_test
TEMP_BUCKET_NAME = "parsons_test"
TEMP_FILE_NAME = "tmp_file_01.txt"
@mark_live_test
class TestGoogleStorageBuckets(unittest.TestCase):
def setUp(self):
self.cloud = GoogleCloudStorage()
# Running into some issues creating and delete too many buckets, so
# will check to see if it already exists
if not self.cloud.bucket_exists(TEMP_BUCKET_NAME):
self.cloud.create_bucket(TEMP_BUCKET_NAME)
# Upload a file
tmp_file_path = files.string_to_temp_file("A little string", suffix=".txt")
self.cloud.put_blob(TEMP_BUCKET_NAME, TEMP_FILE_NAME, tmp_file_path)
def test_list_buckets(self):
# Assert that it finds the correct buckets
bucket_list = self.cloud.list_buckets()
# Make sure that my bucket is in the list
assert TEMP_BUCKET_NAME in bucket_list
def test_bucket_exists(self):
# Assert finds a bucket that exists
assert self.cloud.bucket_exists(TEMP_BUCKET_NAME)
# Assert doesn't find a bucket that doesn't exist
assert not self.cloud.bucket_exists("NOT_A_REAL_BUCKET")
def test_get_bucket(self):
# Assert that a bucket object is returned
assert isinstance(self.cloud.get_bucket(TEMP_BUCKET_NAME), storage.bucket.Bucket)
def test_create_bucket(self):
# Temporary bucket has already been created as part of set up, so just checking
# that it really exists
assert self.cloud.bucket_exists(TEMP_BUCKET_NAME)
def test_delete_bucket(self):
# Create another bucket, delete it and make sure it doesn't exist
self.cloud.create_bucket(TEMP_BUCKET_NAME + "_2")
self.cloud.delete_bucket(TEMP_BUCKET_NAME + "_2")
assert not self.cloud.bucket_exists(TEMP_BUCKET_NAME + "_2")
def test_list_blobs(self):
blob_list = self.cloud.list_blobs(TEMP_BUCKET_NAME)
# Make sure that my file is in the list
assert TEMP_FILE_NAME in blob_list
# Make sure that there is only one file in the bucket
assert len(blob_list) == 1
def test_blob_exists(self):
# Assert that it thinks that the blob exists
assert self.cloud.blob_exists(TEMP_BUCKET_NAME, TEMP_FILE_NAME)
# Assert that it thinks that a non-existent blob doesn't exist
assert not self.cloud.blob_exists(TEMP_BUCKET_NAME, "FAKE_BLOB")
def test_put_blob(self):
# Already being tested as part of setUp
pass
def test_get_blob(self):
# Assert that a blob object is returned
assert isinstance(self.cloud.get_blob(TEMP_BUCKET_NAME, TEMP_FILE_NAME), storage.blob.Blob)
def test_download_blob(self):
# Download blob and ensure that it is the expected file
blob = Path(self.cloud.download_blob(TEMP_BUCKET_NAME, TEMP_FILE_NAME))
assert blob.read_text() == "A little string"
def test_delete_blob(self):
file_name = "delete_me.txt"
# Upload a file
tmp_file_path = files.string_to_temp_file("A little string", suffix=".txt")
self.cloud.put_blob(TEMP_BUCKET_NAME, file_name, tmp_file_path)
# Check that it was deleted.
self.cloud.delete_blob(TEMP_BUCKET_NAME, file_name)
assert not self.cloud.blob_exists(TEMP_BUCKET_NAME, file_name)
def test_get_url(self):
file_name = "delete_me.csv"
input_tbl = Table([["a"], ["1"]])
self.cloud.upload_table(input_tbl, TEMP_BUCKET_NAME, file_name)
url = self.cloud.get_url(TEMP_BUCKET_NAME, file_name)
download_tbl = Table.from_csv(url)
assert_matching_tables(input_tbl, download_tbl)