1
1
import posixpath as path
2
2
from itertools import chain
3
3
from threading import Lock
4
- from typing import Any , Dict , Iterator , List , Optional , Set , Tuple
4
+ from typing import Dict , Iterator , List , Optional , Set , Tuple
5
5
from urllib .parse import urlparse
6
6
from uuid import uuid4
7
7
@@ -144,7 +144,9 @@ def clean_up_partitions(self, database_name: str, table_name: str, where_conditi
144
144
def clean_up_table (self , database_name : str , table_name : str ):
145
145
table_location = self .get_table_location (database_name , table_name )
146
146
147
- if table_location is not None :
147
+ # this check avoid issues for when the table location is an empty string
148
+ # or when the table do not exist and table location is None
149
+ if table_location :
148
150
self .delete_from_s3 (table_location )
149
151
150
152
@available
@@ -161,7 +163,7 @@ def delete_from_s3(self, s3_path: str):
161
163
conn = self .connections .get_thread_connection ()
162
164
client = conn .handle
163
165
bucket_name , prefix = self ._parse_s3_path (s3_path )
164
- if self ._s3_path_exists (client , bucket_name , prefix ):
166
+ if self ._s3_path_exists (bucket_name , prefix ):
165
167
s3_resource = client .session .resource ("s3" , region_name = client .region_name , config = get_boto3_config ())
166
168
s3_bucket = s3_resource .Bucket (bucket_name )
167
169
logger .debug (f"Deleting table data: path='{ s3_path } ', bucket='{ bucket_name } ', prefix='{ prefix } '" )
@@ -195,12 +197,13 @@ def _parse_s3_path(s3_path: str) -> Tuple[str, str]:
195
197
prefix = o .path .lstrip ("/" ).rstrip ("/" ) + "/"
196
198
return bucket_name , prefix
197
199
198
- @staticmethod
199
- def _s3_path_exists (client : Any , s3_bucket : str , s3_prefix : str ) -> bool :
200
+ def _s3_path_exists (self , s3_bucket : str , s3_prefix : str ) -> bool :
200
201
"""Checks whether a given s3 path exists."""
201
- response = client .session .client (
202
- "s3" , region_name = client .region_name , config = get_boto3_config ()
203
- ).list_objects_v2 (Bucket = s3_bucket , Prefix = s3_prefix )
202
+ conn = self .connections .get_thread_connection ()
203
+ client = conn .handle
204
+ with boto3_client_lock :
205
+ s3_client = client .session .client ("s3" , region_name = client .region_name , config = get_boto3_config ())
206
+ response = s3_client .list_objects_v2 (Bucket = s3_bucket , Prefix = s3_prefix )
204
207
return True if "Contents" in response else False
205
208
206
209
def _join_catalog_table_owners (self , table : agate .Table , manifest : Manifest ) -> agate .Table :
0 commit comments