-
Notifications
You must be signed in to change notification settings - Fork 26
Expand file tree
/
Copy pathswagger_generation_cli.py
More file actions
182 lines (159 loc) · 8.73 KB
/
Copy pathswagger_generation_cli.py
File metadata and controls
182 lines (159 loc) · 8.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import json
import os
import sys
import time
import traceback
import requests
from user_config import UserConfigurations
from swagger_generator import SwaggerGeneration
from file_scanner import FileScanner
from framework_identifier import FrameworkIdentifier
from endpoints_extractor import EndpointsExtractor
from faiss_index_generator import GenerateFaissIndex
from nodejs_pipeline.run_swagger_generation import run_swagger_generation as nodejs_swagger_generator
from python_pipeline.run_swagger_generation import run_swagger_generation as python_swagger_generator
from rails_pipeline.run_swagger_generation import run_swagger_generation as ruby_on_rails_swagger_generator
from golang_pipeline.run_swagger_generation import run_swagger_generation as golang_swagger_generator
from utils import get_output_filepath
from telemetry_posthog import PostHogTelemetry
class RunSwagger:
def __init__(self, project_api_key, openai_api_key, ai_chat_id, is_mcp):
self.ai_chat_id = ai_chat_id
self.user_configurations = UserConfigurations(project_api_key, openai_api_key, ai_chat_id, is_mcp)
self.user_config = self.user_configurations.load_user_config()
self.framework_identifier = FrameworkIdentifier()
self.file_scanner = FileScanner()
self.endpoints_extractor = EndpointsExtractor()
self.faiss_index = GenerateFaissIndex()
self.swagger_generator = SwaggerGeneration()
self.telemetry = PostHogTelemetry.from_env()
def run_python_nodejs_ruby(self, framework):
swagger = None
try:
if framework == "django" or framework == "flask" or framework == "fastapi":
swagger = python_swagger_generator(self.user_config['api_host'])
elif framework == "express" or framework == "nestjs":
swagger = nodejs_swagger_generator(self.user_config['api_host'])
elif framework == "ruby_on_rails":
swagger = ruby_on_rails_swagger_generator(self.user_config['api_host'])
elif framework == "golang":
swagger = golang_swagger_generator(self.user_config['api_host'])
except Exception as ex:
traceback.print_exc()
print("Fallback to old procedure")
return swagger
def _resolve_ai_chat_id(self, ai_chat_id):
candidate = (ai_chat_id or "").strip()
if candidate and candidate.lower() != "null":
return candidate
return self.user_config.get("ai_chat_id", "null")
def run(self, ai_chat_id=None):
resolved_ai_chat_id = self._resolve_ai_chat_id(ai_chat_id if ai_chat_id is not None else self.ai_chat_id)
telemetry = self.telemetry
run_id = telemetry.new_run_id()
t0 = time.time()
telemetry.capture("apimesh_run_started", {
"run_id": run_id,
})
file_paths = []
swagger = None
all_endpoints = []
framework = ""
try:
with telemetry.stage(run_id, "scan_repo"):
file_paths = self.file_scanner.get_all_file_paths()
print("\n***************************************************")
with telemetry.stage(run_id, "detect_framework"):
try:
if self.user_config.get('framework', None):
print(f"Using Existing Framework - {self.user_config['framework']}")
framework = self.user_config.get('framework', "")
else:
print("Started framework identification")
framework = self.framework_identifier.get_framework(file_paths)['framework']
self.user_config['framework'] = framework
self.user_configurations.save_user_config(self.user_config)
except Exception as ex:
msg = str(ex)
lowered = msg.lower()
if "insufficient_quota" in lowered or "quota" in lowered:
print("OpenAI quota exceeded. Please check your plan/billing and retry after adding credits.")
else:
print("We do not support this framework currently. Please contact QodexAI support.")
raise
print(f"completed framework identification - {framework}")
print("\n***************************************************")
print("Started finding files related to API information")
try:
with telemetry.stage(run_id, "extract_endpoints"):
swagger = self.run_python_nodejs_ruby(framework)
if swagger:
print("Completed finding files related to API information")
else:
api_files = self.file_scanner.find_api_files(file_paths, framework)
print("Completed finding files related to API information")
for filePath in api_files:
endpoints = self.endpoints_extractor.extract_endpoints_with_gpt(filePath, framework)
all_endpoints.extend(endpoints)
with telemetry.stage(run_id, "generate_swagger", {"fast_path": bool(swagger)}):
if not swagger:
print("\n***************************************************")
print("Started creating faiss index for all files")
faiss_vector = self.faiss_index.create_faiss_index(file_paths, framework)
print("Completed creating faiss index for all files")
print("Fetching authentication related information")
authentication_information = self.faiss_index.get_authentication_related_information(faiss_vector)
print("Completed Fetching authentication related information")
endpoint_related_information = self.endpoints_extractor.get_endpoint_related_information(faiss_vector, all_endpoints)
swagger = self.swagger_generator.create_swagger_json(endpoint_related_information, authentication_information, framework, self.user_config['api_host'])
except Exception:
print("Oops! looks like we encountered an issue. Please try after some time.")
raise
with telemetry.stage(run_id, "render_html"):
output_filepath = get_output_filepath()
self.swagger_generator.save_swagger_json(swagger, output_filepath)
telemetry.capture("apimesh_run_completed", {
"run_id": run_id,
"duration_ms": int((time.time() - t0) * 1000),
"success": True,
})
except Exception as e:
telemetry.capture("apimesh_run_failed", {
"run_id": run_id,
"duration_ms": int((time.time() - t0) * 1000),
"success": False,
"error_type": type(e).__name__,
})
raise
#self.upload_swagger_to_qodex(resolved_ai_chat_id)
return
def upload_swagger_to_qodex(self, ai_chat_id):
qodex_api_key = self.user_config['qodex_api_key']
if qodex_api_key:
print("Uploading swagger to Qodex.AI")
url = "https://api.app.qodex.ai/api/v1/collection_imports/create_with_json"
output_filepath = get_output_filepath()
with open(output_filepath, "r") as file:
swagger_doc = json.load(file)
payload = {
"api_key": qodex_api_key,
"swagger_doc": swagger_doc,
"ai_chat_id": ai_chat_id
}
response = requests.post(url, json=payload)
# Check the response
if response.status_code == 200 or response.status_code == 201:
print("Success:", response.json()) # Or response.text for plain text responses
print("Swagger successfully uploaded to Qodex AI. Please refresh your tab.")
print("We highly recommend you to review the apis before generating test scenarios.")
if str(ai_chat_id) != 'null':
print("Open the following link in your browser or refresh the existing open page to continue further")
print(f"https://app.qodex.ai/ai-agent?chatId={ai_chat_id}")
else:
print(f"Failed with status code {response.status_code}: {response.text}")
return
openai_api_key = sys.argv[1] if len(sys.argv) > 1 else ""
project_api_key = sys.argv[2] if len(sys.argv) > 2 else ""
ai_chat_id = sys.argv[3] if len(sys.argv) > 3 else ""
is_mcp = sys.argv[4] if len(sys.argv) > 4 else False
RunSwagger(project_api_key, openai_api_key, ai_chat_id, is_mcp).run(ai_chat_id)