Skip to content

Commit 8e1dea3

Browse files
committed
rm long-term connection support
1 parent 2cf7a98 commit 8e1dea3

File tree

3 files changed

+10
-29
lines changed

3 files changed

+10
-29
lines changed

alluxiofs/client/config.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ def __init__(
1919
worker_data_port: int = ALLUXIO_WORKER_S3_SERVER_PORT_DEFAULT_VALUE,
2020
fallback_to_ufs_enabled=True,
2121
ufs_info_refresh_interval_minutes=2,
22-
concurrency=64,
2322
log_level="INFO",
2423
log_dir=None,
2524
log_tag_allowlist=None,
@@ -73,10 +72,6 @@ def __init__(
7372
fallback_to_ufs_enabled, bool
7473
), "'fallback_to_ufs_enabled' should be a boolean"
7574

76-
assert (
77-
isinstance(concurrency, int) and concurrency > 0
78-
), "'concurrency' should be a positive integer"
79-
8075
assert isinstance(log_level, str) and log_level in [
8176
"DEBUG",
8277
"INFO",
@@ -212,7 +207,6 @@ def __init__(
212207
self.worker_hosts = worker_hosts
213208
self.worker_http_port = worker_http_port
214209
self.worker_data_port = worker_data_port
215-
self.concurrency = concurrency
216210
self.log_level = log_level
217211
self.log_dir = log_dir
218212
self.log_tag_allowlist = log_tag_allowlist

alluxiofs/client/core.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,6 @@ def __init__(
165165
)
166166
self.logger = TagAdapter(base_logger, {"tag": "[ALLUXIO_CLIENT]"})
167167

168-
self.session = self._create_session(self.config.concurrency)
169168
self.data_manager = None
170169
if self.config.load_balance_domain:
171170
self.loadbalancer = None
@@ -298,7 +297,7 @@ def listdir(self, path):
298297
)
299298
params = {"path": path}
300299

301-
response = self.session.get(
300+
response = requests.get(
302301
LIST_URL_FORMAT.format(
303302
worker_host=worker_host, http_port=worker_http_port
304303
),
@@ -364,7 +363,7 @@ def get_file_status(self, path):
364363
)
365364
params = {"path": path}
366365

367-
response = self.session.get(
366+
response = requests.get(
368367
GET_FILE_STATUS_URL_FORMAT.format(
369368
worker_host=worker_host,
370369
http_port=worker_http_port,
@@ -441,7 +440,7 @@ def submit_load(
441440
"opType": OpType.SUBMIT.value,
442441
"verbose": json.dumps(verbose),
443442
}
444-
response = self.session.get(
443+
response = requests.get(
445444
LOAD_URL_FORMAT.format(
446445
worker_host=worker_host,
447446
http_port=worker_http_port,
@@ -470,7 +469,7 @@ def stop_load(
470469
path
471470
)
472471
params = {"path": path, "opType": OpType.STOP.value}
473-
response = self.session.get(
472+
response = requests.get(
474473
LOAD_URL_FORMAT.format(
475474
worker_host=worker_host,
476475
http_port=worker_http_port,
@@ -1534,7 +1533,7 @@ def _load_file(
15341533
"opType": OpType.SUBMIT.value,
15351534
"verbose": json.dumps(verbose),
15361535
}
1537-
response = self.session.get(
1536+
response = requests.get(
15381537
LOAD_URL_FORMAT.format(
15391538
worker_host=worker_host,
15401539
http_port=worker_http_port,
@@ -1583,7 +1582,7 @@ def _load_file(
15831582
def _load_progress_internal(
15841583
self, load_url: str, params: Dict
15851584
) -> (LoadState, str):
1586-
response = self.session.get(load_url, params=params)
1585+
response = requests.get(load_url, params=params)
15871586
self._check_response(response)
15881587
content = json.loads(response.content.decode("utf-8"))
15891588
if "jobState" not in content:
@@ -1630,7 +1629,7 @@ def _read_page(
16301629
page_length=length,
16311630
)
16321631
self.logger.debug(f"Reading page request {page_url}")
1633-
response = self.session.get(page_url)
1632+
response = requests.get(page_url)
16341633
self._check_response(response)
16351634
return response.content
16361635

@@ -1701,7 +1700,7 @@ def _get_preferred_worker_address(self, full_ufs_path):
17011700
file_path=full_ufs_path,
17021701
)
17031702
try:
1704-
response = self.session.get(url)
1703+
response = requests.get(url)
17051704
response.raise_for_status()
17061705
data = json.loads(response.content)
17071706
ip = data["Host"]
@@ -2202,7 +2201,7 @@ def _get_preferred_worker_host(self, full_ufs_path):
22022201
file_path=full_ufs_path,
22032202
)
22042203
try:
2205-
response = self.session.get(url)
2204+
response = requests.get(url)
22062205
response.raise_for_status()
22072206
data = json.loads(response.content)
22082207
ip = data["Host"]
@@ -2230,7 +2229,7 @@ async def _request(
22302229
data=None,
22312230
) -> Tuple[int, bytes]:
22322231
await self._set_session()
2233-
async with self.session.request(
2232+
async with requests.request(
22342233
method=method.value,
22352234
url=url,
22362235
params=params,

config_template.yaml

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -90,18 +90,6 @@ log_tag_allowlist: null
9090
# Concurrency Control Configuration
9191
# ----------------------------------------------------------------------------
9292

93-
# concurrency: Maximum number of concurrent operations
94-
# Purpose: Controls the maximum number of concurrent operations the client can execute simultaneously (e.g., concurrent reads, writes)
95-
# Affected Areas:
96-
# - HTTP request connection pool size (pool_connections and pool_maxsize)
97-
# - Thread pool size for concurrent file operations
98-
# Default Value: 64
99-
# Tuning Recommendations:
100-
# - High concurrency scenarios: Consider increasing (e.g., 128 or 256)
101-
# - Resource-constrained scenarios: Consider decreasing (e.g., 32 or 16)
102-
# - Should be adjusted based on actual network bandwidth and Worker node performance
103-
concurrency: 64
104-
10593
# ----------------------------------------------------------------------------
10694
# Local Cache Configuration
10795
# ----------------------------------------------------------------------------

0 commit comments

Comments
 (0)