diff --git a/06_gpu_and_ml/embeddings/__dynamo.py b/06_gpu_and_ml/embeddings/__dynamo.py new file mode 100644 index 000000000..e019b8c73 --- /dev/null +++ b/06_gpu_and_ml/embeddings/__dynamo.py @@ -0,0 +1,100 @@ +import subprocess +import os +import asyncio +from pathlib import Path +from concurrent.futures import ThreadPoolExecutor +from time import perf_counter +from typing import Iterator, Sequence, Tuple, List + +import modal + + +# ────────────────────────────── Constants ────────────────────────────── +HF_SECRET = modal.Secret.from_name("huggingface-secret") +VOL_NAME = "example-embedding-data" +VOL_MNT = Path("/data") +data_volume = modal.Volume.from_name(VOL_NAME, create_if_missing=True) +MODEL_REPO = VOL_MNT / "dynamo_repo" # will hold model.plan + config +container_repo = Path("~/dynamo/container").expanduser() +backend = "tensorrt_llm" +image_path = container_repo / f"Dockerfile.{backend}" +# image with dynamo + torch + dynamoclient (tiny helper) +dynamo_IMAGE = ( + modal.Image.from_dockerfile(image_path) + # .add_local_dir("/dynamo", remote_path="/dynamo") + .env( + { + "DYNAMOHOMEDEBIAN_FRONTEND": "noninteractive", + } + ) + # .run_commands("apt-get install -yq python3-dev python3-pip python3-venv libucx0") + # .apt_install("pip") + # .run_commands("pip install ai-dynamo[all]==0.2.1") + .env( + { + "HF_HOME": VOL_MNT.as_posix(), + "HF_HUB_ENABLE_HF_TRANSFER": "1", + # Tell dynamo where the repo will be mounted + "MODEL_REPO": MODEL_REPO.as_posix(), + } + ) + .entrypoint([]) +) + +app = modal.App( + "clip-dynamo-embed", + image=dynamo_IMAGE, + volumes={VOL_MNT: data_volume}, + secrets=[HF_SECRET], +) + +with dynamo_IMAGE.imports(): + import torch, torchvision # noqa: F401 – for torchscript + from transformers import CLIPVisionModel, CLIPImageProcessorFast + from torchvision.io import read_image + from torchvision.transforms.functional import to_pil_image + import dynamoclient.http as httpclient + + +@app.cls( + image=dynamo_IMAGE, + volumes={VOL_MNT: data_volume}, + timeout=24 * 60 * 60, # if using a large HF dataset, this may need to be longer + cpu=4, # HuggingFace will use multi-process parallelism to download + gpu="H100:2", # HuggingFace will use multi-process parallelism to download +) +class Server: + @modal.enter() + def startup(self): + self._proc = subprocess.Popen( + "cd $DYNAMO_HOME/examples/multimodal && " + "dynamo serve graphs.agg:Frontend -f ./configs/agg.yaml" + ) + + @modal.method() + def infer( + self, + ): + import subprocess, textwrap, sys + + # Entire cURL command as **one** shell string. + curl_cmd = textwrap.dedent("""\ + curl http://localhost:8000/v1/chat/completions \ + -H "Content-Type: application/json" \ + -d '{"model":"llava-hf/llava-1.5-7b-hf","messages":[{"role":"user","content":[{"type":"text","text":"What is in this image?"},{"type":"image_url","image_url":{"url":"http://images.cocodataset.org/test2017/000000155781.jpg"}}]}],"max_tokens":300,"stream":false}' + """) + + # Launch the command; `shell=True` is required because we pass a single string. + proc = subprocess.Popen( + curl_cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, # return strings instead of bytes + ) + + +@app.local_entrypoint() +def main(): + x = Server() + x.infer.map([1, 2]) diff --git a/06_gpu_and_ml/embeddings/_dynamo2.py b/06_gpu_and_ml/embeddings/_dynamo2.py new file mode 100644 index 000000000..9d43583f0 --- /dev/null +++ b/06_gpu_and_ml/embeddings/_dynamo2.py @@ -0,0 +1,81 @@ +import subprocess +import os +import asyncio +from pathlib import Path +from concurrent.futures import ThreadPoolExecutor +from time import perf_counter +from typing import Iterator, Sequence, Tuple, List + +import modal + + +# ────────────────────────────── Constants ────────────────────────────── +HF_SECRET = modal.Secret.from_name("huggingface-secret") +VOL_NAME = "example-embedding-data" +VOL_MNT = Path("/data") +data_volume = modal.Volume.from_name(VOL_NAME, create_if_missing=True) +MODEL_REPO = VOL_MNT / "dynamo_repo" # will hold model.plan + config + +# image with dynamo + torch + dynamoclient (tiny helper) +dynamo_IMAGE = modal.Image.from_dockerfile( + "/home/ec2-user/dynamo/container/Dockerfile.tensorrt_llm" +) + + +app = modal.App( + "clip-dynamo-embed22", + image=dynamo_IMAGE, + volumes={VOL_MNT: data_volume}, + secrets=[HF_SECRET], +) + +with dynamo_IMAGE.imports(): + import torch, torchvision # noqa: F401 – for torchscript + from transformers import CLIPVisionModel, CLIPImageProcessorFast + from torchvision.io import read_image + from torchvision.transforms.functional import to_pil_image + import dynamoclient.http as httpclient + + +@app.cls( + image=dynamo_IMAGE, + volumes={VOL_MNT: data_volume}, + timeout=24 * 60 * 60, # if using a large HF dataset, this may need to be longer + cpu=4, # HuggingFace will use multi-process parallelism to download + gpu="H100:2", # HuggingFace will use multi-process parallelism to download +) +class Server: + @modal.enter() + def startup(self): + self._proc = subprocess.Popen( + "cd $DYNAMO_HOME/examples/multimodal && " + "dynamo serve graphs.agg:Frontend -f ./configs/agg.yaml" + ) + + @modal.method() + def infer(self, in_idx: int): + import subprocess, textwrap, sys + + # Entire cURL command as **one** shell string. + curl_cmd = textwrap.dedent("""\ + curl http://localhost:8000/v1/chat/completions \ + -H "Content-Type: application/json" \ + -d '{"model":"llava-hf/llava-1.5-7b-hf","messages":[{"role":"user","content":[{"type":"text","text":"What is in this image?"},{"type":"image_url","image_url":{"url":"http://images.cocodataset.org/test2017/000000155781.jpg"}}]}],"max_tokens":300,"stream":false}' + """) + + # Launch the command; `shell=True` is required because we pass a single string. + proc = subprocess.Popen( + curl_cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, # return strings instead of bytes + ) + return proc + + +@app.local_entrypoint() +def main(): + x = Server() + for status in x.infer.map([1, 2]): + print(status) diff --git a/06_gpu_and_ml/embeddings/clean_mp.py b/06_gpu_and_ml/embeddings/clean_mp.py new file mode 100644 index 000000000..0f7697e92 --- /dev/null +++ b/06_gpu_and_ml/embeddings/clean_mp.py @@ -0,0 +1,488 @@ +import asyncio +import os +import shutil +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path +from time import perf_counter +from typing import Iterator, List, Sequence, Tuple + +import modal + +vol_name = "example-embedding-data" +vol_mnt = Path("/data") +TH_CACHE_DIR = vol_mnt / "model-compile-cache" +# If this is set to something other than `None`, will cancel max_containers parameters below +buffer_containers = None # 1 + +hf_secret = modal.Secret.from_name("huggingface-secret") +data_volume = modal.Volume.from_name(vol_name, create_if_missing=True) +th_compile_kwargs = {"mode": "reduce-overhead", "fullgraph": True, "dynamic": False} +app_cfg = {"buffer_containers": buffer_containers} if buffer_containers else {} + +# ### Define the image +th_compile_image = ( + modal.Image.debian_slim(python_version="3.10") + .pip_install( + [ + # TODO: some of this required for the full embedding demo with data setup + "datasets", # for huggingface data download + "hf_transfer", # for fast huggingface data download + "tqdm", # progress bar for dataset download + "torch", # torch.compile + "transformers", # CLIPVisionModel etc. + "torchvision", # for fast image loading + ] + ) + .env( + { + # For fast HuggingFace model and data caching and download in our Volume + "HF_HOME": vol_mnt.as_posix(), + "HF_HUB_ENABLE_HF_TRANSFER": "1", + # Enables speedy caching across containers + "TORCHINDUCTOR_CACHE_DIR": TH_CACHE_DIR.as_posix(), + "TORCHINDUCTOR_FX_GRAPH_CACHE": "1", + "TORCHINDUCTOR_AUTOGRAD_CACHE": "1", + } + ) +) + +# Initialize the app +app = modal.App( + "example-multiprocessing-embedder", + image=th_compile_image, + volumes={vol_mnt: data_volume}, + secrets=[hf_secret], +) + +# Imports inside the container +with th_compile_image.imports(): + import torch + from torch import Tensor + from torchvision.io import read_image + from transformers import CLIPImageProcessorFast, CLIPVisionConfig, CLIPVisionModel + + +# ## Data +# @dataclass(frozen=True, slots=True) +class _WorkerCfg: + """Serialisable package for workers""" + + def __init__( + self, + model_config, + state_dict, + preprocessor, + compile_cache, + device_id, + input_shape, + ): + self.model_config = model_config + self.state_dict = state_dict + self.preprocessor = preprocessor + self.compile_cache = compile_cache + self.device_id = device_id + self.input_shape = input_shape + + +def chunked(seq: list[os.PathLike], subseq_size: int) -> Iterator[list[os.PathLike]]: + """ + Helper function that chunks a sequence into subsequences of length `subseq_size`. + """ + for i in range(0, len(seq), subseq_size): + yield seq[i : i + subseq_size] + + +# ## Worker (Process) + + +def _fmt_msg(rank, time_in_queue, cuda_time, inf_time, cudactxptr): + msg = f"Process {rank} (PID={os.getpid()}, ctx=0c{cudactxptr}" + msg += f"\n\ttime in queue: {time_in_queue:.2E}" + msg += f"\n\tto-cuda time: {cuda_time:.2E}" + msg += f"\n\tinference time: {inf_time:.2E}" + return msg + + +def _worker_loop( + rank: int, + pinned_bufs: List[torch.Tensor], + free_q: "torch.multiprocessing.SimpleQueue", + ready_q: "torch.multiprocessing.SimpleQueue", + out_q: "torch.multiprocessing.SimpleQueue", + cfg: "_WorkerCfg", + verbose: bool = False, +) -> None: + """ + Single‑GPU worker executed in a subprocess + + Parameters + ---------- + rank : int + Worker index for logging only. + pinned_bufs : List[Tensor] + Shared pinned‑host ring buffer. + free_q / ready_q / out_q + IPC queues coordinating slots & outputs. + cfg : _WorkerCfg + Frozen dataclass with model weights & hyper‑parameters. + """ + with torch.no_grad(): + ######### + ## Printing stuff────────────────────────────────── + import ctypes + from ctypes.util import find_library + + from torch.compiler import load_cache_artifacts + from torch.compiler._cache import CacheInfo + from torch.serialization import safe_globals + + major, minor = torch.cuda.get_device_capability(torch.cuda.current_device()) + if major > 8: + torch.set_float32_matmul_precision("high") + + def cuda_context_ptr(): + CUcontext = ctypes.c_void_p + ctx = CUcontext() + libcuda = ctypes.CDLL(find_library("cuda")) + libcuda.cuInit(0) + libcuda.cuCtxGetCurrent(ctypes.byref(ctx)) + return ctx.value # integer handle + + # ─── per‑process CUDA initialisation ────────────────────────────────── + st = perf_counter() + torch.cuda.set_device(cfg.device_id) + stream = torch.cuda.Stream() + + # Make sure cache is loaded + with safe_globals([CacheInfo]): + load_cache_artifacts(cfg.compile_cache.read_bytes()) + + # Instantiate model (1-2s) + model = CLIPVisionModel(cfg.model_config).eval().cuda() + model.load_state_dict(cfg.state_dict) + # Compile (2-4s) + model = torch.compile(model, **th_compile_kwargs) + model( + **cfg.preprocessor( + images=torch.randn(*cfg.input_shape), + device=model.device, + return_tensors="pt", + ) + ) + + # TODO: unsure how necessary this is but seemed to reduce initial post time + with torch.cuda.stream(stream): + for buf in pinned_bufs: + _ = pinned_bufs[0].cuda(non_blocking=True) # launch async H2D + torch.cuda.current_stream().wait_stream(stream) # ensure it finishes + torch.cuda.synchronize() # belt-and-suspenders + + if verbose: + print( + f"Worker{rank}: loaded and compiled model in {perf_counter() - st:2E}" + ) + + # ───── loop ────────────────────────────────────────────────────────── + while True: + # Look out for a batch in the ready_q + item = ready_q.get() + if item is None: + break + # `slot` is just the index of the pinned buffer + slot, t_post = item + time_in_queue = perf_counter() - t_post + + # ─── H2D async copy ──────────────────────────────────────────── + t_cpu2gpu = perf_counter() + with torch.cuda.stream(stream): + batch_gpu = pinned_bufs[slot].cuda(non_blocking=True) + torch.cuda.current_stream().wait_stream(stream) + cuda_time = perf_counter() - t_cpu2gpu + + # Pinned buffer is free now + free_q.put(slot) + + # ─── forward pass ───────────────────────────────────────────── + t_inf_start = perf_counter() + embed = model( + **cfg.preprocessor(images=batch_gpu, return_tensors="pt") + ).pooler_output + inf_time = perf_counter() - t_inf_start + # Output the CPU embedding ptr, a message, some times + out_q.put( + ( + embed.cpu(), + _fmt_msg( + rank, time_in_queue, cuda_time, inf_time, cuda_context_ptr() + ), + inf_time, + ) + ) + + +# ## Inference App +@app.cls( + image=th_compile_image, + volumes={vol_mnt: data_volume}, + timeout=5 * 60, # 5min timeout for large models + batches + cpu=8, + memory=20 * 1024, # MB -> GB + include_source=True, + **app_cfg, +) +class MPEngine: + model_name: str = modal.parameter() + batch_size: int = modal.parameter(default=100) + n_engines: int = modal.parameter(default=1) + concurrency: int = modal.parameter(default=1) + im_c: int = modal.parameter(default=3) + im_h: int = modal.parameter(default=224) + im_w: int = modal.parameter(default=224) + threads_per_core: int = modal.parameter(default=4) + verbose_inference: bool = modal.parameter(default=False) + # Cannot currently gracefully set ENV vars from local_entrypoint + cache_dir: Path = TH_CACHE_DIR + # For logging + name: str = "MPEngine" + + def get_compile_bytes(self, model, input_shape, preprocessor, compile_cache): + """ + Check for compiled model cache; if nothing found, trigger re-trace + in principal should only happen once per container but it's hard to + "convince" torchInductor no further compilation is needed... + """ + if compile_cache.exists(): + artifact_bytes = compile_cache.read_bytes() + opt_msg = "" + else: + print("Parent compilation...", end="") + # Compile once to save the bytes + model = torch.compile(model, **th_compile_kwargs) + # Force trace + model(**preprocessor(images=torch.randn(*input_shape), return_tensors="pt")) + # Write the bytes + artifact_bytes, _ = torch.compiler.save_cache_artifacts() + compile_cache.parent.mkdir(parents=True, exist_ok=True) + compile_cache.write_bytes(artifact_bytes) + print("done.") + opt_msg = " and re-compile the" + out_msg = f"\n\ttime to load{opt_msg} model in Parent: " + return artifact_bytes, out_msg + + @modal.enter() + async def init_engines(self): + import torch.multiprocessing as mp + + torch.set_grad_enabled(False) + + msg = "New container!" + # (0) Setup + + mp.set_start_method("spawn", force=True) + ctx = mp.get_context("spawn") + + # This makes sure n-th container finds the torch.compile cache created by the first one + data_volume.reload() + + # This is where we will cache torch.compile artifacts + compile_cache: Path = Path(self.cache_dir) / ( + self.model_name.replace("/", "_") + "_compiled_model_cache.pt" + ) + + # (1.a) Load original model weights into cpu ram + load_st = perf_counter() + input_shape = (self.batch_size, self.im_c, self.im_h, self.im_w) + base = CLIPVisionModel.from_pretrained(self.model_name) + preprocessor = CLIPImageProcessorFast.from_pretrained( + self.model_name, usefast=True + ) + + # (1.b) Compile if necessary + # Note: tried passing artifact_bytes directly to worker, + # this makes more sense to me but didn't formally compare + artifact_bytes, out_msg = self.get_compile_bytes( + base, input_shape, preprocessor, compile_cache + ) + msg += f"{out_msg}{perf_counter() - load_st:.2E}" + + worker_cfg = _WorkerCfg( + model_config=base.config, + state_dict=base.state_dict(), + preprocessor=preprocessor, + compile_cache=compile_cache, + device_id=0, + input_shape=input_shape, + ) + + # (2) Pinned ring buffer init + proc_st = perf_counter() + buf_depth = 5 * self.concurrency # 5 reasonable?? + buf_shape = (self.batch_size, self.im_c, self.im_h, self.im_w) + # Persistent CPU memory buffer (TODO: better/possible to do in CUDA memory?) + self.pinned_bufs: List[Tensor] = [ + torch.empty(buf_shape, dtype=torch.uint8, pin_memory=True) + for _ in range(buf_depth) + ] + + self.free_q = ctx.SimpleQueue() + self.ready_q = ctx.SimpleQueue() + self.out_q = ctx.SimpleQueue() + + for idx in range(buf_depth): + self.free_q.put(idx) + + # (3) Start processes + self.procs = [ + ctx.Process( + target=_worker_loop, + args=( + i, + self.pinned_bufs, + self.free_q, + self.ready_q, + self.out_q, + worker_cfg, + ), + daemon=False, + ) + for i in range(self.concurrency) + ] + msg += f"\n\ttime to init buffers: {perf_counter() - proc_st:.2E}" + st = perf_counter() + for p in self.procs: + p.start() + msg += f"\n\ttime to startup workers: {perf_counter() - st:.2E}" + print(msg) + + def _read_images(self, paths: Sequence[os.PathLike]) -> Tensor: + """ + Creates a batch *th.Tensor + """ + + def _load(p: os.PathLike) -> Tensor: + return read_image(str(vol_mnt / p)) + + with ThreadPoolExecutor(max_workers=os.cpu_count() * 4) as pool: + images = list(pool.map(_load, paths)) + return torch.stack(images) + + @modal.method() + async def embed(self, batch_paths: List[os.PathLike]) -> Tuple[float, int]: + """ + Encode images and return latency + count. + TODO: add/save embeddings to FAISS vector DB would be sweet. + """ + with torch.no_grad(): + etime = perf_counter() + slot = self.free_q.get() # blocking until a slot is free + buf = self.pinned_bufs[slot] + + # ── disk → host (CPU) ────────────────────────────────────────── + buf.copy_(self._read_images(batch_paths)) + self.ready_q.put((slot, perf_counter())) # hand off to worker + + # ── wait for worker result ───────────────────────────────────── + embedding, msg, inf_time = self.out_q.get() + + if self.verbose_inference: + print(msg) + # This is somehow significantly faster than the others but OVERALL slower (???) + # And here are are even penalizing this method for batch creation time. + total_embed_time = perf_counter() - etime + return total_embed_time, len(batch_paths) # inf_time + + @modal.exit() + def _shutdown(self) -> None: + """ + Gracefully terminate workers. Doesn't work oftentimes... + """ + for _ in self.procs: + self.ready_q.put(None) # poison pill + for p in self.procs: + p.join(timeout=3) + + +# ## Backbone + + +@app.local_entrypoint() +def main(): + im_cap = 10000 + million_image_test = False # overrides im_cap! + + gpu = "A10G" + max_containers = 1 # NOTE: this is ignored if buffer_containers is not None + + hf_dataset_name = "microsoft/cats_vs_dogs" + model_name: str = "openai/clip-vit-base-patch16" + batch_size: int = 500 + im_c: int = 3 + im_h: int = 224 + im_w: int = 224 + threads_per_core: int = 4 + verbose_inference: bool = True + + allow_concurrent_inputs = 2 + n_engines: int = 2 + + start_time = perf_counter() + + datadir = Path("extracted") / hf_dataset_name + im_path_list = [ + x.path + for x in data_volume.listdir(datadir.as_posix()) + if x.path.endswith(".jpg") + ] + + # Dataset extension or pruning + if million_image_test: + print("WARNING: `million_image_test` FLAG RECEIVED!") + mil = int(1e6) + while len(im_path_list) < mil: + im_path_list += im_path_list + im_path_list = im_path_list[:mil] + elif len(im_path_list) > im_cap: + im_path_list = im_path_list[:im_cap] + + n_ims = len(im_path_list) + + app_cfg = {} if buffer_containers else {"max_containers": max_containers} + + embedder = MPEngine.with_options( + gpu=gpu, + allow_concurrent_inputs=allow_concurrent_inputs, + **app_cfg, + )( + model_name=model_name, + batch_size=batch_size, + n_engines=n_engines, + concurrency=allow_concurrent_inputs, + im_c=im_c, + im_h=im_h, + im_w=im_w, + threads_per_core=threads_per_core, + verbose_inference=verbose_inference, + ) + + times, batchsizes = [], [] + for time, batchsize in embedder.embed.map(chunked(im_path_list, batch_size)): + times.append(time) + batchsizes.append(batchsize) + + if n_ims > 0: + total_duration = perf_counter() - start_time + total_throughput = n_ims / total_duration + embed_throughputs = [ + batchsize / time for batchsize, time in zip(batchsizes, times) + ] + avg_throughput = sum(embed_throughputs) / len(embed_throughputs) + + log_msg = ( + f"{embedder.name}{gpu}::batch_size={batch_size}::" + f"n_ims={n_ims}::concurrency={allow_concurrent_inputs}\n" + f"\tTotal time:\t{total_duration / 60:.2f} min\n" + f"\tOverall throughput:\t{total_throughput:.2f} im/s\n" + f"\tSingle-model throughput (avg):\t{avg_throughput:.2f} im/s\n" + ) + + print(log_msg) diff --git a/06_gpu_and_ml/embeddings/dict2csv.py b/06_gpu_and_ml/embeddings/dict2csv.py new file mode 100644 index 000000000..93f60f2b7 --- /dev/null +++ b/06_gpu_and_ml/embeddings/dict2csv.py @@ -0,0 +1,26 @@ +import modal + +# Create a persisted dict - the data gets retained between app runs +data = modal.Dict.from_name("laion2B", create_if_missing=True) +keys = sorted(list(data.keys())) +experiments = list({key.split("-")[0] for key in keys}) +ms = ["exp-start", "embedder-init", "embedding-begin", "embedding-complete"] +for exp in experiments: + try: + msg = f"Experiment: {exp}" + ms1 = ms[0] + for idx in range(1, len(ms)): + ms0 = ms[idx] + dur = (data[f"{exp}-{ms0}"] - data[f"{exp}-{ms1}"]) / 1e9 + msg += f"\n\t{ms0} duration: {dur:.2E}s" + if "M" in exp: + n_images = 1e6 + else: + n_images = 23410 + msg += f"\n\t\tThroughput: {n_images / dur: .2f} images/second" + print(msg) + except: + if msg: + msg = "Incomplete Experiment: " + f"\n\t {msg}" + print(msg) + continue diff --git a/06_gpu_and_ml/embeddings/embedding_racetrack.py b/06_gpu_and_ml/embeddings/embedding_racetrack.py deleted file mode 100644 index 0d51b8a23..000000000 --- a/06_gpu_and_ml/embeddings/embedding_racetrack.py +++ /dev/null @@ -1,418 +0,0 @@ -# --- -# cmd: ["modal", "run", "06_gpu_and_ml/embeddings/embedding_racetrack.py::main"] -# --- - -# # Modal Cookbook: Recipe for Inference Throughput Maximization -# In certain applications, the bottom line comes to throughput: process a set of inputs as fast as possible. -# Let's explore how to maximize throughput by using Modal on an embedding example, and see just how fast -# we can encode the [Microsoft Cats & Dogs dataset](https://huggingface.co/datasets/microsoft/cats_vs_dogs) -# using the [Infinity inference engine](https://github.com/michaelfeil/infinity "github/michaelfeil/infinity"). - -# ## Conclusions -# ### BLUF (Bottom Line Up Front) -# Set concurrency (`max_concurrent_inputs`) to 4, and set `batch_size` between 50-500. -# To set `max_containers`, divide the total number of inputs by `max_concurrent_inputs*batchsize` -# (note: if you have a massive dataset, keep an eye out for diminishing returns on `max_containers`; but -# Modal should handle that for you!). -# Be sure to preprocess your data in the same manner that the model is expecting (e.g., resizing images). -# If you only want to use one container, increase `batch_size` until you are maxing -# out the GPU (but keep concurrency, `max_concurrent_inputs`, capped around 4). The example herein achieves -# upward of 750 images / second overall throughput (not including initial Volume setup time). - -# ### Why? -# While batchsize maximizes GPU utilization, the time to form a batch (ie reading images) -# will ultimately overtake inference, whether due to I/O, sending data across a wire, etc. -# We can make up for this by using idle GPU cores to store additional copies of the model: -# this _GPU packing_ is achieved via an async queue and the [@modal.concurrent(max_inputs:int) ](https://modal.com/docs/guide/concurrent-inputs#input-concurrency "Modal: input concurrency") -# decorator. Once you nail down `batch_size` you can crank up the number of containers to distribute the -# computational load. High values of concurrency has diminishing returns, we believe, -# because we are already throttling the CPU with multi-threaded dataloading. The demo herein -# achieves upward of 750 images / second, and that will increase for larger datasets where the model loading -# time becomes increasingly negligable. - -# ## Local env imports -# Import everything we need for the locally-run Python (everything in our local_entrypoint function at the bottom). -import asyncio -import os -from concurrent.futures import ThreadPoolExecutor -from pathlib import Path -from time import perf_counter -from typing import Iterator, TypeVar - -import modal - -# ## Key Parameters -# There are three ways to parallelize inference for this usecase: via batching (which happens internal to Infinity), -# by packing individual GPU(s) with multiple copies of the model, and by fanning out across multiple containers. -# Here are some parameters for controlling these factors: -# * `max_concurrent_inputs` sets the [@modal.concurrent(max_inputs:int) ](https://modal.com/docs/guide/concurrent-inputs#input-concurrency "Modal: input concurrency") argument for the inference app. This takes advantage of the asynchronous nature of the Infinity embedding inference app. -# * `gpu` is a string specifying the GPU to be used. -# * `max_containers` caps the number of containers allowed to spin-up. -# * `memory_request` amount of RAM requested per container -# * `core_request` number of logical cores requested per container -# * `threads_per_core` oversubscription factor for parallelized I/O (image reading) -# * `batch_size` is a parameter passed to the [Infinity inference engine](https://github.com/michaelfeil/infinity "github/michaelfeil/infinity"), and it means the usual thing for machine learning inference: a group of images are processed through the neural network together. -# * `image_cap` caps the number of images used in this example (e.g. for debugging/testing) -max_concurrent_inputs: int = 4 -gpu: str = "L4" -max_containers: int = 50 -memory_request: float = 5 * 1024 # MB->GB -core_request: float = 4 -threads_per_core: int = 8 -batch_size: int = 100 -image_cap: int = -1 - -# This timeout caps the maximum time a single function call is allowed to take. In this example, that -# includes reading a batch-worth of data and running inference on it. When `batch_size` is large (e.g. 5000) -# and with a large value of `max_concurrent_inputs`, where a batch may sit in a queue for a while, -# this could take several minutes. -timeout_seconds: int = 5 * 60 - -# ## Data and Model Specification -# This model parameter should point to a model on HuggingFace that is supported by Infinity. -# Note that your selected model might require specialized imports when -# designing the image in the next section. This [OpenAI model](https://huggingface.co/openai/clip-vit-base-patch16 "OpenAI ViT") -# takes about 4-10s to load into memory. -model_name = "openai/clip-vit-base-patch16" # 599 MB -model_input_shape = (224, 224) - -# We will use a high-performance [Modal Volume](https://modal.com/docs/guide/volumes#volumes "Modal.Volume") -# both to cache model weights and to store images we want to encode. The details of -# setting this volume up are below. Here, we just need to name it so that we can instantiate -# the Modal application. -# You may need to [set up a secret](https://modal.com/secrets/) to access HuggingFace datasets -hf_secret = modal.Secret.from_name("huggingface-secret") -# Change this global variable to use a different HF dataset: -hf_dataset_name = "microsoft/cats_vs_dogs" -# This name is important for referencing the volume in other apps or for [browsing](https://modal.com/storage): -vol_name = "example-embedding-data" -# This is the location within the container that this Volume will be mounted: -vol_mnt = Path("/data") -# Finally, the Volume object can be created: -data_volume = modal.Volume.from_name(vol_name, create_if_missing=True) - - -# ## Define the image -infinity_image = ( - modal.Image.debian_slim(python_version="3.10") - .pip_install( - [ - "pillow", # for Infinity input typehint - "datasets", # for huggingface data download - "hf_transfer", # for fast huggingface data download - "tqdm", # progress bar for dataset download - "infinity_emb[all]==0.0.76", # for Infinity inference lib - "sentencepiece", # for this particular chosen model - "torchvision", # for fast image loading - ] - ) - .env( - { - "HF_HOME": vol_mnt.as_posix(), # For model and data caching in our Volume - "HF_HUB_ENABLE_HF_TRANSFER": "1", # For fast data transfer - } - ) -) - -# Initialize the app -app = modal.App( - "example-infinity-embedder", - image=infinity_image, - volumes={vol_mnt: data_volume}, - secrets=[hf_secret], -) - -# Imports inside the container -with infinity_image.imports(): - from infinity_emb import AsyncEmbeddingEngine, EngineArgs - from infinity_emb.primitives import Dtype, InferenceEngine - from PIL.Image import Image - from torchvision.io import read_image - from torchvision.transforms.functional import to_pil_image - -## Dataset Downloading and Setup -# ## Data setup -# We use a [Modal Volume](https://modal.com/docs/guide/volumes#volumes "Modal.Volume") -# to store images we want to encode. We download them from Huggingface into a Volume and then preprocess -# them to 224 x 224 JPEGs. The selected model, `openai/clip-vit-base-patch16`, was trained on 224 x 224 -# sized images. If you skip this preprocess resize step, Infinity will handle image resizing for you- -# at a severe penalty to inference throughput. - -# Note that Modal Volumes are optimized for datasets on the order of 50,000 - 500,000 -# files and directories. If you have a larger dataset, you may need to consider other storage -# options such as a [CloudBucketMount](https://modal.com/docs/examples/rosettafold). - - -@app.function( - image=infinity_image, - volumes={vol_mnt: data_volume}, - max_containers=1, # We only want one container to handle volume setup - cpu=core_request, # HuggingFace will use multi-process parallelism to download - timeout=timeout_seconds, # if using a large HF dataset, this may need to be longer -) -def catalog_jpegs(dataset_namespace: str, cache_dir: str, image_cap: int): - """ - This function checks the volume for JPEGs and, if needed, calls `download_to_volume` - which pulls a HuggingFace dataset into the mounted volume. - """ - - def download_to_volume(dataset_namespace: str, cache_dir: str): - """ - This function caches a hugginface dataset to the path specified in your `HF_HOME` environment - variable, which we set when creating the image so as to point to a Modal Volume. - """ - from datasets import load_dataset - from torchvision.io import write_jpeg - from torchvision.transforms import Compose, PILToTensor, Resize - from tqdm import tqdm - - # Load cache to HF_HOME - ds = load_dataset( - dataset_namespace, - split="train", - num_proc=os.cpu_count(), # this will be capped by huggingface based on the number of shards - ) - - # Create an `extraction` cache dir where we will create explicit JPEGs - mounted_cache_dir = vol_mnt / cache_dir - mounted_cache_dir.mkdir(exist_ok=True, parents=True) - - # Preprocessing pipeline: resize now instead of on-the-fly - preprocessor = Compose( - [ - Resize(model_input_shape), - PILToTensor(), - ] - ) - - def preprocess_img(idx, example): - """ - Applies preprocessor and write as jpeg with TurboJPEG (via torchvision). - """ - # Define output path - write_path = mounted_cache_dir / f"img{idx:07d}.jpg" - if write_path.is_file(): - return - - # Here, `example["image"]` is a `PIL.Image.Image` - preprocessed = preprocessor(example["image"].convert("RGB")) - - # Write to modal.Volume - write_jpeg(preprocessed, write_path) - - # This is a parallelized pre-processing loop that opens compressed images, - # preprocesses them to the size expected by our model, and writes as a JPEG. - for idx, ex in tqdm(enumerate(ds), total=len(ds), desc="Caching images"): - if (image_cap > 0) and (idx >= image_cap): - break - preprocess_img(idx, ex) - - data_volume.commit() - - ds_preptime_st = perf_counter() - - def list_all_jpegs(subdir: os.PathLike = "/") -> list[os.PathLike]: - """ - Searches a subdir within your volume for all JPEGs. - """ - return [ - x.path - for x in data_volume.listdir(subdir.as_posix()) - if x.path.endswith(".jpg") - ] - - # Check for extracted-JPEG cache dir within the volume - if (vol_mnt / cache_dir).is_dir(): - im_path_list = list_all_jpegs(cache_dir) - n_ims = len(im_path_list) - else: - n_ims = 0 - print("The cache dir was not found...") - - # If needed, download dataset to a vol - if (n_ims < image_cap) or (n_ims == 0): - print(f"Found {n_ims} JPEGs; checking for more on HuggingFace.") - download_to_volume(dataset_namespace, cache_dir) - # Try again - im_path_list = list_all_jpegs(cache_dir) - n_ims = len(im_path_list) - - # [optional] Cap the number of images to process - print(f"Found {n_ims} JPEGs in the Volume.", end="") - if image_cap > 0: - im_path_list = im_path_list[: min(image_cap, len(im_path_list))] - print(f"using {len(im_path_list)}.") - - # Time it - ds_time_elapsed = perf_counter() - ds_preptime_st - return im_path_list, ds_time_elapsed - - -T = TypeVar("T") # generic type for chunked typehints - - -def chunked(seq: list[T], subseq_size: int) -> Iterator[list[T]]: - """ - Helper function that chunks a sequence into subsequences of length `subseq_size`. - """ - for i in range(0, len(seq), subseq_size): - yield seq[i : i + subseq_size] - - -# ## Inference app -# Here we define an app.cls that wraps Infinity's AsyncEmbeddingEngine. -# Note that the variable `max_concurrent_inputs` is used to set `max_inputs` -# in (1) the [modal.concurrent](https://modal.com/docs/guide/concurrent-inputs#input-concurrency) -# decorator, and (2) the `n_engines` class property. -# In `init_engines`, we are creating exactly one inference -# engine for each concurrently-passed batch of data. This is critical for packing a GPU with -# multiple simultaneously operating models. The [@modal.enter](https://modal.com/docs/reference/modal.enter#modalenter) -# decorator ensures that this method is called once per container, on startup (and `exit` is -# run once, on shutdown). -@app.cls( - gpu=gpu, - cpu=core_request, - memory=5 * 1024, # MB -> GB - image=infinity_image, - volumes={vol_mnt: data_volume}, - timeout=timeout_seconds, - max_containers=max_containers, -) -@modal.concurrent(max_inputs=max_concurrent_inputs) -class InfinityEngine: - n_engines: int = max_concurrent_inputs - - @modal.enter() - async def init_engines(self): - """ - On container start, starts `self.n_engines` copies of the selected model - and puts them in an async queue. - """ - print(f"Loading {self.n_engines} models... ", end="") - self.engine_queue: asyncio.Queue[AsyncEmbeddingEngine] = asyncio.Queue() - start = perf_counter() - for _ in range(self.n_engines): - engine = AsyncEmbeddingEngine.from_args( - EngineArgs( - model_name_or_path=model_name, - batch_size=batch_size, - model_warmup=False, - engine=InferenceEngine.torch, - dtype=Dtype.float16, - device="cuda", - ) - ) - await engine.astart() - await self.engine_queue.put(engine) - print(f"Took {perf_counter() - start:.4}s.") - - def read_batch(self, im_path_list: list[os.PathLike]) -> list["Image"]: - """ - Read a batch of data. Infinity is expecting PIL.Image.Image type - inputs, but it's faster to read from disk with torchvision's `read_image` - and convert to PIL than it is to read directly with PIL. - - This process is parallelized over the batch with multithreaded data reading. - The number of threads is 4 per core, which is based on the batchsize. - """ - - def readim(impath: os.PathLike): - """Read with torch, convert back to PIL for Infinity""" - return to_pil_image(read_image(str(vol_mnt / impath))) - - with ThreadPoolExecutor( - max_workers=os.cpu_count() * threads_per_core - ) as executor: - images = list(executor.map(readim, im_path_list)) - - return images - - @modal.method() - async def embed(self, images: list[os.PathLike]) -> tuple[float, float]: - """ - This is the workhorse function. We select a model, prepare a batch, - execute inference, and return the time elapsed. You probably want - to return the embeddings in your usecase. - """ - # (0) Grab an engine from the queue - engine = await self.engine_queue.get() - - try: - # (1) Load batch of image data - st = perf_counter() - images = self.read_batch(images) - batch_elapsed = perf_counter() - st - - # (2) Encode the batch - st = perf_counter() - embedding, _ = await engine.image_embed(images=images) - embed_elapsed = perf_counter() - st - finally: - # No matter what happens, return the engine to the queue - await self.engine_queue.put(engine) - - # (3) Housekeeping - print(f"Time to load batch: {batch_elapsed:.2f}s") - print(f"Time to embed batch: {embed_elapsed:.2f}s") - - # (4) You may wish to return the embeddings themselves here - return embed_elapsed, len(images) - - @modal.exit() - async def exit(self) -> None: - """ - Shut down each of the engines. - """ - for _ in range(self.n_engines): - engine = await self.engine_queue.get() - await engine.astop() - - -# ## Local Entrypoint -# This backbone code is run on your machine. It starts up the app, -# catalogs the data, and via the remote `map` call, parses the data -# with the Infinity embedding engine. The embedder.embed executions -# across the batches are autoscaled depending on the app parameters -# `max_containers` and `max_concurrent_inputs`. -@app.local_entrypoint() -def main(): - start_time = perf_counter() - - # (1) Catalog data: modify `catalog_jpegs` to fetch batches of your data. - extracted_path = Path("extracted") / hf_dataset_name - im_path_list, vol_setup_time = catalog_jpegs.remote( - dataset_namespace=hf_dataset_name, cache_dir=extracted_path, image_cap=image_cap - ) - print(f"Took {vol_setup_time:.2f}s to setup volume.") - n_ims = len(im_path_list) - - # (2) Init the model inference app - start_time = perf_counter() - embedder = InfinityEngine() - - # (3) Embed batches via remote `map` call - times, batchsizes = [], [] - for time, batchsize in embedder.embed.map(chunked(im_path_list, batch_size)): - times.append(time) - batchsizes.append(batchsize) - - # (4) Log - if n_ims > 0: - total_duration = perf_counter() - start_time - total_throughput = n_ims / total_duration - embed_throughputs = [ - batchsize / time for batchsize, time in zip(batchsizes, times) - ] - avg_throughput = sum(embed_throughputs) / len(embed_throughputs) - - log_msg = ( - f"EmbeddingRacetrack{gpu}::batch_size={batch_size}::" - f"n_ims={n_ims}::concurrency={max_concurrent_inputs}::" - f"max_containers={max_containers}::cores={core_request}\n" - f"\tTotal time:\t{total_duration / 60:.2f} min\n" - f"\tVolume setup time:\t{vol_setup_time / 60:.2f} min\n" - f"\tOverall throughput:\t{total_throughput:.2f} im/s\n" - f"\tEmbedding-only throughput (avg):\t{avg_throughput:.2f} im/s\n" - ) - - print(log_msg) diff --git a/06_gpu_and_ml/embeddings/image_embedding_infinity.py b/06_gpu_and_ml/embeddings/image_embedding_infinity.py new file mode 100644 index 000000000..8cf3ead21 --- /dev/null +++ b/06_gpu_and_ml/embeddings/image_embedding_infinity.py @@ -0,0 +1,557 @@ +# --- +# cmd: ["modal", "run", "06_gpu_and_ml/embeddings/image_embedding_infinity.py::main"] +# --- + +# # A Recipe for Throughput Maximization: GPU Packing with Infinity Inference +# In certain applications, the bottom line comes to *throughput*: process a batch of inputs as fast as possible. +# This example presents a Modal recipe for maximizing image embedding throughput using the +# [Infinity inference engine](https://github.com/michaelfeil/infinity "github/michaelfeil/infinity"), +# a popular inference engine that manages asychronous queuing and model serving. +# +# Check out [this example](https://modal.com/docs/examples/image_embedding_th_compile) to see how +# to use Modal to natively accomplish these features and achieve even higher throughput (nearly 2x)! +# +# TODO: remove buffer container discussion/replace with min_containers +# ## Conclusions +# ### BLUF (Bottom Line Up Front) +# Set concurrency (`max_concurrent_inputs`) to 2, and set `batch_size` around 100. +# To get maximum throughput at any cost, set buffer_containers to 10; otherwise set it to None +# and set max_containers based on your budget. +# Be sure to preprocess your data in the same manner that the model is expecting (e.g., resizing images). +# If you only want to use one container, increase `batch_size` until you are maxing +# out the GPU (but keep concurrency, `max_concurrent_inputs`, capped around 2). The example herein achieves +# around 700 images / second overall throughput, embedding the entire +# [cats vs dogs](https://huggingface.co/datasets/microsoft/cats_vs_dogs) +# dataset in about 30s. + +# ### Why? +# While batch size maximizes GPU utilization, the time to form a batch (ie reading images) +# will ultimately overtake inference, whether due to I/O, sending data across a wire, etc. +# We can make up for this by using idle GPU cores to store additional copies of the model: +# this high-level form of _GPU packing_ is achieved via an async queue and the +# [@modal.concurrent(max_inputs:int) ](https://modal.com/docs/guide/concurrent-inputs#input-concurrency "Modal: input concurrency") +# decorator, called indirectly through [modal.cls.with_options](https://modal.com/docs/reference/modal.Cls#with_options). + +# Once you nail down an effective `batch_size` for your problem, you can crank up the number of containers +# to distribute the computational load. Set buffer_containers > 0 so that Modal continuously spins up more +# and more containers until the task is complete; otherwise set it to None, and use max_containers to cap +# the number of containers allowed. + +# High values of concurrency has diminishing returns, we believe, +# because we are already throttling the CPU with multi-threaded dataloading, and because of the way +# Infinity handles batches of inputs. We have a [more advanced example](https://modal.com/docs/examples/image_embedding_th_compile) +# that directly uses torch.compile (without Infinity), which uses Modal's native capabilities to +# manage queuing etc., and can take better advantage of the concurrency feature. + +# The demo herein should achieve around 700 images/second (around 200-300 images/second per model), +# but that number will increase dramatically for larger datasets where more and more containers spawn. +# We have clocked 9500 images/second using Infinity on a 1M image dataset, and twice that in our torch.compile +# example. + +# ## Local env imports +# Import everything we need for the locally-run Python (everything in our local_entrypoint function at the bottom). +import asyncio +import os +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path +from time import perf_counter +from typing import Iterator + +import modal + +# ## Key Parameters +# There are three ways to parallelize inference for this usecase: via batching, +# by packing individual GPU(s) with multiple copies of the model, and by fanning out across multiple containers. +# +# Modal provides two ways to dynamically parameterize classes: through +# [modal.cls.with_options](https://modal.com/docs/reference/modal.Cls#with_options) +# and through +# [modal.parameter](https://modal.com/docs/reference/modal.parameter#modalparameter). +# The app.local_entrypoint() main function at the bottom of this example uses these +# features to dynamically construct the inference engine class wrapper. One feature +# that is not currently support via `with_options` is the `buffer_containers` parameter. +# This tells Modal to pre-emptively warm a number of containers before they are strictly +# needed. In other words it tells Modal to continuously fire up more and more containers +# until throughput is saturated. + +# ## Dataset, Model, and Image Setup +# This example uses HuggingFace to download data and models. We will use a high-performance +# [Modal Volume](https://modal.com/docs/guide/volumes#volumes "Modal.Volume") +# both to cache model weights and to store the +# [image dataset](https://huggingface.co/datasets/microsoft/cats_vs_dogs) +# that we want to embed. + +# ### Volume Initialization +# You may need to [set up a secret](https://modal.com/secrets/) to access HuggingFace datasets +hf_secret = modal.Secret.from_name("huggingface-secret") + +# This name is important for referencing the volume in other apps or for +# [browsing](https://modal.com/storage): +vol_name = "example-embedding-data" + +# This is the location within the container where this Volume will be mounted: +vol_mnt = Path("/data") + +# Finally, the Volume object can be created: +data_volume = modal.Volume.from_name(vol_name, create_if_missing=True) + +# ### Define the image +infinity_image = ( + modal.Image.debian_slim(python_version="3.10") + .pip_install( + [ + "pillow", # for Infinity input typehint + "datasets", # for huggingface data download + "hf_transfer", # for fast huggingface data download + "tqdm", # progress bar for dataset download + "infinity_emb[all]==0.0.76", # for Infinity inference lib + "sentencepiece", # for this particular chosen model + "torchvision", # for fast image loading + ] + ) + .env( + { + # For fast HuggingFace model and data caching and download in our Volume + "HF_HOME": vol_mnt.as_posix(), + "HF_HUB_ENABLE_HF_TRANSFER": "1", + } + ) +) + +# Initialize the app +app = modal.App( + "example-infinity-embedder", + image=infinity_image, + volumes={vol_mnt: data_volume}, + secrets=[hf_secret], +) + +# Imports inside the container +with infinity_image.imports(): + from infinity_emb import AsyncEmbeddingEngine, EngineArgs + from infinity_emb.primitives import Dtype, InferenceEngine + from PIL.Image import Image + from torchvision.io import read_image + from torchvision.transforms.functional import to_pil_image + +# ## Dataset Setup +# We use a [Modal Volume](https://modal.com/docs/guide/volumes#volumes "Modal.Volume") +# to store the images we want to encode. For your usecase, can simply replace the +# function `catalog_jpegs` with any function that returns a list of image paths. Just make +# sure that it's returning the _paths_: we are going to +# [map](https://modal.com/docs/reference/modal.Function#map) these inputs between containers +# so that the inference class can simply read them directly from the Volume. If you are +# shipping the images themselves across the wire, that will likely bottleneck throughput. + +# Note that Modal Volumes are optimized for datasets on the order of 50,000 - 500,000 +# files and directories. If you have a larger dataset, you may need to consider other storage +# options such as a [CloudBucketMount](https://modal.com/docs/examples/rosettafold). + +# A note on preprocessing: Infinity will handle resizing and other preprocessing in case +# your images are not the same size as what the model is expecting; however, this will +# significantly degrade throughput. We recommend batch-processing (if possible). + + +@app.function( + image=infinity_image, + volumes={vol_mnt: data_volume}, + max_containers=1, # We only want one container to handle volume setup + cpu=4, # HuggingFace will use multi-process parallelism to download + timeout=24 * 60 * 60, # if using a large HF dataset, this may need to be longer +) +def catalog_jpegs( + dataset_namespace: str, # a HuggingFace path like `microsoft/cats_vs_dogs` + cache_dir: str, # a subdir where the JPEGs will be extracted into the volume long-form + image_cap: int, # hard cap on the number of images to be processed (e.g. for timing, debugging) + model_input_shape: tuple[int, int, int], # JPEGs will be preprocessed to this shape + threads_per_cpu: int = 4, # threads per CPU for I/O oversubscription +) -> tuple[ + list[os.PathLike], # the function returns a list of paths, + float, # and the time it took to prepare +]: + """ + This function checks the volume for JPEGs and, if needed, calls `download_to_volume` + which pulls a HuggingFace dataset into the mounted volume, preprocessing along the way. + """ + + def download_to_volume(dataset_namespace: str, cache_dir: str): + """ + This function: + (1) caches a HuggingFace dataset to the path specified in your `HF_HOME` environment + variable, which is pointed to a Modal Volume during creation of the image above. + (2) unpacks the dataset and preprocesses them; this could be done in several different + ways, but we want to do it all once upfront so as not to confound the timing tests later. + """ + from datasets import load_dataset + from torchvision.io import write_jpeg + from torchvision.transforms import Compose, PILToTensor, Resize + from tqdm import tqdm + + # Load dataset cache to HF_HOME + ds = load_dataset( + dataset_namespace, + split="train", + num_proc=os.cpu_count(), # this will be capped by huggingface based on the number of shards + ) + + # Create an `extraction` cache dir where we will create explicit JPEGs + mounted_cache_dir = vol_mnt / cache_dir + mounted_cache_dir.mkdir(exist_ok=True, parents=True) + + # Preprocessing pipeline: resize in bulk now instead of on-the-fly later + preprocessor = Compose( + [ + Resize(model_input_shape), + PILToTensor(), + ] + ) + + def preprocess_img(idx, example): + """ + Applies preprocessor and write as jpeg with TurboJPEG (via TorchVision). + """ + # Define output path + write_path = mounted_cache_dir / f"img{idx:07d}.jpg" + # Skip if already done + if write_path.is_file(): + return + + # Process + preprocessed = preprocessor(example["image"].convert("RGB")) + + # Write to modal.Volume + write_jpeg(preprocessed, write_path) + + # Note: the optimization of this loop really depends on your preprocessing stack. + # You could use ProcessPool if there is significant work per image, or even + # GPU acceleration and batch preprocessing. We keep it simple here for the example. + futures = [] + with ThreadPoolExecutor(max_workers=os.cpu_count * threads_per_cpu) as executor: + for idx, ex in enumerate(ds): + if image_cap > 0 and idx >= image_cap: + break + futures.append(executor.submit(preprocess_img, idx, ex)) + + # Progress bar over completed futures + for _ in tqdm( + as_completed(futures), total=len(futures), desc="Caching images" + ): + pass # result() is implicitly called by as_completed() + + # Save changes + data_volume.commit() + + ds_preptime_st = perf_counter() + + def list_all_jpegs(subdir: os.PathLike = "/") -> list[os.PathLike]: + """ + Searches a subdir within your volume for all JPEGs. + """ + return [ + x.path + for x in data_volume.listdir(subdir.as_posix()) + if x.path.endswith(".jpg") + ] + + # Check for extracted-JPEG cache dir within the modal.Volume + if (vol_mnt / cache_dir).is_dir(): + im_path_list = list_all_jpegs(cache_dir) + n_ims = len(im_path_list) + else: + n_ims = 0 + print("The cache dir was not found...") + + # If needed, download dataset to a modal.Volume + if (n_ims < image_cap) or (n_ims == 0): + print(f"Found {n_ims} JPEGs; checking for more on HuggingFace.") + download_to_volume(dataset_namespace, cache_dir) + # Try again + im_path_list = list_all_jpegs(cache_dir) + n_ims = len(im_path_list) + + # [optional] Cap the number of images to process + print(f"Found {n_ims} JPEGs in the Volume.", end="") + if image_cap > 0: + im_path_list = im_path_list[: min(image_cap, len(im_path_list))] + print(f"using {len(im_path_list)}.") + + # Time it + ds_time_elapsed = perf_counter() - ds_preptime_st + return im_path_list, ds_time_elapsed + + +def chunked(seq: list[os.PathLike], subseq_size: int) -> Iterator[list[os.PathLike]]: + """ + Helper function that chunks a sequence into subsequences of length `subseq_size`. + """ + for i in range(0, len(seq), subseq_size): + yield seq[i : i + subseq_size] + + +# ## Inference app +# Here we define a [modal.cls](https://modal.com/docs/reference/modal.Cls#modalcls) +# that wraps [Infinity's AsyncEmbeddingEngine](https://github.com/michaelfeil/infinity "github/michaelfeil/infinity"). +# Some important observations: +# 1. Infinity handles asynchronous queuing internally. This is actually redundant with Modal's +# concurrency feature, but we found that using them together still helps. +# In [another example](https://modal.com/docs/examples/image_embedding_th_compile), +# we show how to achieve a similar setup without Infinity. +# 2. The variable `allow_concurrent_inputs` passed to the `main` local_entrypoint is +# used to set both the number of concurrent inputs (via with_options) and the class variable +# `n_engines` (via modal.parameters). If you aren't using `with_options` you can use the +# [modal.concurrent](https://modal.com/docs/guide/concurrent-inputs#input-concurrency) +# decorator directly. +# 3. In `init_engines`, we are creating exactly one Infinity inference +# engine for each concurrently-passed batch of data. This is a high-level version of GPU packing suitable +# for use with a high-level inference engine like Infinity. +# 4. The [@modal.enter](https://modal.com/docs/reference/modal.enter#modalenter) +# decorator ensures that this method is called once per container, on startup (and `exit` is +# run once, on shutdown). + + +@app.cls( + image=infinity_image, + volumes={vol_mnt: data_volume}, + cpu=4, + memory=5 * 1024, # MB -> GB +) +class InfinityEngine: + model_name: str = modal.parameter() + batch_size: int = modal.parameter(default=100) + n_engines: int = modal.parameter(default=1) + threads_per_core: int = modal.parameter(default=8) + verbose_inference: bool = modal.parameter(default=False) + # For logging + name: str = "InfinityEngine" + + @modal.enter() + async def init_engines(self): + """ + On container start, starts `self.n_engines` copies of the selected model + and puts them in an async queue. + """ + print(f"Loading {self.n_engines} models... ", end="") + self.engine_queue: asyncio.Queue[AsyncEmbeddingEngine] = asyncio.Queue() + start = perf_counter() + for _ in range(self.n_engines): + engine = AsyncEmbeddingEngine.from_args( + EngineArgs( + model_name_or_path=self.model_name, + batch_size=self.batch_size, + model_warmup=False, + engine=InferenceEngine.torch, + dtype=Dtype.float16, + device="cuda", + ) + ) + await engine.astart() + await self.engine_queue.put(engine) + print(f"Took {perf_counter() - start:.4}s.") + + def read_batch(self, im_path_list: list[os.PathLike]) -> list["Image"]: + """ + Read a batch of data. Infinity is expecting PIL.Image.Image type + inputs, but it's faster to read from disk with torchvision's `read_image` + and convert to PIL than it is to read directly with PIL. + + This process is parallelized over the batch with multithreaded data reading. + """ + + def readim(impath: os.PathLike): + """Read with torch, convert back to PIL for Infinity""" + return to_pil_image(read_image(str(vol_mnt / impath))) + + with ThreadPoolExecutor( + max_workers=os.cpu_count() * self.threads_per_core + ) as executor: + images = list(executor.map(readim, im_path_list)) + + return images + + @modal.method() + async def embed(self, images: list[os.PathLike]) -> tuple[float, float]: + """ + This is the workhorse function. We select a model from the queue, prepare + a batch, execute inference, and return the time elapsed. + + NOTE: we throw away the embeddings here; you probably want to return + them or save them directly to a modal.Volume. + """ + # (0) Grab an engine from the queue + engine = await self.engine_queue.get() + + try: + # (1) Load batch of image data + st = perf_counter() + images = self.read_batch(images) + batch_elapsed = perf_counter() - st + + # (2) Encode the batch + st = perf_counter() + # Infinity Engine is async + embedding, _ = await engine.image_embed(images=images) + embed_elapsed = perf_counter() - st + finally: + # No matter what happens, return the engine to the queue + await self.engine_queue.put(engine) + + # (3) Housekeeping + if self.verbose_inference: + print(f"Time to load batch: {batch_elapsed:.2f}s") + print(f"Time to embed batch: {embed_elapsed:.2f}s") + + # (4) You may wish to return the embeddings themselves here + return embed_elapsed, len(images) + + @modal.exit() + async def exit(self) -> None: + """ + Shut down each of the engines. + """ + for _ in range(self.n_engines): + engine = await self.engine_queue.get() + await engine.astop() + + +# ## Local Entrypoint +# This is the backbone of the example: it parses inputs, grabs a list of data, instantiates +# the InfinityEngine embedder application, and passes data to it via `map`. +# +# Inputs: +# * `gpu` is a string specifying the GPU to be used. +# * `max_containers` caps the number of containers allowed to spin-up. Note that this cannot +# be used with `buffer_containers`: *if you want to use this, set* `buffer_containers=None` *above!* +# * `allow_concurrent_inputs` sets the [@modal.concurrent(max_inputs:int) ](https://modal.com/docs/guide/concurrent-inputs#input-concurrency "Modal: input concurrency") +# argument for the inference app via the +# [modal.cls.with_options](https://modal.com/docs/reference/modal.Cls#with_options) API. +# This takes advantage of the asynchronous nature of the Infinity embedding inference app. +# * `threads_per_core` oversubscription factor for parallelized I/O (image reading). +# * `batch_size` is a parameter passed to the [Infinity inference engine](https://github.com/michaelfeil/infinity "github/michaelfeil/infinity"), +# and it means the usual thing for machine learning inference: a group of images are processed +# through the neural network together. +# * `model_name` a HuggingFace model path a la [openai/clip-vit-base-patch16]([OpenAI model](https://huggingface.co/openai/clip-vit-base-patch16 "OpenAI ViT")); +# Infinity will automatically load it and prepare it for asynchronous serving. +# * `image_cap` caps the number of images used in this example (e.g. for debugging/testing) +# * `hf_dataset_name` a HuggingFace data path a la "microsoft/cats_vs_dogs" +# * `log_file` (optional) points to a local path where a CSV of times will be logged +# +# These three parameters are used to pre-process images to the correct size in a big batch +# before inference. However, if you have the wrong numbers or aren't sure, Infinity will +# automatically handle resizing (at a cost to throughput). +# * `im_chan`: the number of color channels your model is expecting (probably 3) +# * `im_height`: the number of pixels tall your model is expecting the images to be +# * `im_width`: the number of color channels your model is expecting (probably 3) +# +@app.local_entrypoint() +def main( + # with_options parameters: + gpu: str = "A10G", + max_containers: int = 50, + allow_concurrent_inputs: int = 2, + # modal.parameters: + n_models: int = None, # defaults to match `allow_concurrent_parameters` + model_name: str = "openai/clip-vit-base-patch16", + batch_size: int = 100, + im_chan: int = 3, + im_height: int = 224, + im_width: int = 224, + # data + image_cap: int = -1, + hf_dataset_name: str = "microsoft/cats_vs_dogs", + million_image_test: bool = False, + # logging (optional) + log_file: str = None, # TODO: remove local logging from example +): + start_time = perf_counter() + + # (0) Catalog data: modify `catalog_jpegs` to fetch batches of your data paths. + extracted_path = Path("extracted") / hf_dataset_name + im_path_list, vol_setup_time = catalog_jpegs.remote( + dataset_namespace=hf_dataset_name, + cache_dir=extracted_path, + image_cap=image_cap, + model_input_shape=(im_chan, im_height, im_width), + ) + print(f"Took {vol_setup_time:.2f}s to setup volume.") + if million_image_test: + print("WARNING: `million_image_test` FLAG RECEIVED! RESETTING BSZ ETC!") + mil = int(1e6) + while len(im_path_list) < mil: + im_path_list += im_path_list + im_path_list = im_path_list[:mil] + n_ims = len(im_path_list) + + # (1) Init the model inference app + # No inputs to with_options if none provided or buffer_used aboe + container_config = {"max_containers": max_containers} + # Build the engine + start_time = perf_counter() + embedder = InfinityEngine.with_options( + gpu=gpu, **container_config + ).with_concurrency(max_inputs=allow_concurrent_inputs)( + batch_size=batch_size, + n_engines=n_models if n_models else allow_concurrent_inputs, + model_name=model_name, + ) + + # (2) Embed batches via remote `map` call + times, batchsizes = [], [] + for time, batchsize in embedder.embed.map(chunked(im_path_list, batch_size)): + times.append(time) + batchsizes.append(batchsize) + + # (3) Log + if n_ims > 0: + total_duration = perf_counter() - start_time + total_throughput = n_ims / total_duration + embed_throughputs = [ + batchsize / time for batchsize, time in zip(batchsizes, times) + ] + avg_throughput = sum(embed_throughputs) / len(embed_throughputs) + + log_msg = ( + f"{embedder.name}{gpu}::batch_size={batch_size}::" + f"n_ims={n_ims}::concurrency={allow_concurrent_inputs}::" + f"\tTotal time:\t{total_duration / 60:.2f} min\n" + f"\tOverall throughput:\t{total_throughput:.2f} im/s\n" + f"\tSingle-model throughput (avg):\t{avg_throughput:.2f} im/s\n" + ) + + print(log_msg) + + if log_file is not None: + local_logfile = Path(log_file).expanduser() + local_logfile.parent.mkdir(parents=True, exist_ok=True) + + import csv + + csv_exists = local_logfile.exists() + with open(local_logfile, "a", newline="", encoding="utf-8") as f: + writer = csv.writer(f) + if not csv_exists: + # write header + writer.writerow( + [ + "batch_size", + "concurrency", + "max_containers", + "gpu", + "n_images", + "total_time", + "total_throughput", + "avg_model_throughput", + ] + ) + # write your row + writer.writerow( + [ + batch_size, + allow_concurrent_inputs, + max_containers, + gpu, + n_ims, + total_duration, + total_throughput, + avg_throughput, + ] + ) diff --git a/06_gpu_and_ml/embeddings/image_embedding_th_compile.py b/06_gpu_and_ml/embeddings/image_embedding_th_compile.py new file mode 100644 index 000000000..12737c22a --- /dev/null +++ b/06_gpu_and_ml/embeddings/image_embedding_th_compile.py @@ -0,0 +1,665 @@ +# --- +# cmd: ["modal", "run", "06_gpu_and_ml/embeddings/image_embedding_th_compile.py::main"] +# --- +# TODO: deprecation warnings at the beginning?? +# TODO: add torch.inference_mode everywhere to both scripts... +# TODO: remove all the bugger stuff +# # A Recipe for Throughput Maximization: GPU Packing with torch.compile +# In certain applications, the bottom line comes to *throughput*: process a batch of inputs as fast as possible. +# This example presents a Modal recipe for maximizing image embedding throughput, +# taking advantage of Modal's concurrency features. +# ### BLUF (Bottom Line Up Front) +# recipe ABC +# ### Why? +# compile discussion +# ## Local env imports +# # Import everything we need for the locally-run Python (everything in our local_entrypoint function at the bottom). +import asyncio +import csv +import os +import shutil +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path +from time import perf_counter, time_ns +from typing import Iterator + +import modal + +# ## Key Parameters +# There are three ways to parallelize inference for this usecase: via batching, +# by packing individual GPU(s) with multiple copies of the model, and by fanning out across multiple containers. +# +# Modal provides two ways to dynamically parameterize classes: through +# [modal.cls.with_options](https://modal.com/docs/reference/modal.Cls#with_options) +# and through +# [modal.parameter](https://modal.com/docs/reference/modal.parameter#modalparameter). +# The app.local_entrypoint() main function at the bottom of this example uses these +# features to dynamically construct the inference engine class wrapper. One feature +# that is not currently support via `with_options` is the `buffer_containers` parameter. +# This tells Modal to pre-emptively warm a number of containers before they are strictly +# needed. In other words it tells Modal to continuously fire up more and more containers +# until throughput is saturated. + +# buffer_containers: int = 1000 # (50,) + +# ## Dataset, Model, and Image Setup +# This example uses HuggingFace to download data and models. We will use a high-performance +# [Modal Volume](https://modal.com/docs/guide/volumes#volumes "Modal.Volume") +# to (1) cache model weights, (2) store the +# [image dataset](https://huggingface.co/datasets/microsoft/cats_vs_dogs) +# that we want to embed, and (3) cache torch.compile kernels and artifacts. + + +# ### Volume Initialization +# You may need to [set up a secret](https://modal.com/secrets/) to access HuggingFace datasets +hf_secret = modal.Secret.from_name("huggingface-secret") + + +# Create a persisted dict - the data gets retained between app runs +racetrack_dict = modal.Dict.from_name("laion2B", create_if_missing=True) + +# This name is important for referencing the volume in other apps or for +# [browsing](https://modal.com/storage): +data_volume = modal.Volume.from_name("example-embedding-data", create_if_missing=True) + +# The location within the volume where torch.compile's caching backends should point to: +# This is the location within the container where this Volume will be mounted: +vol_mnt = Path("/data") +TH_CACHE_DIR = vol_mnt / "model-compile-cache" + +# ### Define the image +th_compile_image = ( + modal.Image.debian_slim(python_version="3.10") + .pip_install( + [ + "datasets", # for huggingface data download + "hf_transfer", # for fast huggingface data download + "tqdm", # progress bar for dataset download + "torch", # torch.compile + "transformers", # CLIPVisionModel etc. + "torchvision", # for fast image loading + ] + ) + .env( + { + # For fast HuggingFace model and data caching and download in our Volume + "HF_HOME": vol_mnt.as_posix(), + "HF_HUB_ENABLE_HF_TRANSFER": "1", + # Enables speedy caching across containers + "TORCHINDUCTOR_CACHE_DIR": TH_CACHE_DIR.as_posix(), + "TORCHINDUCTOR_FX_GRAPH_CACHE": "1", + "TORCHINDUCTOR_AUTOGRAD_CACHE": "1", + } + ) +) + +# Initialize the app +app = modal.App( + "example-compiled-embedder", + image=th_compile_image, + volumes={vol_mnt: data_volume}, + secrets=[hf_secret], +) + +# Imports inside the container +with th_compile_image.imports(): + import torch + from torch.serialization import safe_globals + from torchvision.io import read_image + from transformers import CLIPImageProcessorFast, CLIPVisionModel + + +# ## Dataset Setup +# We use a [Modal Volume](https://modal.com/docs/guide/volumes#volumes "Modal.Volume") +# to store the images we want to encode. For your usecase, can simply replace the +# function `catalog_jpegs` with any function that returns a list of image paths. Just make +# sure that it's returning the _paths_: we are going to +# [map](https://modal.com/docs/reference/modal.Function#map) these inputs between containers +# so that the inference class can simply read them directly from the Volume. If you are +# shipping the images themselves across the wire, that will likely bottleneck throughput. + +# Note that Modal Volumes are optimized for datasets on the order of 50,000 - 500,000 +# files and directories. If you have a larger dataset, you may need to consider other storage +# options such as a [CloudBucketMount](https://modal.com/docs/examples/rosettafold). + + +@app.function( + image=th_compile_image, + volumes={vol_mnt: data_volume}, + max_containers=1, # We only want one container to handle volume setup + cpu=4, # HuggingFace will use multi-process parallelism to download + timeout=10 * 60, # if using a large HF dataset, this may need to be longer +) +def catalog_jpegs( + dataset_namespace: str, # a HuggingFace path like `microsoft/cats_vs_dogs` + cache_dir: str, # a subdir where the JPEGs will be extracted into the volume long-form + image_cap: int, # hard cap on the number of images to be processed (e.g. for timing, debugging) + model_input_shape: tuple[int, int, int], # JPEGs will be preprocessed to this shape + threads_per_core: int = 8, # threads per CPU for I/O oversubscription + n_million_image_test: float = None, +) -> tuple[ + list[os.PathLike], # the function returns a list of paths, + float, # and the time it took to prepare +]: + """ + This function checks the volume for JPEGs and, if needed, calls `download_to_volume` + which pulls a HuggingFace dataset into the mounted volume, preprocessing along the way. + """ + + def download_to_volume(dataset_namespace: str, cache_dir: str): + """ + This function: + (1) caches a HuggingFace dataset to the path specified in your `HF_HOME` environment + variable, which is pointed to a Modal Volume during creation of the image above. + (2) unpacks the dataset and preprocesses them; this could be done in several different + ways, but we want to do it all once upfront so as not to confound the timing tests later. + """ + from datasets import load_dataset + from torchvision.io import write_jpeg + from torchvision.transforms import Compose, PILToTensor, Resize + from tqdm import tqdm + + # Load dataset cache to HF_HOME + ds = load_dataset( + dataset_namespace, + split="train", + num_proc=os.cpu_count(), # this will be capped by huggingface based on the number of shards + ) + + # Create an `extraction` cache dir where we will create explicit JPEGs + mounted_cache_dir = vol_mnt / cache_dir + mounted_cache_dir.mkdir(exist_ok=True, parents=True) + + # Preprocessing pipeline: resize in bulk now instead of on-the-fly later + preprocessor = Compose( + [ + Resize(model_input_shape), + PILToTensor(), + ] + ) + + def preprocess_img(idx, example): + """ + Applies preprocessor and write as jpeg with TurboJPEG (via TorchVision). + """ + # Define output path + write_path = mounted_cache_dir / f"img{idx:07d}.jpg" + # Skip if already done + if write_path.is_file(): + return + + # Process + preprocessed = preprocessor(example["image"].convert("RGB")) + + # Write to modal.Volume + write_jpeg(preprocessed, write_path) + + # Note: the optimization of this loop really depends on your preprocessing stack. + # You could use ProcessPool if there is significant work per image, or even + # GPU acceleration and batch preprocessing. We keep it simple here for the example. + futures = [] + with ThreadPoolExecutor( + max_workers=os.cpu_count * threads_per_core + ) as executor: + for idx, ex in enumerate(ds): + if image_cap > 0 and idx >= image_cap: + break + futures.append(executor.submit(preprocess_img, idx, ex)) + + # Progress bar over completed futures + for _ in tqdm( + as_completed(futures), total=len(futures), desc="Caching images" + ): + pass # result() is implicitly called by as_completed() + + # Save changes + data_volume.commit() + + ds_preptime_st = perf_counter() + + def list_all_jpegs(subdir: os.PathLike = "/") -> list[os.PathLike]: + """ + Searches a subdir within your volume for all JPEGs. + """ + return [ + x.path + for x in data_volume.listdir(subdir.as_posix()) + if x.path.endswith(".jpg") + ] + + # Check for extracted-JPEG cache dir within the modal.Volume + if (vol_mnt / cache_dir).is_dir(): + im_path_list = list_all_jpegs(cache_dir) + n_ims = len(im_path_list) + else: + n_ims = 0 + print("The cache dir was not found...") + + # If needed, download dataset to a modal.Volume + if (n_ims < image_cap) or (n_ims == 0): + print(f"Found {n_ims} JPEGs; checking for more on HuggingFace.") + download_to_volume(dataset_namespace, cache_dir) + # Try again + im_path_list = list_all_jpegs(cache_dir) + n_ims = len(im_path_list) + + # [optional] Cap the number of images to process + print(f"Found {n_ims} JPEGs in the Volume.", end="") + if image_cap > 0: + im_path_list = im_path_list[: min(image_cap, len(im_path_list))] + + print(f"Took {perf_counter() - ds_preptime_st:.2f}s to setup volume.") + if n_million_image_test > 0: + print(f"WARNING: `{n_million_image_test} million_image_test` FLAG RECEIVED!") + mil = int(n_million_image_test * 1e6) + while len(im_path_list) < mil: + im_path_list += im_path_list + im_path_list = im_path_list[:mil] + + return im_path_list + + +def chunked(seq: list[os.PathLike], subseq_size: int) -> Iterator[list[os.PathLike]]: + """ + Helper function that chunks a sequence into subsequences of length `subseq_size`. + """ + for i in range(0, len(seq), subseq_size): + yield seq[i : i + subseq_size] + + +# ## Inference app +# Here we define a [modal.cls](https://modal.com/docs/reference/modal.Cls#modalcls) +# that wraps an AsyncQueue of `torch.compile`'d models. +# Some important notes: +# 1. We let Modal handle management of concurrent inputs via the `input_concurrency` +# parameter, which we pass to the class constructor in our `main` local_entrypoint below. This +# parameter sets both the number of concurrent inputs (via with_options) and the class variable +# `n_engines` (via modal.parameters). If you aren't using `with_options` you can use the +# [modal.concurrent](https://modal.com/docs/guide/concurrent-inputs#input-concurrency) +# decorator directly. +# 2. In `init_engines`, we are compiling one copy of the model for each concurrently-passed +# batch of data. Higher level inference engines like +# [Infinity](https://github.com/michaelfeil/infinity "github/michaelfeil/infinity") +# handle this under the hood, at a cost to throughput. +# 3. The [@modal.enter](https://modal.com/docs/reference/modal.enter#modalenter) +# decorator ensures that this method is called once per container, on startup (and `exit` is +# run once, on shutdown). + + +# If buffer_containers is set, use it, otherwise rely on `with_options`. +@app.cls( + image=th_compile_image, + volumes={vol_mnt: data_volume}, + cpu=2.5, + memory=2.5 * 1024, # MB -> GB + buffer_containers=50, +) +class TorchCompileEngine: + model_name: str = modal.parameter() + batch_size: int = modal.parameter(default=100) + n_engines: int = modal.parameter(default=1) + model_input_chan: int = modal.parameter(default=3) + model_input_imheight: int = modal.parameter(default=224) + model_input_imwidth: int = modal.parameter(default=224) + threads_per_core: int = modal.parameter(default=8) + exp_tag: str = modal.parameter(default="default-tag") + # Cannot currently gracefully set ENV vars from local_entrypoint + cache_dir: Path = TH_CACHE_DIR + # For logging + name: str = "TorchCompileEngine" + + def init_th(self): + """ + Have to manually turn this on for torch.compile. + """ + major, minor = torch.cuda.get_device_capability(torch.cuda.current_device()) + torch.set_grad_enabled(False) + if major > 8: + torch.set_float32_matmul_precision("high") + + @modal.enter() + async def init_engines(self): + """ + Once per container start, `self.n_engines` models will be initialized + (one for each concurrently served input via Modal). The first container + needs to compute a trace and cache the kernels to our modal.Volume; sub- + sequent containers can use that cache (which takes 50%-60% the time + as the first torch.compile call). + """ + key = f"{self.exp_tag}-first.ctr.start" + racetrack_dict[key] = racetrack_dict.get(key, time_ns()) + + # (0) Setup + # Torch backend + self.init_th() + # This makes sure n-th container finds the cache created by the first one + data_volume.reload() + # This is where we will cache torch.compile artifacts + compile_cache: Path = Path(self.cache_dir) / ( + self.model_name.replace("/", "_") + "_compiled_model_cache.pt" + ) + # Condense modal.parameter values + model_input_shape = ( + self.batch_size, + self.model_input_chan, + self.model_input_imwidth, + self.model_input_imwidth, + ) + + from torch.compiler._cache import CacheInfo + + # This tells torch to dynamically decide whether to recompile from scratch + # or to check for a cache (we want it to check for a cache!) + torch.compiler.set_stance("eager_on_recompile") + + # (1) Load raw model weights and preprocessor once per container + base = CLIPVisionModel.from_pretrained(self.model_name) + self.preprocessor = CLIPImageProcessorFast.from_pretrained( + self.model_name, usefast=True + ) + + # Only save what we need + config = base.config + state = base.state_dict() + del base + + # (2) Check for trace artifacts cache + if compile_cache.is_file(): + cache = compile_cache.read_bytes() + with safe_globals([CacheInfo]): + torch.compiler.load_cache_artifacts(cache) + + # (3) Build an Async Queue of compiled models + self.engine_queue = asyncio.Queue() + + for idx in range(self.n_engines): + # (3.a) Build a CLIPVisionModel model from weights + model = CLIPVisionModel(config).eval().cuda() + model.load_state_dict(state) + + # Uses cache under the hood (if available) + compiled_model = torch.compile( + model, + mode="reduce-overhead", + fullgraph=True, + ) + + # (3.b) Cache the trace only in the 1st container for the 1st model copy + if (idx == 0) and (not compile_cache.is_file()): + # Complete the trace with an inference + compiled_model( + **self.preprocessor( + images=torch.randn(model_input_shape), + device=compiled_model.device, + return_tensors="pt", + ) + ) + # Extract and save artifacts + compile_cache.parent.mkdir(exist_ok=True, parents=True) + artifact_bytes, cache_info = torch.compiler.save_cache_artifacts() + compile_cache.write_bytes(artifact_bytes) + + await self.engine_queue.put(compiled_model) + + # (4) initialize threadpool for dataloading + self.executor = ThreadPoolExecutor( + max_workers=os.cpu_count() * self.threads_per_core, + thread_name_prefix="img-io", + ) + + @staticmethod + def readim(impath: os.PathLike): + """ + Prepends this container's volume mount location to the image path. + """ + return read_image(str(vol_mnt / impath)) + + @modal.method() + async def embed( + self, images: list[os.PathLike], *args, **kwargs + ) -> tuple[float, float]: + """ + This is the workhorse function. We select a model from the queue, prepare + a batch, execute inference, and return the time elapsed. + + NOTE: we throw away the embeddings here; you probably want to return + them or save them directly to a modal.Volume. + + TODO: do image loading first before awaiting queue + """ + + try: + # (0) Load batch of image data + st = perf_counter() + images = self.preprocessor( + images=torch.stack(list(self.executor.map(self.readim, images))), + device="cuda:0", + return_tensors="pt", + ) + batch_elapsed = perf_counter() - st + + # (1) Grab an engine from the queue + engine = await self.engine_queue.get() + + # (2) Encode the batch + st = perf_counter() + embedding = engine(**images).pooler_output + embed_elapsed = perf_counter() - st + + finally: + # No matter what happens, return the engine to the queue + await self.engine_queue.put(engine) + + # (3) You may wish to return the embeddings themselves here + return batch_elapsed, embed_elapsed, len(images) + + @modal.exit() + async def exit(self) -> None: + """ + trying to get less printouts?... + """ + self.executor.shutdown(wait=True) + racetrack_dict[f"{self.exp_tag}-last.ctr.complete"] = time_ns() + return + + +# This modal.function is a helper that you probably don't need to call: +# it deletes the torch.compile cache dir we use for sharing a cache across +# containers (for measuring startup times). + + +@app.function( + image=th_compile_image, + volumes={vol_mnt: data_volume}, +) +def destroy_th_compile_cache(): + """ + For timing purposes: deletes torch compile cache dir. + """ + if TH_CACHE_DIR.exists(): + num_files = sum(1 for f in TH_CACHE_DIR.rglob("*") if f.is_file()) + + print( + "\t*** DESTROYING model cache! You sure you wanna do that?! " + f"({num_files} files)" + ) + shutil.rmtree(TH_CACHE_DIR.as_posix()) + else: + print( + f"\t***destroy_cache was called, but path doesnt exist:\n\t{TH_CACHE_DIR}" + ) + return + + +# ## Local Entrypoint +# This is the backbone of the example: it parses inputs, grabs a list of data, instantiates +# the TorchCompileEngine embedder application, and passes data to it via `map`. +# +# Inputs: +# * `gpu` is a string specifying the GPU to be used. +# * `max_containers` caps the number of containers allowed to spin-up. Note that this cannot +# be used with `buffer_containers`: *if you want to use this, set* `buffer_containers=None` *above!* +# * `input_concurrency` sets the [@modal.concurrent(max_inputs:int) ](https://modal.com/docs/guide/concurrent-inputs#input-concurrency "Modal: input concurrency") +# argument for the inference app via the +# [modal.cls.with_options](https://modal.com/docs/reference/modal.Cls#with_options) API. +# * `threads_per_core` oversubscription factor for parallelized I/O (image reading). +# * `batch_size` determines how many images are passed to individual instances of the model at a time. +# * `model_name` a HuggingFace model path a la [openai/clip-vit-base-patch16]([OpenAI model](https://huggingface.co/openai/clip-vit-base-patch16 "OpenAI ViT")); +# It needs to be wrappable by HuggingFace's CLIPVisionModel class. +# * `image_cap` caps the number of images used in this example (e.g. for debugging/testing) +# * `hf_dataset_name` a HuggingFace data path a la "microsoft/cats_vs_dogs" +# * `log_file` (optional) points to a local path where a CSV of times will be logged +# * `destroy_cache` (optional) destroys the torch.compile cache e.g. for timing/debugging +# +# These three parameters are used to pre-process images to the correct size in a big batch +# before inference and for torch.compile to optimize the trace for your predicted batch size. +# * `model_input_chan`: the number of color channels your model is expecting (probably 3) +# * `model_input_imheight`: the number of pixels tall your model is expecting the images to be +# * `model_input_imwidth`: the number of color channels your model is expecting (probably 3) + + +@app.local_entrypoint() +def main( + # APP CONFIG + gpu: str = "any", + max_containers: int = None, # this gets overridden if buffer_containers is not None + input_concurrency: int = 2, + # MODEL CONFIG + n_models: int = None, # defaults to match `allow_concurrent_parameters` + model_name: str = "openai/clip-vit-base-patch16", + batch_size: int = 32, + # DATA CONFIG + im_chan: int = 3, + im_height: int = 224, + im_width: int = 224, + hf_dataset_name: str = "microsoft/cats_vs_dogs", + image_cap: int = -1, + n_million_image_test: float = 0, + # torch.compile cache + destroy_cache: bool = False, + exp_tag: str = "default-tag", + log_file: str = "/home/ec2-user/modal-examples/06_gpu_and_ml/embeddings/_triton.csv", +): + start_time = perf_counter() + racetrack_dict[f"{exp_tag}-exp.start"] = time_ns() + + # (0.a) Catalog data: modify `catalog_jpegs` to fetch batches of your data paths. + extracted_path = Path("extracted") / hf_dataset_name + im_path_list = catalog_jpegs.remote( + dataset_namespace=hf_dataset_name, + cache_dir=extracted_path, + image_cap=image_cap, + model_input_shape=(im_chan, im_height, im_width), + n_million_image_test=n_million_image_test, + ) + print(f"Embedding {len(im_path_list)} images at batchsize {batch_size}.") + + # (0.b) This destroys cache for timing purposes - you probably don't want to do this! + if destroy_cache: + destroy_th_compile_cache.remote() + + # (1) Init the model inference app + + # Build the engine + racetrack_dict[f"{exp_tag}-embedder.init"] = time_ns() + + container_config = {"max_containers": max_containers} if max_containers else {} + embedder = TorchCompileEngine.with_concurrency( + max_inputs=input_concurrency + ).with_options(gpu=gpu, **container_config)( + batch_size=batch_size, + n_engines=n_models if n_models else input_concurrency, + model_name=model_name, + model_input_chan=im_chan, + model_input_imheight=im_height, + model_input_imwidth=im_width, + exp_tag=exp_tag, + ) + n_ims = len(im_path_list) + # (2) Embed batches via remote `map` call + # (2) Embed batches via remote `map` call + preptimes, inftimes, batchsizes = [], [], [] + # embedder.embed.spawn_map(chunked(im_path_list, batch_size)) + for preptime, inftime, batchsize in embedder.embed.map( + chunked(im_path_list, batch_size) + ): + preptimes.append(preptime) + inftimes.append(inftime) + batchsizes.append(batchsize) + + # (3) Log & persist results + if n_ims > 0: + total_duration = perf_counter() - start_time # end-to-end wall-clock + overall_throughput = n_ims / total_duration # imgs / s, wall-clock + + # per-container metrics + inf_throughputs = [bs / t if t else 0 for bs, t in zip(batchsizes, inftimes)] + prep_throughputs = [bs / t if t else 0 for bs, t in zip(batchsizes, preptimes)] + + avg_inf_throughput = sum(inf_throughputs) / len(inf_throughputs) + best_inf_throughput = max(inf_throughputs) + + avg_prep_throughput = sum(prep_throughputs) / len(prep_throughputs) + best_prep_throughput = max(prep_throughputs) + + total_prep_time = sum(preptimes) + total_inf_time = sum(inftimes) + + log_msg = ( + f"{embedder.name}{gpu}::batch_size={batch_size}::" + f"n_ims={n_ims}::concurrency={input_concurrency}\n" + f"\tTotal wall time:\t{total_duration / 60:.2f} min\n" + f"\tOverall throughput:\t{overall_throughput:.2f} im/s\n" + f"\tPrep time (sum):\t{total_prep_time:.2f} s\n" + f"\tInference time (sum):\t{total_inf_time:.2f} s\n" + f"\tPrep throughput (avg/best):\t{avg_prep_throughput:.2f} / " + f"{best_prep_throughput:.2f} im/s\n" + f"\tInfer throughput (avg/best):\t{avg_inf_throughput:.2f} / " + f"{best_inf_throughput:.2f} im/s\n" + ) + print(log_msg) + + # ── optional CSV ─────────────────────────────────────────────────────────── + if log_file: + path = Path(log_file).expanduser() + path.parent.mkdir(parents=True, exist_ok=True) + + header = [ + "batch_size", + "concurrency", + "n_models", + "max_containers", + "gpu", + "n_images", + "total_wall_time", + "overall_throughput", + "total_prep_time", + "total_inf_time", + "avg_prep_thpt", + "best_prep_thpt", + "avg_inf_thpt", + "best_inf_thpt", + ] + row = [ + batch_size, + input_concurrency, + n_models, + max_containers, + gpu, + n_ims, + total_duration, + overall_throughput, + total_prep_time, + total_inf_time, + avg_prep_throughput, + best_prep_throughput, + avg_inf_throughput, + best_inf_throughput, + ] + + write_header = not path.exists() + with path.open("a", newline="", encoding="utf-8") as f: + writer = csv.writer(f) + if write_header: + writer.writerow(header) + writer.writerow(row) diff --git a/06_gpu_and_ml/embeddings/modal_dynamo.py b/06_gpu_and_ml/embeddings/modal_dynamo.py new file mode 100644 index 000000000..2ff2382f7 --- /dev/null +++ b/06_gpu_and_ml/embeddings/modal_dynamo.py @@ -0,0 +1,231 @@ +import asyncio +import os +import subprocess +import time +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from time import perf_counter +from typing import Iterator, List, Sequence, Tuple + +import modal + +# ────────────────────────────── Constants ────────────────────────────── +HF_SECRET = modal.Secret.from_name("huggingface-secret") +VOL_NAME = "example-embedding-data" +VOL_MNT = Path("/data") +data_volume = modal.Volume.from_name(VOL_NAME, create_if_missing=True) +MODEL_REPO = VOL_MNT / "dynamo_repo" # will hold model.plan + config + +# image with dynamo + torch + dynamoclient (tiny helper) +dynamo_IMAGE = ( + modal.Image.from_registry("nvcr.io/nvidia/pytorch:25.04-py3", add_python="3.12") + .env( + { + "DEBIAN_FRONTEND": "noninteractive", + } + ) + ######################################################################################## + # Build Rust etc. + .run_commands("apt-get update") + .apt_install("curl", "build-essential", "pkg-config", "git", "libssl-dev", "pip") + # Remove any old cargo versions + .run_commands("apt-get purge -y cargo rustc || true") + .run_commands( + "curl -sSf https://sh.rustup.rs | sh -s -- -y " + "--profile minimal --default-toolchain 1.87.0" + ) + .env( + { + "PATH": "/root/.cargo/bin:$PATH", # make rustup’s cargo first + "RUSTUP_HOME": "/root/.rustup", + "CARGO_HOME": "/root/.cargo", + } + ) + .run_commands( + "pip install --upgrade pip 'hatchling>=1.24' 'hatch-fancy-pypi-readme>=22.5'" + ) + .pip_install("uv") + .run_commands("uv pip install --system nixl") + ######################################################################################## + # Build dynamo + .run_commands("git clone https://github.com/ai-dynamo/dynamo.git") + # TODO: condense :) + .apt_install("clang", "libclang-dev", "llvm-dev", "pkg-config", "cmake") + .run_commands( + "source $HOME/.cargo/env && cd /dynamo && cargo build --release --locked" + ) # --features cuda + .run_commands("cd /dynamo/lib/bindings/python && uv pip install --system .") + .run_commands("cd /dynamo && uv pip install --system .[all]") + ######################################################################################## + # extra stuff to absorb + .apt_install("nats-server", "etcd-server") + .env( + { + "HF_HOME": VOL_MNT.as_posix(), + "HF_HUB_ENABLE_HF_TRANSFER": "1", + # Tell dynamo where the repo will be mounted + "MODEL_REPO": MODEL_REPO.as_posix(), + "DYNAMO_HOME": "/dynamo", + } + ) + .entrypoint([]) +) + +app = modal.App( + "clip-dynamo-embed", + image=dynamo_IMAGE, + volumes={VOL_MNT: data_volume}, + secrets=[HF_SECRET], +) + +with dynamo_IMAGE.imports(): + import torch # noqa: F401 – for torchscript + + +def _env(key: str, default: str): + return os.environ.get(key, default) + + +DYNAMO_PORT = int(_env("DYNAMO_PORT", "8000")) +NATS_PORT = int(_env("NATS_PORT", "4222")) +ETCD_PORT = int(_env("ETCD_PORT", "2379")) + + +@app.cls( + image=dynamo_IMAGE, + volumes={VOL_MNT: data_volume}, + timeout=24 * 60 * 60, + cpu=4, + gpu="H100:2", +) +class Server: + @modal.enter() + def startup(self): + # 1. Launch infra + self.nats = subprocess.Popen( + ["nats-server", "-js", "--trace", f"--port={NATS_PORT}"], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + self.etcd = subprocess.Popen( + [ + "etcd", + f"--advertise-client-urls=http://0.0.0.0:{ETCD_PORT}", + f"--listen-client-urls=http://0.0.0.0:{ETCD_PORT}", + ], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + + # 2. Launch Dynamo + workdir = Path(os.environ["DYNAMO_HOME"]) / "examples" / "multimodal" + self.dynamo = subprocess.Popen( + [ + "dynamo", + "serve", + "graphs.agg:Frontend", + "-f", + "configs/agg.yaml", + "--port", + str(DYNAMO_PORT), # if Dynamo supports it + ], + cwd=str(workdir), + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + + # 3. Wait for each service + self.wait_for_port("localhost", NATS_PORT, "nats") + self.wait_for_port("localhost", ETCD_PORT, "etcd") + self.wait_for_port("localhost", DYNAMO_PORT, "dynamo") + + def wait_for_port( + self, host: str, port: int, service_name: str, timeout_min: float = 1.2 + ): + import socket + + from tqdm import tqdm + + checkrate_hz = 2 + t = int(timeout_min * 60 * checkrate_hz) + bar = tqdm(range(t), total=t, desc=f"Waiting for {service_name} heartbeat") + for tick in bar: + try: + with socket.create_connection((host, port), timeout=1): + bar.close() + tqdm.write( + f"\n\t{service_name} is ready on {host}:{port} after {tick / checkrate_hz}s.\n" + ) + return + except OSError: + time.sleep(1 / checkrate_hz) + + raise RuntimeError(f"{service_name} failed to become ready on {host}:{port}.") + + @modal.exit() + def shutdown(self): + self.nats.terminate() + self.etcd.terminate() + self.dynamo.terminate() + + def _dump_proc_logs(self): + for name in ["dynamo", "nats", "etcd"]: + proc = getattr(self, name) + if proc and proc.poll() is not None: # crashed + print(f"\n⚠️ {name} exited with {proc.returncode}. Last 40 lines:") + lines = proc.stdout.readlines()[-40:] + print("".join(l.decode(errors="replace") for l in lines)) + + @modal.method() + def infer( + self, + in_idx: int, + image_url: str = "http://images.cocodataset.org/test2017/000000155781.jpg", + ): + import subprocess, textwrap, sys + + # Entire cURL command as **one** shell string. + url = f"http://localhost:{DYNAMO_PORT}/v1/chat/completions" + payload = { + "model": "llava-hf/llava-1.5-7b-hf", + "messages": [ + { + "role": "user", + "content": [ + {"type": "text", "text": "What is in this image?"}, + {"type": "image_url", "image_url": {"url": image_url}}, + ], + } + ], + "max_tokens": 300, + "stream": False, + } + import requests + + try: + resp = requests.post(url, json=payload, timeout=60) + resp.raise_for_status() + return resp.json() # ✅ pickle-able result + except Exception as e: + # On failure, dump crashed service logs to help debugging + self._dump_proc_logs() + raise RuntimeError(f"Request to {url} failed: {e}") from e + # # Launch the command; `shell=True` is required because we pass a single string. + # result = subprocess.run( + # curl_cmd, + # shell=True, + # capture_output=True, # grabs both stdout & stderr + # text=True, + # check=True, # raises if curl exits non-zero + # ) + # return { + # "stdout": result.stdout, + # "stderr": result.stderr, + # "returncode": result.returncode, + # } + + +@app.local_entrypoint() +def main(): + x = Server() + print(x.infer.remote(1)) diff --git a/06_gpu_and_ml/embeddings/racetrack.py b/06_gpu_and_ml/embeddings/racetrack.py new file mode 100644 index 000000000..3d22cba23 --- /dev/null +++ b/06_gpu_and_ml/embeddings/racetrack.py @@ -0,0 +1,116 @@ +import time +import asyncio +import os +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path +from time import perf_counter +from typing import Iterator + +import modal + +# # Volumes +data_volume = modal.Volume.from_name("example-embedding-data") +results_volume = modal.Volume.from_name("racetrack", create_if_missing=True) +DATA_DIR = Path("/data") +RESULTS_DIR = DATA_DIR / "racetrack-results" +HF_HOME = DATA_DIR / "hf" +TH_CACHE_DIR = DATA_DIR / "model-compile-cache" + +# # Images + +infinity_image = ( + modal.Image.debian_slim(python_version="3.10") + .pip_install( + [ + "pillow", # for Infinity input typehint + "datasets", # for huggingface data download + "hf_transfer", # for fast huggingface data download + "tqdm", # progress bar for dataset download + "infinity_emb[all]==0.0.76", # for Infinity inference lib + "sentencepiece", # for this particular chosen model + "torchvision", # for fast image loading + ] + ) + .env( + { + # For fast HuggingFace model and data caching and download in our Volume + "HF_HOME": HF_HOME.as_posix(), + "HF_HUB_ENABLE_HF_TRANSFER": "1", + } + ) +) + +th_compile_image = ( + modal.Image.debian_slim(python_version="3.10") + .pip_install( + [ + "datasets", # for huggingface data download + "hf_transfer", # for fast huggingface data download + "tqdm", # progress bar for dataset download + "torch", # torch.compile + "transformers", # CLIPVisionModel etc. + "torchvision", # for fast image loading + ] + ) + .env( + { + # For fast HuggingFace model and data caching and download in our Volume + "HF_HOME": HF_HOME.as_posix(), + "HF_HUB_ENABLE_HF_TRANSFER": "1", + # Enables speedy caching across containers + "TORCHINDUCTOR_CACHE_DIR": TH_CACHE_DIR.as_posix(), + "TORCHINDUCTOR_FX_GRAPH_CACHE": "1", + "TORCHINDUCTOR_AUTOGRAD_CACHE": "1", + } + ) +) + +# # Timer class + + +class InferenceTimer: + @modal.enter() + def check_for_logfile(self): + self.start_time = time.perf_counter() + self.n_inferences = 0 + self.end_time = None + if not hasattr(self, 'logfile'): + raise ValueError("No `self.logfile` found. " \ + "Can't currently inherit constructors, so " \ + "all child classes of InferenceTimer must " \ + "have their own `logfile` modal.parameter!") + + @modal.exit() + def log_stats(self): + self.end_time= time.perf_counter() + + with csv. self.logfile + with open(local_logfile, "a", newline="", encoding="utf-8") as f: + writer = csv.writer(f) + if not csv_exists: + # write header + writer.writerow( + [ + "start_time", + "concurrency", + "max_containers", + "gpu", + "n_images", + "total_time", + "total_throughput", + "avg_model_throughput", + ] + ) + # write your row + writer.writerow( + [ + batch_size, + allow_concurrent_inputs, + max_containers, + gpu, + n_ims, + total_duration, + total_throughput, + avg_throughput, + ] + ) diff --git a/06_gpu_and_ml/embeddings/times.py b/06_gpu_and_ml/embeddings/times.py new file mode 100644 index 000000000..3b9a612ba --- /dev/null +++ b/06_gpu_and_ml/embeddings/times.py @@ -0,0 +1,68 @@ +import modal, time, pprint, statistics as st + +app = modal.App("timetest-demo") +data = modal.Dict.from_name("timetest", create_if_missing=True) + + +def epoch_ns() -> int: + """UTC wall-clock in integer nanoseconds.""" + return time.time_ns() + + +@app.cls(image=modal.Image.debian_slim()) +class Engine: + @modal.enter() + def startup(self): + # record when this *specific* container came alive + data["ctr_started_ns"] = epoch_ns() + + @modal.method() + def do_stuff(self, host_pre_call_ns: int) -> int: + """ + host_pre_call_ns is sent by the host right before .remote(). + We store both host and container clocks to cross-check skew. + """ + now_ns = epoch_ns() + # Persist both numbers so any later container can read them + data["ctr_method_ns"] = now_ns + data["host_pre_call_ns_echo"] = host_pre_call_ns + return now_ns # also returned to host for convenience + + @modal.exit() + def shutdown(self): + data["ctr_destroyed_ns"] = epoch_ns() + + +@app.local_entrypoint() +def main(): + # ===== host side ===== + host_pre_call_ns = epoch_ns() + data["host_pre_call_ns"] = host_pre_call_ns + + engine = Engine() + # invoke the remote method, sending the host timestamp along + ctr_method_ns = engine.do_stuff.remote(host_pre_call_ns) + host_post_call_ns = epoch_ns() + data["host_post_call_ns"] = host_post_call_ns + + # -------- pretty-print results -------- + def ns_to_s(n): + return f"{n / 1e9:,.6f} s" + + def delta(a, b): + return ns_to_s(data[a] - data[b]) + + print("\n=== Cross-container timing ===") + print( + f"Host pre-call ➔ container method : {delta('ctr_method_ns', 'host_pre_call_ns')}" + ) + print( + f"Container up ➔ method executed : {delta('ctr_method_ns', 'ctr_started_ns')}" + ) + print( + f"Method return ➔ host post-call : {delta('host_post_call_ns', 'ctr_method_ns')}" + ) + + # Check clock skew (container – host) at call time + skew_ns = data["ctr_method_ns"] - data["host_pre_call_ns_echo"] + print(f"Observed host ⇄ container clock skew : {skew_ns / 1e6:.3f} ms") diff --git a/06_gpu_and_ml/embeddings/tmp.py b/06_gpu_and_ml/embeddings/tmp.py new file mode 100644 index 000000000..33267a233 --- /dev/null +++ b/06_gpu_and_ml/embeddings/tmp.py @@ -0,0 +1,561 @@ +# --- +# cmd: ["modal", "run", "06_gpu_and_ml/embeddings/image_embedding_infinity.py::main"] +# --- + +# # A Recipe for Throughput Maximization: GPU Packing with Infinity Inference +# In certain applications, the bottom line comes to *throughput*: process a batch of inputs as fast as possible. +# This example presents a Modal recipe for maximizing image embedding throughput using the +# [Infinity inference engine](https://github.com/michaelfeil/infinity "github/michaelfeil/infinity"), +# a popular inference engine that manages asychronous queuing and model serving. +# +# Check out [this example](https://modal.com/docs/examples/image_embedding_th_compile) to see how +# to use Modal to natively accomplish these features and achieve even higher throughput (nearly 2x)! +# +# TODO: remove buffer container discussion/replace with min_containers +# ## Conclusions +# ### BLUF (Bottom Line Up Front) +# Set concurrency (`max_concurrent_inputs`) to 2, and set `batch_size` around 100. +# To get maximum throughput at any cost, set buffer_containers to 10; otherwise set it to None +# and set max_containers based on your budget. +# Be sure to preprocess your data in the same manner that the model is expecting (e.g., resizing images). +# If you only want to use one container, increase `batch_size` until you are maxing +# out the GPU (but keep concurrency, `max_concurrent_inputs`, capped around 2). The example herein achieves +# around 700 images / second overall throughput, embedding the entire +# [cats vs dogs](https://huggingface.co/datasets/microsoft/cats_vs_dogs) +# dataset in about 30s. + +# ### Why? +# While batch size maximizes GPU utilization, the time to form a batch (ie reading images) +# will ultimately overtake inference, whether due to I/O, sending data across a wire, etc. +# We can make up for this by using idle GPU cores to store additional copies of the model: +# this high-level form of _GPU packing_ is achieved via an async queue and the +# [@modal.concurrent(max_inputs:int) ](https://modal.com/docs/guide/concurrent-inputs#input-concurrency "Modal: input concurrency") +# decorator, called indirectly through [modal.cls.with_options](https://modal.com/docs/reference/modal.Cls#with_options). + +# Once you nail down an effective `batch_size` for your problem, you can crank up the number of containers +# to distribute the computational load. Set buffer_containers > 0 so that Modal continuously spins up more +# and more containers until the task is complete; otherwise set it to None, and use max_containers to cap +# the number of containers allowed. + +# High values of concurrency has diminishing returns, we believe, +# because we are already throttling the CPU with multi-threaded dataloading, and because of the way +# Infinity handles batches of inputs. We have a [more advanced example](https://modal.com/docs/examples/image_embedding_th_compile) +# that directly uses torch.compile (without Infinity), which uses Modal's native capabilities to +# manage queuing etc., and can take better advantage of the concurrency feature. + +# The demo herein should achieve around 700 images/second (around 200-300 images/second per model), +# but that number will increase dramatically for larger datasets where more and more containers spawn. +# We have clocked 9500 images/second using Infinity on a 1M image dataset, and twice that in our torch.compile +# example. + +# ## Local env imports +# Import everything we need for the locally-run Python (everything in our local_entrypoint function at the bottom). +import asyncio +import os +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path +from time import perf_counter +from typing import Iterator + +import modal + +# ## Key Parameters +# There are three ways to parallelize inference for this usecase: via batching, +# by packing individual GPU(s) with multiple copies of the model, and by fanning out across multiple containers. +# +# Modal provides two ways to dynamically parameterize classes: through +# [modal.cls.with_options](https://modal.com/docs/reference/modal.Cls#with_options) +# and through +# [modal.parameter](https://modal.com/docs/reference/modal.parameter#modalparameter). +# The app.local_entrypoint() main function at the bottom of this example uses these +# features to dynamically construct the inference engine class wrapper. One feature +# that is not currently support via `with_options` is the `buffer_containers` parameter. +# This tells Modal to pre-emptively warm a number of containers before they are strictly +# needed. In other words it tells Modal to continuously fire up more and more containers +# until throughput is saturated. + +# ## Dataset, Model, and Image Setup +# This example uses HuggingFace to download data and models. We will use a high-performance +# [Modal Volume](https://modal.com/docs/guide/volumes#volumes "Modal.Volume") +# both to cache model weights and to store the +# [image dataset](https://huggingface.co/datasets/microsoft/cats_vs_dogs) +# that we want to embed. + +# ### Volume Initialization +# You may need to [set up a secret](https://modal.com/secrets/) to access HuggingFace datasets +hf_secret = modal.Secret.from_name("huggingface-secret") + +# This name is important for referencing the volume in other apps or for +# [browsing](https://modal.com/storage): +vol_name = "example-embedding-data" + +# This is the location within the container where this Volume will be mounted: +vol_mnt = Path("/data") + +# Finally, the Volume object can be created: +data_volume = modal.Volume.from_name(vol_name, create_if_missing=True) + +# ### Define the image +infinity_image = ( + modal.Image.debian_slim(python_version="3.10") + .pip_install( + [ + "pillow", # for Infinity input typehint + "datasets", # for huggingface data download + "hf_transfer", # for fast huggingface data download + "tqdm", # progress bar for dataset download + "infinity_emb[all]==0.0.76", # for Infinity inference lib + "sentencepiece", # for this particular chosen model + "torchvision", # for fast image loading + ] + ) + .env( + { + # For fast HuggingFace model and data caching and download in our Volume + "HF_HOME": vol_mnt.as_posix(), + "HF_HUB_ENABLE_HF_TRANSFER": "1", + } + ) +) + +# Initialize the app +app = modal.App( + "example-infinity-embedder", + image=infinity_image, + volumes={vol_mnt: data_volume}, + secrets=[hf_secret], +) + +# Imports inside the container +with infinity_image.imports(): + from infinity_emb import AsyncEmbeddingEngine, EngineArgs + from infinity_emb.primitives import Dtype, InferenceEngine + from PIL.Image import Image + from torchvision.io import read_image + from torchvision.transforms.functional import to_pil_image + +# ## Dataset Setup +# We use a [Modal Volume](https://modal.com/docs/guide/volumes#volumes "Modal.Volume") +# to store the images we want to encode. For your usecase, can simply replace the +# function `catalog_jpegs` with any function that returns a list of image paths. Just make +# sure that it's returning the _paths_: we are going to +# [map](https://modal.com/docs/reference/modal.Function#map) these inputs between containers +# so that the inference class can simply read them directly from the Volume. If you are +# shipping the images themselves across the wire, that will likely bottleneck throughput. + +# Note that Modal Volumes are optimized for datasets on the order of 50,000 - 500,000 +# files and directories. If you have a larger dataset, you may need to consider other storage +# options such as a [CloudBucketMount](https://modal.com/docs/examples/rosettafold). + +# A note on preprocessing: Infinity will handle resizing and other preprocessing in case +# your images are not the same size as what the model is expecting; however, this will +# significantly degrade throughput. We recommend batch-processing (if possible). + + +@app.function( + image=infinity_image, + volumes={vol_mnt: data_volume}, + max_containers=1, # We only want one container to handle volume setup + cpu=4, # HuggingFace will use multi-process parallelism to download + timeout=24 * 60 * 60, # if using a large HF dataset, this may need to be longer +) +def catalog_jpegs( + dataset_namespace: str, # a HuggingFace path like `microsoft/cats_vs_dogs` + cache_dir: str, # a subdir where the JPEGs will be extracted into the volume long-form + image_cap: int, # hard cap on the number of images to be processed (e.g. for timing, debugging) + model_input_shape: tuple[int, int, int], # JPEGs will be preprocessed to this shape + threads_per_cpu: int = 4, # threads per CPU for I/O oversubscription +) -> tuple[ + list[os.PathLike], # the function returns a list of paths, + float, # and the time it took to prepare +]: + """ + This function checks the volume for JPEGs and, if needed, calls `download_to_volume` + which pulls a HuggingFace dataset into the mounted volume, preprocessing along the way. + """ + + def download_to_volume(dataset_namespace: str, cache_dir: str): + """ + This function: + (1) caches a HuggingFace dataset to the path specified in your `HF_HOME` environment + variable, which is pointed to a Modal Volume during creation of the image above. + (2) unpacks the dataset and preprocesses them; this could be done in several different + ways, but we want to do it all once upfront so as not to confound the timing tests later. + """ + from datasets import load_dataset + from torchvision.io import write_jpeg + from torchvision.transforms import Compose, PILToTensor, Resize + from tqdm import tqdm + + # Load dataset cache to HF_HOME + ds = load_dataset( + dataset_namespace, + split="train", + num_proc=os.cpu_count(), # this will be capped by huggingface based on the number of shards + ) + + # Create an `extraction` cache dir where we will create explicit JPEGs + mounted_cache_dir = vol_mnt / cache_dir + mounted_cache_dir.mkdir(exist_ok=True, parents=True) + + # Preprocessing pipeline: resize in bulk now instead of on-the-fly later + preprocessor = Compose( + [ + Resize(model_input_shape), + PILToTensor(), + ] + ) + + def preprocess_img(idx, example): + """ + Applies preprocessor and write as jpeg with TurboJPEG (via TorchVision). + """ + # Define output path + write_path = mounted_cache_dir / f"img{idx:07d}.jpg" + # Skip if already done + if write_path.is_file(): + return + + # Process + preprocessed = preprocessor(example["image"].convert("RGB")) + + # Write to modal.Volume + write_jpeg(preprocessed, write_path) + + # Note: the optimization of this loop really depends on your preprocessing stack. + # You could use ProcessPool if there is significant work per image, or even + # GPU acceleration and batch preprocessing. We keep it simple here for the example. + futures = [] + with ThreadPoolExecutor(max_workers=os.cpu_count * threads_per_cpu) as executor: + for idx, ex in enumerate(ds): + if image_cap > 0 and idx >= image_cap: + break + futures.append(executor.submit(preprocess_img, idx, ex)) + + # Progress bar over completed futures + for _ in tqdm( + as_completed(futures), total=len(futures), desc="Caching images" + ): + pass # result() is implicitly called by as_completed() + + # Save changes + data_volume.commit() + + ds_preptime_st = perf_counter() + + def list_all_jpegs(subdir: os.PathLike = "/") -> list[os.PathLike]: + """ + Searches a subdir within your volume for all JPEGs. + """ + return [ + x.path + for x in data_volume.listdir(subdir.as_posix()) + if x.path.endswith(".jpg") + ] + + # Check for extracted-JPEG cache dir within the modal.Volume + if (vol_mnt / cache_dir).is_dir(): + im_path_list = list_all_jpegs(cache_dir) + n_ims = len(im_path_list) + else: + n_ims = 0 + print("The cache dir was not found...") + + # If needed, download dataset to a modal.Volume + if (n_ims < image_cap) or (n_ims == 0): + print(f"Found {n_ims} JPEGs; checking for more on HuggingFace.") + download_to_volume(dataset_namespace, cache_dir) + # Try again + im_path_list = list_all_jpegs(cache_dir) + n_ims = len(im_path_list) + + # [optional] Cap the number of images to process + print(f"Found {n_ims} JPEGs in the Volume.", end="") + if image_cap > 0: + im_path_list = im_path_list[: min(image_cap, len(im_path_list))] + print(f"using {len(im_path_list)}.") + + # Time it + ds_time_elapsed = perf_counter() - ds_preptime_st + return im_path_list, ds_time_elapsed + + +def chunked(seq: list[os.PathLike], subseq_size: int) -> Iterator[list[os.PathLike]]: + """ + Helper function that chunks a sequence into subsequences of length `subseq_size`. + """ + for i in range(0, len(seq), subseq_size): + yield seq[i : i + subseq_size] + + +# ## Inference app +# Here we define a [modal.cls](https://modal.com/docs/reference/modal.Cls#modalcls) +# that wraps [Infinity's AsyncEmbeddingEngine](https://github.com/michaelfeil/infinity "github/michaelfeil/infinity"). +# Some important observations: +# 1. Infinity handles asynchronous queuing internally. This is actually redundant with Modal's +# concurrency feature, but we found that using them together still helps. +# In [another example](https://modal.com/docs/examples/image_embedding_th_compile), +# we show how to achieve a similar setup without Infinity. +# 2. The variable `allow_concurrent_inputs` passed to the `main` local_entrypoint is +# used to set both the number of concurrent inputs (via with_options) and the class variable +# `n_engines` (via modal.parameters). If you aren't using `with_options` you can use the +# [modal.concurrent](https://modal.com/docs/guide/concurrent-inputs#input-concurrency) +# decorator directly. +# 3. In `init_engines`, we are creating exactly one Infinity inference +# engine for each concurrently-passed batch of data. This is a high-level version of GPU packing suitable +# for use with a high-level inference engine like Infinity. +# 4. The [@modal.enter](https://modal.com/docs/reference/modal.enter#modalenter) +# decorator ensures that this method is called once per container, on startup (and `exit` is +# run once, on shutdown). + + +container_config = { + "min_containers": 2, + # "max_containers": 2, + # "buffer_containers": 2, +} + + +@app.cls( + image=infinity_image, + volumes={vol_mnt: data_volume}, + cpu=4, + memory=5 * 1024, # MB -> GB + gpu="A10G", + allow_concurrent_inputs=2, + **container_config, +) +class InfinityEngine: + model_name: str = modal.parameter(default="openai/clip-vit-base-patch16") + batch_size: int = modal.parameter(default=100) + n_engines: int = modal.parameter(default=1) + threads_per_core: int = modal.parameter(default=8) + verbose_inference: bool = modal.parameter(default=False) + # For logging + name: str = "InfinityEngine" + + @modal.enter() + async def init_engines(self): + """ + On container start, starts `self.n_engines` copies of the selected model + and puts them in an async queue. + """ + print(f"Loading {self.n_engines} models... ", end="") + self.engine_queue: asyncio.Queue[AsyncEmbeddingEngine] = asyncio.Queue() + start = perf_counter() + for _ in range(self.n_engines): + engine = AsyncEmbeddingEngine.from_args( + EngineArgs( + model_name_or_path=self.model_name, + batch_size=self.batch_size, + model_warmup=False, + engine=InferenceEngine.torch, + dtype=Dtype.float16, + device="cuda", + ) + ) + await engine.astart() + await self.engine_queue.put(engine) + print(f"Took {perf_counter() - start:.4}s.") + + def read_batch(self, im_path_list: list[os.PathLike]) -> list["Image"]: + """ + Read a batch of data. Infinity is expecting PIL.Image.Image type + inputs, but it's faster to read from disk with torchvision's `read_image` + and convert to PIL than it is to read directly with PIL. + + This process is parallelized over the batch with multithreaded data reading. + """ + + def readim(impath: os.PathLike): + """Read with torch, convert back to PIL for Infinity""" + return to_pil_image(read_image(str(vol_mnt / impath))) + + with ThreadPoolExecutor( + max_workers=os.cpu_count() * self.threads_per_core + ) as executor: + images = list(executor.map(readim, im_path_list)) + + return images + + @modal.method() + async def embed(self, images: list[os.PathLike]) -> tuple[float, float]: + """ + This is the workhorse function. We select a model from the queue, prepare + a batch, execute inference, and return the time elapsed. + + NOTE: we throw away the embeddings here; you probably want to return + them or save them directly to a modal.Volume. + """ + # (0) Grab an engine from the queue + engine = await self.engine_queue.get() + + try: + # (1) Load batch of image data + st = perf_counter() + images = self.read_batch(images) + batch_elapsed = perf_counter() - st + + # (2) Encode the batch + st = perf_counter() + # Infinity Engine is async + embedding, _ = await engine.image_embed(images=images) + embed_elapsed = perf_counter() - st + finally: + # No matter what happens, return the engine to the queue + await self.engine_queue.put(engine) + + # (3) Housekeeping + if self.verbose_inference: + print(f"Time to load batch: {batch_elapsed:.2f}s") + print(f"Time to embed batch: {embed_elapsed:.2f}s") + + # (4) You may wish to return the embeddings themselves here + return embed_elapsed, len(images) + + @modal.exit() + async def exit(self) -> None: + """ + Shut down each of the engines. + """ + for _ in range(self.n_engines): + engine = await self.engine_queue.get() + await engine.astop() + + +# ## Local Entrypoint +# This is the backbone of the example: it parses inputs, grabs a list of data, instantiates +# the InfinityEngine embedder application, and passes data to it via `map`. +# +# Inputs: +# * `gpu` is a string specifying the GPU to be used. +# * `max_containers` caps the number of containers allowed to spin-up. Note that this cannot +# be used with `buffer_containers`: *if you want to use this, set* `buffer_containers=None` *above!* +# * `allow_concurrent_inputs` sets the [@modal.concurrent(max_inputs:int) ](https://modal.com/docs/guide/concurrent-inputs#input-concurrency "Modal: input concurrency") +# argument for the inference app via the +# [modal.cls.with_options](https://modal.com/docs/reference/modal.Cls#with_options) API. +# This takes advantage of the asynchronous nature of the Infinity embedding inference app. +# * `threads_per_core` oversubscription factor for parallelized I/O (image reading). +# * `batch_size` is a parameter passed to the [Infinity inference engine](https://github.com/michaelfeil/infinity "github/michaelfeil/infinity"), +# and it means the usual thing for machine learning inference: a group of images are processed +# through the neural network together. +# * `model_name` a HuggingFace model path a la [openai/clip-vit-base-patch16]([OpenAI model](https://huggingface.co/openai/clip-vit-base-patch16 "OpenAI ViT")); +# Infinity will automatically load it and prepare it for asynchronous serving. +# * `image_cap` caps the number of images used in this example (e.g. for debugging/testing) +# * `hf_dataset_name` a HuggingFace data path a la "microsoft/cats_vs_dogs" +# * `log_file` (optional) points to a local path where a CSV of times will be logged +# +# These three parameters are used to pre-process images to the correct size in a big batch +# before inference. However, if you have the wrong numbers or aren't sure, Infinity will +# automatically handle resizing (at a cost to throughput). +# * `im_chan`: the number of color channels your model is expecting (probably 3) +# * `im_height`: the number of pixels tall your model is expecting the images to be +# * `im_width`: the number of color channels your model is expecting (probably 3) +# +@app.local_entrypoint() +def main( + # with_options parameters: + gpu: str = "A10G", + min_containers: int = 1, + max_containers: int = 50, # warning: this gets overridden if buffer_containers not None + allow_concurrent_inputs: int = 2, + # modal.parameters: + n_models: int = None, # defaults to match `allow_concurrent_parameters` + model_name: str = "openai/clip-vit-base-patch16", + batch_size: int = 100, + im_chan: int = 3, + im_height: int = 224, + im_width: int = 224, + # data + image_cap: int = -1, + hf_dataset_name: str = "microsoft/cats_vs_dogs", + million_image_test: bool = False, + # logging (optional) + log_file: str = None, # TODO: remove local logging from example +): + start_time = perf_counter() + + # (0) Catalog data: modify `catalog_jpegs` to fetch batches of your data paths. + extracted_path = Path("extracted") / hf_dataset_name + im_path_list, vol_setup_time = catalog_jpegs.remote( + dataset_namespace=hf_dataset_name, + cache_dir=extracted_path, + image_cap=image_cap, + model_input_shape=(im_chan, im_height, im_width), + ) + print(f"Took {vol_setup_time:.2f}s to setup volume.") + if million_image_test: + print("WARNING: `million_image_test` FLAG RECEIVED! RESETTING BSZ ETC!") + mil = int(1e6) + while len(im_path_list) < mil: + im_path_list += im_path_list + im_path_list = im_path_list[:mil] + n_ims = len(im_path_list) + + # (1) Init the model inference app + # No inputs to with_options if none provided or buffer_used aboe + + # Build the engine + start_time = perf_counter() + embedder = InfinityEngine() + # (2) Embed batches via remote `map` call + times, batchsizes = [], [] + for time, batchsize in embedder.embed.map(chunked(im_path_list, batch_size)): + times.append(time) + batchsizes.append(batchsize) + + # (3) Log + if n_ims > 0: + total_duration = perf_counter() - start_time + total_throughput = n_ims / total_duration + embed_throughputs = [ + batchsize / time for batchsize, time in zip(batchsizes, times) + ] + avg_throughput = sum(embed_throughputs) / len(embed_throughputs) + + log_msg = ( + f"{embedder.name}{gpu}::batch_size={batch_size}::" + f"n_ims={n_ims}::concurrency={allow_concurrent_inputs}::" + f"\tTotal time:\t{total_duration / 60:.2f} min\n" + f"\tOverall throughput:\t{total_throughput:.2f} im/s\n" + f"\tSingle-model throughput (avg):\t{avg_throughput:.2f} im/s\n" + ) + + print(log_msg) + + if log_file is not None: + local_logfile = Path(log_file).expanduser() + local_logfile.parent.mkdir(parents=True, exist_ok=True) + + import csv + + csv_exists = local_logfile.exists() + with open(local_logfile, "a", newline="", encoding="utf-8") as f: + writer = csv.writer(f) + if not csv_exists: + # write header + writer.writerow( + [ + "batch_size", + "concurrency", + "max_containers", + "gpu", + "n_images", + "total_time", + "total_throughput", + "avg_model_throughput", + ] + ) + # write your row + writer.writerow( + [ + batch_size, + allow_concurrent_inputs, + max_containers, + gpu, + n_ims, + total_duration, + total_throughput, + avg_throughput, + ] + ) diff --git a/06_gpu_and_ml/embeddings/triton_wire.py b/06_gpu_and_ml/embeddings/triton_wire.py new file mode 100644 index 000000000..8be7d8bf6 --- /dev/null +++ b/06_gpu_and_ml/embeddings/triton_wire.py @@ -0,0 +1,782 @@ +#!/usr/bin/env python3 + +# # Maximizing throughput on Triton Inference Server + + +# ## Local env imports +# # Import everything we need for the locally-run Python (everything in our local_entrypoint function at the bottom). +import asyncio +import csv +import os +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path +from time import perf_counter +from typing import Iterator, List, Sequence, Tuple + +import modal + +# ────────────────────────────── Constants ────────────────────────────── +HF_SECRET = modal.Secret.from_name("huggingface-secret") +VOL_NAME = "example-embedding-data" +VOL_MNT = Path("/data") +data_volume = modal.Volume.from_name(VOL_NAME, create_if_missing=True) +MODEL_REPO = VOL_MNT / "triton_repo" # will hold model.plan + config + + +IN_NAME, IN_PATH = "clip_input", "/clip_input" +OUT_NAME, OUT_PATH = "clip_output", "/clip_output" +DTYPE = "FP16" + + +# image with Triton + torch + tritonclient (tiny helper) +TRITON_IMAGE = ( + modal.Image.from_registry( + "nvcr.io/nvidia/tritonserver:24.03-py3", add_python="3.10" + ) + .pip_install("uv") + .run_commands( + "uv pip install --system --no-cache-dir torch torchvision torchaudio " + "--index-url https://download.pytorch.org/whl/cu121" + ) + .run_commands( + "uv pip install --system --no-cache-dir transformers pillow tritonclient[all] " + "tqdm hf_transfer tensorrt onnx " + ) + .run_commands("uv pip install --system pynvml") + .env( + { + "HF_HOME": VOL_MNT.as_posix(), + "HF_HUB_ENABLE_HF_TRANSFER": "1", + # Tell Triton where the repo will be mounted + "MODEL_REPO": MODEL_REPO.as_posix(), + } + ) + .entrypoint([]) +) + +app = modal.App( + "clip-triton-embed", + image=TRITON_IMAGE, + volumes={VOL_MNT: data_volume}, + secrets=[HF_SECRET], +) + +with TRITON_IMAGE.imports(): + import numpy as np + import torch # noqa: F401 – for torchscript + import torchvision + import tritonclient.grpc as grpcclient + from torchvision.io import read_image + from tqdm import tqdm + from transformers import CLIPImageProcessorFast, CLIPVisionModel + from tritonclient.utils import shared_memory as shm + + +# ## Dataset Setup +# We use a [Modal Volume](https://modal.com/docs/guide/volumes#volumes "Modal.Volume") +# to store the images we want to encode. For your usecase, can simply replace the +# function `catalog_jpegs` with any function that returns a list of image paths. Just make +# sure that it's returning the _paths_: we are going to +# [map](https://modal.com/docs/reference/modal.Function#map) these inputs between containers +# so that the inference class can simply read them directly from the Volume. If you are +# shipping the images themselves across the wire, that will likely bottleneck throughput. + +# Note that Modal Volumes are optimized for datasets on the order of 50,000 - 500,000 +# files and directories. If you have a larger dataset, you may need to consider other storage +# options such as a [CloudBucketMount](https://modal.com/docs/examples/rosettafold). + + +@app.function( + image=TRITON_IMAGE, + volumes={VOL_MNT: data_volume}, + max_containers=1, # We only want one container to handle volume setup + cpu=4, # HuggingFace will use multi-process parallelism to download + timeout=10 * 60, # if using a large HF dataset, this may need to be longer +) +def catalog_jpegs( + dataset_namespace: str, # a HuggingFace path like `microsoft/cats_vs_dogs` + cache_dir: str, # a subdir where the JPEGs will be extracted into the volume long-form + image_cap: int, # hard cap on the number of images to be processed (e.g. for timing, debugging) + model_input_shape: tuple[int, int, int], # JPEGs will be preprocessed to this shape + threads_per_core: int = 8, # threads per CPU for I/O oversubscription + n_million_image_test: float = None, +) -> tuple[ + list[os.PathLike], # the function returns a list of paths, + float, # and the time it took to prepare +]: + """ + This function checks the volume for JPEGs and, if needed, calls `download_to_volume` + which pulls a HuggingFace dataset into the mounted volume, preprocessing along the way. + """ + + def download_to_volume(dataset_namespace: str, cache_dir: str): + """ + This function: + (1) caches a HuggingFace dataset to the path specified in your `HF_HOME` environment + variable, which is pointed to a Modal Volume during creation of the image above. + (2) unpacks the dataset and preprocesses them; this could be done in several different + ways, but we want to do it all once upfront so as not to confound the timing tests later. + """ + from datasets import load_dataset + from torchvision.io import write_jpeg + from torchvision.transforms import Compose, PILToTensor, Resize + from tqdm import tqdm + + # Load dataset cache to HF_HOME + ds = load_dataset( + dataset_namespace, + split="train", + num_proc=os.cpu_count(), # this will be capped by huggingface based on the number of shards + ) + + # Create an `extraction` cache dir where we will create explicit JPEGs + mounted_cache_dir = VOL_MNT / cache_dir + mounted_cache_dir.mkdir(exist_ok=True, parents=True) + + # Preprocessing pipeline: resize in bulk now instead of on-the-fly later + preprocessor = Compose( + [ + Resize(model_input_shape), + PILToTensor(), + ] + ) + + def preprocess_img(idx, example): + """ + Applies preprocessor and write as jpeg with TurboJPEG (via TorchVision). + """ + # Define output path + write_path = mounted_cache_dir / f"img{idx:07d}.jpg" + # Skip if already done + if write_path.is_file(): + return + + # Process + preprocessed = preprocessor(example["image"].convert("RGB")) + + # Write to modal.Volume + write_jpeg(preprocessed, write_path) + + # Note: the optimization of this loop really depends on your preprocessing stack. + # You could use ProcessPool if there is significant work per image, or even + # GPU acceleration and batch preprocessing. We keep it simple here for the example. + futures = [] + with ThreadPoolExecutor( + max_workers=os.cpu_count * threads_per_core + ) as executor: + for idx, ex in enumerate(ds): + if image_cap > 0 and idx >= image_cap: + break + futures.append(executor.submit(preprocess_img, idx, ex)) + + # Progress bar over completed futures + for _ in tqdm( + as_completed(futures), total=len(futures), desc="Caching images" + ): + pass # result() is implicitly called by as_completed() + + # Save changes + data_volume.commit() + + ds_preptime_st = perf_counter() + + def list_all_jpegs(subdir: os.PathLike = "/") -> list[os.PathLike]: + """ + Searches a subdir within your volume for all JPEGs. + """ + return [ + x.path + for x in data_volume.listdir(subdir.as_posix()) + if x.path.endswith(".jpg") + ] + + # Check for extracted-JPEG cache dir within the modal.Volume + if (VOL_MNT / cache_dir).is_dir(): + im_path_list = list_all_jpegs(cache_dir) + n_ims = len(im_path_list) + else: + n_ims = 0 + print("The cache dir was not found...") + + # If needed, download dataset to a modal.Volume + if (n_ims < image_cap) or (n_ims == 0): + print(f"Found {n_ims} JPEGs; checking for more on HuggingFace.") + download_to_volume(dataset_namespace, cache_dir) + # Try again + im_path_list = list_all_jpegs(cache_dir) + n_ims = len(im_path_list) + + # [optional] Cap the number of images to process + print(f"Found {n_ims} JPEGs in the Volume.", end="") + if image_cap > 0: + im_path_list = im_path_list[: min(image_cap, len(im_path_list))] + + print(f"Took {perf_counter() - ds_preptime_st:.2f}s to setup volume.") + if n_million_image_test > 0: + print(f"WARNING: `{n_million_image_test} million_image_test` FLAG RECEIVED!") + mil = int(n_million_image_test * 1e6) + while len(im_path_list) < mil: + im_path_list += im_path_list + im_path_list = im_path_list[:mil] + + return im_path_list + + +def chunked(seq: list[os.PathLike], subseq_size: int) -> Iterator[list[os.PathLike]]: + """ + Helper function that chunks a sequence into subsequences of length `subseq_size`. + """ + for i in range(0, len(seq), subseq_size): + yield seq[i : i + subseq_size] + + +# ────────────────────── Triton Server wrapper class ───────────────────── +@app.cls( + image=TRITON_IMAGE, + volumes={VOL_MNT: data_volume}, + cpu=4, + memory=2.5 * 1024, # MB -> GB + buffer_containers=20, +) +class TritonServer: + model_name: str = modal.parameter() + batch_size: int = modal.parameter(default=1) + n_engines: int = modal.parameter(default=1) + triton_backend: str = modal.parameter(default="tensorrt") + + model_input_chan: int = modal.parameter(default=3) + model_input_imheight: int = modal.parameter(default=224) + model_input_imwidth: int = modal.parameter(default=224) + output_dim: int = modal.parameter(default=768) + + force_rebuild: bool = modal.parameter(default=False) + name: str = "Triton" + + def set_names( + self, + ): + """ + Turn 'openai/clip-vit-base-patch16' → 'openai_clip-vit-base-patch16' + (slashes, spaces and dots are not allowed in model dir names) + """ + safe_name = ( + self.model_name.replace("/", "_").replace(" ", "_").replace(".", "_") + ) + self.triton_model_name = f"{safe_name}_cc{self.n_engines}_bsz{self.batch_size}" + self.in_shape = ( + self.batch_size, + self.model_input_chan, + self.model_input_imheight, + self.model_input_imwidth, + ) + + @modal.enter() + async def _start_triton(self): + self.set_names() + self.build_triton_repo() + + import subprocess + import time + + import tritonclient.http as http + + self._client = http.InferenceServerClient(url="localhost:8000") + # start triton in background + self._proc = subprocess.Popen( + [ + "tritonserver", + f"--model-repository={MODEL_REPO}", + "--exit-on-error=true", + "--model-control-mode=none", # autoload + *self.gpu_pool_flags(), + ] + ) + + # Load + if "--model-control-mode=explicit" in self._proc.args: + self._client.load_model(self.triton_model_name) # ← added line + + # Heartbeat + self._client = grpcclient.InferenceServerClient(url="localhost:8001") + + minutes_wait = 2 + check_rate_hz = 2 + n_iter = minutes_wait * 60 * check_rate_hz + for idx in tqdm( + range(n_iter), total=n_iter, desc="Waiting for server hearbeat" + ): + try: + if self._client.is_model_ready(self.triton_model_name): + break + except Exception: + pass + time.sleep(1 / check_rate_hz) + if (idx / check_rate_hz) == int(idx / check_rate_hz): + print(".", end="") + else: + raise RuntimeError("Triton failed to become ready") + + self.executor = ThreadPoolExecutor( + max_workers=os.cpu_count() * 8, + thread_name_prefix="img-io", + ) + + def gpu_pool_flags(self, headroom_pct: float = 0.10): + """ + Return CLI flag strings that give Triton ~90 % of every visible GPU + and ¼ of that amount for pinned host memory. + """ + import pynvml + + pynvml.nvmlInit() + flags = [] + total_pool = 0 + + for idx in range(pynvml.nvmlDeviceGetCount()): + h = pynvml.nvmlDeviceGetHandleByIndex(idx) + total = pynvml.nvmlDeviceGetMemoryInfo(h).total # bytes + pool = int(total * (1 - headroom_pct)) + flags.append(f"--cuda-memory-pool-byte-size={idx}:{pool}") + total_pool = max(total_pool, pool) # use biggest for pin + flags.append(f"--pinned-memory-pool-byte-size={total_pool // 4}") + return flags + + def build_triton_repo( + self, + version: str = "1", + fp16: bool = True, + ): + """ + Build a Triton-ready repo for CLIP vision encoder. + + Parameters + ---------- + model_name : str HuggingFace model id + version : str Triton version directory + fp16 : bool Trace / build in FP16 mode + engine : str 'pytorch' → TorchScript + PyTorch backend + 'tensorrt' → TensorRT engine + TensorRT backend + """ + import subprocess, torch, json, os + from pathlib import Path + from textwrap import dedent + from torchvision.io import read_image + from torch import jit + from torch.onnx import export as onnx_export + + repo_dir = Path(MODEL_REPO) / self.triton_model_name / version + repo_dir.mkdir(parents=True, exist_ok=True) + + # ------------------------------------------------------------------ # + # 0. short-circuit if artifacts & config already exist + artifact = repo_dir / ( + "model.pt" if self.triton_backend == "pytorch" else "model.plan" + ) + cfg_file = Path(MODEL_REPO) / self.triton_model_name / "config.pbtxt" + if artifact.exists() and cfg_file.exists() and (not self.force_rebuild): + print("Model repo already complete – skip build.") + return + + # ------------------------------------------------------------------ # + print("Building torch model...", end="") + st = perf_counter() + + # 1. Build Torch module (used for *both* backends) + class ClipEmbedder(torch.nn.Module): + def __init__(self, hf_name: str, fp16: bool): + super().__init__() + self.clip = CLIPVisionModel.from_pretrained(hf_name) + if fp16: + self.clip.half() + self.clip.eval() + + @torch.no_grad() + def forward(self, pixels: torch.Tensor): + return self.clip(pixel_values=pixels).pooler_output + + model = ClipEmbedder(self.model_name, fp16).eval().cuda() + example = torch.randn( + self.in_shape, + device="cuda", + dtype=torch.float16 if fp16 else torch.float32, + ) + print(f"took {perf_counter() - st:.2E}s") + # ------------------------------------------------------------------ # + # 2. Write backend-specific artifact + if self.triton_backend == "pytorch": + print("doing torch trace...", end="") + st = perf_counter() + traced = torch.jit.trace(model, example, strict=False).cpu() + # rename io so we have input0 / output0 + graph = traced.inlined_graph + g_inputs, g_outputs = list(graph.inputs()), list(graph.outputs()) + g_inputs[0].setDebugName("input0") + g_outputs[0].setDebugName("output0") + traced.save(artifact) + # Free GPU memory + del model, traced + torch.cuda.empty_cache() + print(f"took {perf_counter() - st:.2E}s") + + elif self.triton_backend == "tensorrt": + onnx_path = repo_dir / "model.onnx" + print("Exporting ONXX... ", end="") + st = perf_counter() + onnx_export( + model.cpu(), # ONNX must be on CPU + example.cpu(), + onnx_path, + input_names=["input0"], + output_names=["output0"], + dynamic_axes={"input0": {0: "batch"}, "output0": {0: "batch"}}, + opset_version=17, + ) + print(f"took {perf_counter() - st:.2E}s") + + size_str = "x".join(self.in_shape) + print(f"SIZESTR=={size_str}") + + print("Running:\n\t", " ".join(cmd)) + st = perf_counter() + plan_path = repo_dir / "model.plan" + # --fp16 flag assumes GPU supports it; change to --fp32 if not + cmd = [ + "/usr/src/tensorrt/bin/trtexec", + f"--onnx={onnx_path}", + f"--saveEngine={plan_path}", + "--fp16" if fp16 else "--fp32", + f"--minShapes=input0:1x{'x'.join(self.in_shapes[1:])}", + f"--optShapes=input0:{size_str}", + f"--maxShapes=input0:{size_str}", + "--workspace=4096", + "--verbose", + ] + subprocess.run(cmd, check=True) + print(f"\n\t.....->took {perf_counter() - st:.2E}s") + + else: + raise ValueError( + f"Triton backend `{self.triton_backend}` not" + "recognized; try `pytorch` or `tensorrt`" + ) + # ------------------------------------------------------------------ # + # 3. Generate config.pbtxt + dtype = "TYPE_FP16" if fp16 else "TYPE_FP32" + cfg_text = self.make_config( + name=self.triton_model_name, + dtype=dtype, + output_dim=self.output_dim, + instances=self.n_engines, + ) + cfg_file.write_text(cfg_text) + + data_volume.commit() # persist for future containers + print(f"✓ wrote {artifact.name} + config for backend='{self.triton_backend}'") + + def make_config( + self, + name: str, + dtype: str, + output_dim: int, + instances: int = 1, + ) -> str: + """Return a minimal, left-aligned Triton config.pbtxt.""" + from textwrap import dedent + + # Config basics: choose a backend + cfg = f"""\ + name: "{name}" + backend: "{self.triton_backend}" + max_batch_size: {self.batch_size} + """ + # Set inputs/outputs info + cfg += f"""\ + input [ + {{ + name: "input0" + data_type: {dtype} + dims: [ {", ".join(map(str, self.in_shape[1:]))} ] + }} + ] + + output [ + {{ + name: "output0" + data_type: {dtype} + dims: [ {output_dim} ] + }} + ] + """ + # Multi-model concurrency within a single (each) GPU + cfg += f""" + instance_group [ + {{ kind: KIND_GPU, count: {instances} }} + ] + + """ + + cfg += f""" + optimization {{ execution_accelerators {{ + gpu_execution_accelerator : [ {{ + name : "{self.triton_backend}" + parameters {{ key: "precision_mode" value: "{dtype}" }} + parameters {{ key: "max_workspace_size_bytes" value: "1073741824" }} + }}] + }}}} + """ + return dedent(cfg) + + @staticmethod + def readim(impath: os.PathLike): + """ + Prepends this container's volume mount location to the image path. + """ + return read_image(str(VOL_MNT / impath)) + + # -------------------------------------------------------------------------- + def _ensure_region(self, name, path, byte_size): + """Create or grow a system-SHM block and remember its handle.""" + + if not hasattr(self, "_shm"): + self._shm = {} + + # first time + if name not in self._shm: + self._shm[name] = shm.create_shared_memory_region(name, path, byte_size) + self._client.register_system_shared_memory(name, path, byte_size) + return + + # # need more space + # cur = shm.get_shared_memory_byte_size(self._shm[name]) + # if byte_size > cur: + # self._client.unregister_system_shared_memory(name, path) + # shm.destroy_shared_memory_region(path) + # self._shm[name] = shm.create_shared_memory_region(name, path, byte_size) + # self._client.register_system_shared_memory(name, path, byte_size) + + # -------------------------------------------------------------------------- + def _load_batch(self, img_paths): + batch = ( + torch.stack(list(self.executor.map(self.readim, img_paths))).to( + torch.float16 + ) + / 255 + ).numpy() + # input SHM + self._ensure_region(IN_NAME, IN_PATH, batch.nbytes) + shm.set_shared_memory_region(self._shm[IN_NAME], [batch]) + + # output SHM (same batch-size) + out_bytes = batch.shape[0] * self.output_dim * batch.dtype.itemsize + self._ensure_region(OUT_NAME, OUT_PATH, out_bytes) + + return batch.shape, batch.nbytes, out_bytes + + # -------------------------------------------------------------------------- + @modal.method() + async def embed(self, imgs: list[os.PathLike]) -> tuple[float, int]: + t0 = perf_counter() + in_shape, in_bytes, out_bytes = self._load_batch(imgs) + t_prep = perf_counter() - t0 + + # Build request + inp = grpcclient.InferInput("input0", in_shape, DTYPE) + inp.set_shared_memory(IN_NAME, in_bytes) + + out = grpcclient.InferRequestedOutput("output0") + out.set_shared_memory(OUT_NAME, out_bytes) + + # Inference + t1 = perf_counter() + self._client.infer(self.triton_model_name, [inp], outputs=[out]) + t_inf = perf_counter() - t1 + + # # (If you need the vectors:) + # vecs = shm.get_contents_as_numpy( + # self._shm[OUT_NAME], (in_shape[0], self.output_dim), DTYPE + # ) + + print(f"\tBatchCreate={t_prep * 1e3:.1f} ms\n\tInference={t_inf * 1e3:.1f} ms") + return t_prep, t_inf, len(imgs) + + @modal.exit() + def _cleanup(self): + if hasattr(self, "_proc"): + self._proc.terminate() + + +@app.function(image=TRITON_IMAGE, volumes={VOL_MNT: data_volume}) +def destroy_triton_cache(): + """ + For timing purposes: deletes torch compile cache dir. + """ + import shutil + + if MODEL_REPO.exists(): + num_files = sum(1 for f in MODEL_REPO.rglob("*") if f.is_file()) + + print( + "\t*** DESTROYING model cache! You sure you wanna do that?! " + f"({num_files} files)" + ) + shutil.rmtree(MODEL_REPO.as_posix()) + else: + print(f"\t***destroy_cache was called, but path doesnt exist:\n\t{MODEL_REPO}") + return + + +# ───────────────────────────── Local entrypoint ───────────────────────── +# +@app.local_entrypoint() +def main( + # with_options parameters: + gpu: str = "A10G", + min_containers: int = 1, + max_containers: int = 50, # this gets overridden if buffer_containers is not None + input_concurrency: int = 2, + # modal.parameters: + n_models: int = None, # defaults to match `allow_concurrent_parameters` + model_name: str = "openai/clip-vit-base-patch16", + batch_size: int = 512, + im_chan: int = 3, + im_height: int = 224, + im_width: int = 224, + # data + image_cap: int = -1, + hf_dataset_name: str = "microsoft/cats_vs_dogs", + n_million_image_test: float = 0, + # triton cache + destroy_cache: bool = False, + # logging (optional) + log_file: str = "/home/ec2-user/modal-examples/06_gpu_and_ml/embeddings/_triton.csv", + triton_backend: str = "pytorch", + n_gpu: int = 1, + force_rebuild: bool = False, +): + start_time = perf_counter() + + # (0.a) Catalog data: modify `catalog_jpegs` to fetch batches of your data paths. + extracted_path = Path("extracted") / hf_dataset_name + im_path_list = catalog_jpegs.remote( + dataset_namespace=hf_dataset_name, + cache_dir=extracted_path, + image_cap=image_cap, + model_input_shape=(im_chan, im_height, im_width), + n_million_image_test=n_million_image_test, + ) + print(f"Embedding {len(im_path_list)} images at batchsize {batch_size}.") + + n_ims = len(im_path_list) + + # (0.b) This destroys cache for timing purposes - you probably don't want to do this! + + if destroy_cache: + destroy_triton_cache.remote() + + # (1.a) Init the model inference app + # No inputs to with_options if none provided or buffer_used aboe + buffer_containers = None + make_empty = (buffer_containers is not None) or (max_containers is None) + # Build the engine + start_time = perf_counter() + + embedder = TritonServer.with_concurrency( + max_inputs=input_concurrency, + ).with_options( + gpu=f"{gpu}:{n_gpu}", + max_containers=max_containers, + )( + batch_size=batch_size, + n_engines=input_concurrency, + triton_backend=triton_backend, + model_name=model_name, + model_input_chan=im_chan, + model_input_imheight=im_height, + model_input_imwidth=im_width, + force_rebuild=force_rebuild, + ) + + # (2) Embed batches via remote `map` call + preptimes, inftimes, batchsizes = [], [], [] + # embedder.embed.spawn_map(chunked(im_path_list, batch_size)) + for preptime, inftime, batchsize in embedder.embed.map( + chunked(im_path_list, batch_size) + ): + preptimes.append(preptime) + inftimes.append(inftime) + batchsizes.append(batchsize) + + # (3) Log & persist results + if n_ims > 0: + total_duration = perf_counter() - start_time # end-to-end wall-clock + overall_throughput = n_ims / total_duration # imgs / s, wall-clock + + # per-container metrics + inf_throughputs = [bs / t if t else 0 for bs, t in zip(batchsizes, inftimes)] + prep_throughputs = [bs / t if t else 0 for bs, t in zip(batchsizes, preptimes)] + + avg_inf_throughput = sum(inf_throughputs) / len(inf_throughputs) + best_inf_throughput = max(inf_throughputs) + + avg_prep_throughput = sum(prep_throughputs) / len(prep_throughputs) + best_prep_throughput = max(prep_throughputs) + + total_prep_time = sum(preptimes) + total_inf_time = sum(inftimes) + + log_msg = ( + f"{embedder.name}{gpu}::batch_size={batch_size}::" + f"n_ims={n_ims}::concurrency={input_concurrency}\n" + f"\tTotal wall time:\t{total_duration / 60:.2f} min\n" + f"\tOverall throughput:\t{overall_throughput:.2f} im/s\n" + f"\tPrep time (sum):\t{total_prep_time:.2f} s\n" + f"\tInference time (sum):\t{total_inf_time:.2f} s\n" + f"\tPrep throughput (avg/best):\t{avg_prep_throughput:.2f} / " + f"{best_prep_throughput:.2f} im/s\n" + f"\tInfer throughput (avg/best):\t{avg_inf_throughput:.2f} / " + f"{best_inf_throughput:.2f} im/s\n" + ) + print(log_msg) + + # ── optional CSV ─────────────────────────────────────────────────────────── + if log_file: + path = Path(log_file).expanduser() + path.parent.mkdir(parents=True, exist_ok=True) + + header = [ + "batch_size", + "concurrency", + "max_containers", + "gpu", + "n_images", + "total_wall_time", + "overall_throughput", + "total_prep_time", + "total_inf_time", + "avg_prep_thpt", + "best_prep_thpt", + "avg_inf_thpt", + "best_inf_thpt", + ] + row = [ + batch_size, + input_concurrency, + max_containers, + gpu, + n_ims, + total_duration, + overall_throughput, + total_prep_time, + total_inf_time, + avg_prep_throughput, + best_prep_throughput, + avg_inf_throughput, + best_inf_throughput, + ] + + write_header = not path.exists() + with path.open("a", newline="", encoding="utf-8") as f: + writer = csv.writer(f) + if write_header: + writer.writerow(header) + writer.writerow(row)