diff --git a/packages/interface-compliance-tests/src/stream-muxer/stream-test.ts b/packages/interface-compliance-tests/src/stream-muxer/stream-test.ts index 96735cfedb..0fe7b629dc 100644 --- a/packages/interface-compliance-tests/src/stream-muxer/stream-test.ts +++ b/packages/interface-compliance-tests/src/stream-muxer/stream-test.ts @@ -200,6 +200,15 @@ export default (common: TestSetup): void => { expect(inboundStream).to.have.property('writeStatus', 'writable', 'inbound stream writeStatus was incorrect') expect(inboundStream).to.have.property('readStatus', 'readable', 'inbound stream readStatus was incorrect') }) + + it('closes read only', async () => { + expect(outboundStream).to.not.have.nested.property('timeline.close') + + await outboundStream.closeRead() + + expect(outboundStream).to.have.property('writeStatus', 'writable') + expect(outboundStream).to.have.property('readStatus', 'closed') + }) it('aborts', async () => { const eventPromises = Promise.all([ @@ -237,6 +246,19 @@ export default (common: TestSetup): void => { expect(inboundEvent).to.have.nested.property('error.name', 'StreamResetError') }) + it('resets when remote aborts', async () => { + expect(outboundStream).to.not.have.nested.property('timeline.close') + + const closePromise = pEvent(outboundStream, 'close') + inboundStream.abort(new Error('Urk!')) + + await closePromise + + expect(outboundStream).to.have.property('status', 'reset') + expect(isValidTick(outboundStream.timeline.close)).to.equal(true) + expect(outboundStream.timeline.close).to.be.greaterThanOrEqual(outboundStream.timeline.open) + }) + it('does not send close read when remote closes write', async () => { // @ts-expect-error internal method of AbstractMessageStream const sendCloseReadSpy = Sinon.spy(outboundStream, 'sendCloseRead') diff --git a/packages/transport-webrtc/test/stream.spec.ts b/packages/transport-webrtc/test/stream.spec.ts index fc4388c83f..ce9efd1dee 100644 --- a/packages/transport-webrtc/test/stream.spec.ts +++ b/packages/transport-webrtc/test/stream.spec.ts @@ -1,18 +1,11 @@ import { defaultLogger } from '@libp2p/logger' import { expect } from 'aegir/chai' -import delay from 'delay' import * as lengthPrefixed from 'it-length-prefixed' -import { bytes } from 'multiformats' -import { pEvent } from 'p-event' import { stubInterface } from 'sinon-ts' import { MAX_MESSAGE_SIZE, PROTOBUF_OVERHEAD } from '../src/constants.js' import { Message } from '../src/private-to-public/pb/message.js' import { createStream } from '../src/stream.js' import { isFirefox } from '../src/util.ts' -import { RTCPeerConnection } from '../src/webrtc/index.js' -import { receiveFinAck, receiveRemoteCloseWrite } from './util.js' -import type { WebRTCStream } from '../src/stream.js' -import type { Stream } from '@libp2p/interface' describe('Max message size', () => { it(`sends messages smaller or equal to ${MAX_MESSAGE_SIZE} bytes in one`, async () => { @@ -84,181 +77,3 @@ describe('Max message size', () => { } }) }) - -const TEST_MESSAGE = 'test_message' - -async function setup (): Promise<{ peerConnection: RTCPeerConnection, dataChannel: RTCDataChannel, stream: WebRTCStream }> { - const peerConnection = new RTCPeerConnection() - const dataChannel = peerConnection.createDataChannel('whatever', { negotiated: true, id: 91 }) - - await pEvent(dataChannel, 'open', { - rejectionEvents: [ - 'close', - 'error' - ] - }) - - const stream = createStream({ - channel: dataChannel, - direction: 'outbound', - closeTimeout: 1, - log: defaultLogger().forComponent('test') - }) - - return { peerConnection, dataChannel, stream } -} - -function generatePbByFlag (flag?: Message.Flag): Uint8Array { - const buf = Message.encode({ - flag, - message: bytes.fromString(TEST_MESSAGE) - }) - - return lengthPrefixed.encode.single(buf).subarray() -} - -// TODO: move to transport interface compliance suite -describe.skip('Stream Stats', () => { - let stream: WebRTCStream - let peerConnection: RTCPeerConnection - let dataChannel: RTCDataChannel - - beforeEach(async () => { - ({ stream, peerConnection, dataChannel } = await setup()) - }) - - afterEach(() => { - if (peerConnection != null) { - peerConnection.close() - } - }) - - it('can construct', () => { - expect(stream.timeline.close).to.not.exist() - }) - - it('close marks it closed', async () => { - expect(stream.timeline.close).to.not.exist() - expect(stream.writeStatus).to.equal('writable') - - receiveFinAck(dataChannel) - receiveRemoteCloseWrite(dataChannel) - - await Promise.all([ - pEvent(stream, 'close'), - stream.close() - ]) - - expect(stream.timeline.close).to.be.a('number') - expect(stream.writeStatus).to.equal('closed') - }) - - it('closeRead marks it read-closed only', async () => { - expect(stream.timeline.close).to.not.exist() - await stream.closeRead() - - expect(stream).to.have.property('writeStatus', 'writable') - expect(stream).to.have.property('readStatus', 'closed') - }) - - it('closeWrite marks it write-closed only', async () => { - expect(stream.timeline.close).to.not.exist() - - receiveFinAck(dataChannel) - await stream.close() - - expect(stream).to.have.property('writeStatus', 'closed') - expect(stream).to.have.property('readStatus', 'readable') - }) - - it('abort = close', () => { - expect(stream.timeline.close).to.not.exist() - stream.abort(new Error('Oh no!')) - expect(stream.timeline.close).to.be.a('number') - }) - - it('reset = close', () => { - expect(stream.timeline.close).to.not.exist() - stream.onRemoteReset() // only resets the write side - expect(stream.timeline.close).to.be.a('number') - expect(stream.timeline.close).to.be.greaterThanOrEqual(stream.timeline.open) - }) -}) - -// TODO: move to transport interface compliance suite -describe.skip('Stream Read Stats Transition By Incoming Flag', () => { - let dataChannel: RTCDataChannel - let stream: Stream - let peerConnection: RTCPeerConnection - - beforeEach(async () => { - ({ dataChannel, stream, peerConnection } = await setup()) - }) - - afterEach(() => { - if (peerConnection != null) { - peerConnection.close() - } - }) - - it('no flag, no transition', () => { - expect(stream.timeline.close).to.not.exist() - const data = generatePbByFlag() - dataChannel.onmessage?.(new MessageEvent('message', { data })) - - expect(stream.timeline.close).to.not.exist() - }) - - it('open to read-close by flag:FIN', async () => { - const data = generatePbByFlag(Message.Flag.FIN) - dataChannel.dispatchEvent(new MessageEvent('message', { data })) - - await delay(100) - - expect(stream.readStatus).to.equal('closed') - }) - - it('read-close to close by flag:STOP_SENDING', async () => { - const data = generatePbByFlag(Message.Flag.STOP_SENDING) - dataChannel.dispatchEvent(new MessageEvent('message', { data })) - - await delay(100) - - expect(stream.remoteReadStatus).to.equal('closed') - }) -}) - -// TODO: move to transport interface compliance suite -describe.skip('Stream Write Stats Transition By Incoming Flag', () => { - let dataChannel: RTCDataChannel - let stream: Stream - let peerConnection: RTCPeerConnection - - beforeEach(async () => { - ({ dataChannel, stream, peerConnection } = await setup()) - }) - - afterEach(() => { - if (peerConnection != null) { - peerConnection.close() - } - }) - - it('open to write-close by flag:STOP_SENDING', async () => { - const data = generatePbByFlag(Message.Flag.STOP_SENDING) - dataChannel.dispatchEvent(new MessageEvent('message', { data })) - - await delay(100) - - expect(stream.remoteReadStatus).to.equal('closed') - }) - - it('write-close to close by flag:FIN', async () => { - const data = generatePbByFlag(Message.Flag.FIN) - dataChannel.dispatchEvent(new MessageEvent('message', { data })) - - await delay(100) - - expect(stream.remoteWriteStatus).to.equal('closed') - }) -})