Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix stop request forward #126

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
147 changes: 123 additions & 24 deletions MediaPushPlugin/src/main/java/io/antmedia/plugin/MediaPushPlugin.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package io.antmedia.plugin;

import com.google.gson.Gson;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
Expand All @@ -15,7 +15,21 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;

import org.apache.http.impl.client.LaxRedirectStrategy;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.methods.RequestBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import org.apache.http.entity.StringEntity;
import java.util.regex.Pattern;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
Expand All @@ -42,14 +56,15 @@
import org.springframework.stereotype.Component;

import io.antmedia.AntMediaApplicationAdapter;
import io.antmedia.filter.JWTFilter;
import io.antmedia.RecordType;
import io.antmedia.datastore.db.DataStore;
import io.antmedia.datastore.db.types.Broadcast;
import io.antmedia.filter.TokenFilterManager;
import io.antmedia.model.Endpoint;
import io.antmedia.plugin.api.IStreamListener;
import io.antmedia.rest.model.Result;
import io.github.bonigarcia.wdm.WebDriverManager;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.Status;


@Component(value=IMediaPushPlugin.BEAN_NAME)
Expand All @@ -68,7 +83,7 @@ public class MediaPushPlugin implements ApplicationContextAware, IStreamListener


private boolean initialized = false;
private ApplicationContext applicationContext;
public ApplicationContext applicationContext;


public Map<String, RemoteWebDriver> getDrivers() {
Expand All @@ -82,7 +97,7 @@ public Map<String, RemoteWebDriver> getDrivers() {
* Extra
* https://peter.sh/experiments/chromium-command-line-switches/
*/
private static final List<String> CHROME_DEFAULT_SWITHES =
private static final List<String> CHROME_DEFAULT_SWITHES =
Arrays.asList(
"--remote-allow-origins=*",
"--enable-usermedia-screen-capturing",
Expand Down Expand Up @@ -208,6 +223,8 @@ public static boolean isValidURL(String urlString) {
return false;
}
}



@Override
public Result startMediaPush(String streamIdPar, String websocketUrl, int width, int height, String url, String token, String recordTypeString)
Expand All @@ -218,6 +235,10 @@ public Result startMediaPush(String streamIdPar, String websocketUrl, int width,
return startMediaPush(streamIdPar, websocketUrl, endpoint);
}

public WebDriverWait createWebDriverWait(WebDriver driver, int timeoutSeconds) {
return new WebDriverWait(driver, Duration.ofSeconds(timeoutSeconds));
}

@SuppressWarnings("javasecurity:S5334")
@Override
public Result startMediaPush(String streamIdPar, String websocketUrl, Endpoint endpoint)
Expand Down Expand Up @@ -261,10 +282,9 @@ public Result startMediaPush(String streamIdPar, String websocketUrl, Endpoint e

String publisherUrl = getPublisherHTMLURL(websocketUrl);


driver = openDriver(width, height, recordTypeString, extraChromeSwitchList, streamId, publisherUrl, url);

WebDriverWait wait = new WebDriverWait(driver, Duration.ofSeconds(TIMEOUT_IN_SECONDS));
WebDriverWait wait = createWebDriverWait(driver, TIMEOUT_IN_SECONDS);

//there are three methods in javascript side
//window.startBroadcasting = startBroadcasting -> gets message json parameter
Expand All @@ -273,9 +293,12 @@ public Result startMediaPush(String streamIdPar, String websocketUrl, Endpoint e

wait.until(ExpectedConditions.jsReturnsValue("return (typeof window.startBroadcasting != 'undefined')"));

String driverIp = getApplication().getServerSettings().getHostAddress();
driverIp = "{\"driverIp\":\"" + driverIp + "\"}";

String startBroadcastingCommand = String.format("window.startBroadcasting({websocketURL:'%s',streamId:'%s',width:%d,height:%d,token:'%s',driverIp:'%s'});",
websocketUrl, streamId, width, height, StringUtils.isNotBlank(token) ? token : "",driverIp);

String startBroadcastingCommand = String.format("window.startBroadcasting({websocketURL:'%s',streamId:'%s',width:%d,height:%d,token:'%s'});",
websocketUrl, streamId, width, height, StringUtils.isNotBlank(token) ? token : "");
driver.executeScript(startBroadcastingCommand);


Expand Down Expand Up @@ -328,7 +351,7 @@ public String checkAndGetStreamId(String streamId) {
return streamId;
}

private String clearAndQuit(String streamId, RemoteWebDriver driver, Exception e) {
public String clearAndQuit(String streamId, RemoteWebDriver driver, Exception e) {

logger.error(ExceptionUtils.getStackTrace(e));
if (driver != null)
Expand All @@ -347,7 +370,6 @@ private String clearAndQuit(String streamId, RemoteWebDriver driver, Exception e
recordingMap.remove(streamId);
return "Error message is " + e.getMessage();
}

public String getPublisherHTMLURL(String websocketUrl) throws InvalidArgumentException, URISyntaxException
{

Expand Down Expand Up @@ -375,13 +397,71 @@ public String getPublisherHTMLURL(String websocketUrl) throws InvalidArgumentExc
return publisherHtmlURL;
}

public boolean isValidIP(String ip) {
if (ip == null)
return false;
return ip.matches("^((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)\\.){3}(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)$");
}

public boolean forwardMediaPushStopRequest(String streamId, String ip) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a MUST to have integration test for this code. We may create another issue or increase the story point of this issue


AntMediaApplicationAdapter appAdapter = getApplication();
logger.info("forwarding media push stop request to {}", ip);
try {
CloseableHttpClient client = HttpClients.custom().setRedirectStrategy(new LaxRedirectStrategy()).build();

String url = "http://" + ip + ":" + appAdapter.getServerSettings().getDefaultHttpPort()
+ applicationContext.getApplicationName() + "/rest/v1/media-push/stop/" + streamId;
logger.info(url);

String jwtToken = JWTFilter.generateJwtToken(appAdapter.getAppSettings().getClusterCommunicationKey(),
System.currentTimeMillis() + 5000);
HttpUriRequest post = RequestBuilder.post().setUri(url)
.setHeader(HttpHeaders.CONTENT_TYPE, "application/json")
.setHeader(TokenFilterManager.TOKEN_HEADER_FOR_NODE_COMMUNICATION, jwtToken)
.setEntity(new StringEntity("{}", StandardCharsets.UTF_8))
.build();

CloseableHttpResponse response = client.execute(post);

if (response.getStatusLine().getStatusCode() == 404) {
return false;
} else if (response.getStatusLine().getStatusCode() == 200) {
return true;
}
} catch (Exception e) {
logger.error(ExceptionUtils.getStackTrace(e));
}
return false;
}

@Override
public Result stopMediaPush(String streamId) {
Result result = new Result(false);
if (!drivers.containsKey(streamId))
{
logger.warn("Driver does not exist for stream id: {}", streamId);
result.setMessage("Driver does not exist for stream id: " + streamId);
if (!drivers.containsKey(streamId)) {
Broadcast broadcast = getBroadcast(streamId);

if (broadcast == null) {
result.setMessage("Driver does not exist for stream id: " + streamId);
return result;
}

String metaData = broadcast.getMetaData();
try {
if (metaData != null && !metaData.equals("null") && !metaData.isEmpty()) {
JsonObject jsonObject = JsonParser.parseString(metaData).getAsJsonObject();
String driverIp = jsonObject.get("driverIp").getAsString();

if (isValidIP(driverIp) && forwardMediaPushStopRequest(streamId, driverIp)) {
result.setSuccess(true);
return result;
}
}
result.setMessage("Driver does not exist for stream id: " + streamId);
return result;
} catch (Exception e) {
logger.error(ExceptionUtils.getStackTrace(e));
}
return result;
}

Expand All @@ -390,22 +470,21 @@ public Result stopMediaPush(String streamId) {

WebDriverWait wait = new WebDriverWait(driver, Duration.ofSeconds(TIMEOUT_IN_SECONDS));

try
{
try {
driver.executeScript("window.stopBroadcasting({"
+ "streamId:'" + streamId + "',"
+ "});");

wait.until(ExpectedConditions.jsReturnsValue("return !window.isConnected('"+streamId+"')"));
wait.until(ExpectedConditions.jsReturnsValue("return !window.isConnected('" + streamId + "')"));
result.setSuccess(true);

}
catch(TimeoutException e) {
} catch (TimeoutException e) {
logger.error(ExceptionUtils.getStackTrace(e));
result.setMessage("Timeoutexception occured in stopping the stream. Fortunately, it'll quit the session to stop completely. Error message is " + e.getMessage());
result.setMessage(
"Timeoutexception occured in stopping the stream. Fortunately, it'll quit the session to stop completely. Error message is "
+ e.getMessage());

}
finally {
} finally {
driver.quit();
}
return result;
Expand Down Expand Up @@ -461,6 +540,26 @@ public RemoteWebDriver createDriver(int width, int height, String streamId, List
return new ChromeDriver(options);
}

public static StringBuffer readResponse(HttpResponse response) throws IOException {
BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity().getContent()));

StringBuffer result = new StringBuffer();
String line = "";
while ((line = rd.readLine()) != null) {
result.append(line);
}
return result;
}

public Broadcast getBroadcast(String streamId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to call getBroadcast through REST method. Just get it from dataStore

AntMediaApplicationAdapter app = getApplication();
DataStore dataStore = app.getDataStore();

if(dataStore == null)
return null;

return dataStore.get(streamId);
}


public AntMediaApplicationAdapter getApplication() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,4 @@ String getApplicationName() {
return app.getName();
}
}

4 changes: 2 additions & 2 deletions MediaPushPlugin/src/main/js/src/media-push-publisher.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ async function startBroadcasting(message) {
callback : (info, obj) => {
if (info == "initialized") {
console.log("WebRTC adaptor initialized");
webRTCAdaptorMediaPush.publish(message.streamId, token, "", "", "", "");
webRTCAdaptorMediaPush.publish(message.streamId, token, "", "", "", "", message.driverIp ,"");
} else if (info == "publish_started") {
console.log("mediapush_publish_started");
publishStarted = true;
Expand Down Expand Up @@ -176,4 +176,4 @@ function stopBroadcasting(message) {

window.startBroadcasting = startBroadcasting
window.stopBroadcasting = stopBroadcasting
window.isConnected = isConnected
window.isConnected = isConnected
1 change: 1 addition & 0 deletions MediaPushPlugin/src/main/resources/e4e4da7454e7cf2e6ffa.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion MediaPushPlugin/src/main/resources/media-push-publisher.js

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ public void testExtraChromeSwitches() {
assertEquals("switches", endpoint.getExtraChromeSwitches());

}
}
}
Loading
Loading