Skip to content

Commit 6aaee5f

Browse files
Merge branch '8.0.x' into master by rayokota
2 parents ae022c5 + d7e9e1d commit 6aaee5f

File tree

7 files changed

+102
-208
lines changed

7 files changed

+102
-208
lines changed

bin/ksql-run-class

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,15 @@ if [ -z "$KSQL_LOG4J_OPTS" ]; then
5353
elif [ -e "/etc/ksqldb/log4j2.yaml" ]; then # Normal install layout
5454
KSQL_LOG4J_OPTS="-Dlog4j2.configurationFile=file:/etc/ksqldb/log4j2.yaml"
5555
fi
56+
else
57+
if echo "$KSQL_LOG4J_OPTS" | grep -E "log4j\.[^[:space:]]+(\.properties|\.xml)$"; then
58+
# Enable Log4j 1.x configuration compatibility mode for Log4j 2
59+
export LOG4J_COMPATIBILITY=true
60+
echo "DEPRECATED: A Log4j 1.x configuration file has been detected, which is no longer recommended." >&2
61+
echo "To use a Log4j 2.x configuration, please see https://logging.apache.org/log4j/2.x/migrate-from-log4j1.html#Log4j2ConfigurationFormat for details about Log4j configuration file migration." >&2
62+
echo "Since ksql processing log functionality uses kafka log4j appender, which was deprecated from 8.0.x, it will not be supported when using log4j1." >&2
63+
echo "To continue using this functionality please switch to log4j2." >&2
64+
fi
5665
fi
5766

5867
KSQL_LOG4J_OPTS="-Dksql.log.dir=$LOG_DIR ${KSQL_LOG4J_OPTS}"

ksqldb-engine/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@
2828
<artifactId>ksqldb-engine</artifactId>
2929

3030
<dependencies>
31+
<dependency>
32+
<groupId>com.jayway.jsonpath</groupId>
33+
<artifactId>json-path</artifactId>
34+
<version>2.9.0</version>
35+
</dependency>
3136
<dependency>
3237
<groupId>io.confluent.ksql</groupId>
3338
<artifactId>ksqldb-common</artifactId>

ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedSchemaRegistryClient.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public Optional<ParsedSchema> parseSchema(
7878
final String schemaType,
7979
final String schemaString,
8080
final List<SchemaReference> references) {
81-
throw new UnsupportedOperationException();
81+
return sandboxCacheClient.parseSchema(schemaType, schemaString, references);
8282
}
8383

8484
@Override
@@ -286,6 +286,21 @@ public int getId(final String subject, final ParsedSchema parsedSchema)
286286
}
287287
}
288288

289+
@Override
290+
public RegisterSchemaResponse getIdWithResponse(
291+
final String subject, final ParsedSchema schema, final boolean normalize)
292+
throws IOException, RestClientException {
293+
try {
294+
return srClient.getIdWithResponse(subject, schema, normalize);
295+
} catch (final RestClientException e) {
296+
// if we don't find the schema in SR, we try to get it from the sandbox cache
297+
if (e.getStatus() == HttpStatus.SC_NOT_FOUND) {
298+
return sandboxCacheClient.getIdWithResponse(subject, schema, normalize);
299+
}
300+
throw e;
301+
}
302+
}
303+
289304
@Override
290305
public void reset() {
291306
throw new UnsupportedOperationException();

ksqldb-engine/src/test/java/io/confluent/ksql/services/SandboxedSchemaRegistryClientTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.hamcrest.Matchers.notNullValue;
2121
import static org.junit.Assert.assertThrows;
2222
import static org.mockito.ArgumentMatchers.any;
23+
import static org.mockito.ArgumentMatchers.anyBoolean;
2324
import static org.mockito.ArgumentMatchers.anyInt;
2425
import static org.mockito.ArgumentMatchers.anyString;
2526
import static org.mockito.Mockito.mock;
@@ -31,13 +32,18 @@
3132
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
3233
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
3334
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
35+
import io.confluent.kafka.schemaregistry.client.rest.entities.Metadata;
36+
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet;
3437
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse;
3538
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
3639
import io.confluent.ksql.test.util.TestMethods;
3740
import io.confluent.ksql.test.util.TestMethods.TestCase;
3841
import java.util.Collection;
42+
import java.util.Collections;
43+
import java.util.List;
3944
import java.util.Objects;
4045

46+
import java.util.Optional;
4147
import org.apache.avro.Schema;
4248
import org.apache.hc.core5.http.HttpStatus;
4349
import org.junit.Before;
@@ -78,8 +84,12 @@ public static Collection<TestCase<SchemaRegistryClient>> getMethodsToTest() {
7884
.ignore("getId", String.class, ParsedSchema.class)
7985
.ignore("getId", String.class, ParsedSchema.class, boolean.class)
8086
.ignore("getId", String.class, Schema.class)
87+
.ignore("getIdWithResponse", String.class, ParsedSchema.class, boolean.class)
8188
.ignore("getVersion", String.class, ParsedSchema.class)
8289
.ignore("getSchemaById", int.class)
90+
.ignore("parseSchema", io.confluent.kafka.schemaregistry.client.rest.entities.Schema.class)
91+
.ignore("parseSchema", String.class, String.class, List.class)
92+
.ignore("parseSchema", String.class, String.class, List.class, Metadata.class, RuleSet.class)
8393
.build();
8494
}
8595

@@ -251,7 +261,52 @@ public void shouldGetIdFromCache() throws Exception {
251261

252262
// Then:
253263
assertThat(id, is(newId));
264+
}
265+
266+
@Test
267+
public void shouldGetIdWithResponse() throws Exception {
268+
// Given:
269+
when(delegate.getIdWithResponse(anyString(), any(ParsedSchema.class), anyBoolean()))
270+
.thenReturn(new RegisterSchemaResponse(123))
271+
.thenReturn(new RegisterSchemaResponse(124))
272+
.thenReturn(new RegisterSchemaResponse(125));
254273

274+
// When:
275+
final int id = sandboxedClient.getIdWithResponse("some subject", schema, false).getId();
276+
final int id1 = sandboxedClient.getIdWithResponse("some subject", parsedSchema, false).getId();
277+
final int id2 = sandboxedClient.getIdWithResponse("some subject", parsedSchema, true).getId();
278+
279+
// Then:
280+
assertThat(id, is(123));
281+
assertThat(id1, is(124));
282+
assertThat(id2, is(125));
283+
}
284+
285+
@Test
286+
public void shouldGetIdWithResponseFromCache() throws Exception {
287+
// Given:
288+
final RestClientException exception = mock(RestClientException.class);
289+
when(exception.getStatus()).thenReturn(HttpStatus.SC_NOT_FOUND);
290+
when(delegate.getIdWithResponse(anyString(), any(ParsedSchema.class), anyBoolean()))
291+
.thenThrow(exception);
292+
293+
final int newId = sandboxedClient.register("newSubject", parsedSchema);
294+
295+
// When:
296+
final int id = sandboxedClient.getIdWithResponse("newSubject", parsedSchema, false).getId();
297+
298+
// Then:
299+
assertThat(id, is(newId));
300+
}
301+
302+
@Test
303+
public void shouldParseSchema() throws Exception {
304+
// Given:
305+
final Optional<ParsedSchema> schema =
306+
sandboxedClient.parseSchema("AVRO", "\"string\"", Collections.emptyList());
307+
308+
// Then:
309+
assertThat(schema.get(), is(new AvroSchema("\"string\"")));
255310
}
256311
}
257312
}

ksqldb-rest-app/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@
9999

100100
<dependency>
101101
<groupId>io.confluent</groupId>
102-
<artifactId>confluent-log4j-extensions</artifactId>
102+
<artifactId>confluent-log4j2-extensions</artifactId>
103103
<version>${io.confluent.common.version}</version>
104104
</dependency>
105105

licenses/LICENSE-zookeeper-3.8.4.txt

Lines changed: 0 additions & 202 deletions
This file was deleted.

pom.xml

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,12 @@
483483
<groupId>io.confluent</groupId>
484484
<artifactId>broker-plugins</artifactId>
485485
<version>${kafka.version}</version>
486+
<exclusions>
487+
<exclusion>
488+
<groupId>ch.qos.reload4j</groupId>
489+
<artifactId>reload4j</artifactId>
490+
</exclusion>
491+
</exclusions>
486492
</dependency>
487493

488494
<!-- End Confluent dependencies -->
@@ -788,10 +794,16 @@
788794
<groupId>io.confluent</groupId>
789795
<artifactId>logredactor</artifactId>
790796
</dependency>
791-
<dependency>
792-
<groupId>io.confluent</groupId>
793-
<artifactId>broker-plugins</artifactId>
794-
</dependency>
797+
<dependency>
798+
<groupId>io.confluent</groupId>
799+
<artifactId>broker-plugins</artifactId>
800+
<exclusions>
801+
<exclusion>
802+
<groupId>ch.qos.reload4j</groupId>
803+
<artifactId>reload4j</artifactId>
804+
</exclusion>
805+
</exclusions>
806+
</dependency>
795807
</dependencies>
796808

797809
<build>

0 commit comments

Comments
 (0)