Skip to content

Commit c75be6f

Browse files
committed
update bigquery code
1 parent e35ca62 commit c75be6f

File tree

4 files changed

+46
-7
lines changed

4 files changed

+46
-7
lines changed

elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchSource.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import pekko.stream.connectors.elasticsearch.{ impl, _ }
2121
import pekko.stream.javadsl.Source
2222
import pekko.stream.{ Attributes, Materializer }
2323
import pekko.util.ccompat.JavaConverters._
24-
import com.fasterxml.jackson.core.{ JsonFactory, StreamReadConstraints, StreamWriteConstraints }
24+
import com.fasterxml.jackson.core.{ JsonFactory, JsonFactoryBuilder, StreamReadConstraints, StreamWriteConstraints }
2525
import com.fasterxml.jackson.databind.ObjectMapper
2626
import com.fasterxml.jackson.databind.json.JsonMapper
2727
import com.fasterxml.jackson.databind.node.{ ArrayNode, NumericNode }
@@ -180,12 +180,11 @@ object ElasticsearchSource {
180180
val streamWriteConstraints = StreamWriteConstraints.builder
181181
.maxNestingDepth(config.getInt("write.max-nesting-depth"))
182182
.build
183-
val jsonFactory = JsonFactory.builder
183+
val jsonFactory = JsonFactory.builder.asInstanceOf[JsonFactoryBuilder]
184184
.streamReadConstraints(streamReadConstraints)
185185
.streamWriteConstraints(streamWriteConstraints)
186186
.build
187187
new JsonMapper(jsonFactory)
188-
189188
}
190189

191190
private final class JacksonReader[T](mapper: ObjectMapper, clazz: Class[T]) extends impl.MessageReader[T] {

google-cloud-bigquery/src/main/resources/reference.conf

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,23 @@ pekko.connectors.google {
77
# BigQuery has a hard limit of 1,500 load jobs per table per day (just over 1 job per minute)
88
# This sets the rate limit when loading data via BigQuery.insertAllAsync
99
load-job-per-table-quota = 1 minute
10+
11+
jackson {
12+
read {
13+
# see https://www.javadoc.io/static/com.fasterxml.jackson.core/jackson-core/2.16.0/com/fasterxml/jackson/core/StreamReadConstraints.html
14+
# these defaults are the same as the defaults in `StreamReadConstraints`
15+
max-nesting-depth = 1000
16+
max-number-length = 1000
17+
max-string-length = 20000000
18+
max-name-length = 50000
19+
# max-document-length of -1 means unlimited
20+
max-document-length = -1
21+
}
22+
write {
23+
# see https://www.javadoc.io/static/com.fasterxml.jackson.core/jackson-core/2.16.0/com/fasterxml/jackson/core/StreamWriteConstraints.html
24+
# these defaults are the same as the defaults in `StreamWriteConstraints`
25+
max-nesting-depth = 1000
26+
}
27+
}
1028
}
1129
}

google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/javadsl/jackson/BigQueryMarshallers.scala

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,15 @@ import pekko.http.javadsl.unmarshalling.Unmarshaller
2121
import pekko.stream.connectors.googlecloud.bigquery.model.QueryResponse
2222
import pekko.stream.connectors.googlecloud.bigquery.model.{ TableDataInsertAllRequest, TableDataListResponse }
2323
import com.fasterxml.jackson.databind.{ JavaType, MapperFeature, ObjectMapper }
24+
import com.fasterxml.jackson.core.{ JsonFactory, JsonFactoryBuilder, StreamReadConstraints, StreamWriteConstraints }
25+
import com.fasterxml.jackson.databind.json.JsonMapper
26+
import com.typesafe.config.ConfigFactory
2427

2528
import java.io.IOException
2629

2730
object BigQueryMarshallers {
2831

29-
private val defaultObjectMapper = new ObjectMapper().enable(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY)
32+
private val defaultObjectMapper = createObjectMapper().enable(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY)
3033

3134
/**
3235
* [[pekko.http.javadsl.unmarshalling.Unmarshaller]] for [[pekko.stream.connectors.googlecloud.bigquery.model.TableDataListResponse]]
@@ -99,4 +102,23 @@ object BigQueryMarshallers {
99102
case e: IOException =>
100103
throw new IllegalArgumentException("Cannot unmarshal JSON as " + expectedType.getTypeName, e)
101104
}
105+
106+
private def createObjectMapper(): ObjectMapper = {
107+
val config = ConfigFactory.load.getConfig("pekko.connectors.google.bigquery.jackson")
108+
val streamReadConstraints = StreamReadConstraints.builder
109+
.maxNestingDepth(config.getInt("read.max-nesting-depth"))
110+
.maxNumberLength(config.getInt("read.max-number-length"))
111+
.maxStringLength(config.getInt("read.max-string-length"))
112+
.maxNameLength(config.getInt("read.max-name-length"))
113+
.maxDocumentLength(config.getLong("read.max-document-length"))
114+
.build
115+
val streamWriteConstraints = StreamWriteConstraints.builder
116+
.maxNestingDepth(config.getInt("write.max-nesting-depth"))
117+
.build
118+
val jsonFactory = JsonFactory.builder.asInstanceOf[JsonFactoryBuilder]
119+
.streamReadConstraints(streamReadConstraints)
120+
.streamWriteConstraints(streamWriteConstraints)
121+
.build
122+
new JsonMapper(jsonFactory)
123+
}
102124
}

project/Dependencies.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -214,9 +214,9 @@ object Dependencies {
214214
"org.apache.pekko" %% "pekko-http-jackson" % PekkoHttpVersion % Provided,
215215
"org.apache.pekko" %% "pekko-http-spray-json" % PekkoHttpVersion,
216216
"io.spray" %% "spray-json" % "1.3.6",
217-
"com.fasterxml.jackson.core" % "jackson-annotations" % JacksonDatabindVersion,
218-
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % JacksonDatabindVersion % Test,
219-
"io.specto" % "hoverfly-java" % hoverflyVersion % Test) ++ Mockito)
217+
"com.fasterxml.jackson.core" % "jackson-annotations" % JacksonDatabindVersion216,
218+
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % JacksonDatabindVersion216 % Test,
219+
"io.specto" % "hoverfly-java" % hoverflyVersion % Test) ++ JacksonDatabindDependencies216 ++ Mockito)
220220
val GoogleBigQueryStorage = Seq(
221221
// see Pekko gRPC version in plugins.sbt
222222
libraryDependencies ++= Seq(

0 commit comments

Comments
 (0)