-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathagent_generation.py
More file actions
1400 lines (1148 loc) · 58.2 KB
/
agent_generation.py
File metadata and controls
1400 lines (1148 loc) · 58.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Image/Video Generation Agent
Features:
1. Integrates existing image generation and video generation functionality
2. Injects foreign face references and clothing descriptions before generation
3. Uses target foreign faces for generation
Usage:
python agent_generation.py clip1_script.json # Generate all keyframes
python agent_generation.py clip1_script.json --mode video # Generate all videos
python agent_generation.py clip1_script.json --shot 9 # Generate specific shot
"""
import os
import sys
import json
import argparse
import time
from pathlib import Path
from datetime import datetime
# Add project root directory to path
project_root = os.path.dirname(os.path.abspath(__file__))
if project_root not in sys.path:
sys.path.insert(0, project_root)
try:
from google import genai
from google.genai import types
from PIL import Image
except ImportError:
print("Error: Required libraries google-genai and Pillow not installed")
print("Please run: pip install google-genai Pillow")
sys.exit(1)
class GenerationAgent:
"""Image/Video Generation Agent"""
def __init__(self, script_json="clip1_script.json", character_mapping="character_mapping.json",
reference_dir="reference_images", memory_agent=None, style="realistic"):
"""
Initialize
Args:
script_json: Script JSON file path
character_mapping: Character mapping configuration file path
reference_dir: Reference image directory
memory_agent: MemoryAllocationAgent instance (optional)
style: Generation style (realistic, lego, disney, anime, clay, japanese_anime)
"""
self.script_json = script_json
self.character_mapping_file = character_mapping
self.reference_dir = reference_dir
self.memory_agent = memory_agent
self.style = style
# Initialize Gemini client
api_key = os.environ.get("GENAI_API_KEY")
if not api_key:
raise ValueError("Error: Environment variable GENAI_API_KEY not found")
self.client = genai.Client(api_key=api_key)
# Load data
self.script_data = None
self.character_mappings = []
self.shots_data = []
# Generation configuration - will be dynamically read from video metadata
self.target_width = None
self.target_height = None
self.aspect_ratio = None
self.duration_seconds = 8
# Save error information (for intelligent review agent)
self.last_errors = {}
def load_script_data(self):
"""Load script data"""
print(f"Reading: {self.script_json}")
with open(self.script_json, 'r', encoding='utf-8') as f:
self.script_data = json.load(f)
# ========== Stage 2: Read video aspect ratio metadata ==========
video_metadata = self.script_data.get("video_metadata", {})
aspect_ratio_info = video_metadata.get("aspect_ratio", {})
if aspect_ratio_info:
self.target_width = aspect_ratio_info.get("width", 1920)
self.target_height = aspect_ratio_info.get("height", 1080)
self.aspect_ratio = aspect_ratio_info.get("aspect_ratio", "16:9")
print(f"\n{'='*60}")
print(f"--> Loaded aspect ratio info from video metadata:")
print(f"{'='*60}")
print(f" Resolution: {self.target_width}x{self.target_height}")
print(f" Aspect Ratio: {self.aspect_ratio}")
print(f"{'='*60}")
else:
# If no metadata, use default values
print(f"\n⚠️ Video aspect ratio metadata not found, using default 16:9")
self.target_width = 1920
self.target_height = 1080
self.aspect_ratio = "16:9"
# Extract scene data
major_scenes = self.script_data.get("major_scenes", {}).get("major_scenes", [])
# Create major_scene_map
self.major_scene_map = {}
for ms in major_scenes:
self.major_scene_map[ms["scene_id"]] = ms
# Convert to shots_data format
self.shots_data = []
for scene in self.script_data.get("scenes", []):
if scene.get("_disabled", False):
continue
scene_id = scene.get("scene_index", 0)
start_time = scene.get("start_time", 0.0)
# Find the major_scene this belongs to
current_major_scene_id = None
for ms in major_scenes:
if ms["start_time"] <= start_time <= ms["end_time"]:
current_major_scene_id = ms["scene_id"]
break
json_content = {
"lighting_setup": scene.get("lighting_setup", ""),
"color_grading": scene.get("color_grading", ""),
"composition": scene.get("composition", ""),
"mood_atmosphere": scene.get("mood_atmosphere", ""),
"shot_size": scene.get("shot_size", ""),
"camera_angle": scene.get("camera_angle", ""),
"camera_height": scene.get("camera_height", ""),
"horizontal_angle": scene.get("horizontal_angle", ""),
"focal_length": scene.get("focal_length", ""),
"depth_of_field": scene.get("depth_of_field", ""),
"tech_device": scene.get("tech_device", ""),
"camera_movement": scene.get("camera_movement", ""),
"subject_movement": scene.get("subject_movement", ""),
"duration": scene.get("duration", 0.0),
"t2i_prompt": scene.get("I2V Prompt", ""),
"language_to_one_shot": scene.get("Language_to_One_Shot_Prompt", ""),
"time_range": scene.get("time_range", ""),
"start_time": start_time,
"end_time": scene.get("end_time", 0.0),
"major_scene_id": current_major_scene_id
}
self.shots_data.append({
"id": scene_id,
"json_content": json_content
})
print(f"✅ Loaded {len(self.shots_data)} shots")
return self.script_data
def load_character_mappings(self):
"""Load character mapping configuration"""
if not os.path.exists(self.character_mapping_file):
print(f"⚠️ Character mapping file not found: {self.character_mapping_file}")
return False
print(f"Reading: {self.character_mapping_file}")
with open(self.character_mapping_file, 'r', encoding='utf-8') as f:
data = json.load(f)
self.character_mappings = data.get('mappings', [])
print(f"✅ Loaded {len(self.character_mappings)} character mappings")
return True
def load_memory_allocation(self):
"""Load or create memory allocation"""
if self.memory_agent is None:
# If no memory_agent is passed in, create one
from agent_memory import MemoryAllocationAgent
self.memory_agent = MemoryAllocationAgent(
script_json=self.script_json,
character_mapping=self.character_mapping_file,
reference_dir=self.reference_dir
)
# Try to load existing memory allocation
if os.path.exists("memory_allocation.json"):
print("Loading memory allocation...")
success = self.memory_agent.load_memory_allocation()
if success:
return True
# If no existing allocation, allocate new memory
print("Allocating new memory...")
self.memory_agent.load_script_data()
self.memory_agent.load_character_mappings()
self.memory_agent.allocate_all_memory()
self.memory_agent.save_memory_allocation()
return True
def get_character_mapping(self, character_id):
"""
Get mapping information by character ID
Args:
character_id: Character ID (e.g., @character_01)
Returns:
Mapping information dictionary, or None if not found
"""
for mapping in self.character_mappings:
if mapping['video_character'] == character_id:
return mapping
return None
def build_enhanced_prompt(self, shot_data, character_refs, clothing_refs):
"""
Build enhanced generation prompt
Args:
shot_data: Shot data
character_refs: List of character reference image paths
clothing_refs: List of clothing reference image paths
Returns:
Complete prompt string
"""
content_json = shot_data["json_content"]
major_scene_id = content_json.get("major_scene_id")
# Basic JSON content
prompt_str = json.dumps(content_json, indent=2)
# Composition rules
composition_rules = f"""
**CRITICAL COMPOSITION RULES (MUST FOLLOW)**:
0. **ASPECT RATIO REQUIREMENT**:
- MUST generate image with EXACT aspect ratio: {self.aspect_ratio}
- Target resolution: {self.target_width}x{self.target_height}
- CRITICAL: All generated content MUST maintain this aspect ratio
1. **SINGLE SHOT ONLY - ABSOLUTE REQUIREMENT**:
- Generate ONLY ONE continuous shot/frame
- DO NOT generate multiple images arranged together
- DO NOT create split-screen compositions
- DO NOT create diptych, triptych, or grid layouts
- DO NOT arrange multiple shots side-by-side or stacked
- MUST be a single, unified scene
2. **NO TEXT/GRAPHICS - ABSOLUTE PROHIBITION**:
- DO NOT include any text overlays, subtitles, titles, captions
- DO NOT add watermarks, logos, or signatures
- DO NOT include visible text anywhere in the image
- DO NOT add graphics, arrows, or UI elements
- DO NOT include dialogue bubbles or text boxes
- DO NOT render any dialogue, speech, or conversation as visible text
- The image must be completely text-free
- IGNORE any dialogue quotes or speech content in descriptions - these are for context only, NOT to be rendered as text
3. **PURE CINEMATIC SCENE**:
- Focus purely on the cinematic scene itself
- Show only the environment, characters, lighting, and action
- No artificial composition layouts or frames
"""
# Detect shot type and add targeted composition guidance
shot_size = content_json.get("shot_size", "")
shot_size_lower = shot_size.lower()
if any(term in shot_size_lower for term in ["extreme close-up", "ecu", "close-up", "cu", "medium close-up", "mcu"]):
composition_guidance = f"""
**COMPOSITION GUIDANCE FOR CLOSE-UP SHOTS**:
- CRITICAL: The subject MUST occupy the ENTIRE {self.aspect_ratio} frame
- DO NOT leave empty space above head or below shoulders
- Frame the subject TIGHTLY from edge to edge
"""
elif any(term in shot_size_lower for term in ["wide shot", "wide", "establishing", "extreme wide"]):
composition_guidance = f"""
**COMPOSITION GUIDANCE FOR WIDE SHOTS**:
- CRITICAL: The scene MUST fill the ENTIRE {self.aspect_ratio} frame
- Show the full environment but keep subjects LARGE enough to be visible
- NO empty sky areas or blank floor space
"""
else:
composition_guidance = f"""
**COMPOSITION GUIDANCE**:
- CRITICAL: The frame MUST be completely filled in {self.aspect_ratio} format
- Subject(s) should occupy 70-90% of the frame
- NO empty or wasted space anywhere
"""
# Character and clothing consistency guidance
consistency_instruction = f"""
**CRITICAL CONSISTENCY INSTRUCTIONS**:
This shot belongs to Major Scene: {major_scene_id}
**CHARACTER FACE REFERENCES**:
You have been provided with reference images showing the target faces to use.
- Use these for: Exact facial features, hairstyles, skin tones
- CRITICAL: The generated characters MUST match these face references
**CLOTHING DNA COMPLIANCE**:
The scene wardrobe contains EXACT clothing DNA specifications. You MUST follow ALL dimensions:
**EXACT CLOTHING SPECIFICATIONS FOR THIS SCENE**:
"""
# Inject clothing descriptions
if major_scene_id:
scene_wardrobe = self.script_data.get("scene_wardrobe", {}).get("scene_wardrobes", {})
if major_scene_id in scene_wardrobe:
scene_wardrobe_info = scene_wardrobe[major_scene_id]
character_wardrobe_list = scene_wardrobe_info.get('character_wardrobe', {})
for char_id, wardrobe_data in character_wardrobe_list.items():
full_desc = wardrobe_data.get('full_description', '')
if full_desc:
# Check if there is a character mapping
mapping = self.get_character_mapping(f"@{char_id}")
if mapping:
target_name = mapping.get('target_name', 'Unknown')
consistency_instruction += f"\n**{char_id} (Face: {target_name})**:\n{full_desc}\n"
else:
consistency_instruction += f"\n**{char_id}**:\n{full_desc}\n"
consistency_instruction += """
**ZERO TOLERANCE FOR VARIATIONS**:
- All shots in this major scene MUST use identical clothing DNA
- NO frame-to-frame variations allowed
- Consistency is MANDATORY
"""
prompt_str = composition_rules + composition_guidance + consistency_instruction + "\n\nSHOT SPECIFICS:\n" + prompt_str
return prompt_str
def detect_characters_in_shot(self, shot_data):
"""
Detect characters appearing in shot
Args:
shot_data: Shot data
Returns:
List of detected character IDs
"""
content_json = shot_data["json_content"]
text_to_check = ""
if "subject_movement" in content_json:
text_to_check += content_json["subject_movement"] + " "
if "t2i_prompt" in content_json:
text_to_check += content_json["t2i_prompt"]
detected_characters = []
for mapping in self.character_mappings:
char_id = mapping['video_character']
char_name = mapping.get('video_character_name', '')
# Check if appears in text
if char_id in text_to_check or char_name in text_to_check:
detected_characters.append(char_id)
return detected_characters
def generate_image_for_shot(self, shot_id):
"""
Generate keyframe image for a single shot
Args:
shot_id: Shot ID
Returns:
True on success, False on failure
"""
# Get memory package
if self.memory_agent:
memory_package = self.memory_agent.get_shot_memory(shot_id)
if not memory_package:
print(f"❌ Memory package for Shot {shot_id} does not exist")
return False
use_memory = True
else:
use_memory = False
# Find shot data
shot_data = None
for shot in self.shots_data:
if shot["id"] == shot_id:
shot_data = shot
break
if not shot_data:
print(f"❌ Shot {shot_id} does not exist")
return False
print(f"\n{'='*70}")
print(f"Generating keyframe for Shot {shot_id}")
print(f"{'='*70}")
# Delete old image (to avoid detecting previously generated files)
old_image_path = f"shot_{shot_id}.png"
if os.path.exists(old_image_path):
try:
os.remove(old_image_path)
print(f"🗑️ Deleted old image: {old_image_path}")
except Exception as e:
print(f"⚠️ Unable to delete old image: {e}")
# Prepare reference images
ref_images = []
if use_memory:
# Use memory package
print(f"Generating using memory package")
print(f"Scene: {memory_package.get('major_scene', 'N/A')}")
print(f"Characters: {', '.join(memory_package.get('characters', []))}")
# ========== Load reference images in new priority order ==========
# 1️⃣ Environment reference image (first priority, single image)
env_ref = memory_package.get("environment_ref", "")
if env_ref and os.path.exists(env_ref):
try:
ref_images.append(Image.open(env_ref))
print(f" ✅ [1/1] Loaded environment reference: {os.path.basename(env_ref)}")
except Exception as e:
print(f" ⚠️ Unable to load environment reference: {e}")
# 2️⃣ Clothing reference images (second priority, main characters → supporting characters)
clothing_refs = memory_package.get("clothing_refs", [])
for i, ref_path in enumerate(clothing_refs, 1):
if os.path.exists(ref_path):
try:
ref_images.append(Image.open(ref_path))
print(f" ✅ [{i}/{len(clothing_refs)}] Loaded clothing reference: {os.path.basename(ref_path)}")
except Exception as e:
print(f" ⚠️ Unable to load clothing reference: {e}")
# 3️⃣ Character montage (third priority, remaining quota)
character_refs = memory_package.get("character_refs", [])
for i, ref_path in enumerate(character_refs, 1):
if os.path.exists(ref_path):
try:
ref_images.append(Image.open(ref_path))
print(f" ✅ [{len(clothing_refs)+i}/{len(clothing_refs)+len(character_refs)}] Loaded character montage: {os.path.basename(ref_path)}")
except Exception as e:
print(f" ⚠️ Unable to load character montage: {e}")
# Print total statistics
total_images = 1 + len(clothing_refs) + len(character_refs)
portrait_count = len(clothing_refs) + len(character_refs)
print(f"\n 📊 Image statistics for this generation:")
print(f" Environment: 1 image")
print(f" Clothing: {len(clothing_refs)} images")
print(f" Montage: {len(character_refs)} images")
print(f" Total: {total_images} images")
print(f" Portrait type: {portrait_count} images (limit 5)")
# Get Visual DNA
visual_dna = memory_package.get("visual_dna", {})
# Extract character list from memory package
detected_characters = memory_package.get("characters", [])
# Build enhanced prompt
prompt_str = self.build_enhanced_prompt_from_memory(
shot_id, memory_package, ref_images
)
# When using memory package, reference images are already loaded, skip subsequent loading steps
print(f"\nTotal reference images: {len(ref_images)} (loaded from memory package)")
print(f"Prompt length: {len(prompt_str)} characters")
# Truncate prompt if it exceeds the safe limit to avoid EMPTY_PARTS from Gemini
MAX_PROMPT_CHARS = 4000
if len(prompt_str) > MAX_PROMPT_CHARS:
print(f"⚠️ Prompt exceeds {MAX_PROMPT_CHARS} chars, truncating to reduce EMPTY_PARTS risk")
prompt_str = prompt_str[:MAX_PROMPT_CHARS]
else:
# Original logic (without using memory package)
content_json = shot_data["json_content"]
major_scene_id = content_json.get("major_scene_id")
print(f"Scene: {major_scene_id if major_scene_id else 'N/A'}")
# Detect characters
detected_characters = self.detect_characters_in_shot(shot_data)
print(f"Detected characters: {', '.join(detected_characters) if detected_characters else 'None'}")
# 1. Load environment reference image
if major_scene_id:
env_ref_path = os.path.join(self.reference_dir, f"{major_scene_id}_environment.png")
if os.path.exists(env_ref_path):
try:
ref_images.append(Image.open(env_ref_path))
print(f"✅ Loaded environment reference: {os.path.basename(env_ref_path)}")
except Exception as e:
print(f"⚠️ Unable to load environment reference: {e}")
# 2. Load foreign face reference images
for char_id in detected_characters:
mapping = self.get_character_mapping(char_id)
if mapping:
target_face_path = mapping.get('target_face')
if target_face_path and os.path.exists(target_face_path):
try:
ref_images.append(Image.open(target_face_path))
print(f"✅ Loaded foreign face: {mapping.get('target_name', 'Unknown')} ({os.path.basename(target_face_path)})")
except Exception as e:
print(f"⚠️ Unable to load foreign face: {e}")
# 3. Load clothing reference images
for char_id in detected_characters:
clean_id = char_id.replace('@', '')
clothing_ref_path = os.path.join(self.reference_dir, f"{clean_id}_clothing.png")
if os.path.exists(clothing_ref_path):
try:
ref_images.append(Image.open(clothing_ref_path))
print(f"✅ Loaded clothing reference: {os.path.basename(clothing_ref_path)}")
except Exception as e:
print(f"⚠️ Unable to load clothing reference: {e}")
print(f"Total reference images: {len(ref_images)}")
# Build enhanced prompt (without using memory package)
prompt_str = self.build_enhanced_prompt(shot_data, detected_characters, [])
print(f"Prompt length: {len(prompt_str)} characters")
# Prepare input
input_contents = [prompt_str] + ref_images
# Call model for generation
try:
# Note: generate_content() does not support aspect_ratio parameter
# Aspect ratio control is completely implemented through strict rules in prompt
response = self.client.models.generate_content(
model="gemini-3-pro-image-preview",
contents=input_contents,
config=types.GenerateContentConfig(
response_modalities=['IMAGE'],
)
)
# Check response and diagnose errors
saved = False
# Detailed error diagnosis
if not response.candidates or len(response.candidates) == 0:
print(f"\n❌ Shot {shot_id} generation failed")
self._print_detailed_error(response, shot_id, "NO_CANDIDATES")
# Save error information for intelligent review agent
self._save_error_info(shot_id, response, "NO_CANDIDATES")
return False
for candidate_idx, candidate in enumerate(response.candidates):
# Check finish_reason
if hasattr(candidate, 'finish_reason') and candidate.finish_reason:
finish_reason = candidate.finish_reason
finish_reason_str = str(finish_reason)
# Only these are true error states
error_finish_reasons = ["SAFETY", "RECITATION", "IMAGE_SAFETY", "MAX_TOKENS", "BLOCK_REASON_UNSPECIFIED"]
if finish_reason_str in error_finish_reasons:
if not saved: # Only print error when not successful
print(f"\n❌ Shot {shot_id} generation failed")
self._print_detailed_error(response, shot_id, f"FINISH_REASON: {finish_reason}")
error_diagnosed = True
# Save error information for intelligent review agent
self._save_error_info(shot_id, response, f"FINISH_REASON: {finish_reason}")
return False # Return failure directly, do not continue checking
# Check if content exists
if not hasattr(candidate, 'content'):
if not saved:
print(f"\n❌ Shot {shot_id} generation failed")
self._print_detailed_error(response, shot_id, "NO_CONTENT")
# Save error information for intelligent review agent
self._save_error_info(shot_id, response, "NO_CONTENT")
return False
if not candidate.content:
if not saved:
print(f"\n❌ Shot {shot_id} generation failed")
self._print_detailed_error(response, shot_id, "EMPTY_CONTENT")
# Save error information for intelligent review agent
self._save_error_info(shot_id, response, "EMPTY_CONTENT")
return False
# Check if parts exists
if not hasattr(candidate.content, 'parts'):
if not saved:
print(f"\n❌ Shot {shot_id} generation failed")
self._print_detailed_error(response, shot_id, "NO_PARTS")
# Save error information for intelligent review agent
self._save_error_info(shot_id, response, "NO_PARTS")
return False
if not candidate.content.parts:
if not saved:
print(f"\n❌ Shot {shot_id} generation failed")
self._print_detailed_error(response, shot_id, "EMPTY_PARTS")
# Save error information for intelligent review agent
self._save_error_info(shot_id, response, "EMPTY_PARTS")
return False
# Process image data normally
for part in candidate.content.parts:
if part.inline_data:
image_bytes = part.inline_data.data
import io
image = Image.open(io.BytesIO(image_bytes))
# Save directly without any processing
filename = f"shot_{shot_id}.png"
image.save(filename)
print(f"✅ Successfully saved: {filename}")
print(f" Image size: {image.size}")
print(f" Target aspect ratio: {self.aspect_ratio}")
saved = True
break
if saved:
break
if not saved:
print(f"⚠️ Shot {shot_id} did not generate valid image")
self._print_detailed_error(response, shot_id, "NO_VALID_IMAGE")
return False
return True
except Exception as e:
print(f"\n❌ Shot {shot_id} generation failed: {e}")
import traceback
traceback.print_exc()
# Save error information for intelligent review agent
self._save_error_info(shot_id, None, str(e), traceback.format_exc())
# Try to get more information from the exception
if "NoneType" in str(e) and "content.parts" in str(e):
print(f"\n{'='*70}")
print(f"💡 Error Analysis:")
print(f"{'='*70}")
print(f"Possible causes:")
print(f" 1. Gemini safety review blocked generation (content involves violence, pornography, etc.)")
print(f" 2. Gemini content policy violation (copyright, trademark issues)")
print(f" 3. Prompt too long or format issue")
print(f" 4. Reference images do not meet requirements")
print(f" 5. API quota or rate limiting issue")
print(f"\nSuggestions:")
print(f" - Check if script description contains sensitive content")
print(f" - Check if reference images are appropriate")
print(f" - Check if prompt length is reasonable")
print(f"{'='*70}")
return False
def _print_detailed_error(self, response, shot_id, error_type):
"""
Print detailed Gemini error information
Args:
response: Gemini API response object
shot_id: Shot ID
error_type: Error type identifier
"""
print(f"\n{'='*70}")
print(f"📋 Gemini Detailed Error Information - Shot {shot_id}")
print(f"{'='*70}")
print(f"Error type: {error_type}")
# 1. Check prompt_feedback (contains reasons for being blocked)
if hasattr(response, 'prompt_feedback') and response.prompt_feedback:
feedback = response.prompt_feedback
print(f"\n🔍 Prompt Feedback:")
# Check block reason
if hasattr(feedback, 'block_reason') and feedback.block_reason:
block_reason = feedback.block_reason
print(f" Block reason: {block_reason}")
if block_reason == "SAFETY":
print(f" ⚠️ Content violates safety policy")
print(f" Possible reasons: violence, gore, pornography, hate speech, etc.")
elif block_reason == "BLOCK_REASON_UNSPECIFIED":
print(f" ⚠️ Content blocked (reason unspecified)")
else:
print(f" ⚠️ Block reason: {block_reason}")
# Get safety ratings
if hasattr(feedback, 'safety_ratings') and feedback.safety_ratings:
print(f"\n🛡️ Safety ratings:")
for rating in feedback.safety_ratings:
category = rating.category if hasattr(rating, 'category') else "UNKNOWN"
probability = rating.probability if hasattr(rating, 'probability') else "UNKNOWN"
print(f" {category}: {probability}")
# Check if there is high risk
if 'HIGH' in str(probability) or 'MEDIUM' in str(probability):
print(f" ⚠️ Detected {probability} risk content")
# 2. Check candidates
if hasattr(response, 'candidates') and response.candidates:
print(f"\n📊 Candidates information:")
print(f" Candidate count: {len(response.candidates)}")
for idx, candidate in enumerate(response.candidates):
print(f"\n Candidate #{idx + 1}:")
# Check finish_reason
if hasattr(candidate, 'finish_reason') and candidate.finish_reason:
finish_reason = candidate.finish_reason
print(f" Finish reason: {finish_reason}")
if finish_reason == "FINISH_REASON_UNSPECIFIED":
print(f" ℹ️ Reason unspecified (possibly blocked by content policy)")
elif finish_reason == "RECITATION":
print(f" ⚠️ Possibly involves copyrighted content (citing protected content)")
elif finish_reason == "SAFETY":
print(f" ⚠️ Blocked for safety reasons")
elif finish_reason == "MAX_TOKENS":
print(f" ⚠️ Generated content too long")
elif finish_reason == "IMAGE_SAFETY":
print(f" ⚠️ Image safety review failed")
# Check content status
if hasattr(candidate, 'content'):
if candidate.content:
if hasattr(candidate.content, 'parts') and candidate.content.parts:
print(f" Content Parts: {len(candidate.content.parts)} parts")
else:
print(f" Content Parts: Empty or does not exist")
else:
print(f" Content: None (content is empty)")
else:
print(f" Content: No content attribute")
# Check candidate-level safety ratings
if hasattr(candidate, 'safety_ratings') and candidate.safety_ratings:
print(f" Safety ratings:")
for rating in candidate.safety_ratings:
category = rating.category if hasattr(rating, 'category') else "UNKNOWN"
probability = rating.probability if hasattr(rating, 'probability') else "UNKNOWN"
print(f" {category}: {probability}")
# 3. Try to get response text (if any)
if hasattr(response, 'text') and response.text:
print(f"\n📝 Response text (first 500 characters):")
print(f" {response.text[:500]}")
print(f"{'='*70}")
print(f"💡 Suggestion: Modify prompt or reference images based on the above error information")
print(f"{'='*70}")
def _save_error_info(self, shot_id, response=None, error_type="", error_trace=""):
"""
Save detailed error information (for intelligent review agent)
Args:
shot_id: Shot ID
response: Gemini response object (if any)
error_type: Error type
error_trace: Error stack trace
"""
error_info = {
"shot_id": shot_id,
"timestamp": datetime.now().isoformat(),
"error_type": error_type,
"trace": error_trace
}
# If response object exists, extract detailed information
if response:
error_info["response_details"] = self._extract_response_details(response)
# Save to instance variable
self.last_errors[shot_id] = error_info
def _extract_response_details(self, response):
"""
Extract detailed information from response object (for intelligent review agent)
Args:
response: Gemini response object
Returns:
Dictionary containing detailed information
"""
details = {}
# Extract prompt_feedback
if hasattr(response, 'prompt_feedback') and response.prompt_feedback:
feedback = response.prompt_feedback
details["prompt_feedback"] = {
"block_reason": str(feedback.block_reason) if hasattr(feedback, 'block_reason') else None,
"safety_ratings": [
{
"category": str(r.category),
"probability": str(r.probability)
}
for r in feedback.safety_ratings
] if hasattr(feedback, 'safety_ratings') else []
}
# Extract candidates information
if hasattr(response, 'candidates') and response.candidates:
details["candidates"] = []
for idx, candidate in enumerate(response.candidates):
candidate_info = {
"index": idx,
"finish_reason": str(candidate.finish_reason) if hasattr(candidate, 'finish_reason') else None
}
# Check content and parts
if hasattr(candidate, 'content'):
if candidate.content:
if hasattr(candidate.content, 'parts') and candidate.content.parts:
candidate_info["has_content"] = True
candidate_info["parts_count"] = len(candidate.content.parts)
else:
candidate_info["has_content"] = True
candidate_info["parts_empty"] = True
else:
candidate_info["has_content"] = False
else:
candidate_info["has_content"] = False
# safety_ratings
if hasattr(candidate, 'safety_ratings') and candidate.safety_ratings:
candidate_info["safety_ratings"] = [
{
"category": str(r.category),
"probability": str(r.probability)
}
for r in candidate.safety_ratings
]
details["candidates"].append(candidate_info)
return details
def get_last_error_info(self, shot_id):
"""
Get the last error information for specified shot (for intelligent review agent)
Args:
shot_id: Shot ID
Returns:
Error information dictionary, or None if not exists
"""
return self.last_errors.get(shot_id)
def generate_video_for_shot(self, shot_id, duration_seconds=None):
"""
Generate video clip for a single shot (using Gemini Veo 3)
Args:
shot_id: Shot ID
duration_seconds: Video duration in seconds, defaults to duration in shot data
Returns:
True on success, False on failure
"""
# Get memory package
if self.memory_agent:
memory_package = self.memory_agent.get_shot_memory(shot_id)
if not memory_package:
print(f"❌ Memory package for Shot {shot_id} does not exist")
return False
use_memory = True
else:
print(f"❌ Video generation requires Memory Allocation Agent support")
return False
print(f"\n{'='*70}")
print(f"Generating video clip for Shot {shot_id}")
print(f"{'='*70}")
# Get video duration
if duration_seconds is None:
duration_seconds = memory_package.get("narrative", {}).get("duration", 8)
# Veo 3 API only supports integers 4, 6, 8 seconds
# Convert float to nearest valid integer
if isinstance(duration_seconds, float):
# Round to nearest integer
rounded_duration = round(duration_seconds)
# Ensure within valid range (4, 6, 8)
if rounded_duration <= 4:
duration_seconds = 4
elif rounded_duration <= 6:
duration_seconds = 6
else:
duration_seconds = 8
print(f"⚠️ Video duration adjusted to: {duration_seconds} seconds (original: {rounded_duration} seconds)")
# Ensure it's an integer
duration_seconds = int(duration_seconds)
print(f"Video duration: {duration_seconds} seconds")
print(f"Generating using memory package")
print(f"Scene: {memory_package.get('major_scene', 'N/A')}")
print(f"Characters: {', '.join(memory_package.get('characters', []))}")
# Prepare reference images
ref_images = []
# 1. Load environment reference image
env_ref = memory_package.get("environment_ref", "")
if env_ref and os.path.exists(env_ref):
try:
ref_images.append(Image.open(env_ref))
print(f"✅ Loaded environment reference: {os.path.basename(env_ref)}")
except Exception as e:
print(f"⚠️ Unable to load environment reference: {e}")
# 2. Load foreign face reference images
for char_ref in memory_package.get("character_refs", []):
if os.path.exists(char_ref):
try:
ref_images.append(Image.open(char_ref))
print(f"✅ Loaded foreign face: {os.path.basename(char_ref)}")
except Exception as e:
print(f"⚠️ Unable to load foreign face: {e}")
# 3. Load clothing reference images
for clothing_ref in memory_package.get("clothing_refs", []):
if os.path.exists(clothing_ref):
try:
ref_images.append(Image.open(clothing_ref))
print(f"✅ Loaded clothing reference: {os.path.basename(clothing_ref)}")
except Exception as e:
print(f"⚠️ Unable to load clothing reference: {e}")
print(f"Total reference images: {len(ref_images)}")
# Build video generation prompt
narrative = memory_package.get("narrative", {})
visual_dna = memory_package.get("visual_dna", {})
character_mappings = memory_package.get("character_mappings", {})
# Video generation prompt - emphasize action and camera movement
video_prompt = f"""Generate a {duration_seconds} second cinematic video clip with the following specifications:
**NARRATIVE CONTENT**:
- Action/Subject Movement: {narrative.get('action', '')}
- Camera Movement: {narrative.get('camera_movement', '')}
**CHARACTER MAPPINGS**:
"""
for char_id, char_info in character_mappings.items():
target_name = char_info.get('target_name', 'Unknown')
video_prompt += f"\n**{char_id}** → {target_name}\n"
video_prompt += f" - Clothing: {char_info.get('clothing', 'N/A')}\n"
video_prompt += f"""
**VISUAL DNA**:
- Lighting: {visual_dna.get('lighting', '')}
- Color Grading: {visual_dna.get('color', '')}
- Mood/Atmosphere: {visual_dna.get('mood', '')}
- Shot Size: {visual_dna.get('shot_size', '')}
- Camera Angle: {visual_dna.get('camera_angle', '')}
- Camera Height: {visual_dna.get('camera_height', '')}
- Focal Length: {visual_dna.get('focal_length', '')}
- Depth of Field: {visual_dna.get('depth_of_field', '')}
**TECHNICAL SPECS**:
- Aspect Ratio: {self.aspect_ratio} ({self.target_width}x{self.target_height})
- Duration: {duration_seconds} seconds
- Frame Rate: 30 fps
- Style: Photorealistic cinematic video
**CRITICAL REQUIREMENTS**: