Skip to content

Commit 201f137

Browse files
committed
fix: accept GeoJSON strings for Edm.GeographyPoint in AzureSearchWriter (#2420)
Azure AI Search expects spatial values as GeoJSON objects, but when users supplied a StringType column the writer JSON-escaped the entire string and the service rejected the request with HTTP 400. Convert string GeographyPoint columns into the canonical struct<type, coordinates> shape via from_json before serialization, mirroring the existing Edm.DateTimeOffset handling. Existing struct-based input is unchanged.
1 parent d0bbeae commit 201f137

2 files changed

Lines changed: 83 additions & 4 deletions

File tree

cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/search/AzureSearch.scala

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ import org.apache.spark.ml.util._
2020
import org.apache.spark.ml.{ComplexParamsReadable, NamespaceInjections, PipelineModel}
2121
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
2222
import org.apache.spark.ml.functions.vector_to_array
23-
import org.apache.spark.sql.functions.{col, expr, struct, to_json, to_utc_timestamp, date_format, when}
23+
import org.apache.spark.sql.functions.{col, expr, from_json, struct, to_json, to_utc_timestamp,
24+
date_format, when}
2425
import org.apache.spark.sql.streaming.DataStreamWriter
2526
import org.apache.spark.sql.types._
2627
import org.apache.spark.sql.{DataFrame, Dataset, Row}
@@ -249,6 +250,50 @@ object AzureSearchWriter extends IndexParser with IndexJsonGetter with SLogging
249250
}
250251
}
251252

253+
/**
254+
* Converts string columns containing GeoJSON to the proper struct shape required for
255+
* Azure Search `Edm.GeographyPoint` fields.
256+
*
257+
* Azure AI Search expects spatial values to be sent as a GeoJSON object
258+
* (e.g. `{"type":"Point","coordinates":[lon, lat]}`), not as a JSON-encoded string.
259+
* Users frequently have their GeoJSON readily available as a string column, and
260+
* passing it as a `StringType` previously caused a `400 Bad Request`
261+
* (see issue #2420) because the writer JSON-escaped the entire string.
262+
*
263+
* For each field declared as `Edm.GeographyPoint` in the index, if the corresponding
264+
* DataFrame column is a `StringType`, parse it into the canonical
265+
* `struct<type:string, coordinates:array<double>>` so that downstream `to_json`
266+
* emits a proper GeoJSON object. Columns that are already structured are left as-is.
267+
*
268+
* @param df DataFrame with potential GeographyPoint columns
269+
* @param indexJson JSON string containing the index schema
270+
* @return DataFrame with string GeographyPoint columns converted to GeoJSON structs
271+
*/
272+
private def convertGeographyPointToStruct(df: DataFrame, indexJson: String): DataFrame = {
273+
val geoStructType = StructType(Seq(
274+
StructField("type", StringType),
275+
StructField("coordinates", ArrayType(DoubleType))
276+
))
277+
val geoFields = parseIndexJson(indexJson).fields
278+
.filter(_.`type` == "Edm.GeographyPoint")
279+
.map(_.name)
280+
geoFields.foldLeft(df) { (currentDF, fieldName) =>
281+
if (currentDF.columns.contains(fieldName)) {
282+
currentDF.schema(fieldName).dataType match {
283+
case StringType =>
284+
currentDF.withColumn(fieldName,
285+
when(col(fieldName).isNotNull, from_json(col(fieldName), geoStructType))
286+
)
287+
case _ =>
288+
// Already a struct (or otherwise compatible) — let checkSchemaParity validate it.
289+
currentDF
290+
}
291+
} else {
292+
currentDF
293+
}
294+
}
295+
}
296+
252297
private def dfToIndexJson(schema: StructType,
253298
indexName: String,
254299
keyCol: String,
@@ -328,17 +373,18 @@ object AzureSearchWriter extends IndexParser with IndexJsonGetter with SLogging
328373

329374
SearchIndex.createIfNoneExists(subscriptionKey, serviceName, indexJson, apiVersion)
330375
val dateConvertedDF = convertDateTimeToISO8601(preppedDF, indexJson)
376+
val geoConvertedDF = convertGeographyPointToStruct(dateConvertedDF, indexJson)
331377

332378
logInfo("checking schema parity")
333-
checkSchemaParity(dateConvertedDF.schema, indexJson, actionCol)
379+
checkSchemaParity(geoConvertedDF.schema, indexJson, actionCol)
334380

335381
val df1 = if (filterNulls) {
336382
val collectionColumns = parseIndexJson(indexJson).fields
337383
.filter(_.`type`.startsWith("Collection"))
338384
.map(_.name)
339-
collectionColumns.foldLeft(dateConvertedDF) { (ndf, c) => filterOutNulls(ndf, c) }
385+
collectionColumns.foldLeft(geoConvertedDF) { (ndf, c) => filterOutNulls(ndf, c) }
340386
} else {
341-
dateConvertedDF
387+
geoConvertedDF
342388
}
343389

344390
// Convert date/timestamp columns to ISO8601 strings for Azure Search

cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/search/split2/SearchWriterSuitePart2.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,4 +171,37 @@ class SearchWriterSuite extends SearchWriterSuiteUtilities {
171171

172172
}
173173

174+
test("Handle GeoJSON GeographyPoint fields supplied as strings") {
175+
176+
val in = generateIndexName()
177+
val df = spark.createDataFrame(Seq(
178+
("upload", "0", """{"type":"Point","coordinates":[-122.3493, 47.6205]}"""),
179+
("upload", "1", """{"type":"Point","coordinates":[-122.3351, 47.6080]}""")
180+
)).toDF("searchAction", "id", "location")
181+
182+
val indexJson =
183+
s"""
184+
|{
185+
| "name": "$in",
186+
| "fields": [
187+
| { "name": "id", "type": "Edm.String", "key": true, "searchable": true, "retrievable": true },
188+
| { "name": "location", "type": "Edm.GeographyPoint", "searchable": false,
189+
| "filterable": true, "retrievable": true, "sortable": true }
190+
| ]
191+
|}
192+
|""".stripMargin
193+
194+
AzureSearchWriter.write(df,
195+
Map(
196+
"subscriptionKey" -> azureSearchKey,
197+
"actionCol" -> "searchAction",
198+
"serviceName" -> testServiceName,
199+
"indexJson" -> indexJson
200+
)
201+
)
202+
203+
retryWithBackoff(assertSize(in, 2))
204+
205+
}
206+
174207
}

0 commit comments

Comments
 (0)