Skip to content

Commit eeefcbf

Browse files
committed
[ZEPPELIN-6216] Show clear message when job manager is disabled instead of infinite loading
1 parent 6dd03ab commit eeefcbf

17 files changed

Lines changed: 407 additions & 47 deletions

File tree

zeppelin-common/src/main/java/org/apache/zeppelin/common/Message.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ public enum OP {
176176
LIST_NOTE_JOBS, // [c-s] get note job management information
177177
LIST_UPDATE_NOTE_JOBS, // [c-s] get job management information for until unixtime
178178
UNSUBSCRIBE_UPDATE_NOTE_JOBS, // [c-s] unsubscribe job information for job management
179+
JOB_MANAGER_DISABLED, // [s-c] send when job manager is disabled
179180
// @param unixTime
180181
GET_INTERPRETER_BINDINGS, // [c-s] get interpreter bindings
181182
SAVE_INTERPRETER_BINDINGS, // [c-s] save interpreter bindings

zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.apache.zeppelin.service.JobManagerService;
6060
import org.apache.zeppelin.service.NotebookService;
6161
import org.apache.zeppelin.service.ServiceContext;
62+
import org.apache.zeppelin.service.exception.JobManagerForbiddenException;
6263
import org.apache.zeppelin.socket.NotebookServer;
6364
import org.apache.zeppelin.user.AuthenticationInfo;
6465
import org.quartz.CronExpression;
@@ -221,6 +222,14 @@ private void checkIfParagraphIsNotNull(Paragraph paragraph, String paragraphId)
221222
}
222223
}
223224

225+
private void checkIfJobManagerIsEnabled() {
226+
try {
227+
jobManagerService.checkIfJobManagerIsEnabled();
228+
} catch (JobManagerForbiddenException e) {
229+
throw new ForbiddenException(e.getMessage());
230+
}
231+
}
232+
224233
/**
225234
* Get notebook capabilities.
226235
*/
@@ -1159,6 +1168,7 @@ public Response getCronJob(@PathParam("noteId") String noteId)
11591168
@ZeppelinApi
11601169
public Response getJobListforNote() throws IOException, IllegalArgumentException {
11611170
LOGGER.info("Get note jobs for job manager");
1171+
checkIfJobManagerIsEnabled();
11621172
List<JobManagerService.NoteJobInfo> noteJobs = jobManagerService
11631173
.getNoteJobInfoByUnixTime(0, getServiceContext(), new RestServiceCallback<>());
11641174
Map<String, Object> response = new HashMap<>();
@@ -1182,6 +1192,7 @@ public Response getJobListforNote() throws IOException, IllegalArgumentException
11821192
public Response getUpdatedJobListforNote(@PathParam("lastUpdateUnixtime") long lastUpdateUnixTime)
11831193
throws IOException, IllegalArgumentException {
11841194
LOGGER.info("Get updated note jobs lastUpdateTime {}", lastUpdateUnixTime);
1195+
checkIfJobManagerIsEnabled();
11851196
List<JobManagerService.NoteJobInfo> noteJobs =
11861197
jobManagerService.getNoteJobInfoByUnixTime(lastUpdateUnixTime, getServiceContext(),
11871198
new RestServiceCallback<>());

zeppelin-server/src/main/java/org/apache/zeppelin/service/JobManagerService.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.zeppelin.service;
1919

2020
import jakarta.inject.Inject;
21+
import java.util.Collections;
2122
import org.apache.commons.lang3.StringUtils;
2223
import org.apache.zeppelin.conf.ZeppelinConfiguration;
2324
import org.apache.zeppelin.notebook.AuthorizationService;
@@ -26,6 +27,7 @@
2627
import org.apache.zeppelin.notebook.Notebook;
2728
import org.apache.zeppelin.notebook.Paragraph;
2829
import org.apache.zeppelin.scheduler.Job;
30+
import org.apache.zeppelin.service.exception.JobManagerForbiddenException;
2931
import org.slf4j.Logger;
3032
import org.slf4j.LoggerFactory;
3133

@@ -55,12 +57,26 @@ public JobManagerService(Notebook notebook,
5557
this.zConf = zConf;
5658
}
5759

60+
public void checkIfJobManagerIsEnabled() throws JobManagerForbiddenException {
61+
if (!zConf.isJobManagerEnabled()) {
62+
throw new JobManagerForbiddenException();
63+
}
64+
}
65+
66+
private boolean isJobManagerDisabled(ServiceContext context, ServiceCallback<?> callback) throws IOException {
67+
if (!zConf.isJobManagerEnabled()) {
68+
callback.onFailure(new JobManagerForbiddenException(), context);
69+
return true;
70+
}
71+
return false;
72+
}
73+
5874
public List<NoteJobInfo> getNoteJobInfo(String noteId,
5975
ServiceContext context,
6076
ServiceCallback<List<NoteJobInfo>> callback)
6177
throws IOException {
62-
if (!zConf.isJobManagerEnabled()) {
63-
return new ArrayList<>();
78+
if (isJobManagerDisabled(context, callback)) {
79+
return Collections.emptyList();
6480
}
6581

6682
return notebook.processNote(noteId,
@@ -83,8 +99,8 @@ public List<NoteJobInfo> getNoteJobInfoByUnixTime(long lastUpdateServerUnixTime,
8399
ServiceContext context,
84100
ServiceCallback<List<NoteJobInfo>> callback)
85101
throws IOException {
86-
if (!zConf.isJobManagerEnabled()) {
87-
return new ArrayList<>();
102+
if (isJobManagerDisabled(context, callback)) {
103+
return Collections.emptyList();
88104
}
89105

90106
List<NoteJobInfo> notesJobInfo = new LinkedList<>();
@@ -103,7 +119,7 @@ public List<NoteJobInfo> getNoteJobInfoByUnixTime(long lastUpdateServerUnixTime,
103119
public void removeNoteJobInfo(String noteId,
104120
ServiceContext context,
105121
ServiceCallback<List<NoteJobInfo>> callback) throws IOException {
106-
if (!zConf.isJobManagerEnabled()) {
122+
if (isJobManagerDisabled(context, callback)) {
107123
return;
108124
}
109125
List<NoteJobInfo> notesJobInfo = new ArrayList<>();
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.zeppelin.service.exception;
19+
20+
/**
21+
* Runtime exception thrown when the job manager is disabled.
22+
*/
23+
public class JobManagerForbiddenException extends Exception {
24+
25+
private static final long serialVersionUID = -8872599278254399427L;
26+
public static final String MESSAGE = "Job Manager is disabled in the current configuration.";
27+
28+
public JobManagerForbiddenException() {
29+
super(MESSAGE);
30+
}
31+
32+
public JobManagerForbiddenException(String message) {
33+
super(message);
34+
}
35+
}

zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
import org.apache.zeppelin.service.NotebookService;
9191
import org.apache.zeppelin.service.ServiceContext;
9292
import org.apache.zeppelin.service.SimpleServiceCallback;
93+
import org.apache.zeppelin.service.exception.JobManagerForbiddenException;
9394
import org.apache.zeppelin.ticket.TicketContainer;
9495
import org.apache.zeppelin.types.InterpreterSettingsList;
9596
import org.apache.zeppelin.user.AuthenticationInfo;
@@ -562,7 +563,13 @@ public void onSuccess(List<JobManagerService.NoteJobInfo> notesJobInfo,
562563

563564
@Override
564565
public void onFailure(Exception ex, ServiceContext context) throws IOException {
565-
LOGGER.warn(ex.getMessage());
566+
if (ex instanceof JobManagerForbiddenException) {
567+
LOGGER.info("Job Manager is disabled. Rejecting request from user: {}",
568+
context.getAutheInfo().getUser());
569+
conn.send(serializeMessage(new Message(OP.JOB_MANAGER_DISABLED).put("errorMessage", ex.getMessage())));
570+
} else {
571+
LOGGER.warn(ex.getMessage());
572+
}
566573
}
567574
});
568575
}
@@ -584,7 +591,11 @@ public void onSuccess(List<JobManagerService.NoteJobInfo> notesJobInfo,
584591

585592
@Override
586593
public void onFailure(Exception ex, ServiceContext context) throws IOException {
587-
LOGGER.warn(ex.getMessage());
594+
if (ex instanceof JobManagerForbiddenException) {
595+
LOGGER.debug(ex.getMessage());
596+
} else {
597+
LOGGER.warn(ex.getMessage());
598+
}
588599
}
589600
});
590601
}
@@ -1931,6 +1942,15 @@ public void onSuccess(List<JobManagerService.NoteJobInfo> notesJobInfo,
19311942
connectionManager.broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(),
19321943
new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response));
19331944
}
1945+
1946+
@Override
1947+
public void onFailure(Exception ex, ServiceContext context) throws IOException {
1948+
if (ex instanceof JobManagerForbiddenException) {
1949+
LOGGER.debug(ex.getMessage());
1950+
} else {
1951+
super.onFailure(ex, context);
1952+
}
1953+
}
19341954
}
19351955

19361956
@Override

zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import com.google.gson.Gson;
2020
import com.google.gson.reflect.TypeToken;
2121

22+
import jakarta.ws.rs.core.Response;
23+
import org.apache.zeppelin.conf.ZeppelinConfiguration;
2224
import org.apache.zeppelin.interpreter.InterpreterSetting;
2325
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
2426
import org.apache.zeppelin.notebook.Notebook;
@@ -1187,4 +1189,47 @@ void testRunWithServerRestart() throws Exception {
11871189
}
11881190
}
11891191
}
1192+
1193+
@Test
1194+
void testGetJobList_whenJobManagerDisabled() throws IOException {
1195+
assertJobManagerDisabledResponse("/notebook/jobmanager/");
1196+
}
1197+
1198+
@Test
1199+
void testGetUpdatedJobList_whenJobManagerDisabled() throws IOException {
1200+
assertJobManagerDisabledResponse("/notebook/jobmanager/12345/");
1201+
}
1202+
1203+
private void assertJobManagerDisabledResponse(String url) throws IOException {
1204+
boolean originalFlag = disableJobManagerAndBackupFlag();
1205+
String expectedErrorMessage = "Job Manager is disabled in the current configuration.";
1206+
1207+
try (CloseableHttpResponse response = httpGet(url)) {
1208+
assertEquals(Response.Status.FORBIDDEN.getStatusCode(),
1209+
response.getStatusLine().getStatusCode(),
1210+
"Response status should be 403 Forbidden");
1211+
1212+
String jsonResponse = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
1213+
Map<String, Object> parsedResponse = gson.fromJson(jsonResponse,
1214+
new TypeToken<Map<String, Object>>() {}.getType());
1215+
1216+
assertEquals("FORBIDDEN", parsedResponse.get("status"));
1217+
assertEquals(expectedErrorMessage, parsedResponse.get("message"));
1218+
} finally {
1219+
restoreJobManagerFlag(originalFlag);
1220+
}
1221+
}
1222+
1223+
private boolean disableJobManagerAndBackupFlag() {
1224+
ZeppelinConfiguration zConf = zepServer.getZeppelinConfiguration();
1225+
boolean originalFlag = zConf.isJobManagerEnabled();
1226+
zConf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_JOBMANAGER_ENABLE.getVarName(), "false");
1227+
return originalFlag;
1228+
}
1229+
1230+
private void restoreJobManagerFlag(boolean originalFlag) {
1231+
ZeppelinConfiguration zConf = zepServer.getZeppelinConfiguration();
1232+
zConf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_JOBMANAGER_ENABLE.getVarName(),
1233+
String.valueOf(originalFlag));
1234+
}
11901235
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.zeppelin.service;
19+
20+
import static org.junit.jupiter.api.Assertions.assertThrows;
21+
import static org.mockito.Mockito.mock;
22+
import static org.mockito.Mockito.when;
23+
24+
import org.apache.zeppelin.conf.ZeppelinConfiguration;
25+
import org.apache.zeppelin.notebook.AuthorizationService;
26+
import org.apache.zeppelin.notebook.Notebook;
27+
import org.apache.zeppelin.service.exception.JobManagerForbiddenException;
28+
import org.apache.zeppelin.user.AuthenticationInfo;
29+
import org.junit.jupiter.api.BeforeEach;
30+
import org.junit.jupiter.api.Test;
31+
32+
public class JobManagerServiceTest {
33+
34+
private ZeppelinConfiguration zConf;
35+
private Notebook mockNotebook;
36+
private AuthorizationService mockAuthorizationService;
37+
private JobManagerService jobManagerService;
38+
private ServiceContext serviceContext;
39+
40+
@BeforeEach
41+
public void setUp() {
42+
zConf = mock(ZeppelinConfiguration.class);
43+
mockNotebook = mock(Notebook.class);
44+
mockAuthorizationService = mock(AuthorizationService.class);
45+
jobManagerService = new JobManagerService(mockNotebook, mockAuthorizationService, zConf);
46+
serviceContext = new ServiceContext(new AuthenticationInfo("test-user"), null);
47+
}
48+
49+
@Test
50+
void shouldThrowForbiddenException_whenJobManagerIsDisabled() {
51+
when(zConf.isJobManagerEnabled()).thenReturn(false);
52+
53+
assertThrows(JobManagerForbiddenException.class, () -> {
54+
jobManagerService.getNoteJobInfo("some_note_id", serviceContext, new SimpleServiceCallback<>());
55+
});
56+
57+
assertThrows(JobManagerForbiddenException.class, () -> {
58+
jobManagerService.getNoteJobInfoByUnixTime(0, serviceContext, new SimpleServiceCallback<>());
59+
});
60+
61+
assertThrows(JobManagerForbiddenException.class, () -> {
62+
jobManagerService.removeNoteJobInfo("some_note_id", serviceContext, new SimpleServiceCallback<>());
63+
});
64+
}
65+
66+
}

0 commit comments

Comments
 (0)