Skip to content

Commit b946f6e

Browse files
committed
fix the ram based workload
1 parent 272a214 commit b946f6e

4 files changed

Lines changed: 136 additions & 45 deletions

File tree

runners/s3-benchrunner-3p/README.md

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,13 @@ The runner creates a config file with the following settings (documented at http
170170
type = s3 # S3 backend type
171171
provider = AWS # Use AWS S3
172172
env_auth = true # Get credentials from environment
173+
region = us-west-2 # AWS region (from REGION command-line argument)
174+
no_check_bucket = true # Don't check if bucket exists or try to create it
173175
directory_bucket = true # Enable S3 Express (automatically added for S3 Express buckets)
174176
```
175177

178+
The region is set in the config file from the REGION command-line argument, ensuring rclone operates in the correct AWS region.
179+
176180
#### Command-Line Options
177181

178182
The runner automatically configures these rclone flags based on the workload:
@@ -193,13 +197,19 @@ The runner automatically configures these rclone flags based on the workload:
193197

194198
- Example: 100 Gbps → 250 parallel streams
195199

196-
3. **Checksum Control** ([docs](https://rclone.org/s3/#s3-disable-checksum)):
200+
3. **Always Transfer Files** ([docs](https://rclone.org/docs/#ignore-times)):
201+
- `--ignore-times`
202+
203+
- Forces rclone to always transfer files, don't skip based on timestamps
204+
- Essential for benchmarking to ensure consistent measurements across runs
205+
206+
4. **Checksum Control** ([docs](https://rclone.org/s3/#s3-disable-checksum)):
197207
- `--s3-disable-checksum`
198208

199209
- Automatically used when no checksum is specified in workload
200210
- Workloads requiring specific checksums will skip (rclone only supports MD5)
201211

202-
4. **S3 Express Support**:
212+
5. **S3 Express Support**:
203213
- Automatically detects S3 Express buckets (ending with `--x-s3` )
204214
- Adds `directory_bucket = true` to config file
205215
- See [S3 Directory Bucket documentation](https://rclone.org/s3/#s3-directory-bucket)

runners/s3-benchrunner-3p/runner/rclone.py

Lines changed: 69 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,15 @@ def _derive_rclone_config(self) -> str:
5555
'provider = AWS',
5656
'env_auth = true']
5757

58+
# Add region to config file
59+
# https://rclone.org/s3/#region
60+
if self.config.region:
61+
lines.append(f'region = {self.config.region}')
62+
63+
# Don't check if bucket exists or try to create it
64+
# https://rclone.org/s3/#no-check-bucket
65+
lines.append('no_check_bucket = true')
66+
5867
# Skip workloads that require checksums since rclone doesn't provide
5968
# config file options for checksum control
6069
if self.config.checksum:
@@ -104,7 +113,24 @@ def _derive_rclone_cmd(self) -> Tuple[list[str], Optional[bytes]]:
104113
if self.config.region:
105114
os.environ['AWS_REGION'] = self.config.region
106115

107-
cmd.append('copy')
116+
# Determine the rclone command based on workload type
117+
# For RAM-based uploads (stdin), use rcat which is optimized for streaming
118+
# For RAM-based downloads, use cat which outputs to stdout (we'll redirect to /dev/null)
119+
# For everything else, use copy
120+
use_rcat = (num_tasks == 1 and
121+
first_task.action == 'upload' and
122+
not self.config.files_on_disk)
123+
124+
use_cat = (num_tasks == 1 and
125+
first_task.action == 'download' and
126+
not self.config.files_on_disk)
127+
128+
if use_rcat:
129+
cmd.append('rcat')
130+
elif use_cat:
131+
cmd.append('cat')
132+
else:
133+
cmd.append('copy')
108134

109135
# Add common configuration flags
110136
# For uploads: S3-specific multipart upload concurrency
@@ -115,6 +141,10 @@ def _derive_rclone_cmd(self) -> Tuple[list[str], Optional[bytes]]:
115141
# https://rclone.org/docs/#multi-thread-streams-int
116142
cmd += ['--multi-thread-streams', str(concurrency)]
117143

144+
# Always transfer files, don't skip based on timestamps
145+
# https://rclone.org/docs/#i-ignore-times
146+
cmd += ['--ignore-times']
147+
118148
# Disable checksum when not specified
119149
# https://rclone.org/s3/#s3-disable-checksum
120150
if not self.config.checksum:
@@ -135,22 +165,24 @@ def _derive_rclone_cmd(self) -> Tuple[list[str], Optional[bytes]]:
135165
if first_task.action == 'download':
136166
# src
137167
cmd.append(f'{s3_remote}{self.config.bucket}/{first_task.key}')
138-
# dst
168+
# dst - only for copy command, not for cat
139169
if self.config.files_on_disk:
140170
cmd.append(first_task.key)
141-
else:
142-
cmd.append('-') # output to stdout
171+
# For cat command, no dst needed - outputs to stdout which we redirect
143172

144173
else: # upload
145-
# src
146174
if self.config.files_on_disk:
175+
# For copy command with files on disk
147176
cmd.append(first_task.key)
177+
# dst
178+
cmd.append(
179+
f'{s3_remote}{self.config.bucket}/{first_task.key}')
148180
else:
149-
cmd.append('-') # read from stdin
181+
# For rcat command, stdin is implicit - only specify destination
150182
stdin = self._random_data_for_upload[:first_task.size]
151-
152-
# dst
153-
cmd.append(f'{s3_remote}{self.config.bucket}/{first_task.key}')
183+
# dst only (rcat reads from stdin automatically)
184+
cmd.append(
185+
f'{s3_remote}{self.config.bucket}/{first_task.key}')
154186

155187
else:
156188
# Multiple files - need to use directory operations
@@ -257,20 +289,36 @@ def run(self):
257289
run_kwargs = {'args': self._rclone_cmd,
258290
'input': self._stdin_for_rclone}
259291

292+
# For 'cat' command, redirect stdout to /dev/null
293+
devnull = None
294+
if 'cat' in self._rclone_cmd:
295+
devnull = open('/dev/null', 'w')
296+
run_kwargs['stdout'] = devnull
297+
260298
if self.config.verbose:
261299
# show live output, and immediately raise exception if process fails
262-
print(f'> {subprocess.list2cmdline(self._rclone_cmd)}', flush=True)
300+
print(f'> {subprocess.list2cmdline(self._rclone_cmd)} > /dev/null' if devnull else f'> {subprocess.list2cmdline(self._rclone_cmd)}', flush=True)
263301
run_kwargs['check'] = True
302+
# For verbose mode with cat, still capture stderr
303+
if devnull:
304+
run_kwargs['stderr'] = subprocess.PIPE
264305
else:
265306
# capture output, and only print if there's an error
266-
run_kwargs['capture_output'] = True
267-
268-
result = subprocess.run(**run_kwargs)
269-
if result.returncode != 0:
270-
# show command that failed, and stderr if any
271-
errmsg = f'{subprocess.list2cmdline(self._rclone_cmd)}'
272-
if hasattr(result, 'stderr') and result.stderr:
273-
stderr = result.stderr.decode().strip()
274-
if stderr:
275-
errmsg += f'\n{stderr}'
276-
exit_with_error(errmsg)
307+
if not devnull:
308+
run_kwargs['capture_output'] = True
309+
else:
310+
run_kwargs['stderr'] = subprocess.PIPE
311+
312+
try:
313+
result = subprocess.run(**run_kwargs)
314+
if result.returncode != 0:
315+
# show command that failed, and stderr if any
316+
errmsg = f'{subprocess.list2cmdline(self._rclone_cmd)}'
317+
if hasattr(result, 'stderr') and result.stderr:
318+
stderr = result.stderr.decode().strip()
319+
if stderr:
320+
errmsg += f'\n{stderr}'
321+
exit_with_error(errmsg)
322+
finally:
323+
if devnull:
324+
devnull.close()

runners/s3-benchrunner-3p/runner/s5cmd.py

Lines changed: 55 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -65,30 +65,48 @@ def _derive_s5cmd_cmd(self) -> Tuple[list[str], Optional[bytes]]:
6565
print(f'> {subprocess.list2cmdline(version_cmd)}', flush=True)
6666
subprocess.run(version_cmd, check=True)
6767

68-
cmd.append('cp')
68+
# Determine the s5cmd command based on workload type
69+
# For RAM-based uploads (stdin), use pipe which is optimized for streaming
70+
# For RAM-based downloads, use cat which outputs to stdout (we'll redirect to /dev/null)
71+
# For everything else, use cp
72+
use_pipe = (num_tasks == 1 and
73+
first_task.action == 'upload' and
74+
not self.config.files_on_disk)
75+
76+
use_cat = (num_tasks == 1 and
77+
first_task.action == 'download' and
78+
not self.config.files_on_disk)
79+
80+
if use_pipe:
81+
cmd.append('pipe')
82+
elif use_cat:
83+
cmd.append('cat')
84+
else:
85+
cmd.append('cp')
86+
6987
cmd += ['--concurrency', str(concurrency)]
7088

7189
if num_tasks == 1:
7290
# Single file operation
7391
if first_task.action == 'download':
7492
# src
7593
cmd.append(f's3://{self.config.bucket}/{first_task.key}')
76-
# dst
94+
# dst - only for cp command, not for cat
7795
if self.config.files_on_disk:
7896
cmd.append(first_task.key)
79-
else:
80-
cmd.append('-') # output to stdout
97+
# For cat command, no dst needed - outputs to stdout which we redirect
8198

8299
else: # upload
83-
# src
84100
if self.config.files_on_disk:
101+
# For cp command with files on disk
85102
cmd.append(first_task.key)
103+
# dst
104+
cmd.append(f's3://{self.config.bucket}/{first_task.key}')
86105
else:
87-
cmd.append('-') # read from stdin
106+
# For pipe command, stdin is implicit - only specify destination
88107
stdin = self._random_data_for_upload[:first_task.size]
89-
90-
# dst
91-
cmd.append(f's3://{self.config.bucket}/{first_task.key}')
108+
# dst only (pipe reads from stdin automatically)
109+
cmd.append(f's3://{self.config.bucket}/{first_task.key}')
92110

93111
else:
94112
# Multiple files - need to use patterns or directory operations
@@ -208,20 +226,36 @@ def run(self):
208226
run_kwargs = {'args': self._s5cmd_cmd,
209227
'input': self._stdin_for_s5cmd}
210228

229+
# For 'cat' command, redirect stdout to /dev/null
230+
devnull = None
231+
if 'cat' in self._s5cmd_cmd:
232+
devnull = open('/dev/null', 'w')
233+
run_kwargs['stdout'] = devnull
234+
211235
if self.config.verbose:
212236
# show live output, and immediately raise exception if process fails
213-
print(f'> {subprocess.list2cmdline(self._s5cmd_cmd)}', flush=True)
237+
print(f'> {subprocess.list2cmdline(self._s5cmd_cmd)} > /dev/null' if devnull else f'> {subprocess.list2cmdline(self._s5cmd_cmd)}', flush=True)
214238
run_kwargs['check'] = True
239+
# For verbose mode with cat, still capture stderr
240+
if devnull:
241+
run_kwargs['stderr'] = subprocess.PIPE
215242
else:
216243
# capture output, and only print if there's an error
217-
run_kwargs['capture_output'] = True
218-
219-
result = subprocess.run(**run_kwargs)
220-
if result.returncode != 0:
221-
# show command that failed, and stderr if any
222-
errmsg = f'{subprocess.list2cmdline(self._s5cmd_cmd)}'
223-
if hasattr(result, 'stderr') and result.stderr:
224-
stderr = result.stderr.decode().strip()
225-
if stderr:
226-
errmsg += f'\n{stderr}'
227-
exit_with_error(errmsg)
244+
if not devnull:
245+
run_kwargs['capture_output'] = True
246+
else:
247+
run_kwargs['stderr'] = subprocess.PIPE
248+
249+
try:
250+
result = subprocess.run(**run_kwargs)
251+
if result.returncode != 0:
252+
# show command that failed, and stderr if any
253+
errmsg = f'{subprocess.list2cmdline(self._s5cmd_cmd)}'
254+
if hasattr(result, 'stderr') and result.stderr:
255+
stderr = result.stderr.decode().strip()
256+
if stderr:
257+
errmsg += f'\n{stderr}'
258+
exit_with_error(errmsg)
259+
finally:
260+
if devnull:
261+
devnull.close()

scripts/utils/build.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,6 @@ def _build_rclone(work_dir: Path, branch: Optional[str]) -> list[str]:
281281
print("rclone not found in PATH, installing using official installer...")
282282

283283
# Install rclone using the official installation script
284-
run(['sudo', '-v']) # Update sudo timestamp
285284
run(['bash', '-c', 'curl https://rclone.org/install.sh | sudo bash'])
286285

287286
# Check if installation succeeded

0 commit comments

Comments
 (0)