@@ -2,9 +2,11 @@ package io.airbyte.cdk.output
2
2
3
3
import com.fasterxml.jackson.annotation.JsonInclude
4
4
import com.fasterxml.jackson.core.JsonGenerator
5
+ import com.fasterxml.jackson.core.util.MinimalPrettyPrinter
5
6
import com.fasterxml.jackson.databind.DeserializationFeature
6
7
import com.fasterxml.jackson.databind.JsonNode
7
8
import com.fasterxml.jackson.databind.ObjectMapper
9
+ import com.fasterxml.jackson.databind.ObjectWriter
8
10
import com.fasterxml.jackson.databind.SequenceWriter
9
11
import com.fasterxml.jackson.databind.node.ObjectNode
10
12
import com.fasterxml.jackson.dataformat.smile.SmileFactory
@@ -19,13 +21,15 @@ import io.airbyte.protocol.models.v0.AirbyteRecordMessage
19
21
import io.github.oshai.kotlinlogging.KotlinLogging
20
22
import io.micronaut.context.annotation.Value
21
23
import jakarta.inject.Singleton
24
+ import java.io.BufferedOutputStream
22
25
import java.io.ByteArrayOutputStream
23
26
import java.io.File
24
27
import java.io.PrintStream
25
28
import java.net.StandardProtocolFamily
26
29
import java.net.UnixDomainSocketAddress
27
30
import java.nio.ByteBuffer
28
31
import java.nio.ByteOrder
32
+ import java.nio.channels.Channels
29
33
import java.nio.channels.ServerSocketChannel
30
34
import java.nio.channels.SocketChannel
31
35
import java.time.Clock
@@ -44,10 +48,12 @@ class UnixDomainSocketOutputConsumer(
44
48
) : StdoutOutputConsumer(stdout, clock, bufferByteSizeThresholdForFlush) {
45
49
private var socketNum: Int = - 1
46
50
var sc: SocketChannel ? = null
51
+ var bufferedOutputStream: BufferedOutputStream ? = null
47
52
lateinit var ll: List <UnixDomainSocketOutputConsumer >
48
53
public val SMILE_MAPPER : ObjectMapper = initSmileMapper();
49
54
private val smileGenerator: JsonGenerator = SMILE_MAPPER .createGenerator(buffer)
50
- // private val smileSequenceWriter: SequenceWriter = SMILE_MAPPER.writer().writeValues(smileGenerator)
55
+ private val smileWriter: ObjectWriter ? = SMILE_MAPPER .writerFor(AirbyteMessage ::class .java).with (
56
+ MinimalPrettyPrinter (System .lineSeparator()))
51
57
// private lateinit var templateRecord: JsonNode
52
58
53
59
@@ -69,72 +75,8 @@ class UnixDomainSocketOutputConsumer(
69
75
socketNum = num
70
76
}
71
77
72
- override fun accept (record : AirbyteRecordMessage ) {
73
- // The serialization of RECORD messages can become a performance bottleneck for source
74
- // connectors because they can come in much higher volumes than other message types.
75
- // Specifically, with jackson, the bottleneck is in the object mapping logic.
76
- // As it turns out, this object mapping logic is not particularly useful for RECORD messages
77
- // because within a given stream the only variations occur in the "data" and the "meta"
78
- // fields:
79
- // - the "data" field is already an ObjectNode and is cheap to serialize,
80
- // - the "meta" field is often unset.
81
- // For this reason, this method builds and reuses a JSON template for each stream.
82
- // Then, for each record, it serializes just "data" and "meta" to populate the template.
83
- // if (::templateRecord.isInitialized.not()) {
84
- val template: RecordTemplate = getOrCreateRecordTemplate(record.stream, record.namespace)
85
-
86
- val tmplt = String (template.prefix + " {}" .toByteArray() + template.suffix)
87
-
88
- val templateRecord = Jsons .readTree(tmplt)
89
- // }
90
- val rec = templateRecord.get(" record" ) as ObjectNode
91
- rec.set<ObjectNode >(" data" , record.data)
92
-
93
- // val tb = ByteArrayOutputStream()
94
- // val gen = SMILE_MAPPER.createGenerator(tb)
95
- // Jsons.writeTree(gen, tt)
96
-
97
- // val rr = SMILE_MAPPER.readTree(tb.toByteArray())
98
- // val rr = SMILE_MAPPER.readerFor(AirbyteMessage::class.java).readTree(tb.toByteArray())
99
- // logger.info { rr }
100
- // assert(rr == tt)
101
- // synchronized(this) {
102
- // Write a newline character to the buffer if it's not empty.
103
- // withLockMaybeWriteNewline()
104
- // Write '{"type":"RECORD","record":{"namespace":"...","stream":"...","data":'.
105
- // buffer.write(template.prefix)
106
- // Serialize the record data ObjectNode to JSON, writing it to the buffer.
107
- // Jsons.writeTree(smileGenerator, record.data)
108
-
109
- // Jsons.writeTree(smileGenerator, templateRecord)
110
- try {
111
- smileGenerator.writeTree(templateRecord)
112
- smileGenerator.flush()
113
- } catch (e: Exception ) {
114
- logger.error(e) { " Error serializing $templateRecord " }
115
- throw e
116
- }
117
- // If the record has a AirbyteRecordMessageMeta instance set,
118
- // write ',"meta":' followed by the serialized meta.
119
- /* val meta: AirbyteRecordMessageMeta? = record.meta
120
- if (meta != null) {
121
- buffer.write(metaPrefixBytes)
122
- smileSequenceWriter.write(meta)
123
- smileSequenceWriter.flush()
124
- }*/
125
- // Write ',"emitted_at":...}}'.
126
- // buffer.write(template.suffix)
127
- // Flush the buffer to stdout only once it has reached a certain size.
128
- // Flushing to stdout incurs some overhead (mutex, syscall, etc.)
129
- // which otherwise becomes very apparent when lots of tiny records are involved.
130
- if (buffer.size() >= bufferByteSizeThresholdForFlush) {
131
- withLockFlushRecord()
132
- }
133
- // }
134
-
135
- }
136
- override fun withLockFlushRecord () {
137
- synchronized(this ) {
78
+ override fun accept (airbyteMessage : AirbyteMessage ) {
79
+ if (airbyteMessage.type == AirbyteMessage .Type .RECORD ) {
138
80
sc ? : let {
139
81
val socketPath = String .format(SOCKET_FULL_PATH , socketNum)
140
82
logger.info { " Using socket..." }
@@ -149,8 +91,18 @@ class UnixDomainSocketOutputConsumer(
149
91
serverSocketChannel.bind(address)
150
92
logger.info { " Source : Server socket bound at ${socketFile.absolutePath} " }
151
93
sc = serverSocketChannel.accept()
94
+
95
+ bufferedOutputStream = Channels .newOutputStream(sc).buffered()
152
96
}
97
+ val seqWriter = smileWriter!! .writeValues(bufferedOutputStream)
98
+ seqWriter.write(airbyteMessage)
99
+ bufferedOutputStream!! .flush()
100
+
101
+ } else {
102
+ super .accept(airbyteMessage)
153
103
}
104
+ }
105
+ override fun withLockFlushRecord () {
154
106
if (buffer.size() > 0 ) {
155
107
val array: ByteArray = buffer.toByteArray()
156
108
sc?.write(ByteBuffer .wrap(array).order(ByteOrder .LITTLE_ENDIAN ))
0 commit comments