Skip to content

Commit 012d86e

Browse files
jxnljg-youampcode-comclaude
authored
feat(batch): add in-memory batching support and improve error handling (#1746)
Co-authored-by: Jean-Gabriel Young <jean.gabriel.young@gmail.com> Co-authored-by: Amp <amp@ampcode.com> Co-authored-by: Claude <noreply@anthropic.com>
1 parent 46115ee commit 012d86e

File tree

13 files changed

+1253
-74
lines changed

13 files changed

+1253
-74
lines changed

docs/concepts/batch.md

Lines changed: 177 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ Batch processing allows you to send multiple requests in a single operation, whi
2525

2626
The `BatchProcessor` provides a complete interface for batch processing including job submission, status monitoring, and result retrieval.
2727

28+
### File-based Batch Processing (Traditional)
29+
2830
```python
2931
from instructor.batch import BatchProcessor
3032
from pydantic import BaseModel
@@ -51,7 +53,7 @@ messages_list = [
5153

5254
# Create batch file
5355
processor.create_batch_from_messages(
54-
file_path="batch_requests.jsonl",
56+
file_path="batch_requests.jsonl", # Specify file path for disk-based processing
5557
messages_list=messages_list,
5658
max_tokens=200,
5759
temperature=0.1
@@ -86,6 +88,161 @@ if status['status'] in ['completed', 'ended', 'JOB_STATE_SUCCEEDED']:
8688
print(f"Name: {user.name}, Age: {user.age}")
8789
```
8890

91+
### In-Memory Batch Processing (Serverless-Friendly)
92+
93+
For serverless deployments and applications that prefer memory-only operations:
94+
95+
```python
96+
import time
97+
from instructor.batch import BatchProcessor
98+
from pydantic import BaseModel
99+
100+
class User(BaseModel):
101+
name: str
102+
age: int
103+
104+
# Create processor
105+
processor = BatchProcessor("openai/gpt-4o-mini", User)
106+
107+
# Prepare your message conversations
108+
messages_list = [
109+
[
110+
{"role": "system", "content": "Extract user information from the text."},
111+
{"role": "user", "content": "John Doe is 25 years old and his email is john@example.com"}
112+
],
113+
[
114+
{"role": "system", "content": "Extract user information from the text."},
115+
{"role": "user", "content": "Jane Smith, age 30, can be reached at jane.smith@company.com"}
116+
]
117+
]
118+
119+
# Create batch in memory (no file_path = in-memory mode)
120+
batch_buffer = processor.create_batch_from_messages(
121+
messages_list,
122+
file_path=None, # This enables in-memory mode
123+
max_tokens=150,
124+
temperature=0.1,
125+
)
126+
127+
print(f"Created batch buffer: {type(batch_buffer)}")
128+
print(f"Buffer size: {len(batch_buffer.getvalue())} bytes")
129+
130+
# Submit the batch using the in-memory buffer
131+
batch_id = processor.submit_batch(
132+
batch_buffer,
133+
metadata={"description": "In-memory batch example"}
134+
)
135+
136+
print(f"Batch submitted successfully! ID: {batch_id}")
137+
138+
# Poll for completion
139+
while True:
140+
status = processor.get_batch_status(batch_id)
141+
current_status = status.get("status", "unknown")
142+
print(f"Status: {current_status}")
143+
144+
if current_status in ["completed", "failed", "cancelled", "expired"]:
145+
break
146+
time.sleep(10)
147+
148+
# Retrieve results
149+
if status.get("status") == "completed":
150+
results = processor.get_results(batch_id)
151+
152+
successful_results = [r for r in results if hasattr(r, "result")]
153+
error_results = [r for r in results if hasattr(r, "error_message")]
154+
155+
print(f"Successful: {len(successful_results)}")
156+
print(f"Errors: {len(error_results)}")
157+
158+
for result in successful_results:
159+
user = result.result
160+
print(f"- {user.name}, {user.age} years old")
161+
```
162+
163+
## In-Memory vs File-Based Processing
164+
165+
### When to Use In-Memory Processing
166+
167+
**✅ Ideal for:**
168+
- **Serverless deployments** (AWS Lambda, Google Cloud Functions, Azure Functions)
169+
- **Containerized applications** where disk I/O should be minimized
170+
- **Security-sensitive environments** where temporary files on disk are not desired
171+
- **High-performance applications** that want to avoid file system overhead
172+
173+
**Key Benefits:**
174+
- **No disk I/O** - Perfect for serverless environments with read-only file systems
175+
- **Faster processing** - No file system overhead
176+
- **Better security** - No temporary files left on disk
177+
- **Cleaner code** - No file cleanup required
178+
- **Memory efficient** - BytesIO buffers are automatically garbage collected
179+
180+
### When to Use File-Based Processing
181+
182+
**✅ Ideal for:**
183+
- **Large batch jobs** where memory usage is a concern
184+
- **Long-running applications** with persistent storage
185+
- **Debugging scenarios** where you want to inspect the batch file
186+
- **Audit requirements** where batch requests need to be saved
187+
188+
### Comparison Example
189+
190+
```python
191+
from instructor.batch import BatchProcessor
192+
from pydantic import BaseModel
193+
194+
class User(BaseModel):
195+
name: str
196+
age: int
197+
198+
processor = BatchProcessor("openai/gpt-4o-mini", User)
199+
messages_list = [
200+
[{"role": "user", "content": "Extract: John, 25, john@example.com"}],
201+
[{"role": "user", "content": "Extract: Jane, 30, jane@example.com"}],
202+
]
203+
204+
# File-based approach (traditional)
205+
file_path = processor.create_batch_from_messages(
206+
messages_list,
207+
file_path="temp_batch.jsonl", # Creates file on disk
208+
)
209+
batch_id = processor.submit_batch(file_path)
210+
# Remember to clean up: os.remove(file_path)
211+
212+
# In-memory approach (new)
213+
buffer = processor.create_batch_from_messages(
214+
messages_list,
215+
file_path=None, # Returns BytesIO buffer
216+
)
217+
batch_id = processor.submit_batch(buffer)
218+
# No cleanup required - buffer is garbage collected
219+
```
220+
221+
### BytesIO Lifecycle Management
222+
223+
When using in-memory batch processing, the BytesIO buffer lifecycle is managed as follows:
224+
225+
1. **Creation**: The `create_batch_from_messages()` method creates and returns a BytesIO buffer
226+
2. **Ownership**: The caller owns the buffer and is responsible for its lifecycle
227+
3. **Submission**: The `submit_batch()` method reads from the buffer but doesn't close it
228+
4. **Cleanup**: Python's garbage collector automatically cleans up the buffer when it goes out of scope
229+
230+
**Best Practices:**
231+
- The buffer is automatically cleaned up when no longer referenced
232+
- No explicit `.close()` call is needed for BytesIO objects
233+
- If you need to reuse the buffer, call `.seek(0)` to reset position
234+
- For very large batches, consider monitoring memory usage
235+
236+
```python
237+
# Example: Reusing a buffer
238+
buffer = processor.create_batch_from_messages(messages, file_path=None)
239+
batch_id_1 = processor.submit_batch(buffer)
240+
241+
# Reset buffer position to reuse
242+
buffer.seek(0)
243+
batch_id_2 = processor.submit_batch(buffer)
244+
```
245+
89246
## Provider-Specific Setup
90247

91248
### OpenAI Setup
@@ -368,11 +525,25 @@ if __name__ == "__main__":
368525

369526
### Core Methods
370527

371-
- **`create_batch_from_messages()`**: Generate batch request file from message conversations
372-
- **`submit_batch()`**: Submit batch job to the provider and return job ID
373-
- **`get_batch_status()`**: Get current status of a batch job
374-
- **`retrieve_results()`**: Download and parse batch results
375-
- **`parse_results()`**: Parse raw batch results into structured objects
528+
- **`create_batch_from_messages(messages_list, file_path=None, max_tokens=1000, temperature=0.1)`**:
529+
- Generate batch request file from message conversations
530+
- **Parameters:**
531+
- `messages_list`: List of message conversations
532+
- `file_path`: Path to save batch file. If `None`, returns BytesIO buffer (in-memory mode)
533+
- `max_tokens`: Maximum tokens per request
534+
- `temperature`: Temperature for generation
535+
- **Returns:** File path (str) or BytesIO buffer
536+
537+
- **`submit_batch(file_path_or_buffer, metadata=None, **kwargs)`**:
538+
- Submit batch job to the provider and return job ID
539+
- **Parameters:**
540+
- `file_path_or_buffer`: File path (str) or BytesIO buffer
541+
- `metadata`: Optional metadata dict
542+
- **Returns:** Batch job ID (str)
543+
544+
- **`get_batch_status(batch_id)`**: Get current status of a batch job
545+
- **`retrieve_results(batch_id)`**: Download and parse batch results
546+
- **`parse_results(results_content)`**: Parse raw batch results into structured objects
376547

377548
## CLI Usage
378549

0 commit comments

Comments
 (0)