-
Notifications
You must be signed in to change notification settings - Fork 8.8k
Expand file tree
/
Copy pathazure_spn_conn.py
More file actions
118 lines (105 loc) · 4.22 KB
/
azure_spn_conn.py
File metadata and controls
118 lines (105 loc) · 4.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import os
import time
from common.decorator import singleton
from azure.identity import ClientSecretCredential, AzureAuthorityHosts
from azure.storage.filedatalake import FileSystemClient
from common import settings
_CLOUD_AUTHORITY_MAP = {
"public": AzureAuthorityHosts.AZURE_PUBLIC_CLOUD,
"china": AzureAuthorityHosts.AZURE_CHINA,
"government": AzureAuthorityHosts.AZURE_GOVERNMENT,
"germany": AzureAuthorityHosts.AZURE_GERMANY,
}
@singleton
class RAGFlowAzureSpnBlob:
def __init__(self):
self.conn = None
self.account_url = os.getenv('ACCOUNT_URL', settings.AZURE["account_url"])
self.client_id = os.getenv('CLIENT_ID', settings.AZURE["client_id"])
self.secret = os.getenv('SECRET', settings.AZURE["secret"])
self.tenant_id = os.getenv('TENANT_ID', settings.AZURE["tenant_id"])
self.container_name = os.getenv('CONTAINER_NAME', settings.AZURE["container_name"])
self.cloud = os.getenv('AZURE_CLOUD', settings.AZURE.get("cloud", "public")).lower()
self.__open__()
def __open__(self):
try:
if self.conn:
self.__close__()
except Exception:
pass
try:
authority = _CLOUD_AUTHORITY_MAP.get(self.cloud, AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
credentials = ClientSecretCredential(tenant_id=self.tenant_id, client_id=self.client_id,
client_secret=self.secret, authority=authority)
self.conn = FileSystemClient(account_url=self.account_url, file_system_name=self.container_name,
credential=credentials)
except Exception:
logging.exception("Fail to connect %s" % self.account_url)
def __close__(self):
del self.conn
self.conn = None
def health(self):
_bucket, fnm, binary = "txtxtxtxt1", "txtxtxtxt1", b"_t@@@1"
f = self.conn.create_file(fnm)
f.append_data(binary, offset=0, length=len(binary))
return f.flush_data(len(binary))
def put(self, bucket, fnm, binary):
for _ in range(3):
try:
f = self.conn.create_file(fnm)
f.append_data(binary, offset=0, length=len(binary))
return f.flush_data(len(binary))
except Exception:
logging.exception(f"Fail put {bucket}/{fnm}")
self.__open__()
time.sleep(1)
return None
return None
def rm(self, bucket, fnm):
try:
self.conn.delete_file(fnm)
except Exception:
logging.exception(f"Fail rm {bucket}/{fnm}")
def get(self, bucket, fnm):
for _ in range(1):
try:
client = self.conn.get_file_client(fnm)
r = client.download_file()
return r.read()
except Exception:
logging.exception(f"fail get {bucket}/{fnm}")
self.__open__()
time.sleep(1)
return None
def obj_exist(self, bucket, fnm):
try:
client = self.conn.get_file_client(fnm)
return client.exists()
except Exception:
logging.exception(f"Fail put {bucket}/{fnm}")
return False
def get_presigned_url(self, bucket, fnm, expires):
for _ in range(10):
try:
return self.conn.get_presigned_url("GET", bucket, fnm, expires)
except Exception:
logging.exception(f"fail get {bucket}/{fnm}")
self.__open__()
time.sleep(1)
return None