-
Notifications
You must be signed in to change notification settings - Fork 155
Expand file tree
/
Copy pathclient_config.py
More file actions
159 lines (132 loc) · 6.6 KB
/
client_config.py
File metadata and controls
159 lines (132 loc) · 6.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import logging
from v2.nacos.common.constants import Constants
from v2.nacos.common.nacos_exception import NacosException, INVALID_PARAM
from v2.nacos.common.auth import StaticCredentialsProvider
class KMSConfig:
def __init__(self, enabled=False, endpoint='', access_key='', secret_key='', client_key_content='', password=''):
self.enabled = enabled # 是否启用kms
self.endpoint = endpoint # kms服务的地址
self.client_key_content = client_key_content
self.password = password
self.access_key = access_key
self.secret_key = secret_key
class TLSConfig:
def __init__(self, enabled=False, appointed=False, ca_file='', cert_file='',
key_file='', server_name_override=''):
self.enabled = enabled # 是否启用tls
self.appointed = appointed # 指明是否使用预设的配置
self.ca_file = ca_file # CA证书文件的路径
self.cert_file = cert_file # 客户端证书文件的路径
self.key_file = key_file # 私钥文件的路径
self.server_name_override = server_name_override # 服务器名称覆盖(用于测试)
def __str__(self):
return str(self.__dict__)
class GRPCConfig:
def __init__(self, max_receive_message_length=Constants.GRPC_MAX_RECEIVE_MESSAGE_LENGTH,
max_keep_alive_ms=Constants.GRPC_KEEPALIVE_TIME_MILLS,
initial_window_size=Constants.GRPC_INITIAL_WINDOW_SIZE,
initial_conn_window_size=Constants.GRPC_INITIAL_CONN_WINDOW_SIZE,
grpc_timeout=Constants.DEFAULT_GRPC_TIMEOUT_MILLS,
port_offset=Constants.GRPC_PORT_OFFSET,
capability_negotiation_timeout=Constants.GRPC_CAPABILITY_NEGOTIATION_TIMEOUT):
self.max_receive_message_length = max_receive_message_length
self.max_keep_alive_ms = max_keep_alive_ms
self.initial_window_size = initial_window_size
self.initial_conn_window_size = initial_conn_window_size
self.grpc_timeout = grpc_timeout
self.port_offset = port_offset # gRPC port offset, default 1000
self.capability_negotiation_timeout = capability_negotiation_timeout
class ClientConfig:
def __init__(self, server_addresses=None, endpoint=None, namespace_id='', context_path=Constants.WEB_CONTEXT, access_key=None,
secret_key=None, username=None, password=None, app_name='', app_key='', log_dir='', log_level=None,
log_rotation_backup_count=None, app_conn_labels=None, credentials_provider=None):
self.server_list = []
try:
if server_addresses is not None and server_addresses.strip() != "":
for server_address in server_addresses.strip().split(','):
self.server_list.append(server_address.strip())
except Exception:
raise NacosException(INVALID_PARAM, "server_addresses is invalid")
self.endpoint = endpoint
self.endpoint_context_path = Constants.WEB_CONTEXT
self.endpoint_query_header = None
self.namespace_id = namespace_id
self.credentials_provider = credentials_provider if credentials_provider else StaticCredentialsProvider(access_key, secret_key)
if not context_path:
self.context_path = Constants.WEB_CONTEXT
else:
cp = context_path
if not cp.startswith("/"):
cp = "/" + cp
if cp != "/" and cp.endswith("/"):
cp = cp[:-1]
self.context_path = cp
self.username = username # the username for nacos auth
self.password = password # the password for nacos auth
self.app_name = app_name
self.app_key = app_key
self.cache_dir = ''
self.disable_use_config_cache = False
self.log_dir = log_dir
self.log_level = logging.INFO if log_level is None else log_level # the log level for nacos client, default value is logging.INFO: log_level
self.log_rotation_backup_count = 7 if log_rotation_backup_count is None else log_rotation_backup_count
self.timeout_ms = 10 * 1000 # timeout for requesting Nacos server, default value is 10000ms
self.heart_beat_interval = 5 * 1000 # the time interval for sending beat to server,default value is 5000ms
self.kms_config = KMSConfig(enabled=False)
self.tls_config = TLSConfig(enabled=False)
self.grpc_config = GRPCConfig()
self.load_cache_at_start = True
self.update_cache_when_empty = False
self.app_conn_labels = app_conn_labels
self.async_update_service = False
self.update_thread_num = 5
self.ai_transport_mode = "grpc"
self.ai_prompt_cache_update_interval = 10
def set_log_level(self, log_level):
self.log_level = log_level
return self
def set_cache_dir(self, cache_dir):
self.cache_dir = cache_dir
return self
def set_log_dir(self, log_dir):
self.log_dir = log_dir
return self
def set_timeout_ms(self, timeout_ms):
self.timeout_ms = timeout_ms
return self
def set_heart_beat_interval(self, heart_beat_interval):
self.heart_beat_interval = heart_beat_interval
return self
def set_tls_config(self, tls_config: TLSConfig):
self.tls_config = tls_config
return self
def set_kms_config(self, kms_config: KMSConfig):
self.kms_config = kms_config
return self
def set_grpc_config(self, grpc_config: GRPCConfig):
self.grpc_config = grpc_config
return self
def set_load_cache_at_start(self, load_cache_at_start):
self.load_cache_at_start = load_cache_at_start
return self
def set_update_cache_when_empty(self, update_cache_when_empty: bool):
self.update_cache_when_empty = update_cache_when_empty
return self
def set_endpoint_context_path(self, endpoint_context_path):
self.endpoint_context_path = endpoint_context_path
return self
def set_app_conn_labels(self, app_conn_labels: dict):
self.app_conn_labels = app_conn_labels
return self
def set_async_update_service(self, async_update_service: bool):
self.async_update_service = async_update_service
return self
def set_update_thread_num(self, update_thread_num: int):
self.update_thread_num = update_thread_num
return self
def set_ai_transport_mode(self, ai_transport_mode: str):
self.ai_transport_mode = ai_transport_mode
return self
def set_ai_prompt_cache_update_interval(self, interval: int):
self.ai_prompt_cache_update_interval = interval
return self