Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 90 additions & 31 deletions docs/en/connector-v2/source/Http.md

Large diffs are not rendered by default.

123 changes: 91 additions & 32 deletions docs/zh/connector-v2/source/Http.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,12 @@
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-janino</artifactId>
<version>${project.version}</version>
<classifier>optional</classifier>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,11 @@ public class HttpSourceOptions extends HttpCommonOptions {
.booleanType()
.defaultValue(false)
.withDescription("When the json field is missing, return null");

public static final Option<String> CUSTOM_SIGNATURE_CODE =
Options.key("custom_signature_code")
.stringType()
.noDefaultValue()
.withDescription(
"This parameter is the Java code for the user-implemented custom signature algorithm of the http request.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.seatunnel.connectors.seatunnel.http.config.JsonField;
import org.apache.seatunnel.connectors.seatunnel.http.config.PageInfo;
import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException;
import org.apache.seatunnel.connectors.seatunnel.http.util.AuthorizationUtil;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;

import java.util.Collections;
Expand All @@ -60,6 +61,8 @@ public class HttpSource extends AbstractSingleSplitSource<SeaTunnelRow> {
protected DeserializationSchema<SeaTunnelRow> deserializationSchema;

protected CatalogTable catalogTable;
protected static Object httpSignatureClass = null;
protected static Boolean needHttpSignature = false;

public HttpSource(ReadonlyConfig pluginConfig) {
this.httpParameter.buildWithConfig(pluginConfig);
Expand Down Expand Up @@ -125,6 +128,13 @@ private void buildPagingWithConfig(ReadonlyConfig config) {
HttpSourceOptions.USE_PLACEHOLDER_REPLACEMENT.defaultValue());
}
}

if (pluginConfig.hasPath(HttpSourceOptions.CUSTOM_SIGNATURE_CODE.key())) {
httpSignatureClass =
AuthorizationUtil.getHttpSignatureClass(
pluginConfig.getString(HttpSourceOptions.CUSTOM_SIGNATURE_CODE.key()));
needHttpSignature = true;
}
}

protected void buildSchemaWithConfig(ReadonlyConfig pluginConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.seatunnel.connectors.seatunnel.http.config.PageInfo;
import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException;
import org.apache.seatunnel.connectors.seatunnel.http.util.AuthorizationUtil;
import org.apache.seatunnel.connectors.seatunnel.http.util.JsonPathProcessorFactory;
import org.apache.seatunnel.connectors.seatunnel.http.util.JsonPathUtils;

Expand All @@ -58,11 +59,14 @@
import java.util.Objects;
import java.util.Optional;

import static org.apache.seatunnel.connectors.seatunnel.http.source.HttpSource.httpSignatureClass;
import static org.apache.seatunnel.connectors.seatunnel.http.source.HttpSource.needHttpSignature;

@Slf4j
@Setter
public class HttpSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
protected final SingleSplitReaderContext context;
protected final HttpParameter httpParameter;
protected HttpParameter httpParameter;
protected HttpClientProvider httpClient;
private final DeserializationCollector deserializationCollector;
private static final Option[] DEFAULT_OPTIONS = {
Expand Down Expand Up @@ -124,6 +128,14 @@ public void close() throws IOException {
}

public void pollAndCollectData(Collector<SeaTunnelRow> output) throws Exception {

// before request should process http signature
if (needHttpSignature) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The HttpSourceReader and HttpSource may in different node. So we can't direct use field in HttpSource.

this.httpParameter =
AuthorizationUtil.getSignatureHttpParameter(
httpSignatureClass, this.httpParameter);
}

HttpResponse response =
httpClient.execute(
this.httpParameter.getUrl(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@

package org.apache.seatunnel.connectors.seatunnel.http.util;

import org.apache.seatunnel.shade.org.codehaus.commons.compiler.CompileException;
import org.apache.seatunnel.shade.org.codehaus.janino.ClassBodyEvaluator;

import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;

import static org.apache.commons.codec.binary.Base64.encodeBase64URLSafeString;

Expand All @@ -30,4 +35,29 @@ public static String getTokenByBasicAuth(String username, String password) {
HttpConfig.BASIC + " " + encodeBase64URLSafeString(accountMessage.getBytes());
return accessToken;
}

public static Object getHttpSignatureClass(String SourceCode) {
try {
ClassBodyEvaluator cbe = new ClassBodyEvaluator();

cbe.cook(SourceCode);

return cbe.getClazz().newInstance();

} catch (CompileException | InstantiationException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}

public static HttpParameter getSignatureHttpParameter(
Object signatureClass, HttpParameter httpParameter) {
Object result;
try {
result = ReflectionUtils.invoke(signatureClass, "HttpSignature", httpParameter);
} catch (Exception e) {
throw new IllegalArgumentException(
"CompileCode error, please check signature algorithms code: " + e.getMessage());
}
return (HttpParameter) result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.http.util;

import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

import static org.junit.Assert.assertThrows;

public class AuthorizationUtilTest {

@Test
@DisplayName("Test getHttpSignatureClass with valid source code")
public void testGetHttpSignatureClass() {
// Given
String sourceCode =
"public String HttpSignature(String vs) {\n" + " return vs;\n" + "}\n";

// When
Object signatureClass = AuthorizationUtil.getHttpSignatureClass(sourceCode);

// Then
Assertions.assertNotNull(signatureClass);
}

@Test
@DisplayName("Test getHttpSignatureClass with invalid source code")
public void testGetHttpSignatureClassWithInvalidCode() {
// Given
String invalidSourceCode = "invalid java code";

// When & Then
assertThrows(
RuntimeException.class,
() -> {
AuthorizationUtil.getHttpSignatureClass(invalidSourceCode);
});
}

@Test
@DisplayName("Test getSignatureHttpParameter with valid signature class")
public void testGetSignatureHttpParameter() {
// Given
String sourceCode =
"import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;"
+ " import java.util.Map;"
+ " import java.util.HashMap;"
+ " public HttpParameter HttpSignature( HttpParameter httpParameter) {"
+ " Map<String, String> params = new HashMap();"
+ " String signature = \"custom_sign_token\";"
+ " params.put(\"token\", signature);"
+ " httpParameter.setParams(params);"
+ " return httpParameter;"
+ " }"
+ " ";

Object signatureClass = AuthorizationUtil.getHttpSignatureClass(sourceCode);
HttpParameter httpParameter = new HttpParameter();

// When
HttpParameter result =
AuthorizationUtil.getSignatureHttpParameter(signatureClass, httpParameter);

// Then
Assertions.assertNotNull(result);
Assertions.assertEquals("custom_sign_token", result.getParams().get("token"));
}

@Test
@DisplayName("Test getSignatureHttpParameter with invalid method")
public void testGetSignatureHttpParameterWithInvalidMethod() {
// Given
String sourceCode =
"import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;"
+ " public HttpParameter InvalidMethodName(\n"
+ " HttpParameter httpParameter) {\n"
+ " return httpParameter;\n"
+ " }\n";

Object signatureClass = AuthorizationUtil.getHttpSignatureClass(sourceCode);
HttpParameter httpParameter = new HttpParameter();

// When & Then
assertThrows(
IllegalArgumentException.class,
() -> {
AuthorizationUtil.getSignatureHttpParameter(signatureClass, httpParameter);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,10 @@ public void testSourceToAssertSink(TestContainer container)

Container.ExecResult execResult21 =
container.executeJob("/http_page_cursor_num_assert.conf");
Assertions.assertEquals(0, execResult21.getExitCode());

Container.ExecResult execResult22 =
container.executeJob("/http_json_signature_assert.conf");
Assertions.assertEquals(0, execResult22.getExitCode());
}

@TestTemplate
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"
}

source {
Http {
plugin_output = "http"
url = "http://mockserver:1080/query/custom_sign"
method = "GET"
format = "json"
json_field = {
name = "$.data[*].name"
age = "$.data[*].age"
}
schema = {
fields {
name = string
age = int
}
}

custom_signature_code = """
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
import java.util.Map;
import java.util.HashMap;

public HttpParameter HttpSignature( HttpParameter httpParameter) {
Map<String, String> params = new HashMap();
//signature logic
String signature = "custom_sign_token";
params.put("token", signature);
httpParameter.setParams(params);
return httpParameter;
}
"""

}
}

sink {
Assert {
plugin_input = "http"
rules {
row_rules = [
{
rule_type = MIN_ROW
rule_value = 2
},
{
rule_type = MAX_ROW
rule_value = 2
}
]
field_rules = [
{
field_name = name
field_type = string
field_value = [
{
rule_type = NOT_NULL
}
]
},
{
field_name = age
field_type = int
field_value = [
{
rule_type = NOT_NULL
}
]
}
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4828,5 +4828,30 @@
]
}
}
},
{
"httpRequest": {
"method": "GET",
"path": "/query/custom_sign",
"queryStringParameters": {
"token": "custom_sign_token"
}
},
"httpResponse": {
"body": {
"status": null,
"msg": null,
"data": [
{
"name": "name3",
"age": 45
},
{
"name": "name4",
"age": 32
}
]
}
}
}
]