From 1b949a9e63decadf31bb4c1c293bcc1dad7b03a7 Mon Sep 17 00:00:00 2001 From: Artur Zdolinski Date: Sun, 3 Nov 2024 02:19:00 +0100 Subject: [PATCH 1/5] Update start.sh Add option run in ENV=development mode --- start.sh | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/start.sh b/start.sh index e2ddd8b4..0671cbce 100755 --- a/start.sh +++ b/start.sh @@ -130,4 +130,9 @@ fi # Start the server -uvicorn main:app --host "$HOST" --port "$PORT" --forwarded-allow-ips '*' +if [ "$ENV" = "production" ] || [ -z "$ENV" ]; then + uvicorn main:app --host "$HOST" --port "$PORT" --forwarded-allow-ips '*' +else + echo "Running in development mode" + uvicorn main:app --host "$HOST" --port "$PORT" --forwarded-allow-ips '*' --reload +fi From 3282d513ba656d7d7636d544732f2d9fb4039f8f Mon Sep 17 00:00:00 2001 From: Artur Zdolinski Date: Sun, 3 Nov 2024 02:40:43 +0100 Subject: [PATCH 2/5] Update start.sh --- start.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/start.sh b/start.sh index 0671cbce..1dabbda2 100755 --- a/start.sh +++ b/start.sh @@ -130,9 +130,9 @@ fi # Start the server -if [ "$ENV" = "production" ] || [ -z "$ENV" ]; then +if [ "$PIPELINES_ENV" = "production" ] || [ -z "$PIPELINES_ENV" ]; then uvicorn main:app --host "$HOST" --port "$PORT" --forwarded-allow-ips '*' else - echo "Running in development mode" + echo "INFO: Running in development mode" uvicorn main:app --host "$HOST" --port "$PORT" --forwarded-allow-ips '*' --reload fi From d05797a1192e0b47b7f6c876be14bcbc7d80019d Mon Sep 17 00:00:00 2001 From: Artur Zdolinski Date: Sat, 9 Nov 2024 00:41:38 +0100 Subject: [PATCH 3/5] Update start.sh add support env: PIPELINES_VENV PIPELINES_VENV_AUTOUPGRADE PIPELINES_VENV_PATH + PIPELINES_WORKERS --- start.sh | 53 +++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 47 insertions(+), 6 deletions(-) diff --git a/start.sh b/start.sh index 1dabbda2..9537eed1 100755 --- a/start.sh +++ b/start.sh @@ -1,8 +1,29 @@ #!/usr/bin/env bash PORT="${PORT:-9099}" HOST="${HOST:-0.0.0.0}" -# Default value for PIPELINES_DIR +# Default values for the variables PIPELINES_DIR=${PIPELINES_DIR:-./pipelines} +PIPELINES_VENV=${PIPELINES_VENV:-False} +PIPELINES_VENV_AUTOUPGRADE=${PIPELINES_VENV_AUTOUPGRADE:-False} +PIPELINES_VENV_PATH="${PIPELINES_VENV_PATH:-${PIPELINES_DIR}/venv}" + +# Actiavte venv if needed +if [ "$PIPELINES_VENV" = True ]; then + echo "PIPELINES_VENV is ACTIVE" + echo "PIPELINES_VENV_PATH: $PIPELINES_VENV_PATH" + echo "PIPELINES_VENV_AUTOUPGRADE: $PIPELINES_VENV_AUTOUPGRADE" + + # Init venv if not installed + if [ ! -d "$PIPELINES_VENV_PATH" ]; then + echo "Creating virtual environment at $PIPELINES_VENV_PATH" + python3 -m venv "$PIPELINES_VENV_PATH" + fi + + # Activate venv + echo "Activated virtual environment at $PIPELINES_VENV_PATH" + source "$PIPELINES_VENV_PATH"/bin/activate +fi + # Function to reset pipelines reset_pipelines_dir() { @@ -33,6 +54,16 @@ reset_pipelines_dir install_requirements() { if [[ -f "$1" ]]; then echo "requirements.txt found at $1. Installing requirements..." + + # Upgrade pip if PIPELINES_VENV_AUTOUPGRADE is True and venv is active + if [ "$PIPELINES_VENV" = True ] && [ "$PIPELINES_VENV_AUTOUPGRADE" = True ]; then + echo "Upgrading pip..." + pip install --upgrade pip + + echo "Checking for outdated packages." + pip install --upgrade -r $1 + fi + pip install -r "$1" else echo "requirements.txt not found at $1. Skipping installation of requirements." @@ -118,7 +149,7 @@ if [[ -n "$PIPELINES_URLS" ]]; then download_pipelines "$path" "$PIPELINES_DIR" done - for file in "$pipelines_dir"/*; do + for file in "$PIPELINES_DIR"/*; do if [[ -f "$file" ]]; then install_frontmatter_requirements "$file" fi @@ -127,12 +158,22 @@ else echo "PIPELINES_URLS not specified. Skipping pipelines download and installation." fi - - -# Start the server if [ "$PIPELINES_ENV" = "production" ] || [ -z "$PIPELINES_ENV" ]; then - uvicorn main:app --host "$HOST" --port "$PORT" --forwarded-allow-ips '*' + if [ "$PIPELINES_WORKERS" = "auto" ]; then + echo "INFO: Calculate workers based on avaible CPU cores" + CPU_COUNT=$(nproc) + WORKERS=$((2 * CPU_COUNT + 1)) + WORKER_CPU_TEXT="(workers: 2*"$CPU_COUNT"xCPU+1)" + else + WORKERS="${PIPELINES_WORKERS:-1}" + WORKER_CPU_TEXT="" + fi + WORKER_TEXT=$([ "$WORKERS" -eq 1 ] && echo "worker" || echo "workers") + + echo "INFO: Running in production mode with $WORKERS $WORKER_TEXT $WORKER_CPU_TEXT" + uvicorn main:app --host "$HOST" --port "$PORT" --workers "$WORKERS" --forwarded-allow-ips '*' else echo "INFO: Running in development mode" + # "workers" flag will be ignored when reloading is enabled. uvicorn main:app --host "$HOST" --port "$PORT" --forwarded-allow-ips '*' --reload fi From 3b74fd9369e3e79e89018830475a529210d73b2b Mon Sep 17 00:00:00 2001 From: Artur Zdolinski Date: Wed, 13 Nov 2024 13:39:37 +0100 Subject: [PATCH 4/5] Create langflow_pipeline.py add langflow pipeline example --- .../integrations/langflow_pipeline.py | 159 ++++++++++++++++++ 1 file changed, 159 insertions(+) create mode 100644 examples/pipelines/integrations/langflow_pipeline.py diff --git a/examples/pipelines/integrations/langflow_pipeline.py b/examples/pipelines/integrations/langflow_pipeline.py new file mode 100644 index 00000000..8bf05358 --- /dev/null +++ b/examples/pipelines/integrations/langflow_pipeline.py @@ -0,0 +1,159 @@ +import argparse +import logging +import time +from typing import Union, Generator, Iterator, Optional +from pprint import pprint +import requests, json, warnings + +# Langflow API Docs: +# https://docs.langflow.org/workspace-api + +# Disable SSL verification warnings +warnings.filterwarnings('ignore', message='Unverified HTTPS request') + +def is_healthy(url, verify_ssl=True): + """Check if the Langflow server is healthy.""" + try: + response = requests.get(url, verify=verify_ssl) + return response.status_code == 200 + except Exception as e: + print(f"Error checking health: {str(e)}") + return False + +def get_flow_components(url, verify_ssl=True): + """Get the flow components IDs from the Langflow server.""" + try: + response = requests.get(url, headers={"Content-Type": "application/json"}, verify=verify_ssl) + response.raise_for_status() + data = response.json() + + # Create dictionary of component IDs with empty objects + components = {} + for node in data.get('data', {}).get('nodes', []): + component_id = node.get('data', {}).get('id') + if component_id: + components[component_id] = {} + + return components + except Exception as e: + print(f"Error getting flow components: {str(e)}") + return {} + +class Pipeline: + def __init__(self): + self.name = "Langflow Chat" + self.langflow_host = "http://langflow.host" + self.flow_id = "28eeaa04-...-...-...-9a5f257dd17c" + self.api_url_run = self.langflow_host+"/api/v1/run" + self.api_url_flow = self.langflow_host+"/api/v1/flows" + self.api_health = self.langflow_host+"/health" + self.api_request_stream = True + self.verify_ssl = False + self.debug = False + + async def on_startup(self): + print(f"on_startup: {__name__}") + # Wait for Langflow server to be healthy + healthy = is_healthy(self.api_health, self.verify_ssl) + while not healthy: + time.sleep(5) + healthy = is_healthy(self.api_health) + print("Langflow server is healthy") + + async def on_shutdown(self): + print(f"on_shutdown: {__name__}") + pass + + async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: + # This function is called before the OpenAI API request is made. You can modify the form data before it is sent to the OpenAI API. + print(f"inlet: {__name__}") + if self.debug: + print(f"inlet: {__name__} - body:") + pprint(body) + print(f"inlet: {__name__} - user:") + pprint(user) + return body + + async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: + # This function is called after the OpenAI API response is completed. You can modify the messages after they are received from the OpenAI API. + print(f"outlet: {__name__}") + if self.debug: + print(f"outlet: {__name__} - body:") + pprint(body) + print(f"outlet: {__name__} - user:") + pprint(user) + return body + + def pipe(self, user_message: str, model_id: str, messages: list, body: dict) -> Union[str, Generator, Iterator]: + """Process the user message through the Langflow pipeline.""" + print(f"Processing message with {self.name}") + + if self.debug is True: + print(f"User message: {user_message}") + print(f"Model ID: {model_id}") + print(f"Messages: {messages}") + print(f"Body: {body}") + + session_id = body.get("chat_id") + + # If you need to tune components in flow / use `get_flow_components` to collect IDs + TWEAKS_COMPONENTS = get_flow_components(url=self.api_url_flow+"/"+self.flow_id, verify_ssl=self.verify_ssl) + + data = { + "input_value": user_message, + "output_type": "chat", + "input_type": "chat", + "session_id": session_id, + "tweaks": TWEAKS_COMPONENTS + } + try: + # Make the initial request to run flow via Langflow API ChatInput box + url = f"{self.api_url_run}/{self.flow_id}?stream={str(self.api_request_stream).lower()}" + headers = {"Content-Type": "application/json"} + response = requests.post(url, json=data, headers=headers, verify=self.verify_ssl) + response.raise_for_status() + + if response.status_code == 200: + init_data = response.json() + if self.debug is True: + print(f"pipe: {__name__} - langflow init response: "+str(init_data)) + + # Check for stream URL in the response + outputs = init_data.get("outputs", [{}])[0].get("outputs", [{}])[0] + stream_url = outputs.get("artifacts", {}).get("stream_url") + + if not stream_url: + message = outputs.get("messages", [])[0].get("message") + if message is not None: + yield message + return + logging.error("No stream URL returned") + yield "Error: No stream URL available" + return + + # Stream the response + stream_url = f"{self.langflow_host}{stream_url}" + params = {"session_id": session_id} + print(f"Stream the response session_id: {session_id} - {stream_url}") + + with requests.get(stream_url, headers=headers, params=params, stream=True, verify=self.verify_ssl) as stream: + for line in stream.iter_lines(decode_unicode=True): + if line.startswith('data: '): + try: + # Remove 'data: ' prefix and parse JSON + json_data = json.loads(line.replace('data: ', '')) + # Extract chunk + if 'chunk' in json_data: + yield json_data['chunk'] + if 'message' in json_data: + if json_data['message'] == 'Stream closed': + print(f"Stream session {session_id} closed") + return + except json.JSONDecodeError: + print(f"Failed to parse JSON: {line}") + else: + yield f"Error: Request failed with status code {response.status_code}" + + except Exception as e: + logging.error(f"Error in pipe: {str(e)}") + yield f"Error: {str(e)}" From b8f312d6f80918a88c056d5b406594fc6cfc7e0b Mon Sep 17 00:00:00 2001 From: Artur Zdolinski Date: Wed, 13 Nov 2024 16:35:00 +0100 Subject: [PATCH 5/5] Delete examples/pipelines/integrations/langflow_pipeline.py incorrect repo name / not in main --- .../integrations/langflow_pipeline.py | 159 ------------------ 1 file changed, 159 deletions(-) delete mode 100644 examples/pipelines/integrations/langflow_pipeline.py diff --git a/examples/pipelines/integrations/langflow_pipeline.py b/examples/pipelines/integrations/langflow_pipeline.py deleted file mode 100644 index 8bf05358..00000000 --- a/examples/pipelines/integrations/langflow_pipeline.py +++ /dev/null @@ -1,159 +0,0 @@ -import argparse -import logging -import time -from typing import Union, Generator, Iterator, Optional -from pprint import pprint -import requests, json, warnings - -# Langflow API Docs: -# https://docs.langflow.org/workspace-api - -# Disable SSL verification warnings -warnings.filterwarnings('ignore', message='Unverified HTTPS request') - -def is_healthy(url, verify_ssl=True): - """Check if the Langflow server is healthy.""" - try: - response = requests.get(url, verify=verify_ssl) - return response.status_code == 200 - except Exception as e: - print(f"Error checking health: {str(e)}") - return False - -def get_flow_components(url, verify_ssl=True): - """Get the flow components IDs from the Langflow server.""" - try: - response = requests.get(url, headers={"Content-Type": "application/json"}, verify=verify_ssl) - response.raise_for_status() - data = response.json() - - # Create dictionary of component IDs with empty objects - components = {} - for node in data.get('data', {}).get('nodes', []): - component_id = node.get('data', {}).get('id') - if component_id: - components[component_id] = {} - - return components - except Exception as e: - print(f"Error getting flow components: {str(e)}") - return {} - -class Pipeline: - def __init__(self): - self.name = "Langflow Chat" - self.langflow_host = "http://langflow.host" - self.flow_id = "28eeaa04-...-...-...-9a5f257dd17c" - self.api_url_run = self.langflow_host+"/api/v1/run" - self.api_url_flow = self.langflow_host+"/api/v1/flows" - self.api_health = self.langflow_host+"/health" - self.api_request_stream = True - self.verify_ssl = False - self.debug = False - - async def on_startup(self): - print(f"on_startup: {__name__}") - # Wait for Langflow server to be healthy - healthy = is_healthy(self.api_health, self.verify_ssl) - while not healthy: - time.sleep(5) - healthy = is_healthy(self.api_health) - print("Langflow server is healthy") - - async def on_shutdown(self): - print(f"on_shutdown: {__name__}") - pass - - async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: - # This function is called before the OpenAI API request is made. You can modify the form data before it is sent to the OpenAI API. - print(f"inlet: {__name__}") - if self.debug: - print(f"inlet: {__name__} - body:") - pprint(body) - print(f"inlet: {__name__} - user:") - pprint(user) - return body - - async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: - # This function is called after the OpenAI API response is completed. You can modify the messages after they are received from the OpenAI API. - print(f"outlet: {__name__}") - if self.debug: - print(f"outlet: {__name__} - body:") - pprint(body) - print(f"outlet: {__name__} - user:") - pprint(user) - return body - - def pipe(self, user_message: str, model_id: str, messages: list, body: dict) -> Union[str, Generator, Iterator]: - """Process the user message through the Langflow pipeline.""" - print(f"Processing message with {self.name}") - - if self.debug is True: - print(f"User message: {user_message}") - print(f"Model ID: {model_id}") - print(f"Messages: {messages}") - print(f"Body: {body}") - - session_id = body.get("chat_id") - - # If you need to tune components in flow / use `get_flow_components` to collect IDs - TWEAKS_COMPONENTS = get_flow_components(url=self.api_url_flow+"/"+self.flow_id, verify_ssl=self.verify_ssl) - - data = { - "input_value": user_message, - "output_type": "chat", - "input_type": "chat", - "session_id": session_id, - "tweaks": TWEAKS_COMPONENTS - } - try: - # Make the initial request to run flow via Langflow API ChatInput box - url = f"{self.api_url_run}/{self.flow_id}?stream={str(self.api_request_stream).lower()}" - headers = {"Content-Type": "application/json"} - response = requests.post(url, json=data, headers=headers, verify=self.verify_ssl) - response.raise_for_status() - - if response.status_code == 200: - init_data = response.json() - if self.debug is True: - print(f"pipe: {__name__} - langflow init response: "+str(init_data)) - - # Check for stream URL in the response - outputs = init_data.get("outputs", [{}])[0].get("outputs", [{}])[0] - stream_url = outputs.get("artifacts", {}).get("stream_url") - - if not stream_url: - message = outputs.get("messages", [])[0].get("message") - if message is not None: - yield message - return - logging.error("No stream URL returned") - yield "Error: No stream URL available" - return - - # Stream the response - stream_url = f"{self.langflow_host}{stream_url}" - params = {"session_id": session_id} - print(f"Stream the response session_id: {session_id} - {stream_url}") - - with requests.get(stream_url, headers=headers, params=params, stream=True, verify=self.verify_ssl) as stream: - for line in stream.iter_lines(decode_unicode=True): - if line.startswith('data: '): - try: - # Remove 'data: ' prefix and parse JSON - json_data = json.loads(line.replace('data: ', '')) - # Extract chunk - if 'chunk' in json_data: - yield json_data['chunk'] - if 'message' in json_data: - if json_data['message'] == 'Stream closed': - print(f"Stream session {session_id} closed") - return - except json.JSONDecodeError: - print(f"Failed to parse JSON: {line}") - else: - yield f"Error: Request failed with status code {response.status_code}" - - except Exception as e: - logging.error(f"Error in pipe: {str(e)}") - yield f"Error: {str(e)}"