Skip to content

Commit a66034b

Browse files
committed
Add replay from trace strategy
Add trace replay capability to GuideLLM for reproducing real-world request patterns from trace files. This enables time-based request rate replay and synthetic prompt generation matching trace token counts. - Add TraceReplayStrategy for scheduling requests at precise timestamps - Add ReplayProfile for configuring trace-based benchmarking - Add TraceSyntheticDatasetDeserializer for generating prompts from traces - Support max_requests truncation to limit trace length This is a minimal implementation to address issue 597. Full Mooncake format support, E2E tests, and documentation will follow in subsequent PRs. Signed-off-by: Vincent Gimenes <vincent.gimenes@gmail.com>
1 parent e52e92c commit a66034b

File tree

10 files changed

+1295
-35
lines changed

10 files changed

+1295
-35
lines changed

src/guidellm/benchmark/entrypoints.py

Lines changed: 68 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ async def resolve_request_loader(
247247
data_num_workers: int | None,
248248
random_seed: int,
249249
console: Console | None = None,
250+
max_requests: int | None = None,
250251
**dataloader_kwargs: dict[str, Any] | None,
251252
) -> DataLoader[GenerationRequest]:
252253
"""
@@ -270,6 +271,7 @@ async def resolve_request_loader(
270271
:param data_num_workers: Number of worker processes for data loading
271272
:param random_seed: Seed for reproducible random operations
272273
:param console: Console instance for progress reporting, or None
274+
:param max_requests: If set, first data source loads at most this many rows.
273275
:param dataloader_kwargs: Additional arguments passed to DataLoader initialization
274276
:return: Configured DataLoader instance for GenerationRequest objects
275277
:raises ValueError: If request formatter type is not registered in
@@ -306,6 +308,17 @@ async def resolve_request_loader(
306308
data_finalizer,
307309
)
308310

311+
# When max_requests is set, limit the first data source to that many rows at load
312+
if max_requests is not None and data:
313+
if max_requests < 1:
314+
raise ValueError(
315+
"max_requests must be >= 1 when set for data truncation, "
316+
f"got {max_requests}"
317+
)
318+
data_args = list(data_args) if data_args else [{} for _ in data]
319+
if len(data_args) >= 1:
320+
data_args[0] = {**data_args[0], "max_rows": max_requests}
321+
309322
request_loader: DataLoader[GenerationRequest] = DataLoader(
310323
data=data,
311324
data_args=data_args,
@@ -352,6 +365,7 @@ async def resolve_profile(
352365
max_global_error_rate: float | None,
353366
over_saturation: dict[str, Any] | None = None,
354367
console: Console | None = None,
368+
data: list[Any] | None = None,
355369
) -> Profile:
356370
"""
357371
Resolve and configure a benchmark profile with rate and constraint settings.
@@ -373,6 +387,7 @@ async def resolve_profile(
373387
:param max_global_error_rate: Maximum global error rate threshold before stopping
374388
:param over_saturation: Over-saturation detection configuration (dict)
375389
:param console: Console instance for progress reporting, or None
390+
:param data: Optional list of data sources.
376391
:return: Configured Profile instance ready for benchmarking
377392
:raises ValueError: If constraints are provided with a pre-configured Profile
378393
"""
@@ -400,6 +415,7 @@ async def resolve_profile(
400415
random_seed=random_seed,
401416
rampup_duration=rampup,
402417
constraints={**constraints},
418+
data=data,
403419
)
404420
elif constraints:
405421
raise ValueError(
@@ -488,24 +504,58 @@ async def benchmark_generative_text(
488504
processor = await resolve_processor(
489505
processor=args.processor, model=model, console=console
490506
)
491-
request_loader = await resolve_request_loader(
492-
data=args.data,
493-
model=model,
494-
data_args=args.data_args,
495-
data_samples=args.data_samples,
496-
processor=processor,
497-
processor_args=args.processor_args,
498-
data_column_mapper=args.data_column_mapper,
499-
data_preprocessors=args.data_preprocessors,
500-
data_preprocessors_kwargs=args.data_preprocessors_kwargs,
501-
data_finalizer=args.data_finalizer,
502-
data_collator=args.data_collator,
503-
data_sampler=args.data_sampler,
504-
data_num_workers=args.data_num_workers,
505-
random_seed=args.random_seed,
506-
console=console,
507-
**(args.dataloader_kwargs or {}),
508-
)
507+
508+
# Build common kwargs for resolve_profile and resolve_request_loader
509+
profile_kwargs = {
510+
"profile": args.profile,
511+
"rate": args.rate,
512+
"random_seed": args.random_seed,
513+
"rampup": args.rampup,
514+
"constraints": constraints,
515+
"max_seconds": args.max_seconds,
516+
"max_requests": args.max_requests,
517+
"max_errors": args.max_errors,
518+
"max_error_rate": args.max_error_rate,
519+
"max_global_error_rate": args.max_global_error_rate,
520+
"over_saturation": args.over_saturation,
521+
"console": console,
522+
}
523+
loader_kwargs = {
524+
"data": args.data,
525+
"model": model,
526+
"data_args": args.data_args,
527+
"data_samples": args.data_samples,
528+
"processor": processor,
529+
"processor_args": args.processor_args,
530+
"data_column_mapper": args.data_column_mapper,
531+
"data_preprocessors": args.data_preprocessors,
532+
"data_preprocessors_kwargs": args.data_preprocessors_kwargs,
533+
"data_finalizer": args.data_finalizer,
534+
"data_collator": args.data_collator,
535+
"data_sampler": args.data_sampler,
536+
"data_num_workers": args.data_num_workers,
537+
"random_seed": args.random_seed,
538+
"console": console,
539+
}
540+
541+
# For replay profile: resolve profile first to apply max_seconds filtering,
542+
# then use the filtered count for the data loader. This ensures the data
543+
# loader and scheduler both work with the same filtered request count.
544+
if args.profile == "replay":
545+
profile = await resolve_profile(**profile_kwargs, data=args.data) # type: ignore[arg-type]
546+
effective_max_requests = (
547+
profile.constraints.get("max_requests")
548+
if profile.constraints
549+
else args.max_requests
550+
)
551+
request_loader = await resolve_request_loader(
552+
**loader_kwargs, max_requests=effective_max_requests
553+
) # type: ignore[arg-type]
554+
else:
555+
request_loader = await resolve_request_loader(
556+
**loader_kwargs, max_requests=args.max_requests
557+
) # type: ignore[arg-type]
558+
profile = await resolve_profile(**profile_kwargs, data=None) # type: ignore[arg-type]
509559

510560
warmup = TransientPhaseConfig.create_from_value(args.warmup)
511561
cooldown = TransientPhaseConfig.create_from_value(args.cooldown)
@@ -521,21 +571,6 @@ async def benchmark_generative_text(
521571
),
522572
status="success",
523573
)
524-
525-
profile = await resolve_profile(
526-
profile=args.profile,
527-
rate=args.rate,
528-
random_seed=args.random_seed,
529-
rampup=args.rampup,
530-
constraints=constraints,
531-
max_seconds=args.max_seconds,
532-
max_requests=args.max_requests,
533-
max_errors=args.max_errors,
534-
max_error_rate=args.max_error_rate,
535-
max_global_error_rate=args.max_global_error_rate,
536-
over_saturation=args.over_saturation,
537-
console=console,
538-
)
539574
output_formats = await resolve_output_formats(
540575
outputs=args.outputs, output_dir=args.output_dir, console=console
541576
)

src/guidellm/benchmark/profiles.py

Lines changed: 120 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
from abc import ABC, abstractmethod
1515
from collections.abc import Generator
16+
from pathlib import Path
1617
from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Literal
1718

1819
import numpy as np
@@ -37,6 +38,8 @@
3738
SchedulingStrategy,
3839
SynchronousStrategy,
3940
ThroughputStrategy,
41+
TraceReplayStrategy,
42+
load_relative_timestamps,
4043
)
4144
from guidellm.schemas import PydanticClassRegistryMixin
4245

@@ -48,13 +51,14 @@
4851
"ConcurrentProfile",
4952
"Profile",
5053
"ProfileType",
54+
"ReplayProfile",
5155
"SweepProfile",
5256
"SynchronousProfile",
5357
"ThroughputProfile",
5458
]
5559

5660
ProfileType = Annotated[
57-
Literal["synchronous", "concurrent", "throughput", "async", "sweep"],
61+
Literal["synchronous", "concurrent", "throughput", "async", "sweep", "replay"],
5862
"Profile type identifiers for polymorphic deserialization",
5963
]
6064

@@ -328,6 +332,121 @@ def next_strategy(
328332
return SynchronousStrategy()
329333

330334

335+
@Profile.register("replay")
336+
class ReplayProfile(Profile):
337+
"""
338+
Replay a trace file:
339+
schedule each request at start_time + time_scale * relative_timestamp[i].
340+
341+
For this profile, the ``rate`` argument is interpreted as time_scale (scale factor
342+
applied to relative timestamps), not as requests per second.
343+
344+
When ``constraints["max_requests"]`` is set, the trace is truncated at load time:
345+
only the first max_requests rows are loaded from the file for both timestamps (here)
346+
and request data (in the data loader). This keeps timestamps and requests aligned.
347+
The trace file is read twice: once by the data pipeline for request payloads, and
348+
once here for relative timestamps.
349+
"""
350+
351+
type_: Literal["replay"] = "replay" # type: ignore[assignment]
352+
relative_timestamps: list[float] = Field(
353+
description="Request start times relative to first event (first = 0)",
354+
)
355+
time_scale: float = Field(
356+
default=1.0,
357+
gt=0,
358+
description="Scale factor applied to relative timestamps",
359+
)
360+
max_seconds_filter: float | None = Field(
361+
default=None,
362+
description=(
363+
"Original max_seconds value used as a load-time filter "
364+
"(not a runtime constraint)"
365+
),
366+
)
367+
368+
@classmethod
369+
def resolve_args(
370+
cls,
371+
rate_type: str,
372+
rate: list[float] | None,
373+
random_seed: int,
374+
**kwargs: Any,
375+
) -> dict[str, Any]:
376+
_ = (rate_type, random_seed) # unused
377+
data = kwargs.get("data")
378+
if not data or not data[0]:
379+
raise ValueError("Replay profile requires data (path to trace file)")
380+
path = Path(data[0]) if isinstance(data[0], str) else data[0]
381+
if not path.exists():
382+
raise ValueError(f"Replay trace file not found: {path}")
383+
constraints = kwargs.get("constraints") or {}
384+
max_requests = constraints.get("max_requests")
385+
if max_requests is not None and max_requests < 1:
386+
raise ValueError(
387+
"max_requests must be >= 1 when set for replay profile, "
388+
f"got {max_requests}"
389+
)
390+
391+
# For replay profile, rate is interpreted as time_scale (not requests per
392+
# second)
393+
time_scale = rate[0] if rate and len(rate) > 0 else 1.0
394+
395+
# Load all timestamps first (max_requests applied after max_seconds filtering)
396+
relative_timestamps = load_relative_timestamps(path)
397+
398+
# Filter by max_seconds (applied in simulated time via time_scale)
399+
max_seconds = constraints.get("max_seconds")
400+
if max_seconds is not None and max_seconds > 0:
401+
relative_timestamps = [
402+
ts for ts in relative_timestamps if ts * time_scale <= max_seconds
403+
]
404+
405+
# Truncate by max_requests on top of any max_seconds filtering
406+
if max_requests is not None:
407+
relative_timestamps = relative_timestamps[:max_requests]
408+
409+
if not relative_timestamps:
410+
raise ValueError(
411+
"No timestamps remain after applying max_seconds and max_requests "
412+
"filters. The trace is empty or all events were filtered out."
413+
)
414+
415+
# Set max_requests to the actual count after filtering to prevent benchmark hang
416+
# and eliminate race conditions between request completion and injection.
417+
constraints["max_requests"] = len(relative_timestamps)
418+
419+
# Remove max_seconds to avoid runtime MaxDurationConstraint canceling
420+
# in-flight requests
421+
constraints.pop("max_seconds", None)
422+
423+
return {
424+
"relative_timestamps": relative_timestamps,
425+
"time_scale": time_scale,
426+
"constraints": constraints,
427+
"max_seconds_filter": max_seconds if max_seconds and max_seconds > 0
428+
else None,
429+
}
430+
431+
@property
432+
def strategy_types(self) -> list[str]:
433+
return ["trace"]
434+
435+
def next_strategy(
436+
self,
437+
prev_strategy: SchedulingStrategy | None,
438+
prev_benchmark: Benchmark | None,
439+
) -> TraceReplayStrategy | None:
440+
_ = prev_benchmark
441+
# Replay has a single strategy; return it once, then None
442+
if prev_strategy is not None:
443+
return None
444+
return TraceReplayStrategy(
445+
relative_timestamps=self.relative_timestamps,
446+
time_scale=self.time_scale,
447+
)
448+
449+
331450
@Profile.register("concurrent")
332451
class ConcurrentProfile(Profile):
333452
"""

src/guidellm/data/deserializers/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
SyntheticTextDataset,
2626
SyntheticTextDatasetDeserializer,
2727
)
28+
from .trace_synthetic import TraceSyntheticDatasetDeserializer
2829

2930
__all__ = [
3031
"ArrowFileDatasetDeserializer",
@@ -46,4 +47,5 @@
4647
"SyntheticTextDatasetDeserializer",
4748
"TarFileDatasetDeserializer",
4849
"TextFileDatasetDeserializer",
50+
"TraceSyntheticDatasetDeserializer",
4951
]

0 commit comments

Comments
 (0)