Skip to content

Commit 950ed10

Browse files
committed
generic sse stream wrapper with curl
1 parent 7feea37 commit 950ed10

File tree

3 files changed

+302
-127
lines changed

3 files changed

+302
-127
lines changed

classes/Ai/Chunk.php

Lines changed: 100 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,43 +7,131 @@
77
*/
88
final class Chunk
99
{
10+
public const string TYPE_STREAM_START = 'stream-start';
11+
public const string TYPE_STREAM_END = 'stream-end';
12+
public const string TYPE_TEXT_START = 'text-start';
1013
public const string TYPE_TEXT_DELTA = 'text-delta';
11-
public const string TYPE_DONE = 'done';
14+
public const string TYPE_TEXT_COMPLETE = 'text-complete';
15+
public const string TYPE_THINKING_START = 'thinking-start';
16+
public const string TYPE_THINKING_DELTA = 'thinking-delta';
17+
public const string TYPE_THINKING_COMPLETE = 'thinking-complete';
18+
public const string TYPE_TOOL_CALL = 'tool-call';
19+
public const string TYPE_TOOL_RESULT = 'tool-result';
1220
public const string TYPE_ERROR = 'error';
1321

1422
private function __construct(
1523
public readonly string $type,
16-
public readonly ?string $text,
17-
public readonly mixed $payload
24+
public readonly mixed $payload = null,
25+
public readonly ?string $text = null
1826
) {
1927
}
2028

21-
public static function textDelta(string $text): self
29+
public static function streamStart(array $payload = []): self
2230
{
23-
return new self(self::TYPE_TEXT_DELTA, text: $text, payload: null);
31+
return new self(self::TYPE_STREAM_START, $payload);
2432
}
2533

26-
public static function done(mixed $payload = null): self
34+
public static function streamEnd(array $payload = []): self
2735
{
28-
return new self(self::TYPE_DONE, text: null, payload: $payload);
36+
return new self(self::TYPE_STREAM_END, $payload);
2937
}
3038

31-
public static function error(string $message, mixed $payload = null): self
39+
public static function textStart(array $payload = []): self
3240
{
33-
return new self(self::TYPE_ERROR, text: null, payload: [
41+
return new self(self::TYPE_TEXT_START, $payload);
42+
}
43+
44+
public static function textDelta(string $text, array $payload = []): self
45+
{
46+
return new self(self::TYPE_TEXT_DELTA, $payload, $text);
47+
}
48+
49+
public static function textComplete(array $payload = []): self
50+
{
51+
return new self(self::TYPE_TEXT_COMPLETE, $payload);
52+
}
53+
54+
public static function thinkingStart(array $payload = []): self
55+
{
56+
return new self(self::TYPE_THINKING_START, $payload);
57+
}
58+
59+
public static function thinkingDelta(string $text, array $payload = []): self
60+
{
61+
return new self(self::TYPE_THINKING_DELTA, $payload, $text);
62+
}
63+
64+
public static function thinkingComplete(array $payload = []): self
65+
{
66+
return new self(self::TYPE_THINKING_COMPLETE, $payload);
67+
}
68+
69+
public static function toolCall(array $payload = []): self
70+
{
71+
return new self(self::TYPE_TOOL_CALL, $payload);
72+
}
73+
74+
public static function toolResult(array $payload = []): self
75+
{
76+
return new self(self::TYPE_TOOL_RESULT, $payload);
77+
}
78+
79+
public static function error(string $message, array $payload = []): self
80+
{
81+
return new self(self::TYPE_ERROR, [
3482
'message' => $message,
35-
'data' => $payload,
83+
'data' => $payload,
3684
]);
3785
}
3886

87+
public function isStreamStart(): bool
88+
{
89+
return $this->type === self::TYPE_STREAM_START;
90+
}
91+
92+
public function isStreamEnd(): bool
93+
{
94+
return $this->type === self::TYPE_STREAM_END;
95+
}
96+
97+
public function isTextStart(): bool
98+
{
99+
return $this->type === self::TYPE_TEXT_START;
100+
}
101+
39102
public function isTextDelta(): bool
40103
{
41104
return $this->type === self::TYPE_TEXT_DELTA;
42105
}
43106

44-
public function isDone(): bool
107+
public function isTextComplete(): bool
108+
{
109+
return $this->type === self::TYPE_TEXT_COMPLETE;
110+
}
111+
112+
public function isThinkingStart(): bool
113+
{
114+
return $this->type === self::TYPE_THINKING_START;
115+
}
116+
117+
public function isThinkingDelta(): bool
118+
{
119+
return $this->type === self::TYPE_THINKING_DELTA;
120+
}
121+
122+
public function isThinkingComplete(): bool
123+
{
124+
return $this->type === self::TYPE_THINKING_COMPLETE;
125+
}
126+
127+
public function isToolCall(): bool
128+
{
129+
return $this->type === self::TYPE_TOOL_CALL;
130+
}
131+
132+
public function isToolResult(): bool
45133
{
46-
return $this->type === self::TYPE_DONE;
134+
return $this->type === self::TYPE_TOOL_RESULT;
47135
}
48136

49137
public function isError(): bool

classes/Ai/Drivers/OpenAi.php

Lines changed: 35 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
namespace tobimori\Seo\Ai\Drivers;
44

55
use Generator;
6-
use Kirby\Exception\Exception as KirbyException;
76
use tobimori\Seo\Ai\Chunk;
87
use tobimori\Seo\Ai\Driver;
8+
use tobimori\Seo\Ai\SseStream;
99

1010
class OpenAi extends Driver
1111
{
@@ -36,10 +36,9 @@ public function stream(string $prompt, array $context = []): Generator
3636
'stream' => true,
3737
];
3838

39-
// Responses API accepts strings, arrays of content blocks, or message lists.
40-
if (isset($context['input']) === true) {
39+
if (isset($context['input'])) {
4140
$payload['input'] = $context['input'];
42-
} elseif (isset($context['messages']) === true) {
41+
} elseif (isset($context['messages'])) {
4342
$payload['input'] = $context['messages'];
4443
} else {
4544
$payload['input'] = $input;
@@ -53,126 +52,47 @@ public function stream(string $prompt, array $context = []): Generator
5352
$payload['metadata'] = $context['metadata'];
5453
}
5554

56-
$options = [
57-
'http' => [
58-
'method' => 'POST',
59-
'header' => implode("\r\n", $headers),
60-
'content' => json_encode($payload, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES),
61-
'ignore_errors' => true,
62-
'protocol_version' => 1.1,
63-
]
64-
];
65-
66-
$contextResource = stream_context_create($options);
67-
$handle = @fopen($endpoint, 'rb', false, $contextResource);
68-
69-
if ($handle === false) {
70-
throw new KirbyException('Failed to establish OpenAI stream.');
71-
}
55+
$stream = new SseStream($endpoint, $headers, $payload, (int)$this->config('timeout', 120));
56+
yield from $stream->stream(function (array $event): Generator {
57+
$type = $event['type'] ?? null;
7258

73-
try {
74-
$meta = stream_get_meta_data($handle);
75-
$headers = $meta['wrapper_data'] ?? [];
76-
$status = $this->extractStatusCode($headers);
77-
78-
if ($status !== null && $status >= 400) {
79-
$body = stream_get_contents($handle) ?: '';
80-
throw new KirbyException(sprintf(
81-
'OpenAI request failed (%d): %s',
82-
$status,
83-
$this->summarizeBody($body)
84-
));
59+
if ($type === 'response.created') {
60+
yield Chunk::streamStart($event);
61+
return;
8562
}
8663

87-
stream_set_blocking($handle, true);
88-
stream_set_timeout($handle, 60);
89-
90-
while (!feof($handle)) {
91-
$line = fgets($handle);
92-
93-
if ($line === false) {
94-
$meta = stream_get_meta_data($handle);
95-
if (($meta['timed_out'] ?? false) === true) {
96-
throw new KirbyException('OpenAI stream timed out.');
97-
}
98-
99-
break;
100-
}
101-
102-
$line = trim($line);
103-
104-
// skip keep-alive newlines and unrelated prefixes
105-
if ($line === '' || str_starts_with($line, ':')) {
106-
continue;
107-
}
108-
109-
if (str_starts_with($line, 'data:') === false) {
110-
continue;
111-
}
112-
113-
$payload = trim(substr($line, 5));
114-
115-
if ($payload === '' || $payload === '[DONE]') {
116-
yield Chunk::done();
117-
break;
118-
}
119-
120-
$event = json_decode($payload, true);
121-
122-
if (json_last_error() !== JSON_ERROR_NONE || $event === null) {
123-
continue;
124-
}
125-
126-
$type = $event['type'] ?? null;
127-
128-
if ($type === 'response.error') {
129-
$message = $event['error']['message'] ?? 'Unknown OpenAI streaming error.';
130-
throw new KirbyException($message);
131-
}
132-
133-
if ($type === 'response.output_text.delta') {
134-
$delta = $event['delta'] ?? '';
135-
136-
if ($delta !== '') {
137-
yield Chunk::textDelta($delta);
138-
}
139-
140-
continue;
141-
}
64+
if ($type === 'response.in_progress') {
65+
yield Chunk::textStart($event);
66+
return;
67+
}
14268

143-
if ($type === 'response.completed') {
144-
yield Chunk::done($event['response'] ?? null);
145-
break;
69+
if ($type === 'response.output_text.delta') {
70+
$delta = $event['delta'] ?? '';
71+
if ($delta !== '') {
72+
yield Chunk::textDelta($delta, $event);
14673
}
74+
return;
14775
}
148-
} finally {
149-
fclose($handle);
150-
}
151-
}
15276

153-
private function extractStatusCode(array $headers): int|null
154-
{
155-
$statusLine = $headers[0] ?? null;
156-
157-
if ($statusLine === null) {
158-
return null;
159-
}
160-
161-
if (preg_match('/HTTP\/\d(?:\.\d)?\s+(\d{3})/', $statusLine, $matches) === 1) {
162-
return (int)$matches[1];
163-
}
164-
165-
return null;
166-
}
77+
if ($type === 'response.output_text.done') {
78+
yield Chunk::textComplete($event);
79+
return;
80+
}
16781

168-
private function summarizeBody(string $body, int $limit = 200): string
169-
{
170-
$body = trim($body);
82+
if ($type === 'response.completed') {
83+
yield Chunk::streamEnd($event);
84+
return;
85+
}
17186

172-
if (strlen($body) <= $limit) {
173-
return $body;
174-
}
87+
if ($type === 'response.output_item.added' && ($event['item']['type'] ?? null) === 'reasoning') {
88+
yield Chunk::thinkingStart($event);
89+
return;
90+
}
17591

176-
return substr($body, 0, $limit - 3) . '...';
92+
if ($type === 'response.error') {
93+
$message = $event['error']['message'] ?? 'Unknown OpenAI streaming error.';
94+
yield Chunk::error($message, $event);
95+
}
96+
});
17797
}
17898
}

0 commit comments

Comments
 (0)