forked from ILikeAI/AlwaysReddy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchat_completions.py
137 lines (112 loc) · 5.66 KB
/
chat_completions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import config
import re
from utils import to_clipboard
class CompletionManager:
def __init__(self, TTS_client,parent_client):
"""Initialize the CompletionManager with the TTS client."""
self.client = None
self.setup_client()
self.model = None
self.TTS_client = TTS_client
self.parent_client = parent_client
self.full_response = ''
def setup_client(self):
"""Instantiates the appropriate AI client based on configuration file."""
if config.COMPLETIONS_API == "openai":
from llm_apis.openai_api import OpenAIClient
self.client = OpenAIClient()
elif config.COMPLETIONS_API == "together":
from llm_apis.togetherai_api import TogetherAIClient
self.client = TogetherAIClient()
elif config.COMPLETIONS_API == "anthropic":
from llm_apis.anthropic_api import AnthropicClient
self.client = AnthropicClient()
elif config.COMPLETIONS_API == "lm_studio":
from llm_apis.lm_studio import LM_StudioClient
if hasattr(config, 'LM_STUDIO_API_BASE_URL'):
self.client = LM_StudioClient(base_url=config.LM_STUDIO_API_BASE_URL)
else:
print("No LM_STUDIO_API_BASE_URL found in config.py, using default")
self.client = LM_StudioClient()
elif config.COMPLETIONS_API == "ollama":
from llm_apis.ollama_api import OllamaClient
if hasattr(config, 'OLLAMA_API_BASE_URL'):
self.client = OllamaClient(base_url=config.OLLAMA_API_BASE_URL)
else:
print("No OLLAMA_API_BASE_URL found in config.py, using default")
self.client = OllamaClient()
else:
raise ValueError("Unsupported completion API service configured")
def get_completion(self, messages, model, **kwargs):
"""Get completion from the selected AI client and streams sentences into the TTS client.
Args:
messages (list): List of messages.
model (str): Model for completion.
**kwargs: Additional keyword arguments.
Returns:
str: Response generated by the AI client, or None if an error occurs.
"""
try:
stream = self.client.stream_completion(messages, model, **kwargs)
for type, content in self.stream_sentences_from_chunks(stream, clip_start_marker=config.START_SEQ, clip_end_marker=config.END_SEQ):
#if stop stream is set to True, break the loop
if self.parent_client.stop_response:
break
if type == "sentence":
self.TTS_client.run_tts(content)
elif type == "clipboard_text":
to_clipboard(content)
return self.full_response
except Exception as e:
print(f"An error occurred while getting completion: {e}")
return None
def stream_sentences_from_chunks(self, chunks_stream, clip_start_marker="-CLIPSTART-", clip_end_marker="-CLIPEND-"):
"""Takes in audio chunks and returns sentences or chunks of text for the clipboard, as well as the full unmodified text stream.
Args:
chunks_stream: Stream of chunks.
clip_start_marker (str): Start marker for clipboard text.
clip_end_marker (str): End marker for clipboard text.
Yields:
tuple: Type of content and the content itself.
"""
buffer = ''
self.full_response = ''
sentence_endings = re.compile(r'(?<=[.!?])\s+|(?<=\n)')
in_marker = False
for chunk in chunks_stream:
#if stop stream is set to True, break the loop
if self.parent_client.stop_response:
break
buffer += chunk
self.full_response += chunk
# Check for clip_start_marker without clip_end_marker
if clip_start_marker in buffer and not in_marker:
pre, match, post = buffer.partition(clip_start_marker)
if pre.strip():
# if the marker that indicates the start of a segment for the clipbaord is here
# and there is text before it, that should be returned as a sentence.
yield "sentence", pre.strip()
buffer = post
in_marker = True
# Check for clip_end_marker without clip_start_marker
if clip_end_marker in buffer and in_marker:
marked_section, _, post_end = buffer.partition(clip_end_marker)
yield "clipboard_text", marked_section.strip()
buffer = post_end # Remaining text after the end marker
in_marker = False # Reset the marker flag
# Process sentences outside of marked sections
if not in_marker:
while True:
match = sentence_endings.search(buffer)
if match:
sentence = buffer[:match.end()]
buffer = buffer[match.end():]
if sentence.strip():
yield "sentence", sentence.strip()
else:
break
# Yield any remaining content in the buffer as a sentence
if buffer.strip() and not in_marker:
yield "sentence", buffer.strip()
elif buffer.strip() and in_marker: # Handle any remaining marked text
yield "clipboard_text", buffer.strip()