-
About the example of kafka streaming of counting words ,I want to save the wordCounts result, such as mysql or push back to kafka. Anybody can help me ? thanks a lot. |
Beta Was this translation helpful? Give feedback.
Replies: 8 comments
-
You can try something like the following: df.SelectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.WriteStream
.Format("kafka")
.Option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.Option("topic", "topic1")
.Start() |
Beta Was this translation helpful? Give feedback.
-
I'll actually have to solve a similar issue soon - saving data from Kafka to HBase. Based on the Spark documentation on sinks, I guess I'll go with the ForEach sink - and bring in additional client SDKs to communicate with HBase. Just to be sure, could you, @imback82, confirm that it’d be possible to write to a relational DB/HBase (i.e. non-native sinks) via ForEachWriter? Does .NET for Spark support bringing in additional driver libs that would normally (i.e. Java/Scala) be required to integrate with those engines? |
Beta Was this translation helpful? Give feedback.
-
Yes, if you already have a logic to write each row to the sync in C#. On the other hand, there is a spark connector for HBase, so it's much easier to work with
Yes. And you can write a thin wrapper to call into the libs similar to how we do it for Delta: https://github.com/dotnet/spark/blob/master/src/csharp/Extensions/Microsoft.Spark.Extensions.Delta/Tables/DeltaTable.cs |
Beta Was this translation helpful? Give feedback.
-
Actually, @imback82, is it just me or is ForeachBatch not available in .NET for Spark? If no wrapper method available can I somehow invoke the underlying Scala API? |
Beta Was this translation helpful? Give feedback.
-
@mwysocki87 |
Beta Was this translation helpful? Give feedback.
-
@mwysocki87 |
Beta Was this translation helpful? Give feedback.
-
@andy99ge Were you able to resolve this issue since |
Beta Was this translation helpful? Give feedback.
-
my issure has been resolved in the way as bellow not using ForeachBatch
Microsoft.Spark.Sql.Streaming.StreamingQuery query = windowedCounts.SelectExpr(
"CAST(word AS STRING) key",
"CAST(count AS STRING) value"
)
.WriteStream()
.OutputMode(OutputMode.Complete)
.Format("kafka")
.Option("kafka.bootstrap.servers", bootstrapServers)
.Option("topic", outputKafka)
.Option("checkpointLocation", "./ck1")
.Start();
but i will try ForeachBatch.
On 10/14/2020 15:56,Niharika Dutta<[email protected]> wrote:
@andy99ge Were you able to resolve this issue since ForeachBatch is now supported?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or unsubscribe.
|
Beta Was this translation helpful? Give feedback.
@mwysocki87
ForeachBatch
is now supported as part of release 0.12.0.