-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhackathon_demo.py
More file actions
135 lines (110 loc) Β· 5.15 KB
/
Copy pathhackathon_demo.py
File metadata and controls
135 lines (110 loc) Β· 5.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/env python3
"""
Hackathon Demo Script
Focused demonstration for judges
"""
import asyncio
import time
import sys
from pathlib import Path
# Add src to path
sys.path.append(str(Path(__file__).parent / "src"))
def print_header(title):
"""Print a formatted header"""
print("\n" + "="*70)
print(f"π {title}")
print("="*70)
def print_section(title):
"""Print a formatted section"""
print(f"\nπ {title}")
print("-" * 50)
async def hackathon_demo():
"""Main hackathon demonstration"""
print_header("Hackathon Demo: Vertex AI Agent Builder System")
print("π― Advanced AI-powered startup analysis platform")
try:
# Import system
from src.agents.vertex_ai_orchestrator import VertexAIOrchestrator
print_section("π System Overview")
print("β’ 5 specialized AI agents built with Vertex AI Agent Builder")
print("β’ Google ADK orchestration for workflow management")
print("β’ Real-time updates via Firebase integration")
print("β’ Production-ready error handling and monitoring")
# Initialize system
print_section("π§ Initializing System")
orchestrator = VertexAIOrchestrator()
print("β
Vertex AI Agent Builder system initialized")
# Health check
health_status = await orchestrator.health_check()
print(f"β
System health: {health_status['overall_status']}")
# Demo startup analysis
print_section("π Live Startup Analysis Demo")
demo_startup = {
"company_name": "TechFlow Solutions",
"business_description": "AI-powered workflow automation platform for small businesses",
"industry": "SaaS",
"stage": "Series A",
"founder_name": "Sarah Johnson",
"founder_background": "Former Google engineer with 10 years experience in AI/ML",
"website": "https://techflow.com",
"additional_info": "Raised $5M in seed funding, 100+ customers, $50K MRR"
}
print(f"Analyzing: {demo_startup['company_name']}")
print(f"Business: {demo_startup['business_description']}")
print(f"Founder: {demo_startup['founder_name']}")
# Execute analysis
print("\nπ Executing complete analysis workflow...")
start_time = time.time()
results = await orchestrator.analyze_startup(demo_startup, "hackathon_demo")
end_time = time.time()
total_time = end_time - start_time
print_section("π Analysis Results")
print(f"β±οΈ Total analysis time: {total_time:.2f} seconds")
print(f"π Agents executed: {len(results)}")
# Show results
for agent_name, result in results.items():
status = result.get("status", "unknown")
processing_time = result.get("processing_time", 0)
print(f" {agent_name}: {status} ({processing_time:.2f}s)")
# Show sample analysis
if "data_collection" in results:
data_analysis = results["data_collection"].get("analysis", "")
if data_analysis:
print_section("π Sample Analysis Output")
print(data_analysis[:400] + "..." if len(data_analysis) > 400 else data_analysis)
# Show advanced features
print_section("π― Advanced Features Demonstrated")
print("β
Vertex AI Agent Builder - Professional AI agent creation")
print("β
Google ADK Orchestration - Advanced workflow management")
print("β
Real-time Updates - Live progress tracking via Firebase")
print("β
Error Handling - Robust retry logic and monitoring")
print("β
Parallel Processing - Multiple agents working simultaneously")
print("β
Production Ready - Enterprise-grade architecture")
# Show hackathon advantages
print_section("π Hackathon Advantages")
print("β’ Demonstrates cutting-edge Google AI capabilities")
print("β’ Shows professional, production-ready system")
print("β’ Real-time collaboration and progress tracking")
print("β’ Advanced orchestration and workflow management")
print("β’ Comprehensive error handling and monitoring")
print("β’ Scalable architecture for real-world use")
print_section("π Demo Complete")
print("β
Vertex AI Agent Builder system demonstrated successfully")
print("π Ready for hackathon competition")
print("π Production-ready AI agent system")
return True
except Exception as e:
print(f"β Demo failed: {str(e)}")
print("π§ Please check your configuration and try again")
return False
async def main():
"""Main function"""
success = await hackathon_demo()
if success:
print("\nπ― Demo completed successfully!")
print("π Your Vertex AI Agent Builder system is ready for the hackathon!")
else:
print("\nβ Demo failed. Please check your setup.")
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())