Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions .env.template
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
COGVIEW4_PATH=/share/official_pretrains/hf_home/CogView4-6B
### basic configs
DTYPE=bfloat16
OFFLOAD_TYPE=no_offload
OPENAI_API_KEY=
OPENAI_BASE_URL=
LORA_DIR=
OFFLOAD_TYPE=cpu_model_offload

### cogview4 related configs
COGVIEW4_PATH=THUDM/CogView4-6B
# Optional, only needed when you don't want to use the default transformer in COGVIEW4_PATH
# COGVIEW4_TRANSFORMER_PATH=
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ torch = ["numpy", "torch", "torchvision"]
api = [
"fastapi[standard]~=0.115.11",
"fastapi_cli~=0.0.7",
"pydantic_settings~=2.8.1",
"openai~=1.67",
"pydantic-settings~=2.8",
"pydantic_settings~=2.8.1",
"python-dotenv~=1.0",
]

Expand Down
12 changes: 12 additions & 0 deletions src/cogkit/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,13 @@
# -*- coding: utf-8 -*-


from cogkit.api.python import generate_image, generate_video
from cogkit.utils import load_lora_checkpoint, load_pipeline, unload_lora_checkpoint

__all__ = [
"generate_image",
"generate_video",
"load_pipeline",
"load_lora_checkpoint",
"unload_lora_checkpoint",
]
8 changes: 8 additions & 0 deletions src/cogkit/api/python/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# -*- coding: utf-8 -*-


from .generation.image import generate_image
from .generation.video import generate_video
from .generation.util import before_generation

__all__ = ["generate_image", "generate_video", "before_generation"]
89 changes: 89 additions & 0 deletions src/cogkit/api/python/generation/image.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# -*- coding: utf-8 -*-


from typing import Literal

import numpy as np
import torch
from PIL import Image

from cogkit.logging import get_logger
from cogkit.utils import (
rand_generator,
)
from diffusers import DiffusionPipeline

from .util import before_generation, guess_resolution

_logger = get_logger(__name__)


def generate_image(
prompt: str,
pipeline: DiffusionPipeline,
num_images_per_prompt: int = 1,
output_type: Literal["pil", "pt", "np"] = "pil",
load_type: Literal["cuda", "cpu_model_offload", "sequential_cpu_offload"] = "cpu_model_offload",
height: int | None = None,
width: int | None = None,
num_inference_steps: int = 50,
guidance_scale: float = 3.5,
seed: int | None = None,
) -> list[Image.Image] | torch.Tensor | np.ndarray:
"""Generates images from a text prompt using a diffusion model pipeline.

This function leverages a diffusion pipeline to create images based on a given text prompt. It supports
customization of image dimensions, inference steps, and guidance scale, as well as optional LoRA (Low-Rank
Adaptation) fine-tuning. The output can be returned in different formats: PIL images, PyTorch tensors, or
NumPy arrays.

Args:
- prompt: The text description used to guide the image generation process.
- pipeline: Preloaded DiffusionPipeline instance.
- num_images_per_prompt: Number of images to generate per prompt. Defaults to 1.
- output_type: Format of the output images. Options are "pil" (PIL.Image), "pt" (PyTorch tensor), or
"np" (NumPy array). Defaults to "pil".
- load_type: Type of offloading to use for the model, use "cuda" if you have enough GPU memory. Defaults to "cpu_model_offload".
- height: Desired height of the output images in pixels. If None, inferred from the pipeline.
- width: Desired width of the output images in pixels. If None, inferred from the pipeline.
- num_inference_steps: Number of denoising steps during generation. Defaults to 50.
- guidance_scale: Strength of the prompt guidance (classifier-free guidance scale). Defaults to 3.5.
- seed: Optional random seed for reproducible results. Defaults to None.

Returns:
A list of generated images in the specified format:
- If output_type is "pil": List of PIL.Image.Image objects.
- If output_type is "pt": PyTorch tensor of shape (num_images, 3, height, width) with dtype torch.uint8.
- If output_type is "np": NumPy array of shape (num_images, height, width, 3) with dtype uint8.
"""

height, width = guess_resolution(pipeline, height, width)

_logger.info(f"Generation config: height {height}, width {width}.")

before_generation(pipeline, load_type)

output = pipeline(
prompt=prompt,
height=height,
width=width,
num_inference_steps=num_inference_steps,
guidance_scale=guidance_scale,
num_images_per_prompt=num_images_per_prompt,
generator=rand_generator(seed),
output_type=output_type,
).images

if output_type != "pil":
output = (output * 255).round()
if output_type == "pt":
assert output.ndim == 4, f"Expected 4D numpy array, got {output.ndim}D array"
assert output.shape[1] == 3, f"Expected 3 channels, got {output.shape[3]} channels"
output = output.to(torch.uint8)
elif output_type == "np":
assert output.ndim == 4, f"Expected 4D torch tensor, got {output.ndim}D torch tensor"
# Dim of image_np: (num_images, height, width, 3)
assert output.shape[3] == 3, f"Expected 3 channels, got {output.shape[3]} channels"
output = output.astype("uint8")

return output
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-

from typing import Literal

from diffusers import (
CogVideoXDPMScheduler,
Expand Down Expand Up @@ -132,18 +133,24 @@ def guess_frames(pipeline: TVideoPipeline, frames: int | None = None) -> tuple[i
return frames, fps


def before_generation(pipeline: TPipeline) -> None:
def before_generation(
pipeline: TPipeline,
load_type: Literal["cuda", "cpu_model_offload", "sequential_cpu_offload"] = "cpu_model_offload",
) -> None:
if isinstance(pipeline, TVideoPipeline):
pipeline.scheduler = CogVideoXDPMScheduler.from_config(
pipeline.scheduler.config, timestep_spacing="trailing"
)

# * enables CPU offload for the model.
# turns off if you have multiple GPUs or enough GPU memory(such as H100) and it will cost less time in inference
# and enable to("cuda")
pipeline.to("cuda")
# pipeline.enable_model_cpu_offload()
# pipe.enable_sequential_cpu_offload()
# turns off offload if you have multiple GPUs or enough GPU memory(such as H100) and it will cost less time in inference
if load_type == "cuda":
pipeline.to("cuda")
elif load_type == "cpu_model_offload":
pipeline.enable_model_cpu_offload()
elif load_type == "sequential_cpu_offload":
pipeline.enable_sequential_cpu_offload()
else:
raise ValueError(f"Unsupported offload type: {load_type}")
if hasattr(pipeline, "vae"):
pipeline.vae.enable_slicing()
pipeline.vae.enable_tiling()
127 changes: 127 additions & 0 deletions src/cogkit/api/python/generation/video.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# -*- coding: utf-8 -*-


from functools import partial
from typing import Any, List, Literal

import numpy as np
import torch
from diffusers import DiffusionPipeline
from diffusers.pipelines.cogvideo.pipeline_output import CogVideoXPipelineOutput
from PIL import Image

from cogkit.logging import get_logger
from cogkit.types import GenerationMode
from cogkit.utils import (
guess_generation_mode,
rand_generator,
)

from .util import before_generation, guess_frames, guess_resolution

_logger = get_logger(__name__)


def _cast_to_pipeline_output(output: Any) -> CogVideoXPipelineOutput:
if isinstance(output, CogVideoXPipelineOutput):
return output
if isinstance(output, tuple):
return CogVideoXPipelineOutput(frames=output[0])

err_msg = f"Cannot cast a `{output.__class__.__name__}` to a `CogVideoXPipelineOutput`."
raise ValueError(err_msg)


def generate_video(
prompt: str,
pipeline: DiffusionPipeline,
num_videos_per_prompt: int = 1,
output_type: Literal["pil", "pt", "np"] = "pil",
input_image: Image.Image | None = None,
# * params for model loading
load_type: Literal["cuda", "cpu_model_offload", "sequential_cpu_offload"] = "cpu_model_offload",
height: int | None = None,
width: int | None = None,
num_frames: int | None = None,
num_inference_steps: int = 50,
guidance_scale: float = 6.0,
seed: int | None = 42,
) -> tuple[List[Image.Image] | torch.Tensor | np.ndarray, int]:
"""Main function for video generation, supporting both text-to-video and image-to-video generation modes.

Args:
- prompt (str): Text prompt describing the desired video content.
- pipeline (DiffusionPipeline): Pre-loaded diffusion model pipeline.
- num_videos_per_prompt (int, optional): Number of videos to generate per prompt. Defaults to 1.
- output_type (Literal, optional): Output type, one of "pil", "pt", or "np". Defaults to "pil".
- input_image (Image.Image | None, optional): Input image for image-to-video generation. Defaults to None.
- load_type (Literal, optional): Model loading type, one of "cuda", "cpu_model_offload", or
"sequential_cpu_offload". Defaults to "cpu_model_offload".
- height (int | None, optional): Height of output video. If None, will be inferred. Defaults to None.
- width (int | None, optional): Width of output video. If None, will be inferred. Defaults to None.
- num_frames (int | None, optional): Number of frames in generated video. If None, will be inferred.
Defaults to None.
- num_inference_steps (int, optional): Number of inference steps. Defaults to 50.
- guidance_scale (float, optional): Classifier guidance scale. Defaults to 6.0.
- seed (int | None, optional): Random seed for generation. Defaults to 42.

Returns:
tuple[torch.Tensor, int]: Returns a tuple containing:
- Generated video tensor with shape (num_videos, num_frames, height, width, 3)
- Video frame rate (fps)

Raises:
ValueError: When provided generation mode is unknown or output cannot be cast to CogVideoXPipelineOutput.
AssertionError: When both pipeline and model_id_or_path are None or both are provided.

Note:
- Either pipeline or model_id_or_path must be provided, but not both.
- If lora_model_id_or_path is provided, LoRA weights will be loaded and applied.
- Height, width, number of frames, and fps will be automatically inferred if not specified.
"""

task = guess_generation_mode(
pipeline=pipeline,
generation_mode=None,
image=input_image,
)

height, width = guess_resolution(pipeline, height, width)
num_frames, fps = guess_frames(pipeline, num_frames)

_logger.info(
f"Generation config: height {height}, width {width}, num_frames {num_frames}, fps {fps}."
)

before_generation(pipeline, load_type)

pipeline_fn = partial(
pipeline,
height=height,
width=width,
prompt=prompt,
num_videos_per_prompt=num_videos_per_prompt,
num_inference_steps=num_inference_steps,
num_frames=num_frames,
use_dynamic_cfg=True,
guidance_scale=guidance_scale,
output_type=output_type,
generator=rand_generator(seed),
)
if task == GenerationMode.TextToVideo:
pipeline_out = pipeline_fn()
elif task == GenerationMode.ImageToVideo:
pipeline_out = pipeline_fn(image=input_image)
else:
err_msg = f"Unknown generation mode: {task.value}"
raise ValueError(err_msg)

batch_video = _cast_to_pipeline_output(pipeline_out).frames

if output_type in ("pt", "np"):
# Dim of a video: (num_videos, num_frames, 3, height, width)
assert batch_video.ndim == 5, f"Expected 5D array, got {batch_video[0].ndim}D array"
assert batch_video.shape[2] == 3, (
f"Expected 3 channels, got {batch_video[0].shape[2]} channels"
)
return batch_video, fps
39 changes: 19 additions & 20 deletions src/cogkit/api/services/image_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
import os

import torch
from diffusers import CogView4Pipeline

from cogkit.api.logging import get_logger
from cogkit.api.settings import APISettings
from cogkit.api.python import before_generation, generate_image
from cogkit.utils import load_lora_checkpoint, unload_lora_checkpoint, load_pipeline

_logger = get_logger(__name__)

Expand All @@ -17,16 +18,13 @@ class ImageGenerationService(object):
def __init__(self, settings: APISettings) -> None:
self._models = {}
if settings.cogview4_path is not None:
cogview4_pl = CogView4Pipeline.from_pretrained(
torch_dtype = torch.bfloat16 if settings.dtype == "bfloat16" else torch.float32
cogview4_pl = load_pipeline(
settings.cogview4_path,
torch_dtype=torch.bfloat16 if settings.dtype == "bfloat16" else torch.float32,
transformer_path=settings.cogview4_transformer_path,
dtype=torch_dtype,
)
if settings.offload_type == "cpu_model_offolad":
cogview4_pl.enable_model_cpu_offload()
else:
cogview4_pl.to("cuda")
cogview4_pl.vae.enable_slicing()
cogview4_pl.vae.enable_tiling()
before_generation(cogview4_pl, settings.offload_type)
self._models["cogview-4"] = cogview4_pl

### Check if loaded models are supported
Expand Down Expand Up @@ -58,33 +56,34 @@ def generate(
if model not in self._models:
raise ValueError(f"Model {model} not loaded")
width, height = list(map(int, size.split("x")))

# TODO: Refactor this to switch by LoRA endpoint API
if lora_path is not None:
adapter_name = os.path.basename(lora_path)
print(f"Loaded LORA weights from {adapter_name}")
self._models[model].load_lora_weights(lora_path)
_logger.info(f"Loaded LORA weights from {adapter_name}")
load_lora_checkpoint(self._models[model], lora_path)
else:
print("Unloading LORA weights")
self._models[model].unload_lora_weights()
_logger.info("Unloading LORA weights")
unload_lora_checkpoint(self._models[model])

image_np = self._models[model](
output = generate_image(
prompt=prompt,
pipeline=self._models[model],
num_images_per_prompt=num_images,
output_type="np",
height=height,
width=width,
num_inference_steps=num_inference_steps,
guidance_scale=guidance_scale,
num_images_per_prompt=num_images,
output_type="np",
).images
assert image_np.ndim == 4, f"Expected 4D array, got {image_np.ndim}D array"
)

image_lst = self.postprocess(image_np)
image_lst = self.postprocess(output)
return image_lst

def is_valid_model(self, model: str) -> bool:
return model in self._models

def postprocess(self, image_np: np.ndarray) -> list[np.ndarray]:
image_np = (image_np * 255).round().astype("uint8")
image_lst = np.split(image_np, image_np.shape[0], axis=0)
image_lst = [img.squeeze(0) for img in image_lst]
return image_lst
Loading