Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions src/Providers/Anthropic/Handlers/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,13 @@ protected function processStream(Response $response, Request $request, int $dept
}
}

// Handle tool calls if present
if ($this->state->hasToolCalls()) {
yield from $this->handleToolCalls($request, $depth);

return;
}

yield $this->emitStreamEndEvent();
}

/**
Expand Down Expand Up @@ -116,10 +119,9 @@ protected function handleMessageStart(array $event): StreamStartEvent
$message = $event['message'] ?? [];
$this->state->withMessageId($message['id'] ?? EventID::generate());

// Capture initial usage data
$usageData = $message['usage'] ?? [];
if (! empty($usageData)) {
$this->state->withUsage(new Usage(
$this->state->addUsage(new Usage(
promptTokens: $usageData['input_tokens'] ?? 0,
completionTokens: $usageData['output_tokens'] ?? 0,
cacheWriteInputTokens: $usageData['cache_creation_input_tokens'] ?? null,
Expand Down Expand Up @@ -230,12 +232,21 @@ protected function handleMessageDelta(array $event): null
/**
* @param array<string, mixed> $event
*/
protected function handleMessageStop(array $event): StreamEndEvent
protected function handleMessageStop(array $event): ?StreamEndEvent
{
if (! $this->state->finishReason() instanceof \Prism\Prism\Enums\FinishReason) {
$this->state->withFinishReason(FinishReason::Stop);
}

return null;
}

protected function emitStreamEndEvent(): StreamEndEvent
{
return new StreamEndEvent(
id: EventID::generate(),
timestamp: time(),
finishReason: FinishReason::Stop, // Default, will be updated by message_delta
finishReason: $this->state->finishReason() ?? FinishReason::Stop,
usage: $this->state->usage(),
citations: $this->state->citations() !== [] ? $this->state->citations() : null,
additionalContent: [
Expand Down
21 changes: 14 additions & 7 deletions src/Providers/DeepSeek/Handlers/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -198,20 +198,27 @@ protected function processStream(Response $response, Request $request, int $dept
);
}

$usage = $this->extractUsage($data);
$this->state->withFinishReason($finishReason);

yield new StreamEndEvent(
id: EventID::generate(),
timestamp: time(),
finishReason: $finishReason,
usage: $usage
);
$usage = $this->extractUsage($data);
if ($usage instanceof \Prism\Prism\ValueObjects\Usage) {
$this->state->addUsage($usage);
}
}
}

if ($toolCalls !== []) {
yield from $this->handleToolCalls($request, $text, $toolCalls, $depth);

return;
}

yield new StreamEndEvent(
id: EventID::generate(),
timestamp: time(),
finishReason: $this->state->finishReason() ?? FinishReason::Stop,
usage: $this->state->usage()
);
}

/**
Expand Down
49 changes: 29 additions & 20 deletions src/Providers/Gemini/Handlers/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,9 @@ protected function processStream(Response $response, Request $request, int $dept
}
}

// Handle completion
$finishReason = $this->mapFinishReason($data);

if ($finishReason !== FinishReason::Unknown) {
// Emit thinking complete if we had thinking
if ($this->state->reasoningId() !== '') {
yield new ThinkingCompleteEvent(
id: EventID::generate(),
Expand All @@ -196,7 +194,6 @@ protected function processStream(Response $response, Request $request, int $dept
);
}

// Emit text complete if we had text
if ($this->state->hasTextStarted()) {
yield new TextCompleteEvent(
id: EventID::generate(),
Expand All @@ -205,27 +202,29 @@ protected function processStream(Response $response, Request $request, int $dept
);
}

// Extract grounding metadata if available
$groundingMetadata = $this->extractGroundingMetadata($data);

// Emit stream end event
yield new StreamEndEvent(
id: EventID::generate(),
timestamp: time(),
finishReason: $finishReason,
usage: $this->state->usage(),
additionalContent: Arr::whereNotNull([
'grounding_metadata' => $groundingMetadata,
'thoughtSummaries' => $this->state->thinkingSummaries() === [] ? null : $this->state->thinkingSummaries(),
])
);
$this->state->withFinishReason($finishReason);
$this->state->withMetadata([
'grounding_metadata' => $this->extractGroundingMetadata($data),
]);
}
}

// Handle tool calls if present and not already handled
if ($this->state->hasToolCalls() && $this->mapFinishReason([]) === FinishReason::Unknown) {
if ($this->state->hasToolCalls()) {
yield from $this->handleToolCalls($request, $depth);

return;
}

yield new StreamEndEvent(
id: EventID::generate(),
timestamp: time(),
finishReason: $this->state->finishReason() ?? FinishReason::Stop,
usage: $this->state->usage(),
additionalContent: Arr::whereNotNull([
'grounding_metadata' => $this->state->metadata()['grounding_metadata'] ?? null,
'thoughtSummaries' => $this->state->thinkingSummaries() === [] ? null : $this->state->thinkingSummaries(),
])
);
}

/**
Expand Down Expand Up @@ -333,16 +332,26 @@ protected function handleToolCalls(
}
}

// Add messages for next turn and continue streaming
if ($toolResults !== []) {
$request->addMessage(new AssistantMessage($this->state->currentText(), $mappedToolCalls));
$request->addMessage(new ToolResultMessage($toolResults));

$depth++;
if ($depth < $request->maxSteps()) {
$previousUsage = $this->state->usage();
$this->state->reset();
$nextResponse = $this->sendRequest($request);
yield from $this->processStream($nextResponse, $request, $depth);

if ($previousUsage instanceof \Prism\Prism\ValueObjects\Usage && $this->state->usage() instanceof \Prism\Prism\ValueObjects\Usage) {
$this->state->withUsage(new Usage(
promptTokens: $previousUsage->promptTokens + $this->state->usage()->promptTokens,
completionTokens: $previousUsage->completionTokens + $this->state->usage()->completionTokens,
cacheWriteInputTokens: ($previousUsage->cacheWriteInputTokens ?? 0) + ($this->state->usage()->cacheWriteInputTokens ?? 0),
cacheReadInputTokens: ($previousUsage->cacheReadInputTokens ?? 0) + ($this->state->usage()->cacheReadInputTokens ?? 0),
thoughtTokens: ($previousUsage->thoughtTokens ?? 0) + ($this->state->usage()->thoughtTokens ?? 0)
));
}
}
}
}
Expand Down
21 changes: 12 additions & 9 deletions src/Providers/Groq/Handlers/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ protected function processStream(Response $response, Request $request, int $dept
if ($rawFinishReason !== null) {
$finishReason = $this->mapFinishReason($data);

// Complete text if we have any
if ($this->state->hasTextStarted() && $text !== '') {
yield new TextCompleteEvent(
id: EventID::generate(),
Expand All @@ -159,17 +158,21 @@ protected function processStream(Response $response, Request $request, int $dept
);
}

// Extract usage information from the final chunk
$usage = $this->extractUsage($data);
$this->state->withFinishReason($finishReason);

yield new StreamEndEvent(
id: EventID::generate(),
timestamp: time(),
finishReason: $finishReason,
usage: $usage
);
$usage = $this->extractUsage($data);
if ($usage instanceof \Prism\Prism\ValueObjects\Usage) {
$this->state->addUsage($usage);
}
}
}

yield new StreamEndEvent(
id: EventID::generate(),
timestamp: time(),
finishReason: $this->state->finishReason() ?? FinishReason::Stop,
usage: $this->state->usage()
);
}

/**
Expand Down
19 changes: 12 additions & 7 deletions src/Providers/Mistral/Handlers/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -163,16 +163,21 @@ protected function processStream(Response $response, Request $request, int $dept
);
}

$usage = $this->extractUsage($data);
$this->state->withFinishReason($finishReason);

yield new StreamEndEvent(
id: EventID::generate(),
timestamp: time(),
finishReason: $finishReason,
usage: $usage
);
$usage = $this->extractUsage($data);
if ($usage instanceof \Prism\Prism\ValueObjects\Usage) {
$this->state->addUsage($usage);
}
}
}

yield new StreamEndEvent(
id: EventID::generate(),
timestamp: time(),
finishReason: $this->state->finishReason() ?? FinishReason::Stop,
usage: $this->state->usage()
);
}

/**
Expand Down
6 changes: 5 additions & 1 deletion src/Providers/Ollama/Handlers/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ protected function processStream(Response $response, Request $request, int $dept
throw new PrismException('Maximum tool call chain depth exceeded');
}

$this->state->reset();
if ($depth === 0) {
$this->state->reset();
}

$text = '';

while (! $response->getBody()->eof()) {
Expand Down Expand Up @@ -280,6 +283,7 @@ protected function handleToolCalls(
// Continue streaming if within step limit
$depth++;
if ($depth < $request->maxSteps()) {
$this->state->reset();
$nextResponse = $this->sendRequest($request);
yield from $this->processStream($nextResponse, $request, $depth);
}
Expand Down
4 changes: 2 additions & 2 deletions src/Providers/Ollama/ValueObjects/OllamaStreamState.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public function completionTokens(): int
public function reset(): self
{
parent::reset();
$this->promptTokens = 0;
$this->completionTokens = 0;
// Note: Token counts are intentionally NOT reset here.
// They accumulate across tool-call turns to provide total usage.

return $this;
}
Expand Down
36 changes: 21 additions & 15 deletions src/Providers/OpenAI/Handlers/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -243,27 +243,33 @@ protected function processStream(Response $response, Request $request, int $dept
}

if (data_get($data, 'type') === 'response.completed') {
yield new StreamEndEvent(
id: EventID::generate(),
timestamp: time(),
finishReason: $this->mapFinishReason($data),
usage: new Usage(
promptTokens: data_get($data, 'response.usage.input_tokens'),
completionTokens: data_get($data, 'response.usage.output_tokens'),
cacheReadInputTokens: data_get($data, 'response.usage.input_tokens_details.cached_tokens'),
thoughtTokens: data_get($data, 'response.usage.output_tokens_details.reasoning_tokens')
),
additionalContent: Arr::whereNotNull([
'response_id' => data_get($data, 'response.id'),
'reasoningSummaries' => $this->state->thinkingSummaries() === [] ? null : $this->state->thinkingSummaries(),
])
);
$this->state->withFinishReason($this->mapFinishReason($data));
$this->state->addUsage(new Usage(
promptTokens: data_get($data, 'response.usage.input_tokens'),
completionTokens: data_get($data, 'response.usage.output_tokens'),
cacheReadInputTokens: data_get($data, 'response.usage.input_tokens_details.cached_tokens'),
thoughtTokens: data_get($data, 'response.usage.output_tokens_details.reasoning_tokens')
));
$this->state->withMetadata(['response_id' => data_get($data, 'response.id')]);
}
}

if ($this->state->hasToolCalls()) {
yield from $this->handleToolCalls($request, $depth);

return;
}

yield new StreamEndEvent(
id: EventID::generate(),
timestamp: time(),
finishReason: $this->state->finishReason() ?? FinishReason::Stop,
usage: $this->state->usage() ?? new Usage(0, 0),
additionalContent: Arr::whereNotNull([
'response_id' => $this->state->metadata()['response_id'] ?? null,
'reasoningSummaries' => $this->state->thinkingSummaries() === [] ? null : $this->state->thinkingSummaries(),
])
);
}

/**
Expand Down
19 changes: 12 additions & 7 deletions src/Providers/OpenRouter/Handlers/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -215,16 +215,21 @@ protected function processStream(Response $response, Request $request, int $dept
);
}

$usage = $this->extractUsage($data);
$this->state->withFinishReason($finishReason);

yield new StreamEndEvent(
id: EventID::generate(),
timestamp: time(),
finishReason: $finishReason,
usage: $usage
);
$usage = $this->extractUsage($data);
if ($usage instanceof \Prism\Prism\ValueObjects\Usage) {
$this->state->addUsage($usage);
}
}
}

yield new StreamEndEvent(
id: EventID::generate(),
timestamp: time(),
finishReason: $this->state->finishReason() ?? FinishReason::Stop,
usage: $this->state->usage()
);
}

/**
Expand Down
11 changes: 9 additions & 2 deletions src/Providers/XAI/Handlers/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,14 @@ protected function processStream(Response $response, Request $request, int $dept
$rawFinishReason = data_get($data, 'choices.0.finish_reason');
if ($rawFinishReason !== null) {
$finishReason = $this->extractFinishReason($data);
if ($finishReason instanceof \Prism\Prism\Enums\FinishReason) {
$this->state->withFinishReason($finishReason);
}

$usage = $this->extractUsage($data);
if ($usage instanceof \Prism\Prism\ValueObjects\Usage) {
$this->state->addUsage($usage);
}
}
}

Expand Down Expand Up @@ -194,8 +201,8 @@ protected function processStream(Response $response, Request $request, int $dept
yield new StreamEndEvent(
id: EventID::generate(),
timestamp: time(),
finishReason: $finishReason ?? FinishReason::Stop,
usage: $usage
finishReason: $this->state->finishReason() ?? FinishReason::Stop,
usage: $this->state->usage()
);
}

Expand Down
Loading