diff --git a/index.js b/index.js index 7a15f22..4ce3f0f 100644 --- a/index.js +++ b/index.js @@ -3,7 +3,6 @@ var extend = require('extend-object'); var BaseSession = require('jingle-session'); var RTCPeerConnection = require('rtcpeerconnection'); - function filterContentSources(content, stream) { if (content.application.applicationType !== 'rtp') { return; @@ -64,6 +63,8 @@ function MediaSession(opts) { } this._ringing = false; + + this._actions = []; } @@ -95,54 +96,82 @@ Object.defineProperties(MediaSession.prototype, { MediaSession.prototype = extend(MediaSession.prototype, { + // ---------------------------------------------------------------- + // "Queue" for serializing async actions + // ---------------------------------------------------------------- + + _queue: function(action){ + var self = this; + self._actions.push(action); + if(self._actions.length > 1){ + return; + } + self._actNext(); + }, + + _actNext: function(){ + var self = this; + var next = self._actions[0]; + if(!next){ + return; + } + next(function(){ + self._actions.shift(); + self._actNext(); + }); + }, + // ---------------------------------------------------------------- // Session control methods // ---------------------------------------------------------------- start: function (offerOptions, next) { - var self = this; - this.state = 'pending'; - - next = next || function () {}; - - this.pc.isInitiator = true; - this.pc.offer(offerOptions, function (err, offer) { - if (err) { - self._log('error', 'Could not create WebRTC offer', err); - return self.end('failed-application', true); - } - - // a workaround for missing a=sendonly - // https://code.google.com/p/webrtc/issues/detail?id=1553 - if (offerOptions && offerOptions.mandatory) { - offer.jingle.contents.forEach(function (content) { - var mediaType = content.application.media; - - if (!content.description || content.application.applicationType !== 'rtp') { - return; - } - - if (!offerOptions.mandatory.OfferToReceiveAudio && mediaType === 'audio') { - content.senders = 'initiator'; - } - - if (!offerOptions.mandatory.OfferToReceiveVideo && mediaType === 'video') { - content.senders = 'initiator'; - } - }); - } - - offer.jingle.contents.forEach(filterUnusedLabels); - - self.send('session-initiate', offer.jingle); + this._queue(this._start.bind(this, offerOptions, next)); + }, - next(); - }); + _start: function (offerOptions, next, done) { + var self = this; + self.state = 'pending'; + + next = next || function () {}; + + self.pc.isInitiator = true; + self.pc.offer(offerOptions, function (err, offer) { + if (err) { + self._log('error', 'Could not create WebRTC offer', err); + self.end('failed-application', true); + return done(); + } + + // a workaround for missing a=sendonly + // https://code.google.com/p/webrtc/issues/detail?id=1553 + if (offerOptions && offerOptions.mandatory) { + offer.jingle.contents.forEach(function (content) { + var mediaType = content.application.media; + + if (!content.description || content.application.applicationType !== 'rtp') { + return; + } + + if (!offerOptions.mandatory.OfferToReceiveAudio && mediaType === 'audio') { + content.senders = 'initiator'; + } + + if (!offerOptions.mandatory.OfferToReceiveVideo && mediaType === 'video') { + content.senders = 'initiator'; + } + }); + } + + offer.jingle.contents.forEach(filterUnusedLabels); + + self.send('session-initiate', offer.jingle); + next(); + done(); + }); }, accept: function (opts, next) { - var self = this; - // support calling with accept(next) or accept(opts, next) if (arguments.length === 1 && typeof opts === 'function') { next = opts; @@ -151,6 +180,11 @@ MediaSession.prototype = extend(MediaSession.prototype, { next = next || function () {}; opts = opts || {}; + this._queue(this._accept.bind(this, opts, next)); + }, + + _accept: function (opts, next, done) { + var self = this; self.constraints = opts.constraints || { mandatory: { OfferToReceiveAudio: true, @@ -158,14 +192,15 @@ MediaSession.prototype = extend(MediaSession.prototype, { } }; - this._log('info', 'Accepted incoming session'); + self._log('info', 'Accepted incoming session'); - this.state = 'active'; + self.state = 'active'; - this.pc.answer(self.constraints, function (err, answer) { + self.pc.answer(self.constraints, function (err, answer) { if (err) { self._log('error', 'Could not create WebRTC answer', err); - return self.end('failed-application'); + self.end('failed-application'); + return done(); } answer.jingle.contents.forEach(filterUnusedLabels); @@ -173,84 +208,126 @@ MediaSession.prototype = extend(MediaSession.prototype, { self.send('session-accept', answer.jingle); next(); + done(); }); }, end: function (reason, silent) { - var self = this; - this.streams.forEach(function (stream) { - self.onRemoveStream({stream: stream}); - }); - this.pc.close(); - BaseSession.prototype.end.call(this, reason, silent); + this._queue(this._end.bind(this, reason, silent)); + }, + + _end: function (reason, silent, done) { + var self = this; + self.streams.forEach(function (stream) { + self.onRemoveStream({stream: stream}); + }); + self.pc.close(); + BaseSession.prototype.end.call(self, reason, silent); + done(); }, ring: function () { - this._log('info', 'Ringing on incoming session'); - this.ringing = true; - this.send('session-info', {ringing: true}); + this._queue(this._ring.bind(this)); + }, + + _ring: function (done) { + var self = this; + self._log('info', 'Ringing on incoming session'); + self.ringing = true; + self.send('session-info', {ringing: true}); + done(); }, mute: function (creator, name) { - this._log('info', 'Muting', name); + this._queue(this._mute.bind(this, creator, name)); + }, + + _mute: function (creator, name, done) { + var self = this; + self._log('info', 'Muting', name); - this.send('session-info', { + self.send('session-info', { mute: { creator: creator, name: name } }); + done(); }, unmute: function (creator, name) { - this._log('info', 'Unmuting', name); - this.send('session-info', { + this._queue(this._unmute.bind(this, creator, name)); + }, + + _unmute: function (creator, name, done) { + var self = this; + self._log('info', 'Unmuting', name); + self.send('session-info', { unmute: { creator: creator, name: name } }); + done(); }, hold: function () { - this._log('info', 'Placing on hold'); - this.send('session-info', {hold: true}); + this._queue(this._hold.bind(this)); + }, + + _hold: function (done) { + var self = this; + self._log('info', 'Placing on hold'); + self.send('session-info', {hold: true}); + done(); }, resume: function () { - this._log('info', 'Resuming from hold'); - this.send('session-info', {active: true}); + this._queue(this._resume.bind(this)); }, + _resume: function (done) { + var self = this; + self._log('info', 'Resuming from hold'); + self.send('session-info', {active: true}); + done(); + }, + + // ---------------------------------------------------------------- // Stream control methods // ---------------------------------------------------------------- addStream: function (stream, renegotiate, cb) { - var self = this; + this._queue(this._addStream.bind(this, stream, renegotiate, cb)); + }, + _addStream: function (stream, renegotiate, cb, done) { + var self = this; cb = cb || function () {}; - this.pc.addStream(stream); + self.pc.addStream(stream); if (!renegotiate) { - return; + return done(); } else if (typeof renegotiate === 'object') { self.constraints = renegotiate; } - this.pc.handleOffer({ + self.pc.handleOffer({ type: 'offer', - jingle: this.pc.remoteDescription + jingle: self.pc.remoteDescription }, function (err) { if (err) { self._log('error', 'Could not create offer for adding new stream'); - return cb(err); + cb(err); + return done(); } self.pc.answer(self.constraints, function (err, answer) { if (err) { self._log('error', 'Could not create answer for adding new stream'); - return cb(err); + cb(err); + return done(); } answer.jingle.contents.forEach(function (content) { filterContentSources(content, stream); @@ -262,6 +339,7 @@ MediaSession.prototype = extend(MediaSession.prototype, { self.send('source-add', answer.jingle); cb(); + done(); }); }); }, @@ -271,18 +349,21 @@ MediaSession.prototype = extend(MediaSession.prototype, { }, removeStream: function (stream, renegotiate, cb) { - var self = this; + this._queue(this._removeStream.bind(this, stream, renegotiate, cb)); + }, + _removeStream: function (stream, renegotiate, cb, done) { + var self = this; cb = cb || function () {}; if (!renegotiate) { - this.pc.removeStream(stream); - return; + self.pc.removeStream(stream); + return done(); } else if (typeof renegotiate === 'object') { self.constraints = renegotiate; } - var desc = this.pc.localDescription; + var desc = self.pc.localDescription; desc.contents.forEach(function (content) { filterContentSources(content, stream); }); @@ -291,23 +372,26 @@ MediaSession.prototype = extend(MediaSession.prototype, { }); delete desc.groups; - this.send('source-remove', desc); - this.pc.removeStream(stream); + self.send('source-remove', desc); + self.pc.removeStream(stream); - this.pc.handleOffer({ + self.pc.handleOffer({ type: 'offer', - jingle: this.pc.remoteDescription + jingle: self.pc.remoteDescription }, function (err) { if (err) { self._log('error', 'Could not process offer for removing stream'); - return cb(err); + cb(err); + return done(); } self.pc.answer(self.constraints, function (err) { if (err) { self._log('error', 'Could not process answer for removing stream'); - return cb(err); + cb(err); + return done(); } cb(); + done(); }); }); }, @@ -317,32 +401,37 @@ MediaSession.prototype = extend(MediaSession.prototype, { }, switchStream: function (oldStream, newStream, cb) { - var self = this; + this._queue(this._switchStream.bind(this, oldStream, newStream, cb)); + }, + _switchStream: function (oldStream, newStream, cb, done) { + var self = this; cb = cb || function () {}; - var desc = this.pc.localDescription; + var desc = self.pc.localDescription; desc.contents.forEach(function (content) { delete content.transport; delete content.application.payloads; }); - this.pc.removeStream(oldStream); - this.send('source-remove', desc); + self.pc.removeStream(oldStream); + self.send('source-remove', desc); - this.pc.addStream(newStream); - this.pc.handleOffer({ + self.pc.addStream(newStream); + self.pc.handleOffer({ type: 'offer', - jingle: this.pc.remoteDescription + jingle: self.pc.remoteDescription }, function (err) { if (err) { self._log('error', 'Could not process offer for switching streams'); - return cb(err); + cb(err); + return done(); } self.pc.answer(self.constraints, function (err, answer) { if (err) { self._log('error', 'Could not process answer for switching streams'); - return cb(err); + cb(err); + return done(); } answer.jingle.contents.forEach(function (content) { delete content.transport; @@ -350,6 +439,7 @@ MediaSession.prototype = extend(MediaSession.prototype, { }); self.send('source-add', answer.jingle); cb(); + done(); }); }); }, @@ -424,102 +514,137 @@ MediaSession.prototype = extend(MediaSession.prototype, { // ---------------------------------------------------------------- onSessionInitiate: function (changes, cb) { - var self = this; + this._queue(this._onSessionInitiate.bind(this, changes, cb)); + }, - this._log('info', 'Initiating incoming session'); + _onSessionInitiate: function (changes, cb, done) { + var self = this; + self._log('info', 'Initiating incoming session'); - this.state = 'pending'; + self.state = 'pending'; - this.pc.isInitiator = false; - this.pc.handleOffer({ + self.pc.isInitiator = false; + self.pc.handleOffer({ type: 'offer', jingle: changes }, function (err) { if (err) { self._log('error', 'Could not create WebRTC answer'); - return cb({condition: 'general-error'}); + cb({condition: 'general-error'}); + return done(); } cb(); + done(); }); }, onSessionAccept: function (changes, cb) { - var self = this; + this._queue(this._onSessionAccept.bind(this, changes, cb)); + }, - this.state = 'active'; - this.pc.handleAnswer({ + _onSessionAccept: function (changes, cb, done) { + var self = this; + self.state = 'active'; + self.pc.handleAnswer({ type: 'answer', jingle: changes }, function (err) { if (err) { self._log('error', 'Could not process WebRTC answer'); - return cb({condition: 'general-error'}); + cb({condition: 'general-error'}); + return done(); } self.emit('accepted', self); cb(); + done(); }); }, onSessionTerminate: function (changes, cb) { - var self = this; + this._queue(this._onSessionTerminate.bind(this, changes, cb)); + }, - this._log('info', 'Terminating session'); - this.streams.forEach(function (stream) { + _onSessionTerminate: function (changes, cb, done) { + var self = this; + self._log('info', 'Terminating session'); + self.streams.forEach(function (stream) { self.onRemoveStream({stream: stream}); }); - this.pc.close(); - BaseSession.prototype.end.call(this, changes.reason, true); + self.pc.close(); + BaseSession.prototype.end.call(self, changes.reason, true); cb(); + done(); }, onSessionInfo: function (info, cb) { + this._queue(this._onSessionInfo.bind(this, info, cb)); + }, + + _onSessionInfo: function (info, cb, done) { + var self = this; if (info.ringing) { - this._log('info', 'Outgoing session is ringing'); - this.ringing = true; - this.emit('ringing', this); - return cb(); + self._log('info', 'Outgoing session is ringing'); + self.ringing = true; + self.emit('ringing', self); + cb(); + return done(); } if (info.hold) { - this._log('info', 'On hold'); - this.emit('hold', this); - return cb(); + self._log('info', 'On hold'); + self.emit('hold', self); + cb(); + return done(); } if (info.active) { - this._log('info', 'Resuming from hold'); - this.emit('resumed', this); - return cb(); + self._log('info', 'Resuming from hold'); + self.emit('resumed', self); + cb(); + return done(); } if (info.mute) { - this._log('info', 'Muting', info.mute); - this.emit('mute', this, info.mute); - return cb(); + self._log('info', 'Muting', info.mute); + self.emit('mute', self, info.mute); + cb(); + return done(); } if (info.unmute) { - this._log('info', 'Unmuting', info.unmute); - this.emit('unmute', this, info.unmute); - return cb(); + self._log('info', 'Unmuting', info.unmute); + self.emit('unmute', self, info.unmute); + cb(); + return done(); } cb(); + done(); }, onTransportInfo: function (changes, cb) { - this.pc.processIce(changes, function () { - cb(); - }); + this._queue(this._onTransportInfo.bind(this, changes, cb)); + }, + + _onTransportInfo: function (changes, cb, done) { + var self = this; + self.pc.processIce(changes, function () { + cb(); + done(); + }); }, onSourceAdd: function (changes, cb) { + this._queue(this._onSourceAdd.bind(this, changes, cb)); + }, + + _onSourceAdd: function (changes, cb, done) { var self = this; - this._log('info', 'Adding new stream source'); + self._log('info', 'Adding new stream source'); - var newDesc = this.pc.remoteDescription; - this.pc.remoteDescription.contents.forEach(function (content, idx) { + var newDesc = self.pc.remoteDescription; + self.pc.remoteDescription.contents.forEach(function (content, idx) { var desc = content.application; var ssrcs = desc.sources || []; var groups = desc.sourceGroups || []; @@ -541,35 +666,42 @@ MediaSession.prototype = extend(MediaSession.prototype, { }); }); - this.pc.handleOffer({ + self.pc.handleOffer({ type: 'offer', jingle: newDesc }, function (err) { if (err) { self._log('error', 'Error adding new stream source'); - return cb({ + cb({ condition: 'general-error' }); + return done(); } self.pc.answer(self.constraints, function (err) { if (err) { self._log('error', 'Error adding new stream source'); - return cb({ + cb({ condition: 'general-error' }); + return done(); } cb(); + done(); }); }); }, onSourceRemove: function (changes, cb) { + this._queue(this._onSourceRemove.bind(this, changes, cb)); + }, + + _onSourceRemove: function (changes, cb, done) { var self = this; - this._log('info', 'Removing stream source'); + self._log('info', 'Removing stream source'); - var newDesc = this.pc.remoteDescription; - this.pc.remoteDescription.contents.forEach(function (content, idx) { + var newDesc = self.pc.remoteDescription; + self.pc.remoteDescription.contents.forEach(function (content, idx) { var desc = content.application; var ssrcs = desc.sources || []; var groups = desc.sourceGroups || []; @@ -627,24 +759,27 @@ MediaSession.prototype = extend(MediaSession.prototype, { }); }); - this.pc.handleOffer({ + self.pc.handleOffer({ type: 'offer', jingle: newDesc }, function (err) { if (err) { self._log('error', 'Error removing stream source'); - return cb({ + cb({ condition: 'general-error' }); + return done(); } self.pc.answer(self.constraints, function (err) { if (err) { self._log('error', 'Error removing stream source'); - return cb({ + cb({ condition: 'general-error' }); + return done(); } cb(); + done(); }); }); },