1
1
/**
2
- * The MIT License (MIT)
3
- * Copyright (c) 2016 Microsoft Corporation
4
- *
5
- * Permission is hereby granted, free of charge, to any person obtaining a copy
6
- * of this software and associated documentation files (the "Software"), to deal
7
- * in the Software without restriction, including without limitation the rights
8
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9
- * copies of the Software, and to permit persons to whom the Software is
10
- * furnished to do so, subject to the following conditions:
11
- *
12
- * The above copyright notice and this permission notice shall be included in all
13
- * copies or substantial portions of the Software.
14
- *
15
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21
- * SOFTWARE.
22
- */
2
+ * The MIT License (MIT)
3
+ * Copyright (c) 2016 Microsoft Corporation
4
+ *
5
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
6
+ * of this software and associated documentation files (the "Software"), to deal
7
+ * in the Software without restriction, including without limitation the rights
8
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9
+ * copies of the Software, and to permit persons to whom the Software is
10
+ * furnished to do so, subject to the following conditions:
11
+ *
12
+ * The above copyright notice and this permission notice shall be included in all
13
+ * copies or substantial portions of the Software.
14
+ *
15
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21
+ * SOFTWARE.
22
+ */
23
23
package org .apache .spark .sql .cosmosdb .util
24
24
25
25
import java .time .temporal .ChronoUnit
@@ -31,8 +31,7 @@ import com.microsoft.azure.cosmosdb.{Document, RequestOptions, ResourceResponse}
31
31
import com .microsoft .azure .cosmosdb .spark .config .{Config , CosmosDBConfig }
32
32
import com .microsoft .azure .cosmosdb .spark .schema .CosmosDBRowConverter
33
33
import com .microsoft .azure .cosmosdb .spark .streaming .CosmosDBWriteStreamRetryPolicy
34
- import org .apache .spark .sql .catalyst .InternalRow
35
- import org .apache .spark .sql .catalyst .encoders .RowEncoder
34
+ import org .apache .spark .sql .catalyst .{CatalystTypeConverters , InternalRow }
36
35
import org .apache .spark .sql .catalyst .expressions .Attribute
37
36
import org .apache .spark .sql .types .StructType
38
37
import org .apache .spark .sql .{DataFrame , SQLContext }
@@ -44,21 +43,19 @@ object StreamingUtils extends Serializable {
44
43
45
44
def createDataFrameStreaming (df : DataFrame , schema : StructType , sqlContext : SQLContext ): DataFrame = {
46
45
47
- val enconder = RowEncoder .apply(schema)
48
- val mappedRdd = df.rdd.map(row => {
49
- enconder.toRow(row)
50
- })
46
+ val convert = CatalystTypeConverters .createToCatalystConverter(schema)
47
+ val mappedRdd = df.rdd.map(convert(_).asInstanceOf [InternalRow ])
51
48
sqlContext.internalCreateDataFrame(mappedRdd, schema, isStreaming = true )
52
49
}
53
50
}
54
51
55
52
class StreamingWriteTask extends Serializable with CosmosDBLoggingTrait {
56
53
57
54
def importStreamingData [D : ClassTag ](
58
- iter : Iterator [D ],
59
- schemaOutput : Seq [Attribute ],
60
- config : Config ,
61
- retryPolicy : CosmosDBWriteStreamRetryPolicy ): Unit = {
55
+ iter : Iterator [D ],
56
+ schemaOutput : Seq [Attribute ],
57
+ config : Config ,
58
+ retryPolicy : CosmosDBWriteStreamRetryPolicy ): Unit = {
62
59
63
60
val upsert : Boolean = config
64
61
.getOrElse(CosmosDBConfig .Upsert , String .valueOf(CosmosDBConfig .DefaultUpsert ))
@@ -89,4 +86,4 @@ class StreamingWriteTask extends Serializable with CosmosDBLoggingTrait {
89
86
val latency = Math .abs(ChronoUnit .MILLIS .between(LocalDateTime .now(), startTime))
90
87
logInfo(s " Batch of $count records written with latency $latency milliseconds " )
91
88
}
92
- }
89
+ }
0 commit comments