Skip to content

Commit 182f2e6

Browse files
authored
Merge pull request #286 from alexott/readstream-option-override
Allow to override Redis options for spark.readStream
2 parents c9e53ac + c2f3533 commit 182f2e6

File tree

5 files changed

+44
-22
lines changed

5 files changed

+44
-22
lines changed

doc/dataframe.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -341,11 +341,11 @@ root
341341
| max.pipeline.size | maximum number of commands per pipeline (used to batch commands) | `Int` | 100 |
342342
| scan.count | count option of SCAN command (used to iterate over keys) | `Int` | 100 |
343343
| iterator.grouping.size | the number of items to be grouped when iterating over underlying RDD partition | `Int` | 1000 |
344-
| host | overrides `spark.redis.host` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `String` | `localhost` |
345-
| port | overrides `spark.redis.port` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `Int` | `6379` |
346-
| auth | overrides `spark.redis.auth` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `String` | - |
347-
| dbNum | overrides `spark.redis.db` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `Int` | `0` |
348-
| timeout | overrides `spark.redis.timeout` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `Int` | `2000` |
344+
| host | overrides `spark.redis.host` configured in SparkSession | `String` | `localhost` |
345+
| port | overrides `spark.redis.port` configured in SparkSession | `Int` | `6379` |
346+
| auth | overrides `spark.redis.auth` configured in SparkSession | `String` | - |
347+
| dbNum | overrides `spark.redis.db` configured in SparkSession | `Int` | `0` |
348+
| timeout | overrides `spark.redis.timeout` configured in SparkSession | `Int` | `2000` |
349349

350350

351351
## Known limitations

doc/structured-streaming.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ there will be the output `keys output:*`:
8484
4) "30.5"
8585
```
8686

87-
Please refer to [DataFrame docs](dataframe.md) for different options(such as specifying key name) available for writing .
87+
Please refer to [DataFrame docs](dataframe.md) for different options (such as specifying key name) available for writing.
8888

8989
### Stream Offset
9090

@@ -142,6 +142,19 @@ Please note, item ordering will be preserved only within a particular Redis key
142142

143143
With the second approach you can read data from a single Redis key with multiple consumers in parallel, e.g. `option("stream.parallelism", 4)`. Each consumer will be mapped to a Spark partition. There are no ordering guarantees in this case.
144144

145+
### Connection options
146+
147+
Similarly to Dataframe API, we can override connection options on the individual stream level, using following options passed to `spark.readStream`:
148+
149+
| Name | Description | Type | Default |
150+
| -----------| -------------------------------------------------------------| ---------- | ----------- |
151+
| host | overrides `spark.redis.host` configured in SparkSession | `String` | `localhost` |
152+
| port | overrides `spark.redis.port` configured in SparkSession | `Int` | `6379` |
153+
| auth | overrides `spark.redis.auth` configured in SparkSession | `String` | - |
154+
| dbNum | overrides `spark.redis.db` configured in SparkSession | `Int` | `0` |
155+
| timeout | overrides `spark.redis.timeout` configured in SparkSession | `Int` | `2000` |
156+
157+
145158
### Other configuration
146159

147160
Spark-Redis automatically creates a consumer group with name `spark-source` if it doesn't exist. You can customize the consumer group name with

src/main/scala/com/redislabs/provider/redis/RedisConfig.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,24 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST,
4343
)
4444
}
4545

46+
47+
/**
48+
* Constructor from spark config and parameters.
49+
*
50+
* @param conf spark context config
51+
* @param parameters source specific parameters
52+
*/
53+
def this(conf: SparkConf, parameters: Map[String, String]) {
54+
this(
55+
parameters.getOrElse("host", conf.get("spark.redis.host", Protocol.DEFAULT_HOST)),
56+
parameters.getOrElse("port", conf.get("spark.redis.port", Protocol.DEFAULT_PORT.toString)).toInt,
57+
parameters.getOrElse("auth", conf.get("spark.redis.auth", null)),
58+
parameters.getOrElse("dbNum", conf.get("spark.redis.db", Protocol.DEFAULT_DATABASE.toString)).toInt,
59+
parameters.getOrElse("timeout", conf.get("spark.redis.timeout", Protocol.DEFAULT_TIMEOUT.toString)).toInt,
60+
parameters.getOrElse("ssl", conf.get("spark.redis.ssl", false.toString)).toBoolean)
61+
}
62+
63+
4664
/**
4765
* Constructor with Jedis URI
4866
*
@@ -127,6 +145,10 @@ object RedisConfig {
127145
def fromSparkConf(conf: SparkConf): RedisConfig = {
128146
new RedisConfig(new RedisEndpoint(conf))
129147
}
148+
149+
def fromSparkConfAndParameters(conf: SparkConf, parameters: Map[String, String]): RedisConfig = {
150+
new RedisConfig(new RedisEndpoint(conf, parameters))
151+
}
130152
}
131153

132154
/**

src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,8 @@ class RedisSourceRelation(override val sqlContext: SQLContext,
3030
with Serializable
3131
with Logging {
3232

33-
private implicit val redisConfig: RedisConfig = {
34-
new RedisConfig(
35-
if ((parameters.keySet & Set("host", "port", "auth", "dbNum", "timeout")).isEmpty) {
36-
new RedisEndpoint(sqlContext.sparkContext.getConf)
37-
} else {
38-
val host = parameters.getOrElse("host", Protocol.DEFAULT_HOST)
39-
val port = parameters.get("port").map(_.toInt).getOrElse(Protocol.DEFAULT_PORT)
40-
val auth = parameters.getOrElse("auth", null)
41-
val dbNum = parameters.get("dbNum").map(_.toInt).getOrElse(Protocol.DEFAULT_DATABASE)
42-
val timeout = parameters.get("timeout").map(_.toInt).getOrElse(Protocol.DEFAULT_TIMEOUT)
43-
val ssl = parameters.get("ssl").map(_.toBoolean).getOrElse(false)
44-
RedisEndpoint(host, port, auth, dbNum, timeout, ssl)
45-
}
46-
)
47-
}
33+
private implicit val redisConfig: RedisConfig = RedisConfig.fromSparkConfAndParameters(
34+
sqlContext.sparkContext.getConf, parameters)
4835

4936
implicit private val readWriteConfig: ReadWriteConfig = {
5037
val global = ReadWriteConfig.fromSparkConf(sqlContext.sparkContext.getConf)

src/main/scala/org/apache/spark/sql/redis/stream/RedisSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class RedisSource(sqlContext: SQLContext, metadataPath: String,
2525

2626
private val sc = sqlContext.sparkContext
2727

28-
implicit private val redisConfig: RedisConfig = RedisConfig.fromSparkConf(sc.getConf)
28+
implicit private val redisConfig: RedisConfig = RedisConfig.fromSparkConfAndParameters(sc.getConf, parameters)
2929

3030
private val sourceConfig = RedisSourceConfig.fromMap(parameters)
3131

0 commit comments

Comments
 (0)