Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# River

⚠️ Not production ready, while Replit is using parts of River in production, we are still going through rapid breaking changes. First production ready version will be `1.x.x` ⚠️
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pog


River allows multiple clients to connect to and make remote procedure calls to a remote server as if they were local procedures.

## Long-lived streaming remote procedure calls
Expand Down
17 changes: 17 additions & 0 deletions __tests__/streams.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,23 @@ describe('Writable unit', () => {
expect(closeCb).toHaveBeenCalledOnce();
});

it('should support closing with value', () => {
const closeCb = vi.fn();
const writeCb = vi.fn();
const writable = new WritableImpl<number>({
writeCb,
closeCb,
});

expect(writable.isWritable()).toBeTruthy();

writable.close(42);
expect(writable.isWritable()).toBeFalsy();
expect(closeCb).toHaveBeenCalledOnce();
expect(writeCb).toHaveBeenCalledOnce();
expect(writeCb).toHaveBeenCalledWith(42);
});

it('should allow calling close multiple times', () => {
const closeCb = vi.fn();
const writable = new WritableImpl<number>({
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@replit/river",
"description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!",
"version": "0.209.6",
"version": "0.209.7",
"type": "module",
"exports": {
".": {
Expand Down
12 changes: 9 additions & 3 deletions router/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,12 @@ export interface Writable<T> {
/**
* {@link close} signals the closure of the {@link Writeable}, informing the {@link Readable} end that
* all data has been transmitted and we've cleanly closed.
* Optionally a final value can be passed to {@link close}, which will be the last value
* to write before it closes.
*
* Calling {@link close} multiple times is a no-op.
*/
close(): undefined;
close(value?: T): undefined;
/**
* {@link isWritable} returns true if it's safe to call {@link write}, which
* means that the {@link Writable} hasn't been closed due to {@link close} being called
Expand Down Expand Up @@ -362,7 +364,7 @@ export class WritableImpl<T> implements Writable<T> {
/**
* Passed via constructor to pass on calls to {@link close}
*/
private closeCb: () => void;
private closeCb: (value?: T) => void;
/**
* Whether {@link close} was called, and {@link Writable} is not writable anymore.
*/
Expand All @@ -385,11 +387,15 @@ export class WritableImpl<T> implements Writable<T> {
return !this.closed;
}

public close(): undefined {
public close(value?: T): undefined {
if (this.closed) {
return;
}

if (value !== undefined) {
this.writeCb(value);
}

this.closed = true;
this.writeCb = () => undefined;
this.closeCb();
Expand Down
Loading