-
Notifications
You must be signed in to change notification settings - Fork 450
Expand file tree
/
Copy pathidea2video_pipeline.py
More file actions
257 lines (218 loc) · 10.4 KB
/
idea2video_pipeline.py
File metadata and controls
257 lines (218 loc) · 10.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
import os
import logging
from agents import Screenwriter, CharacterExtractor, CharacterPortraitsGenerator
from pipelines.script2video_pipeline import Script2VideoPipeline
from interfaces import CharacterInScene
from typing import List, Dict, Optional
import asyncio
import json
from moviepy import VideoFileClip, concatenate_videoclips
import yaml
from langchain.chat_models import init_chat_model
import importlib
class Idea2VideoPipeline:
def __init__(
self,
chat_model: str,
image_generator: str,
video_generator: str,
working_dir: str,
custom_assets: Optional[Dict[str, List[Dict[str, str]]]] = None,
):
self.chat_model = chat_model
self.image_generator = image_generator
self.video_generator = video_generator
self.working_dir = working_dir
self.custom_assets = custom_assets or {"sample_images": [], "sample_videos": []}
os.makedirs(self.working_dir, exist_ok=True)
self.screenwriter = Screenwriter(chat_model=self.chat_model)
self.character_extractor = CharacterExtractor(chat_model=self.chat_model)
self.character_portraits_generator = CharacterPortraitsGenerator(image_generator=self.image_generator)
@classmethod
def init_from_config(
cls,
config_path: str,
):
with open(config_path, "r") as f:
config = yaml.safe_load(f)
chat_model_args = config["chat_model"]["init_args"]
chat_model = init_chat_model(**chat_model_args)
image_generator_cls_module, image_generator_cls_name = config["image_generator"]["class_path"].rsplit(".", 1)
image_generator_cls = getattr(importlib.import_module(image_generator_cls_module), image_generator_cls_name)
image_generator_args = config["image_generator"]["init_args"]
image_generator = image_generator_cls(**image_generator_args)
video_generator_cls_module, video_generator_cls_name = config["video_generator"]["class_path"].rsplit(".", 1)
video_generator_cls = getattr(importlib.import_module(video_generator_cls_module), video_generator_cls_name)
video_generator_args = config["video_generator"]["init_args"]
video_generator = video_generator_cls(**video_generator_args)
# Load custom assets if provided
custom_assets = config.get("assets", {"sample_images": [], "sample_videos": []})
return cls(
chat_model=chat_model,
image_generator=image_generator,
video_generator=video_generator,
working_dir=config["working_dir"],
custom_assets=custom_assets,
)
async def extract_characters(
self,
story: str,
):
save_path = os.path.join(self.working_dir, "characters.json")
if os.path.exists(save_path):
with open(save_path, "r", encoding="utf-8") as f:
characters = json.load(f)
characters = [CharacterInScene.model_validate(character) for character in characters]
print(f"🚀 Loaded {len(characters)} characters from existing file.")
else:
characters = await self.character_extractor.extract_characters(story)
with open(save_path, "w", encoding="utf-8") as f:
json.dump([character.model_dump() for character in characters], f, ensure_ascii=False, indent=4)
print(f"✅ Extracted {len(characters)} characters from story and saved to {save_path}.")
return characters
async def generate_character_portraits(
self,
characters: List[CharacterInScene],
character_portraits_registry: Optional[Dict[str, Dict[str, Dict[str, str]]]],
style: str,
):
character_portraits_registry_path = os.path.join(self.working_dir, "character_portraits_registry.json")
if character_portraits_registry is None:
if os.path.exists(character_portraits_registry_path):
with open(character_portraits_registry_path, 'r', encoding='utf-8') as f:
character_portraits_registry = json.load(f)
else:
character_portraits_registry = {}
tasks = [
self.generate_portraits_for_single_character(character, style)
for character in characters
if character.identifier_in_scene not in character_portraits_registry
]
if tasks:
for future in asyncio.as_completed(tasks):
character_portraits_registry.update(await future)
with open(character_portraits_registry_path, 'w', encoding='utf-8') as f:
json.dump(character_portraits_registry, f, ensure_ascii=False, indent=4)
print(f"✅ Completed character portrait generation for {len(characters)} characters.")
else:
print("🚀 All characters already have portraits, skipping portrait generation.")
return character_portraits_registry
async def develop_story(
self,
idea: str,
user_requirement: str,
):
save_path = os.path.join(self.working_dir, "story.txt")
if os.path.exists(save_path):
with open(save_path, "r", encoding="utf-8") as f:
story = f.read()
print(f"🚀 Loaded story from existing file.")
else:
print("🧠 Developing story...")
story = await self.screenwriter.develop_story(idea=idea, user_requirement=user_requirement)
with open(save_path, "w", encoding="utf-8") as f:
f.write(story)
print(f"✅ Developed story and saved to {save_path}.")
return story
async def write_script_based_on_story(
self,
story: str,
user_requirement: str,
):
save_path = os.path.join(self.working_dir, "script.json")
if os.path.exists(save_path):
with open(save_path, "r", encoding="utf-8") as f:
script = json.load(f)
print(f"🚀 Loaded script from existing file.")
else:
print("🧠 Writing script based on story...")
script = await self.screenwriter.write_script_based_on_story(story=story, user_requirement=user_requirement)
with open(save_path, "w", encoding="utf-8") as f:
json.dump(script, f, ensure_ascii=False, indent=4)
print(f"✅ Written script based on story and saved to {save_path}.")
return script
async def generate_portraits_for_single_character(
self,
character: CharacterInScene,
style: str,
):
character_dir = os.path.join(self.working_dir, "character_portraits", f"{character.idx}_{character.identifier_in_scene}")
os.makedirs(character_dir, exist_ok=True)
front_portrait_path = os.path.join(character_dir, "front.png")
if os.path.exists(front_portrait_path):
pass
else:
front_portrait_output = await self.character_portraits_generator.generate_front_portrait(character, style)
front_portrait_output.save(front_portrait_path)
side_portrait_path = os.path.join(character_dir, "side.png")
if os.path.exists(side_portrait_path):
pass
else:
side_portrait_output = await self.character_portraits_generator.generate_side_portrait(character, front_portrait_path)
side_portrait_output.save(side_portrait_path)
back_portrait_path = os.path.join(character_dir, "back.png")
if os.path.exists(back_portrait_path):
pass
else:
back_portrait_output = await self.character_portraits_generator.generate_back_portrait(character, front_portrait_path)
back_portrait_output.save(back_portrait_path)
print(f"☑️ Completed character portrait generation for {character.identifier_in_scene}.")
return {
character.identifier_in_scene: {
"front": {
"path": front_portrait_path,
"description": f"A front view portrait of {character.identifier_in_scene}.",
},
"side": {
"path": side_portrait_path,
"description": f"A side view portrait of {character.identifier_in_scene}.",
},
"back": {
"path": back_portrait_path,
"description": f"A back view portrait of {character.identifier_in_scene}.",
},
}
}
async def __call__(
self,
idea: str,
user_requirement: str,
style: str,
):
story = await self.develop_story(idea=idea, user_requirement=user_requirement)
characters = await self.extract_characters(story=story)
character_portraits_registry = await self.generate_character_portraits(
characters=characters,
character_portraits_registry=None,
style=style,
)
scene_scripts = await self.write_script_based_on_story(story=story, user_requirement=user_requirement)
all_video_paths = []
for idx, scene_script in enumerate(scene_scripts):
scene_working_dir = os.path.join(self.working_dir, f"scene_{idx}")
os.makedirs(scene_working_dir, exist_ok=True)
script2video_pipeline = Script2VideoPipeline(
chat_model=self.chat_model,
image_generator=self.image_generator,
video_generator=self.video_generator,
working_dir=scene_working_dir,
custom_assets=self.custom_assets,
)
final_video_path = await script2video_pipeline(
script=scene_script,
user_requirement=user_requirement,
style=style,
characters=characters,
character_portraits_registry=character_portraits_registry,
)
all_video_paths.append(final_video_path)
final_video_path = os.path.join(self.working_dir, "final_video.mp4")
if os.path.exists(final_video_path):
print(f"🚀 Skipped concatenating videos, already exists.")
else:
print(f"🎬 Starting concatenating videos...")
video_clips = [VideoFileClip(final_video_path) for final_video_path in all_video_paths]
final_video = concatenate_videoclips(video_clips)
final_video.write_videofile(final_video_path)
print(f"☑️ Concatenated videos, saved to {final_video_path}.")
return final_video_path