Skip to content

Commit a8a2617

Browse files
authored
Merge pull request #1 from ruanwenjun/dev_wenjun_cpTransportMultipleLineParam
Support parse task output params under multiple log
2 parents 83ac81c + 8bc53e1 commit a8a2617

File tree

11 files changed

+179
-64
lines changed

11 files changed

+179
-64
lines changed

Diff for: dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java

+11-35
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
2525
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
26+
import org.apache.dolphinscheduler.plugin.task.api.parser.TaskOutputParameterParser;
2627
import org.apache.dolphinscheduler.plugin.task.api.utils.AbstractCommandExecutorConstants;
2728
import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
2829

@@ -35,16 +36,16 @@
3536
import java.io.InputStreamReader;
3637
import java.lang.reflect.Field;
3738
import java.util.Collections;
39+
import java.util.HashMap;
3840
import java.util.LinkedList;
3941
import java.util.List;
42+
import java.util.Map;
4043
import java.util.concurrent.ExecutorService;
4144
import java.util.concurrent.Executors;
4245
import java.util.concurrent.LinkedBlockingQueue;
4346
import java.util.concurrent.ThreadFactory;
4447
import java.util.concurrent.TimeUnit;
4548
import java.util.function.Consumer;
46-
import java.util.regex.Matcher;
47-
import java.util.regex.Pattern;
4849

4950
import org.slf4j.Logger;
5051

@@ -55,12 +56,8 @@
5556
*/
5657
public abstract class AbstractCommandExecutor {
5758

58-
/**
59-
* rules for extracting Var Pool
60-
*/
61-
protected static final Pattern SETVALUE_REGEX = Pattern.compile(TaskConstants.SETVALUE_REGEX);
59+
protected volatile Map<String, String> taskOutputParams = new HashMap<>();
6260

63-
protected StringBuilder varPool = new StringBuilder();
6461
/**
6562
* process
6663
*/
@@ -83,11 +80,6 @@ public abstract class AbstractCommandExecutor {
8380

8481
protected boolean logOutputIsSuccess = false;
8582

86-
/*
87-
* SHELL result string
88-
*/
89-
protected String taskResultString;
90-
9183
/**
9284
* taskRequest
9385
*/
@@ -149,6 +141,7 @@ private void buildProcess(String commandFile) throws IOException {
149141
/**
150142
* generate systemd command.
151143
* eg: sudo systemd-run -q --scope -p CPUQuota=100% -p MemoryMax=200M --uid=root
144+
*
152145
* @param command command
153146
*/
154147
private void generateCgroupCommand(List<String> command) {
@@ -245,8 +238,8 @@ public TaskResponse run(String execCommand) throws IOException, InterruptedExcep
245238

246239
}
247240

248-
public String getVarPool() {
249-
return varPool.toString();
241+
public Map<String, String> getTaskOutputParams() {
242+
return taskOutputParams;
250243
}
251244

252245
/**
@@ -347,22 +340,19 @@ private void parseProcessOutput(Process process) {
347340
String threadLoggerInfoName = taskRequest.getTaskLogName();
348341
ExecutorService getOutputLogService = newDaemonSingleThreadExecutor(threadLoggerInfoName);
349342
getOutputLogService.submit(() -> {
343+
TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser();
350344
try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
351345
String line;
352346
while ((line = inReader.readLine()) != null) {
353-
if (line.startsWith("${setValue(") || line.startsWith("#{setValue(")) {
354-
varPool.append(findVarPool(line));
355-
varPool.append("$VarPool$");
356-
} else {
357-
logBuffer.add(line);
358-
taskResultString = line;
359-
}
347+
logBuffer.add(line);
348+
taskOutputParameterParser.appendParseLog(line);
360349
}
361350
logOutputIsSuccess = true;
362351
} catch (Exception e) {
363352
logger.error(e.getMessage(), e);
364353
logOutputIsSuccess = true;
365354
}
355+
taskOutputParams = taskOutputParameterParser.getTaskOutputParams();
366356
});
367357

368358
getOutputLogService.shutdown();
@@ -388,20 +378,6 @@ private void parseProcessOutput(Process process) {
388378
parseProcessOutputExecutorService.shutdown();
389379
}
390380

391-
/**
392-
* find var pool
393-
*
394-
* @param line
395-
* @return
396-
*/
397-
private String findVarPool(String line) {
398-
Matcher matcher = SETVALUE_REGEX.matcher(line);
399-
if (matcher.find()) {
400-
return matcher.group(1);
401-
}
402-
return null;
403-
}
404-
405381
/**
406382
* get remain time(s)
407383
*

Diff for: dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java

+16-8
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
2525

2626
import org.apache.commons.collections4.CollectionUtils;
27+
import org.apache.commons.collections4.MapUtils;
2728
import org.apache.commons.lang3.StringUtils;
2829

2930
import java.util.ArrayList;
@@ -33,13 +34,17 @@
3334
import java.util.Map;
3435
import java.util.Objects;
3536

37+
import lombok.extern.slf4j.Slf4j;
38+
3639
import com.fasterxml.jackson.databind.JsonNode;
3740
import com.fasterxml.jackson.databind.node.ArrayNode;
3841

3942
/**
4043
* job params related class
4144
*/
45+
@Slf4j
4246
public abstract class AbstractParameters implements IParameters {
47+
4348
@Override
4449
public abstract boolean checkParameters();
4550

@@ -79,7 +84,7 @@ public Map<String, Property> getLocalParametersMap() {
7984
Map<String, Property> localParametersMaps = new LinkedHashMap<>();
8085
if (localParams != null) {
8186
for (Property property : localParams) {
82-
localParametersMaps.put(property.getProp(),property);
87+
localParametersMaps.put(property.getProp(), property);
8388
}
8489
}
8590
return localParametersMaps;
@@ -129,27 +134,30 @@ public void setVarPool(String varPool) {
129134
}
130135
}
131136

132-
public void dealOutParam(String result) {
137+
public void dealOutParam(Map<String, String> taskOutputParams) {
133138
if (CollectionUtils.isEmpty(localParams)) {
134139
return;
135140
}
136141
List<Property> outProperty = getOutProperty(localParams);
137142
if (CollectionUtils.isEmpty(outProperty)) {
138143
return;
139144
}
140-
if (StringUtils.isEmpty(result)) {
145+
if (MapUtils.isEmpty(taskOutputParams)) {
141146
outProperty.forEach(this::addPropertyToValPool);
142147
return;
143148
}
144-
Map<String, String> taskResult = getMapByString(result);
145-
if (taskResult.size() == 0) {
146-
return;
147-
}
149+
148150
for (Property info : outProperty) {
149-
String propValue = taskResult.get(info.getProp());
151+
String propValue = taskOutputParams.get(info.getProp());
150152
if (StringUtils.isNotEmpty(propValue)) {
151153
info.setValue(propValue);
152154
addPropertyToValPool(info);
155+
continue;
156+
}
157+
addPropertyToValPool(info);
158+
if (StringUtils.isEmpty(info.getValue())) {
159+
log.warn("The output parameter {} value is empty and cannot find the out parameter from task output",
160+
info);
153161
}
154162
}
155163
}

Diff for: dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java

-1
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,6 @@ public List<ResourceInfo> getResourceFilesList() {
253253
return new ArrayList<>();
254254
}
255255

256-
@Override
257256
public void dealOutParam(String result) {
258257
if (CollectionUtils.isEmpty(localParams)) {
259258
return;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.dolphinscheduler.plugin.task.api.parser;
18+
19+
import org.apache.commons.lang3.StringUtils;
20+
import org.apache.commons.lang3.tuple.ImmutablePair;
21+
import org.apache.commons.lang3.tuple.Pair;
22+
23+
import java.util.ArrayList;
24+
import java.util.HashMap;
25+
import java.util.List;
26+
import java.util.Map;
27+
28+
import javax.annotation.concurrent.NotThreadSafe;
29+
30+
import lombok.extern.slf4j.Slf4j;
31+
32+
/**
33+
* Used to parse ${setValue()} and #{setValue()} from given lines.
34+
*/
35+
@Slf4j
36+
@NotThreadSafe
37+
public class TaskOutputParameterParser {
38+
39+
// Used to avoid '${setValue(' which loss the end of ')}'
40+
private final int maxOneParameterRows;
41+
42+
// Used to avoid '${setValue(' which length is too long, this may case OOM
43+
private final int maxOneParameterLength;
44+
45+
private final Map<String, String> taskOutputParams;
46+
47+
private List<String> currentTaskOutputParam;
48+
49+
private long currentTaskOutputParamLength;
50+
51+
public TaskOutputParameterParser() {
52+
// the default max rows of one parameter is 1024, this should be enough
53+
this(1024, Integer.MAX_VALUE);
54+
}
55+
56+
public TaskOutputParameterParser(int maxOneParameterRows, int maxOneParameterLength) {
57+
this.maxOneParameterRows = maxOneParameterRows;
58+
this.maxOneParameterLength = maxOneParameterLength;
59+
this.taskOutputParams = new HashMap<>();
60+
this.currentTaskOutputParam = null;
61+
this.currentTaskOutputParamLength = 0;
62+
}
63+
64+
public void appendParseLog(String logLine) {
65+
if (logLine == null) {
66+
return;
67+
}
68+
69+
if (currentTaskOutputParam != null) {
70+
if (currentTaskOutputParam.size() > maxOneParameterRows
71+
|| currentTaskOutputParamLength > maxOneParameterLength) {
72+
log.warn(
73+
"The output param expression '{}' is too long, the max rows is {}, max length is {}, will skip this param",
74+
String.join("\n", currentTaskOutputParam), maxOneParameterLength, maxOneParameterRows);
75+
currentTaskOutputParam = null;
76+
currentTaskOutputParamLength = 0;
77+
return;
78+
}
79+
// continue to parse the rest of line
80+
int i = logLine.indexOf(")}");
81+
if (i == -1) {
82+
// the end of var pool not found
83+
currentTaskOutputParam.add(logLine);
84+
currentTaskOutputParamLength += logLine.length();
85+
} else {
86+
// the end of var pool found
87+
currentTaskOutputParam.add(logLine.substring(0, i + 2));
88+
Pair<String, String> keyValue = parseOutputParam(String.join("\n", currentTaskOutputParam));
89+
if (keyValue.getKey() != null && keyValue.getValue() != null) {
90+
taskOutputParams.put(keyValue.getKey(), keyValue.getValue());
91+
}
92+
currentTaskOutputParam = null;
93+
currentTaskOutputParamLength = 0;
94+
// continue to parse the rest of line
95+
if (i + 2 != logLine.length()) {
96+
appendParseLog(logLine.substring(i + 2));
97+
}
98+
}
99+
return;
100+
}
101+
102+
int indexOfVarPoolBegin = logLine.indexOf("${setValue(");
103+
if (indexOfVarPoolBegin == -1) {
104+
indexOfVarPoolBegin = logLine.indexOf("#{setValue(");
105+
}
106+
if (indexOfVarPoolBegin == -1) {
107+
return;
108+
}
109+
currentTaskOutputParam = new ArrayList<>();
110+
appendParseLog(logLine.substring(indexOfVarPoolBegin));
111+
}
112+
113+
public Map<String, String> getTaskOutputParams() {
114+
return taskOutputParams;
115+
}
116+
117+
// #{setValue(xx=xx)}
118+
protected Pair<String, String> parseOutputParam(String outputParam) {
119+
if (StringUtils.isEmpty(outputParam)) {
120+
log.info("The task output param is empty");
121+
return ImmutablePair.nullPair();
122+
}
123+
if ((!outputParam.startsWith("${setValue(") && !outputParam.startsWith("#{setValue("))
124+
|| !outputParam.endsWith(")}")) {
125+
log.info("The task output param {} should start with '${setValue(' or '#{setValue(' and end with ')}'",
126+
outputParam);
127+
return ImmutablePair.nullPair();
128+
}
129+
String keyValueExpression = outputParam.substring(11, outputParam.length() - 2);
130+
if (!keyValueExpression.contains("=")) {
131+
log.warn("The task output param {} should composite with key=value", outputParam);
132+
return ImmutablePair.nullPair();
133+
}
134+
135+
String[] keyValue = keyValueExpression.split("=", 2);
136+
return ImmutablePair.of(keyValue[0], keyValue[1]);
137+
}
138+
139+
}

Diff for: dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public void handle(TaskCallBack taskCallBack) throws TaskException {
8282
TaskResponse commandExecuteResult = shellCommandExecutor.run(command);
8383
setExitStatusCode(commandExecuteResult.getExitStatusCode());
8484
setProcessId(commandExecuteResult.getProcessId());
85-
parameters.dealOutParam(shellCommandExecutor.getVarPool());
85+
parameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
8686
} catch (InterruptedException e) {
8787
Thread.currentThread().interrupt();
8888
logger.error("The current DvcTask has been interrupted", e);

Diff for: dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public void handle(TaskCallBack taskCallBack) throws TaskException {
8080
setExitStatusCode(taskResponse.getExitStatusCode());
8181
setAppIds(taskResponse.getAppIds());
8282
setProcessId(taskResponse.getProcessId());
83-
setVarPool(shellCommandExecutor.getVarPool());
83+
hiveCliParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
8484
} catch (InterruptedException e) {
8585
Thread.currentThread().interrupt();
8686
logger.error("The current HiveCLI Task has been interrupted", e);

Diff for: dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
2626
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
2727
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
28-
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
2928
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
3029
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
3130
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
@@ -126,7 +125,7 @@ public void handle(TaskCallBack taskCallBack) throws TaskException {
126125
}
127126
setExitStatusCode(exitCode);
128127
setProcessId(commandExecuteResult.getProcessId());
129-
mlflowParameters.dealOutParam(shellCommandExecutor.getVarPool());
128+
mlflowParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
130129
} catch (InterruptedException e) {
131130
Thread.currentThread().interrupt();
132131
logger.error("The current Mlflow task has been interrupted", e);

Diff for: dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,7 @@ public void handle(TaskCallBack taskCallBack) throws TaskException {
113113
TaskResponse taskResponse = shellCommandExecutor.run(command);
114114
setExitStatusCode(taskResponse.getExitStatusCode());
115115
setProcessId(taskResponse.getProcessId());
116-
setVarPool(shellCommandExecutor.getVarPool());
117-
pythonParameters.dealOutParam(shellCommandExecutor.getVarPool());
116+
pythonParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
118117
} catch (Exception e) {
119118
logger.error("python task failure", e);
120119
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);

0 commit comments

Comments
 (0)