diff --git a/src/main/java/io/antmedia/webrtc/WebRTCMuxer.java b/src/main/java/io/antmedia/webrtc/WebRTCMuxer.java index befd68f30..99e85ec90 100644 --- a/src/main/java/io/antmedia/webrtc/WebRTCMuxer.java +++ b/src/main/java/io/antmedia/webrtc/WebRTCMuxer.java @@ -159,8 +159,12 @@ public void registerWebRTCClient(IWebRTCClient webRTCClient) { @Override public boolean unRegisterWebRTCClient(IWebRTCClient webRTCClient) { - clientCount.decrementAndGet(); - return webRTCClientList.remove(webRTCClient); + + boolean result = webRTCClientList.remove(webRTCClient); + if (result) { + clientCount.decrementAndGet(); + } + return result; } @Override @@ -449,18 +453,10 @@ public void setStreamId(String streamId) { } - public int getWidth() { - return width; - } - public void setWidth(int width) { this.width = width; } - public int getHeight() { - return height; - } - public void setHeight(int height) { this.height = height; } @@ -504,5 +500,8 @@ public int getClientCount() { return clientCount.intValue(); } + public void setVideoConf(byte[] videoConf) { + this.videoConf = videoConf; + } } diff --git a/src/main/java/io/antmedia/webrtc/adaptor/Adaptor.java b/src/main/java/io/antmedia/webrtc/adaptor/Adaptor.java index cdac8dd1f..7c020803b 100644 --- a/src/main/java/io/antmedia/webrtc/adaptor/Adaptor.java +++ b/src/main/java/io/antmedia/webrtc/adaptor/Adaptor.java @@ -27,25 +27,25 @@ public abstract class Adaptor implements Observer, SdpObserver private MediaConstraints sdpMediaConstraints; protected PeerConnectionFactory peerConnectionFactory; protected WebSocketCommunityHandler webSocketCommunityHandler; - + private String streamId; - + private Session session; - + protected static final Logger log = Red5LoggerFactory.getLogger(Adaptor.class); public Adaptor(WebSocketCommunityHandler websocketCommunityHandler) { this.webSocketCommunityHandler = websocketCommunityHandler; } - + public abstract void start(); - + public abstract void stop(); public void setPeerConnection(PeerConnection peerConnection) { this.peerConnection = peerConnection; } - + public PeerConnection getPeerConnection() { return peerConnection; } @@ -73,22 +73,22 @@ public void onIceConnectionReceivingChange(boolean receiving) { public void onIceGatheringChange(IceGatheringState newState) { } - + @Override public void onIceCandidate(IceCandidate candidate) { log.warn("onIceCandidate"); - + webSocketCommunityHandler - .sendTakeCandidateMessage(candidate.sdpMLineIndex, candidate.sdpMid, candidate.sdp, streamId, session); + .sendTakeCandidateMessage(candidate.sdpMLineIndex, candidate.sdpMid, candidate.sdp, streamId, session); } - + @Override public void onIceCandidatesRemoved(IceCandidate[] candidates) { } - + public void onAddStream(MediaStream stream) {} - + @Override public void onRemoveStream(MediaStream stream) { @@ -107,34 +107,36 @@ public void onRenegotiationNeeded() { @Override public void onAddTrack(RtpReceiver receiver, MediaStream[] mediaStreams) { } - + @Override public void onCreateSuccess(SessionDescription sdp) { - log.warn("onCreate Success"); - peerConnection.setLocalDescription(new SdpObserver() { - - @Override - public void onSetSuccess() { - - } - - @Override - public void onSetFailure(String error) { - - } - - @Override - public void onCreateSuccess(SessionDescription sdp) { - - } - - @Override - public void onCreateFailure(String error) { + log.warn("onCreate Success for stream: {}", streamId); + if (peerConnection != null) { + peerConnection.setLocalDescription(new SdpObserver() { + + @Override + public void onSetSuccess() { + log.info("set localdescription on set success for {}", streamId); + } + + @Override + public void onSetFailure(String error) { + log.info("set localdescription onSetFailure for {}", streamId); + } + + @Override + public void onCreateSuccess(SessionDescription sdp) { + log.info("set localdescription onCreateSuccess for {}", streamId); + } + + @Override + public void onCreateFailure(String error) { + log.info("set localdescription onCreateSuccess for {}", streamId); + } + }, sdp); + } - } - }, sdp); - String type; if (sdp.type == Type.ANSWER) { type = "answer"; @@ -142,11 +144,11 @@ public void onCreateFailure(String error) { else { type = "offer"; } - + webSocketCommunityHandler.sendSDPConfiguration(sdp.description, type, streamId, session); } - + @Override public void onSetSuccess() { log.warn("on setSuccess"); @@ -173,12 +175,12 @@ public void setSdpMediaConstraints(MediaConstraints sdpMediaConstraints) { public void setPeerConnectionFactory(PeerConnectionFactory peerConnectionFactory) { this.peerConnectionFactory = peerConnectionFactory; } - + public void setSession(Session session) { this.session = session; } - + public Session getSession() { return session; } diff --git a/src/test/java/io/antmedia/test/rest/RestServiceUnitTest.java b/src/test/java/io/antmedia/test/rest/RestServiceUnitTest.java index 3fac79d81..f8be2ee13 100644 --- a/src/test/java/io/antmedia/test/rest/RestServiceUnitTest.java +++ b/src/test/java/io/antmedia/test/rest/RestServiceUnitTest.java @@ -209,8 +209,19 @@ public void testDeleteVoD() { when(context.getBean(AntMediaApplicationAdapter.BEAN_NAME)).thenReturn(app); restService.setAppCtx(context); + + VoD voD = restService.getVoD(vodId); + assertEquals(vodId, voD.getVodId()); + assertEquals(streamVod.getStreamId(), voD.getStreamId()); + assertEquals(streamVod.getVodName(), voD.getVodName()); + assertEquals(streamVod.getFilePath(), voD.getFilePath()); + + assertEquals(1, restService.getVodList(0, 50).size()); + restService.deleteVoD(vodId); + assertEquals(0, restService.getVodList(0, 50).size()); + assertNull(datastore.getVoD(vodId)); } diff --git a/src/test/java/io/antmedia/test/webrtc/adaptor/RTMPAdaptorTest.java b/src/test/java/io/antmedia/test/webrtc/adaptor/RTMPAdaptorTest.java index e272d7144..1623d014b 100644 --- a/src/test/java/io/antmedia/test/webrtc/adaptor/RTMPAdaptorTest.java +++ b/src/test/java/io/antmedia/test/webrtc/adaptor/RTMPAdaptorTest.java @@ -1,23 +1,32 @@ package io.antmedia.test.webrtc.adaptor; +import org.apache.commons.lang3.RandomStringUtils; import org.awaitility.Awaitility; +import org.json.simple.JSONObject; import org.junit.Before; import org.junit.Test; import org.red5.net.websocket.WebSocketConnection; import org.springframework.context.ApplicationContext; +import org.webrtc.IceCandidate; import org.webrtc.MediaStream; +import org.webrtc.PeerConnection; import org.webrtc.PeerConnectionFactory; +import org.webrtc.SessionDescription; +import org.webrtc.SessionDescription.Type; import io.antmedia.recorder.FFmpegFrameRecorder; import io.antmedia.webrtc.adaptor.RTMPAdaptor; import io.antmedia.websocket.WebSocketCommunityHandler; +import io.antmedia.websocket.WebSocketConstants; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.Mockito.*; +import java.io.IOException; import java.util.concurrent.TimeUnit; +import javax.websocket.RemoteEndpoint; import javax.websocket.Session; public class RTMPAdaptorTest { @@ -27,10 +36,10 @@ public void setup() { } - + @Test public void testOnAddStream() { - + FFmpegFrameRecorder recorder = mock(FFmpegFrameRecorder.class); WebSocketCommunityHandler webSocketHandlerReal = new WebSocketCommunityHandler() { @@ -50,13 +59,61 @@ public ApplicationContext getAppContext() { rtmpAdaptor.setStreamId(streamId); Session session = mock(Session.class); rtmpAdaptor.setSession(session); - + MediaStream stream = mock(MediaStream.class); rtmpAdaptor.onAddStream(stream); - + verify(webSocketHandler).sendPublishStartedMessage(streamId, session); } + @Test + public void testCandidate() { + FFmpegFrameRecorder recorder = mock(FFmpegFrameRecorder.class); + + WebSocketCommunityHandler webSocketHandlerReal = new WebSocketCommunityHandler() { + + @Override + public ApplicationContext getAppContext() { + return null; + } + }; + + WebSocketCommunityHandler webSocketHandler = spy(webSocketHandlerReal); + + RTMPAdaptor adaptorReal = new RTMPAdaptor(recorder, webSocketHandler); + RTMPAdaptor rtmpAdaptor = spy(adaptorReal); + + String streamId = "stramId" + (int)(Math.random()*10000); + rtmpAdaptor.setStreamId(streamId); + Session session = mock(Session.class); + RemoteEndpoint.Basic basicRemote = mock(RemoteEndpoint.Basic .class); + when(session.getBasicRemote()).thenReturn(basicRemote); + when(session.isOpen()).thenReturn(true); + rtmpAdaptor.setSession(session); + + IceCandidate iceCandidate = new IceCandidate(RandomStringUtils.randomAlphanumeric(6), 5, RandomStringUtils.randomAlphanumeric(6)); + rtmpAdaptor.onIceCandidate(iceCandidate); + + + verify(webSocketHandler).sendTakeCandidateMessage(iceCandidate.sdpMLineIndex, iceCandidate.sdpMid, iceCandidate.sdp, streamId, session); + + JSONObject jsonObject = new JSONObject(); + jsonObject.put(WebSocketConstants.COMMAND, WebSocketConstants.TAKE_CANDIDATE_COMMAND); + jsonObject.put(WebSocketConstants.CANDIDATE_LABEL, iceCandidate.sdpMLineIndex); + jsonObject.put(WebSocketConstants.CANDIDATE_ID, iceCandidate.sdpMid); + jsonObject.put(WebSocketConstants.CANDIDATE_SDP, iceCandidate.sdp); + jsonObject.put(WebSocketConstants.STREAM_ID, streamId); + + try { + verify(basicRemote).sendText(jsonObject.toJSONString()); + } catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + + } + @Test public void testStartandStop() { @@ -78,27 +135,59 @@ public ApplicationContext getAppContext() { String streamId = "stramId" + (int)(Math.random()*10000); rtmpAdaptor.setStreamId(streamId); Session session = mock(Session.class); + RemoteEndpoint.Basic basicRemote = mock(RemoteEndpoint.Basic .class); + when(session.getBasicRemote()).thenReturn(basicRemote); + when(session.isOpen()).thenReturn(true); rtmpAdaptor.setSession(session); - doReturn(mock(PeerConnectionFactory.class)).when(rtmpAdaptor).createPeerConnectionFactory(); + PeerConnectionFactory peerConnectionFactory = mock(PeerConnectionFactory.class); + + doReturn(peerConnectionFactory).when(rtmpAdaptor).createPeerConnectionFactory(); rtmpAdaptor.start(); Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> - rtmpAdaptor.isStarted() - ); + rtmpAdaptor.isStarted() + ); verify(webSocketHandler).sendStartMessage(streamId, session); + SessionDescription sdp = new SessionDescription(Type.OFFER, RandomStringUtils.randomAlphanumeric(6)); + + rtmpAdaptor.onCreateSuccess(sdp); + + verify(webSocketHandler).sendSDPConfiguration(sdp.description, "offer", streamId, session); + JSONObject jsonResponseObject = new JSONObject(); + jsonResponseObject.put(WebSocketConstants.COMMAND, WebSocketConstants.TAKE_CONFIGURATION_COMMAND); + jsonResponseObject.put(WebSocketConstants.SDP, sdp.description); + jsonResponseObject.put(WebSocketConstants.TYPE, "offer"); + jsonResponseObject.put(WebSocketConstants.STREAM_ID, streamId); + try { + verify(basicRemote).sendText(jsonResponseObject.toJSONString()); + } catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } rtmpAdaptor.stop(); Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> - rtmpAdaptor.getSignallingExecutor().isShutdown() - ); - + rtmpAdaptor.getSignallingExecutor().isShutdown() + ); + verify(webSocketHandler).sendPublishFinishedMessage(streamId, session); + JSONObject jsonObj = new JSONObject(); + jsonObj.put(WebSocketConstants.COMMAND, WebSocketConstants.NOTIFICATION_COMMAND); + jsonObj.put(WebSocketConstants.DEFINITION, WebSocketConstants.PUBLISH_FINISHED); + jsonObj.put(WebSocketConstants.STREAM_ID, streamId); + try { + verify(basicRemote).sendText(jsonObj.toJSONString()); + } catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } diff --git a/src/test/java/io/antmedia/test/webrtc/adaptor/WebRTCMuxerTest.java b/src/test/java/io/antmedia/test/webrtc/adaptor/WebRTCMuxerTest.java new file mode 100644 index 000000000..08e9ec4b0 --- /dev/null +++ b/src/test/java/io/antmedia/test/webrtc/adaptor/WebRTCMuxerTest.java @@ -0,0 +1,103 @@ +package io.antmedia.test.webrtc.adaptor; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.*; + +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; + +import org.red5.server.api.scope.IScope; + +import io.antmedia.webrtc.WebRTCMuxer; +import io.antmedia.webrtc.api.IWebRTCAdaptor; +import io.antmedia.webrtc.api.IWebRTCClient; + +public class WebRTCMuxerTest { + + @Test + public void testWebRTCMuxerRegister() { + + IWebRTCAdaptor webRTCAdaptor = mock(IWebRTCAdaptor.class); + WebRTCMuxer muxer = new WebRTCMuxer(null, webRTCAdaptor); + + IScope scope = mock(IScope.class); + String streamId = "steram" + (int)(Math.random()*100000); + muxer.init(scope, streamId, 480); + + assertEquals(streamId, muxer.getStreamId()); + + muxer.registerToAdaptor(); + + verify(webRTCAdaptor).registerMuxer(streamId, muxer); + + muxer.writeTrailer(); + + verify(webRTCAdaptor).unRegisterMuxer(streamId, muxer); + + } + + @Test + public void testRegisterWebRTCClientAndSendData() { + IWebRTCAdaptor webRTCAdaptor = mock(IWebRTCAdaptor.class); + WebRTCMuxer muxer = new WebRTCMuxer(null, webRTCAdaptor); + + int clientCount = (int)(Math.random()*999); + List clientList = new ArrayList<>(); + for (int i = 0; i< clientCount; i++) { + IWebRTCClient client = mock(IWebRTCClient.class); + clientList.add(client); + muxer.registerWebRTCClient(client); + } + + assertEquals(clientCount, muxer.getClientCount()); + assertEquals(clientCount, muxer.getClientList().size()); + + muxer.unRegisterWebRTCClient(mock(IWebRTCClient.class)); + assertEquals(clientCount, muxer.getClientCount()); + + + byte[] videoPacket = "this is a video packet".getBytes(); + long timestamp = System.currentTimeMillis(); + byte[] videoConf = "this is a videoc conf packet".getBytes(); + + muxer.setVideoConf(videoConf); + muxer.sendVideoConfPacket(videoPacket, timestamp); + + for (IWebRTCClient iWebRTCClient : clientList) { + verify(iWebRTCClient).sendVideoConfPacket(videoConf, videoPacket, timestamp); + } + + boolean isKeyFrame = false; + videoPacket = "this is another video packet".getBytes(); + timestamp = System.currentTimeMillis(); + muxer.sendVideoPacket(videoPacket, isKeyFrame, timestamp); + + for (IWebRTCClient iWebRTCClient : clientList) { + verify(iWebRTCClient).sendVideoPacket(videoPacket, isKeyFrame, timestamp); + } + + byte[] audioPacket = "this is a audio packet".getBytes(); + timestamp = System.currentTimeMillis(); + muxer.sendAudioPacket(audioPacket, timestamp); + + for (IWebRTCClient iWebRTCClient : clientList) { + verify(iWebRTCClient).sendAudioPacket(audioPacket, timestamp); + } + + muxer.writeTrailer(); + + for (IWebRTCClient iWebRTCClient : clientList) { + verify(iWebRTCClient).stop(); + } + + + + + } + + + +}