Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public enum OP {
LIST_NOTE_JOBS, // [c-s] get note job management information
LIST_UPDATE_NOTE_JOBS, // [c-s] get job management information for until unixtime
UNSUBSCRIBE_UPDATE_NOTE_JOBS, // [c-s] unsubscribe job information for job management
JOB_MANAGER_DISABLED, // [s-c] send when job manager is disabled
// @param unixTime
GET_INTERPRETER_BINDINGS, // [c-s] get interpreter bindings
SAVE_INTERPRETER_BINDINGS, // [c-s] save interpreter bindings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.zeppelin.service.JobManagerService;
import org.apache.zeppelin.service.NotebookService;
import org.apache.zeppelin.service.ServiceContext;
import org.apache.zeppelin.service.exception.JobManagerForbiddenException;
import org.apache.zeppelin.socket.NotebookServer;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.quartz.CronExpression;
Expand Down Expand Up @@ -221,6 +222,14 @@ private void checkIfParagraphIsNotNull(Paragraph paragraph, String paragraphId)
}
}

private void checkIfJobManagerIsEnabled() {
try {
jobManagerService.checkIfJobManagerIsEnabled();
} catch (JobManagerForbiddenException e) {
throw new ForbiddenException(e.getMessage());
}
}

/**
* Get notebook capabilities.
*/
Expand Down Expand Up @@ -1159,6 +1168,7 @@ public Response getCronJob(@PathParam("noteId") String noteId)
@ZeppelinApi
public Response getJobListforNote() throws IOException, IllegalArgumentException {
LOGGER.info("Get note jobs for job manager");
checkIfJobManagerIsEnabled();
List<JobManagerService.NoteJobInfo> noteJobs = jobManagerService
.getNoteJobInfoByUnixTime(0, getServiceContext(), new RestServiceCallback<>());
Map<String, Object> response = new HashMap<>();
Expand All @@ -1182,6 +1192,7 @@ public Response getJobListforNote() throws IOException, IllegalArgumentException
public Response getUpdatedJobListforNote(@PathParam("lastUpdateUnixtime") long lastUpdateUnixTime)
throws IOException, IllegalArgumentException {
LOGGER.info("Get updated note jobs lastUpdateTime {}", lastUpdateUnixTime);
checkIfJobManagerIsEnabled();
List<JobManagerService.NoteJobInfo> noteJobs =
jobManagerService.getNoteJobInfoByUnixTime(lastUpdateUnixTime, getServiceContext(),
new RestServiceCallback<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.zeppelin.service;

import jakarta.inject.Inject;
import java.util.Collections;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.notebook.AuthorizationService;
Expand All @@ -26,6 +27,7 @@
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.service.exception.JobManagerForbiddenException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -55,12 +57,26 @@ public JobManagerService(Notebook notebook,
this.zConf = zConf;
}

public void checkIfJobManagerIsEnabled() throws JobManagerForbiddenException {
if (!zConf.isJobManagerEnabled()) {
throw new JobManagerForbiddenException();
}
}

private boolean isJobManagerDisabled(ServiceContext context, ServiceCallback<?> callback) throws IOException {
if (!zConf.isJobManagerEnabled()) {
callback.onFailure(new JobManagerForbiddenException(), context);
return true;
}
return false;
}

public List<NoteJobInfo> getNoteJobInfo(String noteId,
ServiceContext context,
ServiceCallback<List<NoteJobInfo>> callback)
throws IOException {
if (!zConf.isJobManagerEnabled()) {
return new ArrayList<>();
if (isJobManagerDisabled(context, callback)) {
return Collections.emptyList();
}

return notebook.processNote(noteId,
Expand All @@ -83,8 +99,8 @@ public List<NoteJobInfo> getNoteJobInfoByUnixTime(long lastUpdateServerUnixTime,
ServiceContext context,
ServiceCallback<List<NoteJobInfo>> callback)
throws IOException {
if (!zConf.isJobManagerEnabled()) {
return new ArrayList<>();
if (isJobManagerDisabled(context, callback)) {
return Collections.emptyList();
}

List<NoteJobInfo> notesJobInfo = new LinkedList<>();
Expand All @@ -103,7 +119,7 @@ public List<NoteJobInfo> getNoteJobInfoByUnixTime(long lastUpdateServerUnixTime,
public void removeNoteJobInfo(String noteId,
ServiceContext context,
ServiceCallback<List<NoteJobInfo>> callback) throws IOException {
if (!zConf.isJobManagerEnabled()) {
if (isJobManagerDisabled(context, callback)) {
return;
}
List<NoteJobInfo> notesJobInfo = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zeppelin.service.exception;

/**
* Runtime exception thrown when the job manager is disabled.
*/
public class JobManagerForbiddenException extends Exception {

private static final long serialVersionUID = -8872599278254399427L;
public static final String MESSAGE = "Job Manager is disabled in the current configuration.";

public JobManagerForbiddenException() {
super(MESSAGE);
}

public JobManagerForbiddenException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.apache.zeppelin.service.NotebookService;
import org.apache.zeppelin.service.ServiceContext;
import org.apache.zeppelin.service.SimpleServiceCallback;
import org.apache.zeppelin.service.exception.JobManagerForbiddenException;
import org.apache.zeppelin.ticket.TicketContainer;
import org.apache.zeppelin.types.InterpreterSettingsList;
import org.apache.zeppelin.user.AuthenticationInfo;
Expand Down Expand Up @@ -562,7 +563,13 @@ public void onSuccess(List<JobManagerService.NoteJobInfo> notesJobInfo,

@Override
public void onFailure(Exception ex, ServiceContext context) throws IOException {
LOGGER.warn(ex.getMessage());
if (ex instanceof JobManagerForbiddenException) {
LOGGER.info("Job Manager is disabled. Rejecting request from user: {}",
context.getAutheInfo().getUser());
conn.send(serializeMessage(new Message(OP.JOB_MANAGER_DISABLED).put("errorMessage", ex.getMessage())));
} else {
LOGGER.warn(ex.getMessage());
}
}
});
}
Expand All @@ -584,7 +591,11 @@ public void onSuccess(List<JobManagerService.NoteJobInfo> notesJobInfo,

@Override
public void onFailure(Exception ex, ServiceContext context) throws IOException {
LOGGER.warn(ex.getMessage());
if (ex instanceof JobManagerForbiddenException) {
LOGGER.debug(ex.getMessage());
} else {
LOGGER.warn(ex.getMessage());
}
}
});
}
Expand Down Expand Up @@ -1931,6 +1942,15 @@ public void onSuccess(List<JobManagerService.NoteJobInfo> notesJobInfo,
connectionManager.broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(),
new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response));
}

@Override
public void onFailure(Exception ex, ServiceContext context) throws IOException {
if (ex instanceof JobManagerForbiddenException) {
LOGGER.debug(ex.getMessage());
} else {
super.onFailure(ex, context);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;

import jakarta.ws.rs.core.Response;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
import org.apache.zeppelin.notebook.Notebook;
Expand Down Expand Up @@ -1187,4 +1189,47 @@ void testRunWithServerRestart() throws Exception {
}
}
}

@Test
void testGetJobList_whenJobManagerDisabled() throws IOException {
assertJobManagerDisabledResponse("/notebook/jobmanager/");
}

@Test
void testGetUpdatedJobList_whenJobManagerDisabled() throws IOException {
assertJobManagerDisabledResponse("/notebook/jobmanager/12345/");
}

private void assertJobManagerDisabledResponse(String url) throws IOException {
boolean originalFlag = disableJobManagerAndBackupFlag();
String expectedErrorMessage = "Job Manager is disabled in the current configuration.";

try (CloseableHttpResponse response = httpGet(url)) {
assertEquals(Response.Status.FORBIDDEN.getStatusCode(),
response.getStatusLine().getStatusCode(),
"Response status should be 403 Forbidden");

String jsonResponse = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
Map<String, Object> parsedResponse = gson.fromJson(jsonResponse,
new TypeToken<Map<String, Object>>() {}.getType());

assertEquals("FORBIDDEN", parsedResponse.get("status"));
assertEquals(expectedErrorMessage, parsedResponse.get("message"));
} finally {
restoreJobManagerFlag(originalFlag);
}
}

private boolean disableJobManagerAndBackupFlag() {
ZeppelinConfiguration zConf = zepServer.getZeppelinConfiguration();
boolean originalFlag = zConf.isJobManagerEnabled();
zConf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_JOBMANAGER_ENABLE.getVarName(), "false");
return originalFlag;
}

private void restoreJobManagerFlag(boolean originalFlag) {
ZeppelinConfiguration zConf = zepServer.getZeppelinConfiguration();
zConf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_JOBMANAGER_ENABLE.getVarName(),
String.valueOf(originalFlag));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zeppelin.service;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.notebook.AuthorizationService;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.service.exception.JobManagerForbiddenException;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class JobManagerServiceTest {

private ZeppelinConfiguration zConf;
private Notebook mockNotebook;
private AuthorizationService mockAuthorizationService;
private JobManagerService jobManagerService;
private ServiceContext serviceContext;

@BeforeEach
public void setUp() {
zConf = mock(ZeppelinConfiguration.class);
mockNotebook = mock(Notebook.class);
mockAuthorizationService = mock(AuthorizationService.class);
jobManagerService = new JobManagerService(mockNotebook, mockAuthorizationService, zConf);
serviceContext = new ServiceContext(new AuthenticationInfo("test-user"), null);
}

@Test
void shouldThrowForbiddenException_whenJobManagerIsDisabled() {
when(zConf.isJobManagerEnabled()).thenReturn(false);

assertThrows(JobManagerForbiddenException.class, () -> {
jobManagerService.getNoteJobInfo("some_note_id", serviceContext, new SimpleServiceCallback<>());
});

assertThrows(JobManagerForbiddenException.class, () -> {
jobManagerService.getNoteJobInfoByUnixTime(0, serviceContext, new SimpleServiceCallback<>());
});

assertThrows(JobManagerForbiddenException.class, () -> {
jobManagerService.removeNoteJobInfo("some_note_id", serviceContext, new SimpleServiceCallback<>());
});
}

}
Loading
Loading