Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/silly-peaches-laugh.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"miniflare": minor
"wrangler": minor
---

Add local Stream binding support in Miniflare and Wrangler dev

Miniflare now implements a local Stream binding with direct upload flows and Stream-specific handler routing so Stream APIs can be exercised during local development. Wrangler dev forwards Stream binding configuration to Miniflare, including remote/local behavior, so local mode works with the same binding shape used in deployed workers.
71 changes: 71 additions & 0 deletions packages/miniflare/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ import {
SharedOptions,
SOCKET_ENTRY,
SOCKET_ENTRY_LOCAL,
STREAM_PLUGIN_NAME,
WorkerOptions,
WrappedBindingNames,
} from "./plugins";
Expand Down Expand Up @@ -150,6 +151,7 @@ import type {
KVNamespaceListKey,
Queue,
R2Bucket,
StreamBinding,
} from "@cloudflare/workers-types/experimental";
import type { Process } from "@puppeteer/browsers";

Expand Down Expand Up @@ -2749,6 +2751,75 @@ export class Miniflare {
): Promise<ReplaceWorkersTypes<ImagesBinding>> {
return this.#getProxy(IMAGES_PLUGIN_NAME, bindingName, workerName);
}
#resolveStreamNamespace<T>(
target: Record<string, unknown>,
methodName: string,
propertyName: string
): T {
const maybeMethod = target[methodName];
if (typeof maybeMethod === "function") {
return (maybeMethod as () => T)();
}
return target[propertyName] as T;
}
#wrapStreamVideoHandle(videoHandle: object): object {
return new Proxy(videoHandle, {
get: (target, key, receiver) => {
if (key === "captions") {
return this.#resolveStreamNamespace(
target as Record<string, unknown>,
"__miniflareCaptions",
"captions"
);
}
if (key === "downloads") {
return this.#resolveStreamNamespace(
target as Record<string, unknown>,
"__miniflareDownloads",
"downloads"
);
}
return Reflect.get(target, key, receiver);
},
});
}
getStreamBinding(
bindingName: string,
workerName?: string
): Promise<ReplaceWorkersTypes<StreamBinding>> {
return this.#getProxy(STREAM_PLUGIN_NAME, bindingName, workerName).then(
(streamBinding) => {
return new Proxy(streamBinding as object, {
get: (target, key, receiver) => {
if (key === "videos") {
return this.#resolveStreamNamespace(
target as Record<string, unknown>,
"__miniflareVideos",
"videos"
);
}
if (key === "watermarks") {
return this.#resolveStreamNamespace(
target as Record<string, unknown>,
"__miniflareWatermarks",
"watermarks"
);
}
if (key === "video") {
const videoFactory = Reflect.get(target, key, receiver);
if (typeof videoFactory !== "function") {
return videoFactory;
}
return (...args: unknown[]) => {
return this.#wrapStreamVideoHandle(videoFactory(...args));
};
}
return Reflect.get(target, key, receiver);
},
}) as ReplaceWorkersTypes<StreamBinding>;
}
);
}
getHelloWorldBinding(
bindingName: string,
workerName?: string
Expand Down
32 changes: 32 additions & 0 deletions packages/miniflare/src/plugins/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,28 @@ export function getGlobalServices({
// Collect list of workers we could route to, then parse and sort all routes
const workerNames = [...allWorkerRoutes.keys()];
const routes = parseRoutes(allWorkerRoutes);
const streamBindings = proxyBindings.flatMap((binding) => {
if (
!("service" in binding) ||
!binding.service?.name?.startsWith("stream:")
) {
return [];
}
assert(binding.name !== undefined);

const nameParts = binding.name.split(":");
const publicBindingName = nameParts[nameParts.length - 1];
if (!publicBindingName) {
return [];
}

return [
{
publicBindingName,
serviceName: binding.service.name,
},
];
});

// Define core/shared services.
const serviceEntryBindings: Worker_Binding[] = [
Expand All @@ -1025,6 +1047,16 @@ export function getGlobalServices({
},
{ name: CoreBindings.JSON_CF_BLOB, json: JSON.stringify(sharedOptions.cf) },
{ name: CoreBindings.JSON_LOG_LEVEL, json: JSON.stringify(log.level) },
{
name: CoreBindings.JSON_STREAM_BINDINGS,
json: JSON.stringify(
streamBindings.map(({ publicBindingName }) => publicBindingName)
),
},
...streamBindings.map(({ publicBindingName, serviceName }) => ({
name: publicBindingName,
service: { name: serviceName },
})),
{
name: CoreBindings.SERVICE_USER_FALLBACK,
service: { name: fallbackWorkerName },
Expand Down
8 changes: 8 additions & 0 deletions packages/miniflare/src/plugins/core/proxy/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,14 @@ class ProxyStubHandler<T extends object>
assert(!isClientError(res.status));

const typeHeader = res.headers.get(CoreHeaders.OP_RESULT_TYPE);
if (typeHeader === null) {
const result = parseWithReadableStreams(
NODE_PLATFORM_IMPL,
{ value: await res.text() },
this.revivers
);
return this.#maybeThrow(res, result, this.#parseAsyncResponse);
}
if (typeHeader === "Promise, ReadableStream") return res.body;
assert(typeHeader === "Promise"); // Must be async

Expand Down
7 changes: 6 additions & 1 deletion packages/miniflare/src/plugins/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import { QUEUES_PLUGIN, QUEUES_PLUGIN_NAME } from "./queues";
import { R2_PLUGIN, R2_PLUGIN_NAME } from "./r2";
import { RATELIMIT_PLUGIN, RATELIMIT_PLUGIN_NAME } from "./ratelimit";
import { SECRET_STORE_PLUGIN, SECRET_STORE_PLUGIN_NAME } from "./secret-store";
import { STREAM_PLUGIN, STREAM_PLUGIN_NAME } from "./stream";
import { VECTORIZE_PLUGIN, VECTORIZE_PLUGIN_NAME } from "./vectorize";
import {
VERSION_METADATA_PLUGIN,
Expand Down Expand Up @@ -69,6 +70,7 @@ export const PLUGINS = {
[HELLO_WORLD_PLUGIN_NAME]: HELLO_WORLD_PLUGIN,
[WORKER_LOADER_PLUGIN_NAME]: WORKER_LOADER_PLUGIN,
[MEDIA_PLUGIN_NAME]: MEDIA_PLUGIN,
[STREAM_PLUGIN_NAME]: STREAM_PLUGIN,
[VERSION_METADATA_PLUGIN_NAME]: VERSION_METADATA_PLUGIN,
};
export type Plugins = typeof PLUGINS;
Expand Down Expand Up @@ -133,6 +135,7 @@ export type WorkerOptions = z.input<typeof CORE_PLUGIN.options> &
z.input<typeof HELLO_WORLD_PLUGIN.options> &
z.input<typeof WORKER_LOADER_PLUGIN.options> &
z.input<typeof MEDIA_PLUGIN.options> &
z.input<typeof STREAM_PLUGIN.options> &
z.input<typeof VERSION_METADATA_PLUGIN.options>;

export type SharedOptions = z.input<typeof CORE_PLUGIN.sharedOptions> &
Expand All @@ -145,7 +148,8 @@ export type SharedOptions = z.input<typeof CORE_PLUGIN.sharedOptions> &
z.input<typeof SECRET_STORE_PLUGIN.sharedOptions> &
z.input<typeof ANALYTICS_ENGINE_PLUGIN.sharedOptions> &
z.input<typeof IMAGES_PLUGIN.sharedOptions> &
z.input<typeof HELLO_WORLD_PLUGIN.sharedOptions>;
z.input<typeof HELLO_WORLD_PLUGIN.sharedOptions> &
z.input<typeof STREAM_PLUGIN.sharedOptions>;

export const PLUGIN_ENTRIES = Object.entries(PLUGINS) as [
keyof Plugins,
Expand Down Expand Up @@ -212,4 +216,5 @@ export * from "./mtls";
export * from "./hello-world";
export * from "./worker-loader";
export * from "./media";
export * from "./stream";
export * from "./version-metadata";
Loading
Loading