Skip to content

Commit b27afb7

Browse files
committed
Remove all locks in rkllama. Every request has his own queue to wait for the response. Fix #152
1 parent f00bd9c commit b27afb7

8 files changed

Lines changed: 616 additions & 657 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# RKLLama: LLM Server and Client for Rockchip 3588/3576
22

3-
### [Version: 0.0.67](#New-Version)
3+
### [Version: 0.0.68](#New-Version)
44

55
Video demo ( version 0.0.1 ):
66

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "rkllama"
3-
version = "0.0.67"
3+
version = "0.0.68"
44
authors = [
55
{ name="NotPunchnox", email="punchnoxpro@gmail.com" },
66
{ name="TomJacobsUK", email="tom@tomjacobs.co.uk" },

src/rkllama/api/format_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ def openai_to_ollama_generate_request(openai_payload: dict) -> dict:
373373

374374
model = openai_payload.get("model", "llama3")
375375
stream = openai_payload.get("stream", False)
376-
images = images.get("images", [])
376+
images = openai_payload.get("images", [])
377377

378378
# Base Ollama payload
379379
ollama_payload = {

src/rkllama/api/process.py

Lines changed: 323 additions & 331 deletions
Large diffs are not rendered by default.

src/rkllama/api/server_utils.py

Lines changed: 56 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -305,13 +305,14 @@ def handle_streaming(cls, model_name, final_prompt, format_spec, tools, enable_t
305305
# Check if multimodal or text only
306306
if not images:
307307
# Send the task of inference to the model
308-
variables.worker_manager_rkllm.inference(model_name, final_prompt, prompt_cache_file)
308+
parent_pipe = variables.worker_manager_rkllm.inference(model_name, final_prompt, prompt_cache_file)
309309
else:
310310
# Send the task of multimodal inference to the model
311-
variables.worker_manager_rkllm.multimodal(model_name, final_prompt, images, prompt_cache_file)
311+
parent_pipe = variables.worker_manager_rkllm.multimodal(model_name, final_prompt, images, prompt_cache_file)
312312

313-
# Wait for result pipe
314-
manager_pipe = variables.worker_manager_rkllm.get_result(model_name)
313+
# Get timeout
314+
timeout = int(rkllama.config.get("model", "max_seconds_waiting_worker_response"))
315+
315316

316317
def generate():
317318

@@ -339,17 +340,17 @@ def generate():
339340

340341

341342
while not thread_finished or not final_sent:
342-
if manager_pipe.poll(int(rkllama.config.get("model", "max_seconds_waiting_worker_response"))): # Timeout in seconds
343-
token = manager_pipe.recv()
343+
if parent_pipe.poll(timeout): # Timeout in seconds
344+
token = parent_pipe.recv()
344345
else:
345346
# Abort the current inference
346347
variables.worker_manager_rkllm.workers[model_name].abort_flag.value = True
347348

348349
# Raise Exception
349-
logger.error(f"No response received by the Worker of the model {model_name} in {int(rkllama.config.get("model", "max_seconds_waiting_worker_response"))} seconds.")
350+
logger.error(f"No response received by the Worker of the model {model_name} in {timeout} seconds.")
350351

351352
# Send message to the user
352-
token=f"Aborted inference by Timeout ({int(rkllama.config.get("model","max_seconds_waiting_worker_response"))} seconds). Try again."
353+
token=f"Aborted inference by Timeout ({timeout} seconds). Try again."
353354

354355
# Set finished state of the thread inference
355356
thread_finished = True
@@ -359,6 +360,9 @@ def generate():
359360
thread_finished = True
360361
# Get the stats from the inference
361362
_, prompt_token_count, token_count, prompt_eval, eval = token
363+
364+
# CLose the parent pipe
365+
parent_pipe.close()
362366

363367
if not thread_finished:
364368
count += 1
@@ -471,27 +475,28 @@ def handle_complete(cls, model_name, final_prompt, format_spec, tools, enable_th
471475
# Check if multimodal or text only
472476
if not images:
473477
# Send the task of inference to the model
474-
variables.worker_manager_rkllm.inference(model_name, final_prompt, prompt_cache_file)
478+
parent_pipe = variables.worker_manager_rkllm.inference(model_name, final_prompt, prompt_cache_file)
475479

476480
else:
477481
# Send the task of multimodal inference to the model
478-
variables.worker_manager_rkllm.multimodal(model_name, final_prompt, images, prompt_cache_file)
482+
parent_pipe = variables.worker_manager_rkllm.multimodal(model_name, final_prompt, images, prompt_cache_file)
479483

480-
# Wait for result pipe
481-
manager_pipe = variables.worker_manager_rkllm.get_result(model_name)
484+
# Get timeout
485+
timeout = int(rkllama.config.get("model", "max_seconds_waiting_worker_response"))
482486

483487
while not thread_finished:
484-
if manager_pipe.poll(int(rkllama.config.get("model", "max_seconds_waiting_worker_response"))): # Timeout in seconds
485-
token = manager_pipe.recv()
488+
if parent_pipe.poll(timeout): # Timeout in seconds
489+
token = parent_pipe.recv()
486490
else:
491+
487492
# Abort the current inference
488493
variables.worker_manager_rkllm.workers[model_name].abort_flag.value = True
489494

490495
# Raise Exception
491-
logger.error(f"No response received by the Worker of the model {model_name} in {int(rkllama.config.get("model", "max_seconds_waiting_worker_response"))} seconds.")
496+
logger.error(f"No response received by the Worker of the model {model_name} in {timeout} seconds.")
492497

493498
# Send message to the user
494-
token=f"Aborted inference by Timeout ({int(rkllama.config.get("model","max_seconds_waiting_worker_response"))} seconds). Try again."
499+
token=f"Aborted inference by Timeout ({timeout} seconds). Try again."
495500

496501
# Set finished state of the thread inference
497502
thread_finished = True
@@ -502,6 +507,9 @@ def handle_complete(cls, model_name, final_prompt, format_spec, tools, enable_th
502507
# Get the stats from the inference
503508
_, prompt_token_count, token_count, prompt_eval, eval = token
504509

510+
# Close the parent pipe
511+
parent_pipe.close()
512+
505513
# Exit the loop
506514
continue
507515

@@ -671,13 +679,13 @@ def handle_streaming(cls, model_name, final_prompt, format_spec, enable_thinking
671679
# Check if multimodal or text only
672680
if not images:
673681
# Send the task of inference to the model
674-
variables.worker_manager_rkllm.inference(model_name, final_prompt, prompt_cache_file)
682+
parent_pipe = variables.worker_manager_rkllm.inference(model_name, final_prompt, prompt_cache_file)
675683
else:
676684
# Send the task of multimodal inference to the model
677-
variables.worker_manager_rkllm.multimodal(model_name, final_prompt, images, prompt_cache_file)
685+
parent_pipe = variables.worker_manager_rkllm.multimodal(model_name, final_prompt, images, prompt_cache_file)
678686

679-
# Wait for result pipe
680-
manager_pipe = variables.worker_manager_rkllm.get_result(model_name)
687+
# Get Timeout
688+
timeout = int(rkllama.config.get("model", "max_seconds_waiting_worker_response"))
681689

682690

683691
def generate():
@@ -693,19 +701,19 @@ def generate():
693701
eval = None
694702

695703
thread_finished = False
696-
704+
697705
while not thread_finished or not final_sent:
698-
if manager_pipe.poll(int(rkllama.config.get("model", "max_seconds_waiting_worker_response"))): # Timeout in seconds
699-
token = manager_pipe.recv()
706+
if parent_pipe.poll(timeout): # Timeout in seconds
707+
token = parent_pipe.recv()
700708
else:
701709
# Abort the current inference
702710
variables.worker_manager_rkllm.workers[model_name].abort_flag.value = True
703711

704712
# Raise Exception
705-
logger.error(f"No response received by the Worker of the model {model_name} in {int(rkllama.config.get("model", "max_seconds_waiting_worker_response"))} seconds.")
713+
logger.error(f"No response received by the Worker of the model {model_name} in {timeout} seconds.")
706714

707715
# Send message to the user
708-
token=f"Aborted inference by Timeout ({int(rkllama.config.get("model","max_seconds_waiting_worker_response"))} seconds). Try again."
716+
token=f"Aborted inference by Timeout ({timeout} seconds). Try again."
709717

710718
# Set finished state of the thread inference
711719
thread_finished = True
@@ -715,6 +723,9 @@ def generate():
715723
thread_finished = True
716724
# Get the stats from the inference
717725
_, prompt_token_count, token_count, prompt_eval, eval = token
726+
727+
# Close the parent pipe
728+
parent_pipe.close()
718729

719730
if not thread_finished:
720731
count += 1
@@ -780,26 +791,26 @@ def handle_complete(cls, model_name, final_prompt, format_spec, enable_thinking,
780791
# Check if multimodal or text only
781792
if not images:
782793
# Send the task of inference to the model
783-
variables.worker_manager_rkllm.inference(model_name, final_prompt, prompt_cache_file)
794+
parent_pipe = variables.worker_manager_rkllm.inference(model_name, final_prompt, prompt_cache_file)
784795
else:
785796
# Send the task of multimodal inference to the model
786-
variables.worker_manager_rkllm.multimodal(model_name, final_prompt, images, prompt_cache_file)
797+
parent_pipe = variables.worker_manager_rkllm.multimodal(model_name, final_prompt, images, prompt_cache_file)
787798

788-
# Wait for result pipe
789-
manager_pipe = variables.worker_manager_rkllm.get_result(model_name)
799+
# Get timeout
800+
timeout = int(rkllama.config.get("model", "max_seconds_waiting_worker_response"))
790801

791802
while not thread_finished:
792-
if manager_pipe.poll(int(rkllama.config.get("model", "max_seconds_waiting_worker_response"))): # Timeout in seconds
793-
token = manager_pipe.recv()
803+
if parent_pipe.poll(timeout): # Timeout in seconds
804+
token = parent_pipe.recv()
794805
else:
795806
# Abort the current inference
796807
variables.worker_manager_rkllm.workers[model_name].abort_flag.value = True
797808

798809
# Raise Exception
799-
logger.error(f"No response received by the Worker of the model {model_name} in {int(rkllama.config.get("model", "max_seconds_waiting_worker_response"))} seconds.")
810+
logger.error(f"No response received by the Worker of the model {model_name} in {timeout} seconds.")
800811

801812
# Send message to the user
802-
token=f"Aborted inference by Timeout ({int(rkllama.config.get("model","max_seconds_waiting_worker_response"))} seconds). Try again."
813+
token=f"Aborted inference by Timeout ({timeout} seconds). Try again."
803814

804815
# Set finished state of the thread inference
805816
thread_finished = True
@@ -809,6 +820,9 @@ def handle_complete(cls, model_name, final_prompt, format_spec, enable_thinking,
809820
thread_finished = True
810821
# Get the stats from the inference
811822
_, prompt_token_count, token_count, prompt_eval, eval = token
823+
824+
# Close the parent pipe
825+
parent_pipe.close()
812826

813827
# Exit the loop
814828
continue
@@ -971,26 +985,29 @@ def handle_complete(cls, model_name, input_text):
971985
for input in all_inputs:
972986

973987
# Send the task of embedding to the model
974-
variables.worker_manager_rkllm.embedding(model_name, input)
988+
parent_pipe = variables.worker_manager_rkllm.embedding(model_name, input)
975989

976-
# Get the result from the input
977-
manager_pipe = variables.worker_manager_rkllm.get_result(model_name)
990+
# Get timeout
991+
timeout = int(rkllama.config.get("model", "max_seconds_waiting_worker_response"))
978992

979993
# Wait for the last_embedding hidden layer return
980-
if manager_pipe.poll(int(rkllama.config.get("model", "max_seconds_waiting_worker_response"))): # Timeout in seconds
981-
last_embeddings = manager_pipe.recv()
994+
if parent_pipe.poll(timeout): # Timeout in seconds
995+
last_embeddings = parent_pipe.recv()
982996
else:
983997
# Abort the current inference
984998
variables.worker_manager_rkllm.workers[model_name].abort_flag.value = True
985999
# Raise Exception
986-
logger.error(f"No response received by the Worker of the model {model_name} in {int(rkllama.config.get("model", "max_seconds_waiting_worker_response"))} seconds.")
1000+
logger.error(f"No response received by the Worker of the model {model_name} in {timeout} seconds.")
9871001
# Send empty embedding
9881002
last_embeddings = embeddings = {
9891003
'embedding': [],
9901004
'embd_size': 0,
9911005
'num_tokens': 0
9921006
}
9931007

1008+
# Close the parent pipe
1009+
parent_pipe.close()
1010+
9941011
# Add the embedding to the list of result
9951012
all_embeddings.append(last_embeddings["embedding"].tolist())
9961013

src/rkllama/api/variables.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,9 @@
1-
import threading
21
from rkllama.config import is_debug_mode
32
from rkllama.api.worker import WorkerManager
43

5-
isLocked = False
6-
74
# Worker variables
85
worker_manager_rkllm = WorkerManager()
96

10-
11-
verrou = threading.Lock()
12-
137
model_id = ""
148
system = "Tu es un assistant artificiel."
159
model_config = {} # For storing model-specific configuration

0 commit comments

Comments
 (0)