-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtest_integration.py
More file actions
123 lines (103 loc) Β· 4.78 KB
/
test_integration.py
File metadata and controls
123 lines (103 loc) Β· 4.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#!/usr/bin/env python3
"""
Integration test for prayer categorization with actual database
"""
from sqlmodel import Session, select
from models import engine, Prayer, User
from app_helpers.services.archive_first_service import submit_prayer_archive_first
from datetime import datetime
def test_full_integration():
"""Test full prayer submission with categorization"""
print("π§ͺ Testing Full Prayer Categorization Integration")
print("=" * 60)
with Session(engine) as session:
# Create test user if it doesn't exist
test_user = session.exec(
select(User).where(User.display_name == "TestUser")
).first()
if not test_user:
test_user = User(
display_name="TestUser",
created_at=datetime.now()
)
session.add(test_user)
session.commit()
session.refresh(test_user)
print("β
Created test user")
else:
print("β
Using existing test user")
# Test prayer submission with categorization
test_cases = [
{
'text': 'Please pray for my grandmother who is recovering from heart surgery',
'expected_category': 'health'
},
{
'text': 'I have a big presentation at work tomorrow, please pray for success',
'expected_category': 'work'
}
]
for i, test_case in enumerate(test_cases):
print(f"\nπ Test {i+1}: {test_case['text'][:50]}...")
# Submit prayer using archive-first approach with categorization
prayer = submit_prayer_archive_first(
text=test_case['text'],
author=test_user,
generated_prayer=f"Heavenly Father, we lift up this request... (Test {i+1})"
)
print(f" Prayer ID: {prayer.id}")
print(f" Safety Score: {prayer.safety_score}")
print(f" Subject Category: {prayer.subject_category} (expected: {test_case['expected_category']})")
print(f" Specificity: {prayer.specificity_type}")
print(f" Method: {prayer.categorization_method}")
print(f" Archive Path: {prayer.text_file_path}")
# Verify categorization is reasonable
assert prayer.safety_score >= 0.0 and prayer.safety_score <= 1.0
assert prayer.subject_category in ['health', 'work', 'relationships', 'spiritual', 'provision', 'protection', 'guidance', 'gratitude', 'transitions', 'crisis', 'general']
assert prayer.specificity_type in ['specific', 'general', 'mixed', 'unknown']
# Check that archive file was created
if prayer.text_file_path:
import os
assert os.path.exists(prayer.text_file_path), f"Archive file not found: {prayer.text_file_path}"
# Read archive and verify categorization metadata is present
with open(prayer.text_file_path, 'r') as f:
archive_content = f.read()
assert 'Safety Score:' in archive_content
assert 'Category:' in archive_content
assert 'Specificity:' in archive_content
print(" β
Archive contains categorization metadata")
print(" β
Test passed")
print(f"\nπ Full integration test completed successfully!")
print(" - Prayers created with categorization")
print(" - Database fields populated correctly")
print(" - Archive files contain categorization metadata")
print(" - Archive-first architecture working properly")
def test_query_performance():
"""Test that categorization queries work efficiently"""
print(f"\nπ Testing Query Performance")
print("=" * 40)
with Session(engine) as session:
# Test basic categorization queries
health_prayers = session.exec(
select(Prayer).where(Prayer.subject_category == 'health')
).all()
high_safety_prayers = session.exec(
select(Prayer).where(Prayer.safety_score >= 0.9)
).all()
specific_prayers = session.exec(
select(Prayer).where(Prayer.specificity_type == 'specific')
).all()
print(f" Health prayers: {len(health_prayers)}")
print(f" High safety prayers (β₯0.9): {len(high_safety_prayers)}")
print(f" Specific prayers: {len(specific_prayers)}")
print(" β
All categorization queries executed successfully")
if __name__ == '__main__':
try:
test_full_integration()
test_query_performance()
print("\nπ All integration tests passed! The categorization system is fully operational.")
except Exception as e:
print(f"\nβ Integration test failed: {e}")
import traceback
traceback.print_exc()
exit(1)