|
| 1 | +import logging |
| 2 | +import os |
| 3 | +import subprocess |
| 4 | +from typing import Optional |
| 5 | + |
| 6 | +from hamilton.function_modifiers import extract_fields |
| 7 | + |
| 8 | +logger = logging.getLogger(__name__) |
| 9 | + |
| 10 | +from hamilton import contrib |
| 11 | + |
| 12 | +with contrib.catch_import_errors(__name__, __file__, logger): |
| 13 | + import openai |
| 14 | + |
| 15 | + |
| 16 | +def llm_client(api_key: Optional[str] = None) -> openai.OpenAI: |
| 17 | + """Create an OpenAI client.""" |
| 18 | + if api_key is None: |
| 19 | + api_key = os.environ.get("OPENAI_API_KEY") |
| 20 | + |
| 21 | + return openai.OpenAI(api_key=api_key) |
| 22 | + |
| 23 | + |
| 24 | +def prompt_template_to_generate_code() -> str: |
| 25 | + """Prompt template to generate code. |
| 26 | +
|
| 27 | + It must include the fields `code_language` and `query`. |
| 28 | + """ |
| 29 | + return """Write some {code_language} code to solve the user's problem. |
| 30 | +
|
| 31 | +Return only python code in Markdown format, e.g.: |
| 32 | +
|
| 33 | +```{code_language} |
| 34 | +.... |
| 35 | +``` |
| 36 | +
|
| 37 | +user problem |
| 38 | +{query} |
| 39 | +
|
| 40 | +{code_language} code |
| 41 | +""" |
| 42 | + |
| 43 | + |
| 44 | +def prompt_to_generate_code( |
| 45 | + prompt_template_to_generate_code: str, query: str, code_language: str = "python" |
| 46 | +) -> str: |
| 47 | + """Fill the prompt template with the code language and the user query.""" |
| 48 | + return prompt_template_to_generate_code.format( |
| 49 | + query=query, |
| 50 | + code_language=code_language, |
| 51 | + ) |
| 52 | + |
| 53 | + |
| 54 | +def response_generated_code(llm_client: openai.OpenAI, prompt_to_generate_code: str) -> str: |
| 55 | + """Call the OpenAI API completion endpoint with the prompt to generate code.""" |
| 56 | + response = llm_client.completions.create( |
| 57 | + model="gpt-3.5-turbo-instruct", |
| 58 | + prompt=prompt_to_generate_code, |
| 59 | + ) |
| 60 | + return response.choices[0].text |
| 61 | + |
| 62 | + |
| 63 | +def parsed_generated_code(response_generated_code: str, code_language: str = "python") -> str: |
| 64 | + """Retrieve the code section from the generated text.""" |
| 65 | + _, _, lower_part = response_generated_code.partition(f"```{code_language}") |
| 66 | + code_part, _, _ = lower_part.partition("```") |
| 67 | + return code_part |
| 68 | + |
| 69 | + |
| 70 | +def code_prepared_for_execution(parsed_generated_code: str, code_language: str = "python") -> str: |
| 71 | + """If code is Python, append to it statements prepare it to be run in a subprocess. |
| 72 | +
|
| 73 | + We collect all local variables in a directory and filter out Python builtins to keep |
| 74 | + only the variables from the generated code. print() is used to send string data from |
| 75 | + the subprocess back to the parent proceess via its `stdout`. |
| 76 | + """ |
| 77 | + |
| 78 | + if code_language != "python": |
| 79 | + raise ValueError("Can only execute the generated code if `code_language` = 'python'") |
| 80 | + |
| 81 | + code_to_get_vars = ( |
| 82 | + "excluded_vars = { 'excluded_vars', '__builtins__', '__annotations__'} | set(dir(__builtins__))\n" |
| 83 | + "local_vars = {k:v for k,v in locals().items() if k not in excluded_vars}\n" |
| 84 | + "print(local_vars)" |
| 85 | + ) |
| 86 | + |
| 87 | + return parsed_generated_code + code_to_get_vars |
| 88 | + |
| 89 | + |
| 90 | +@extract_fields( |
| 91 | + dict( |
| 92 | + execution_output=str, |
| 93 | + execution_error=str, |
| 94 | + ) |
| 95 | +) |
| 96 | +def executed_output(code_prepared_for_execution: str) -> dict: |
| 97 | + """Execute the generated Python code + appended utilities in a subprocess. |
| 98 | +
|
| 99 | + The output and errors from the code are collected as strings. Executing |
| 100 | + the code in a subprocess provides isolation, but isn't a security guarantee. |
| 101 | + """ |
| 102 | + process = subprocess.Popen( |
| 103 | + ["python", "-c", code_prepared_for_execution], |
| 104 | + stdout=subprocess.PIPE, |
| 105 | + stderr=subprocess.PIPE, |
| 106 | + universal_newlines=True, |
| 107 | + ) |
| 108 | + output, errors = process.communicate() |
| 109 | + return dict(execution_output=output, execution_error=errors) |
| 110 | + |
| 111 | + |
| 112 | +# run as a script to test dataflow |
| 113 | +if __name__ == "__main__": |
| 114 | + import __init__ as llm_generate_code |
| 115 | + |
| 116 | + from hamilton import driver |
| 117 | + |
| 118 | + dr = driver.Builder().with_modules(llm_generate_code).build() |
| 119 | + |
| 120 | + dr.display_all_functions("dag.png", orient="TB") |
| 121 | + |
| 122 | + res = dr.execute( |
| 123 | + ["execution_output", "execution_error"], |
| 124 | + overrides=dict(generated_code="s = 'hello world'"), |
| 125 | + ) |
| 126 | + |
| 127 | + print(res) |
0 commit comments