Skip to content

[BUG] PFClient.run using flow with AsyncIterator raises TypeError: cannot pickle '_thread.lock' object #3413

Open
@bwilliams2

Description

@bwilliams2

Describe the bug
Batch runs cannot be successfully completed if the flow call produces an AsyncIterator that ends up wrapped by promptflow.tracing.TracedAsyncIterator. Each run from data files errors with TypeError: cannot pickle 'thread.lock' object

How To Reproduce the bug
Code below produces error consistently

import tempfile

from openai import AzureOpenAI
from promptflow.tracing import trace
from promptflow.core import AzureOpenAIModelConfiguration, Prompty
from promptflow.client import PFClient

class ChatFlow:
    def __init__(
        self, model_config: AzureOpenAIModelConfiguration
    ):
        self.model_config = model_config

    @trace
    async def __call__(
        self,
        topic: str,
    ) -> str:
        """Flow entry function."""

        client = AzureOpenAI(
            azure_endpoint=self.model_config.azure_endpoint,
            api_key=self.model_config.api_key,
            api_version=self.model_config.api_version,
        )
        
        response = client.chat.completions.create(
            model=self.model_config.azure_deployment,
            messages = [
                {"role": "system", "content": "Create a story about the topic provided by the user"},
                {"role": "user", "content": f"Tell me a story about {topic}"},
            ],
            max_tokens=150,
        )

        for chunk in response:
            if len(chunk.choices) > 0 and (message := chunk.choices[0].message):
                content = message.content
                yield content + "\n"

def main():
    f = tempfile.NamedTemporaryFile(suffix=".csv", mode="w+t")
    try:
        f.write("topic\nlittle league\n")
        f.seek(0)
	    config = AzureOpenAIModelConfiguration(
	        connection="aoai_connection", azure_deployment="gpt-35-turbo"
	    )
        chat_flow = ChatFlow(model_config=config)
        result = PFClient().run(chat_flow, data=f.name)
    finally:
        f.delete()

if __name__ == "__main__":
    main()

Bug can be traced to run_info submitted to queue at

For the example code, the results property within run_info has an instance of promptflow.tracing.TracedAsyncIterator which is not able to be pickled when submitted to multiprocessing queue and raises the mentioned error.

Error file from batch run attached
error.json

Expected behavior
Successful execution of batch run

Running Information(please complete the following information):

  • Promptflow Package Version using pf -v: 1.12.0
  • Operating System: macOS Sonoma 14.5
  • Python Version using python --version: 3.12.2

Metadata

Metadata

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions