Skip to content

fix(Bun.write) add support to new Response(readable) #15631

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/bun.js/bindings/ZigGlobalObject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4371,6 +4371,10 @@ GlobalObject::PromiseFunctions GlobalObject::promiseHandlerID(Zig::FFIFunction h
return GlobalObject::PromiseFunctions::Bun__FetchTasklet__onResolveRequestStream;
} else if (handler == Bun__FetchTasklet__onRejectRequestStream) {
return GlobalObject::PromiseFunctions::Bun__FetchTasklet__onRejectRequestStream;
} else if (handler == Bun__BlobToFileSink__onResolveStream) {
return GlobalObject::PromiseFunctions::Bun__BlobToFileSink__onResolveStream;
} else if (handler == Bun__BlobToFileSink__onRejectStream) {
return GlobalObject::PromiseFunctions::Bun__BlobToFileSink__onRejectStream;
} else {
RELEASE_ASSERT_NOT_REACHED();
}
Expand Down
4 changes: 3 additions & 1 deletion src/bun.js/bindings/ZigGlobalObject.h
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,10 @@ class GlobalObject : public Bun::GlobalScope {
Bun__onRejectEntryPointResult,
Bun__FetchTasklet__onRejectRequestStream,
Bun__FetchTasklet__onResolveRequestStream,
Bun__BlobToFileSink__onResolveStream,
Bun__BlobToFileSink__onRejectStream,
};
static constexpr size_t promiseFunctionsSize = 26;
static constexpr size_t promiseFunctionsSize = 28;

static PromiseFunctions promiseHandlerID(SYSV_ABI EncodedJSValue (*handler)(JSC__JSGlobalObject* arg0, JSC__CallFrame* arg1));

Expand Down
2 changes: 2 additions & 0 deletions src/bun.js/bindings/exports.zig
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,8 @@ comptime {
BodyValueBuffererContext.shim.ref();

_ = Bun__LoadLibraryBunString;

JSC.WebCore.BlobToFileSink.shim.ref();
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/bun.js/bindings/headers.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

194 changes: 192 additions & 2 deletions src/bun.js/webcore/blob.zig
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ const NewReadFileHandler = @import("./blob/ReadFile.zig").NewReadFileHandler;
const WriteFile = @import("./blob/WriteFile.zig").WriteFile;
const ReadFile = @import("./blob/ReadFile.zig").ReadFile;
const WriteFileWindows = @import("./blob/WriteFile.zig").WriteFileWindows;

const FileSink = JSC.WebCore.FileSink;
pub const Blob = struct {
const bloblog = Output.scoped(.Blob, false);

Expand Down Expand Up @@ -1141,6 +1141,27 @@ pub const Blob = struct {
return JSC.JSPromise.rejectedPromiseValue(globalThis, err_ref.toJS(globalThis));
},
.Locked => {
if ((response.body.value == .Locked and (response.body.value.Locked.action != .none or response.body.value.Locked.isDisturbed(Response, globalThis, data)))) {
destination_blob.detach();
return JSC.JSPromise.rejectedPromiseValue(globalThis, globalThis.ERR_BODY_ALREADY_USED("Response body already used", .{}).toJS());
}

if (response.body.value.Locked.readable.get()) |stream| {
if (stream.isDisturbed(globalThis)) {
destination_blob.detach();
return JSC.JSPromise.rejectedPromiseValue(globalThis, globalThis.ERR_BODY_ALREADY_USED("Response body already used", .{}).toJS());
}

if (Environment.isWindows) {
// TODO: make FileSink work on every case on Windows
if (destination_blob.store.?.data.file.pathlike == .path) {
return BlobToFileSink.consume(globalThis, destination_blob, stream);
}
} else {
return BlobToFileSink.consume(globalThis, destination_blob, stream);
}
}
// TODO: removing this using toReadableStream in a followup, fixing HTMLRewriter.transform will be needed.
var task = bun.new(WriteFileWaitFromLockedValueTask, .{
.globalThis = globalThis,
.file_blob = destination_blob,
Expand Down Expand Up @@ -1172,6 +1193,25 @@ pub const Blob = struct {
return JSC.JSPromise.rejectedPromiseValue(globalThis, err_ref.toJS(globalThis));
},
.Locked => {
if ((request.body.value == .Locked and (request.body.value.Locked.action != .none or request.body.value.Locked.isDisturbed(Request, globalThis, data)))) {
destination_blob.detach();
return JSC.JSPromise.rejectedPromiseValue(globalThis, globalThis.ERR_BODY_ALREADY_USED("Request body already used", .{}).toJS());
}
if (request.body.value.Locked.readable.get()) |stream| {
if (stream.isDisturbed(globalThis)) {
destination_blob.detach();
return JSC.JSPromise.rejectedPromiseValue(globalThis, globalThis.ERR_BODY_ALREADY_USED("Response body already used", .{}).toJS());
}

if (Environment.isWindows) {
// TODO: make FileSink work on every case on Windows
if (destination_blob.store.?.data.file.pathlike == .path) {
return BlobToFileSink.consume(globalThis, destination_blob, stream);
}
} else {
return BlobToFileSink.consume(globalThis, destination_blob, stream);
}
}
var task = bun.new(WriteFileWaitFromLockedValueTask, .{
.globalThis = globalThis,
.file_blob = destination_blob,
Expand All @@ -1181,7 +1221,6 @@ pub const Blob = struct {

request.body.value.Locked.task = task;
request.body.value.Locked.onReceiveValue = WriteFileWaitFromLockedValueTask.thenWrap;

return task.promise.value();
},
}
Expand Down Expand Up @@ -4771,6 +4810,157 @@ pub const Blob = struct {
}
};

pub const BlobToFileSink = struct {
blob: Blob,
sink: FileSink.JSSink,
stream: JSC.WebCore.ReadableStream.Strong,
pub usingnamespace bun.New(BlobToFileSink);

pub fn deinit(this: *@This()) void {
this.sink.sink.finalize();
this.blob.detach();
this.stream.deinit();
this.destroy();
}

pub fn onResolveStream(_: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue {
var args = callframe.arguments_old(2);
var this: *@This() = args.ptr[args.len - 1].asPromisePtr(@This());

this.deinit();
return JSValue.jsUndefined();
}

pub fn onRejectStream(_: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue {
const args = callframe.arguments_old(2);
var this = args.ptr[args.len - 1].asPromisePtr(@This());
this.deinit();
return JSValue.jsUndefined();
}

pub const shim = JSC.Shimmer("Bun", "BlobToFileSink", @This());

pub const Export = shim.exportFunctions(.{
.onResolveStream = onResolveStream,
.onRejectStream = onRejectStream,
});
comptime {
const jsonResolveRequestStream = JSC.toJSHostFunction(onResolveStream);
@export(jsonResolveRequestStream, .{ .name = Export[0].symbol_name });
const jsonRejectRequestStream = JSC.toJSHostFunction(onRejectStream);
@export(jsonRejectRequestStream, .{ .name = Export[1].symbol_name });
}

pub fn consume(globalThis: *JSGlobalObject, destination_blob: Blob, stream: JSC.WebCore.ReadableStream) JSValue {
bun.assert(destination_blob.store != null);
if (destination_blob.store.?.data != .file) {
return JSC.JSPromise.rejectedPromiseValue(globalThis, globalThis.createInvalidArgs("Blob is read-only", .{}));
}

// lets do something similar to ReadableStream piping in a Bun.file(filename).writer()
var this = BlobToFileSink.new(.{
.blob = destination_blob,
.sink = .{
.sink = .{
.event_loop_handle = JSC.EventLoopHandle.init(JSC.VirtualMachine.get().eventLoop()),
.fd = bun.invalid_fd,
},
},
.stream = .{},
});
this.sink.sink.writer.setParent(&this.sink.sink);

const path = destination_blob.store.?.data.file.pathlike;

const input_path: JSC.WebCore.PathOrFileDescriptor = brk: {
if (path == .fd) {
break :brk .{ .fd = path.fd };
} else {
break :brk .{
.path = ZigString.Slice.fromUTF8NeverFree(
path.path.slice(),
).clone(
globalThis.allocator(),
) catch bun.outOfMemory(),
};
}
};
defer input_path.deinit();

switch (this.sink.sink.start(.{
.FileSink = .{
.input_path = input_path,
},
})) {
.err => |err| {
this.deinit();

return JSC.JSPromise.rejectedPromiseValue(globalThis, err.toJSC(globalThis));
},
else => {},
}

var signal = &this.sink.sink.signal;

signal.* = FileSink.JSSink.SinkSignal.init(JSValue.zero);

// explicitly set it to a dead pointer
// we use this memory address to disable signals being sent
signal.clear();
bun.assert(signal.isDead());

// We are already corked!
const assignment_result: JSValue = FileSink.JSSink.assignToStream(
globalThis,
stream.value,
&this.sink,
@as(**anyopaque, @ptrCast(&signal.ptr)),
);

assignment_result.ensureStillAlive();
// assert that it was updated
bun.assert(!signal.isDead());

if (assignment_result.toError()) |err_value| {
this.deinit();

return JSC.JSPromise.rejectedPromiseValue(globalThis, err_value);
}

if (!assignment_result.isEmptyOrUndefinedOrNull()) {
globalThis.bunVM().drainMicrotasks();

assignment_result.ensureStillAlive();
// it returns a Promise when it goes through ReadableStreamDefaultReader
if (assignment_result.asAnyPromise()) |promise| {
switch (promise.status(globalThis.vm())) {
.pending => {
this.stream = JSC.WebCore.ReadableStream.Strong.init(stream, globalThis);
assignment_result.then(
globalThis,
this,
onResolveStream,
onRejectStream,
);
return assignment_result;
},
.fulfilled, .rejected => {
this.deinit();

return assignment_result;
},
}
} else {
// if is not a promise we treat it as Error
this.deinit();
return JSC.JSPromise.rejectedPromiseValue(globalThis, assignment_result);
}
}
this.deinit();

return JSC.JSPromise.rejectedPromiseValue(globalThis, globalThis.ERR_BODY_ALREADY_USED("body already used", .{}).toJS());
}
};
pub const AnyBlob = union(enum) {
Blob: Blob,
// InlineBlob: InlineBlob,
Expand Down
38 changes: 23 additions & 15 deletions src/io/PipeWriter.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1230,11 +1230,11 @@ pub fn WindowsStreamingWriter(
onWrite(this.parent, written, if (done) .drained else .pending);

// process pending outgoing data if any
this.processSend();

// TODO: should we report writable?
if (onWritable) |onWritableFn| {
onWritableFn(this.parent);
if (!done and this.processSend()) {
// TODO: should we report writable?
if (onWritable) |onWritableFn| {
onWritableFn(this.parent);
}
}
}

Expand All @@ -1257,27 +1257,28 @@ pub fn WindowsStreamingWriter(
}

/// this tries to send more data returning if we are writable or not after this
fn processSend(this: *WindowsWriter) void {
/// returns true if not closed, is unsafe to access this after this returns false
fn processSend(this: *WindowsWriter) bool {
log("processSend", .{});
if (this.current_payload.isNotEmpty()) {
// we have some pending async request, the next outgoing data will be processed after this finish
this.last_write_result = .{ .pending = 0 };
return;
return true;
}

const bytes = this.outgoing.slice();
// nothing todo (we assume we are writable until we try to write something)
if (bytes.len == 0) {
this.last_write_result = .{ .wrote = 0 };
return;
return true;
}

var pipe = this.source orelse {
const err = bun.sys.Error.fromCode(bun.C.E.PIPE, .pipe);
this.last_write_result = .{ .err = err };
onError(this.parent, err);
this.closeWithoutReporting();
return;
return false;
};

// current payload is empty we can just swap with outgoing
Expand All @@ -1297,7 +1298,7 @@ pub fn WindowsStreamingWriter(
this.last_write_result = .{ .err = err };
onError(this.parent, err);
this.closeWithoutReporting();
return;
return false;
}
},
else => {
Expand All @@ -1307,11 +1308,12 @@ pub fn WindowsStreamingWriter(
this.last_write_result = .{ .err = err };
onError(this.parent, err);
this.closeWithoutReporting();
return;
return false;
}
},
}
this.last_write_result = .{ .pending = 0 };
return true;
}

const WindowsWriter = @This();
Expand Down Expand Up @@ -1372,8 +1374,11 @@ pub fn WindowsStreamingWriter(
if (had_buffered_data) {
return .{ .pending = 0 };
}
this.processSend();
return this.last_write_result;
if (this.processSend()) {
return this.last_write_result;
} else {
return .{ .done = 0 };
}
}

pub fn writeUTF16(this: *WindowsWriter, buf: []const u16) WriteResult {
Expand All @@ -1396,8 +1401,11 @@ pub fn WindowsStreamingWriter(
return .{ .wrote = 0 };
}

this.processSend();
return this.last_write_result;
if (this.processSend()) {
return this.last_write_result;
} else {
return .{ .wrote = 0 };
}
}

pub fn end(this: *WindowsWriter) void {
Expand Down
Loading
Loading