-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdirect_upload_knowledge.py
150 lines (126 loc) · 4.66 KB
/
direct_upload_knowledge.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import json
import logging
from datetime import datetime
from clickhouse_connect import get_client
import openai
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configure OpenAI
openai.api_type = "azure"
openai.api_version = "2024-02-15-preview"
openai.api_base = os.getenv("AZURE_OPENAI_ENDPOINT")
openai.api_key = os.getenv("AZURE_OPENAI_KEY")
OPENAI_DEPLOYMENT = os.getenv("AZURE_OPENAI_DEPLOYMENT", "gpt-35-turbo")
def get_clickhouse_client():
"""Get configured ClickHouse client"""
return get_client(
host='20.235.209.193',
port=8123,
username='admin',
password='2286vdaC8LN94RmdTrctyXZPavHcx8',
database='unify'
)
def initialize_storage(client):
"""Initialize the knowledge base storage in ClickHouse"""
try:
# Create table for knowledge base if it doesn't exist
client.command("""
CREATE TABLE IF NOT EXISTS knowledge_base (
timestamp DateTime,
knowledge_id String,
raw_knowledge String,
analysis String,
patterns String
) ENGINE = MergeTree()
ORDER BY (timestamp, knowledge_id)
""")
logger.info("Knowledge base storage initialized successfully")
except Exception as e:
logger.error(f"Error initializing storage: {str(e)}")
raise
def analyze_knowledge(knowledge_data):
"""Analyze knowledge using Azure OpenAI"""
system_prompt = """Analyze the provided knowledge data and extract:
1. Key patterns and relationships
2. Business rules and constraints
3. Common data transformations
4. Relevant metrics and KPIs
"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Analyze this knowledge: {json.dumps(knowledge_data)}"}
]
response = openai.ChatCompletion.create(
engine=OPENAI_DEPLOYMENT,
messages=messages,
temperature=0.3,
max_tokens=1000
)
return response.choices[0].message.content
def extract_patterns(analysis):
"""Extract structured patterns from the analysis"""
messages = [
{"role": "system", "content": "Convert the analysis into a structured JSON format with patterns, rules, and metrics."},
{"role": "user", "content": analysis}
]
response = openai.ChatCompletion.create(
engine=OPENAI_DEPLOYMENT,
messages=messages,
temperature=0.1,
max_tokens=1000
)
return response.choices[0].message.content
def main():
try:
# Initialize ClickHouse client
client = get_clickhouse_client()
# Initialize storage
initialize_storage(client)
# Read knowledge file
with open('product_demand_knowledge.json', 'r') as f:
knowledge_data = json.load(f)
# Generate unique ID
timestamp = datetime.now()
knowledge_id = f"k_{timestamp.strftime('%Y%m%d_%H%M%S')}"
# Analyze knowledge
logger.info("Analyzing knowledge...")
analysis = analyze_knowledge(knowledge_data)
# Extract patterns
logger.info("Extracting patterns...")
patterns = extract_patterns(analysis)
# Store in ClickHouse
logger.info("Storing in ClickHouse...")
client.command("""
INSERT INTO knowledge_base (
timestamp, knowledge_id, raw_knowledge, analysis, patterns
) VALUES (
%(timestamp)s, %(knowledge_id)s, %(raw_knowledge)s, %(analysis)s, %(patterns)s
)
""", parameters={
'timestamp': timestamp,
'knowledge_id': knowledge_id,
'raw_knowledge': json.dumps(knowledge_data),
'analysis': analysis,
'patterns': patterns
})
logger.info(f"Successfully uploaded knowledge with ID: {knowledge_id}")
# Verify storage
result = client.query("""
SELECT timestamp, knowledge_id, analysis
FROM knowledge_base
WHERE knowledge_id = %(knowledge_id)s
LIMIT 1
""", parameters={'knowledge_id': knowledge_id})
if result.result_rows:
logger.info("Successfully verified knowledge storage")
logger.info(f"Stored analysis: {result.result_rows[0][2][:200]}...")
except Exception as e:
logger.error(f"Error uploading knowledge: {str(e)}")
raise
if __name__ == "__main__":
main()