-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathproject_code.py
More file actions
280 lines (230 loc) · 10 KB
/
project_code.py
File metadata and controls
280 lines (230 loc) · 10 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
import os
import tkinter as tk
from tkinter import simpledialog, messagebox, filedialog, scrolledtext
import pywhatkit
import cv2
from cvzone.HandTrackingModule import HandDetector
import geocoder
import boto3
import webbrowser
import requests
import cohere
import io
import base64
# Initialize Boto3 client for EC2 and S3
ec2_client = boto3.client('ec2')
s3_client = boto3.client('s3')
# Initialize Cohere client
COHERE_API_KEY = #yourkey
cohere_client = cohere.Client(COHERE_API_KEY)
# Initialize SerpAPI key
SERPAPI_API_KEY = '#yourkey'
# Initialize Google Vision API key
VISION_API_KEY = '#yourapikey'
def describe_image(image_path):
try:
# Load the image into memory
with io.open(image_path, 'rb') as image_file:
content = image_file.read()
# Encode image to base64
encoded_image = base64.b64encode(content).decode('utf-8')
# Prepare the request payload
request_payload = {
"requests": [
{
"image": {
"content": encoded_image
},
"features": [
{
"type": "LABEL_DETECTION",
"maxResults": 10 # Adjust the number of results as needed
}
]
}
]
}
# Make the request to the Vision API
url = f'https://vision.googleapis.com/v1/images:annotate?key={VISION_API_KEY}'
response = requests.post(url, json=request_payload)
# Check if the request was successful
if response.status_code != 200 or 'error' in response.json():
raise Exception(f"Error {response.status_code}: {response.json().get('error', {}).get('message', 'Unknown error')}")
# Parse the response
label_annotations = response.json()['responses'][0].get('labelAnnotations', [])
# Check if any labels were found
if not label_annotations:
return "No labels detected."
# Collect the labels
labels = [label['description'] for label in label_annotations]
prompt = f"Based on the image labels: {' '.join(labels)}\nGenerate a summary:"
# Generate text using Cohere
summary = generate_text_with_cohere(prompt)
return summary
except Exception as e:
return str(e)
def select_image():
# Open file dialog to select an image file
image_path = filedialog.askopenfilename(filetypes=[("Image Files", ".jpg;.jpeg;*.png")])
if image_path:
result = describe_image(image_path)
result_text.delete(1.0, tk.END)
result_text.insert(tk.END, result)
def send_email():
sender_email = os.environ.get('SENDER_EMAIL')
password = os.environ.get('EMAIL_PASSWORD')
if not sender_email or not password:
messagebox.showerror("Configuration Error", "SENDER_EMAIL and EMAIL_PASSWORD environment variables must be set.")
return
receiver_email = simpledialog.askstring("Receiver Email", "Enter receiver's email:")
message = simpledialog.askstring("Message", "Enter your message:")
subject = 'DOSS Technical Training Project'
try:
pywhatkit.send_mail(sender_email, password, subject, message, receiver_email)
messagebox.showinfo("Success", "Email sent successfully!")
except Exception as e:
messagebox.showerror("Error", f"Failed to send email: {str(e)}")
def get_location_info():
coordinate = geocoder.ip('me').latlng
city = geocoder.osm(coordinate, method='reverse').city
nearby_places = geocoder.osm(coordinate, method='reverse').json['address']
location_info = f"City: {city}\nNearby Places: {nearby_places}"
messagebox.showinfo("Location Info", location_info)
def capture_hand_gestures():
model = HandDetector()
cap = cv2.VideoCapture(0)
while True:
status, photo = cap.read()
hand = model.findHands(photo)
cv2.imshow("Hand Gestures", photo)
if cv2.waitKey(10) == 13:
break
cv2.destroyAllWindows()
cap.release()
def list_ec2_instances():
response = ec2_client.describe_instances()
instance_ids = []
for reservation in response['Reservations']:
for instance in reservation['Instances']:
instance_id = instance['InstanceId']
state = instance['State']['Name']
print(f"Instance ID: {instance_id}, State: {state}")
instance_ids.append(instance_id)
instance_ids_str = '\n'.join(instance_ids)
messagebox.showinfo("EC2 Instance IDs", f"Instance IDs:\n{instance_ids_str}")
def stop_ec2_instance():
response = ec2_client.describe_instances(Filters=[{'Name': 'instance-state-name', 'Values': ['running']}])
instances = response['Reservations']
if not instances:
messagebox.showinfo("No Running Instances", "There are no running instances to stop.")
return
instance_number = simpledialog.askinteger("Select Instance", "Enter 1 to stop the instance:") - 1
if instance_number is None or instance_number < 0 or instance_number >= len(instances):
messagebox.showerror("Invalid Selection", "Please select a valid instance number.")
return
selected_instance_id = instances[instance_number]['Instances'][0]['InstanceId']
ec2_client.stop_instances(InstanceIds=[selected_instance_id])
messagebox.showinfo("Instance Stopped", f"Stopping instance {selected_instance_id}")
def launch_instance():
response = ec2_client.run_instances(
ImageId='ami-09298640a92b2d12c', # Replace with your desired AMI ID
InstanceType='t2.micro', # Replace with your desired instance type
MinCount=1,
MaxCount=1
)
instance_id = response['Instances'][0]['InstanceId']
print(f"New instance launched with ID: {instance_id}")
def open_url():
webbrowser.open("http://3.109.103.64:10001/")
def upload_to_s3():
file_path = filedialog.askopenfilename()
if file_path:
key = simpledialog.askstring("Key", "Enter the key (name) for the file in the bucket:")
try:
s3_client.upload_file(file_path, "dossttpprojectbucket", key)
messagebox.showinfo("Success", "File uploaded successfully!")
except Exception as e:
messagebox.showerror("Error", f"Failed to upload file: {str(e)}")
def download_from_s3():
key = simpledialog.askstring("Key", "Enter the key (name) of the file to download from the bucket:")
if key:
file_path = filedialog.asksaveasfilename(initialfile=key.split('/')[-1])
if file_path:
try:
s3_client.download_file("dossttpprojectbucket", key, file_path)
messagebox.showinfo("Success", "File downloaded successfully!")
except Exception as e:
messagebox.showerror("Error", f"Failed to download file: {str(e)}")
def delete_from_s3():
key = simpledialog.askstring("Key", "Enter the key (name) of the file to delete from the bucket:")
if key:
try:
s3_client.delete_object(Bucket="dossttpprojectbucket", Key=key)
messagebox.showinfo("Success", "File deleted successfully!")
except Exception as e:
messagebox.showerror("Error", f"Failed to delete file: {str(e)}")
def search_serpapi(query):
params = {
"engine": "google",
"q": query,
"api_key": SERPAPI_API_KEY
}
response = requests.get("https://serpapi.com/search", params=params)
return response.json()
def generate_text_with_cohere(prompt):
response = cohere_client.generate(
model='command-light-nightly',
prompt=prompt,
max_tokens=50,
temperature=0.75,
)
return response.generations[0].text.strip()
def search_and_generate(query):
search_results = search_serpapi(query)
search_snippets = [result.get('snippet', '') for result in search_results.get('organic_results', [])]
combined_snippets = "\n".join(search_snippets)
cohere_prompt = f"Based on the following information, write a summary:\n\n{combined_snippets}"
summary = generate_text_with_cohere(cohere_prompt)
return summary
def chatbot():
chat_window = tk.Toplevel(root)
chat_window.title("Chatbot")
chat_window.configure(bg='lightblue')
def get_response():
query = query_entry.get()
summary = search_and_generate(query)
response_area.insert(tk.END, f"Q: {query}\nA: {summary}\n\n")
query_entry.delete(0, tk.END)
query_label = tk.Label(chat_window, text="Enter your query:", bg='lightblue')
query_label.pack(pady=5)
query_entry = tk.Entry(chat_window, width=50)
query_entry.pack(pady=5)
response_area = scrolledtext.ScrolledText(chat_window, wrap=tk.WORD, width=60, height=20)
response_area.pack(pady=5)
submit_button = tk.Button(chat_window, text="Get Response", command=get_response, bg='blue', fg='white')
submit_button.pack(pady=5)
# Main Window
root = tk.Tk()
root.title("Main Window")
root.configure(bg='lightgrey')
buttons = [
("Send Email", send_email),
("Get Location Info", get_location_info),
("Capture Hand Gestures", capture_hand_gestures),
("List EC2 Instances", list_ec2_instances),
("Stop EC2 Instance", stop_ec2_instance),
("Launch EC2 Instance", launch_instance),
("Open URL", open_url),
("Upload to S3", upload_to_s3),
("Download from S3", download_from_s3),
("Delete from S3", delete_from_s3),
("Chatbot", chatbot),
("Image Detection", select_image) # Add the image detection button
]
# Create a text widget to display the results of image detection
result_text = tk.Text(root, width=60, height=20, wrap=tk.WORD)
result_text.pack(pady=5)
for btn_text, command in buttons:
btn = tk.Button(root, text=btn_text, command=command, bg='blue', fg='white')
btn.pack(pady=5, padx=10)
root.mainloop()