Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions config/spark2kafka.properties
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#Parameters you must configure
#==============================================================
#The data source path should be a directory.
#The data source should be a directory.
source.path =

#The CSV Column Name.For example:sA=string,sB=integer,sC=boolean...
#The csv col names.For example:sA=string,sB=integer,sC=boolean...
column.names =

#Primary keys.For example:sA=string,sB=integer,sC=boolean...
Expand All @@ -15,8 +15,8 @@ kafka.bootstrap.servers =
#Set your topic name.
topic.name =

#Spark checkpoint path
checkpoint =
#Spark checkpoint
checkpoint.path =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

請問為何加上.path? 如果是要統一命名的話,Metadata裡面用的變數名稱也要跟著改

Copy link
Copy Markdown
Collaborator Author

@wycccccc wycccccc Feb 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

主要是shell 如果按照checkpoint去搜索會把上方的註解也一併識別,因此乾脆改一個統一的名字。


#Parameters that can be selected for configuration
#==============================================================
Expand Down
4 changes: 2 additions & 2 deletions docker/start_etl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ declare -r SPARK_VERSION=${SPARK_VERSION:-3.3.1}
declare -r LOCAL_PATH=$(cd -- "$(dirname -- "${DOCKER_FOLDER}")" &>/dev/null && pwd)
# ===============================[properties keys]=================================
declare -r SOURCE_KEY="source.path"
declare -r CHECKPOINT_KEY="checkpoint"
declare -r CHECKPOINT_KEY="checkpoint.path"
# ===============================[spark driver/executor resource]==================
declare -r RESOURCES_CONFIGS="${RESOURCES_CONFIGS:-"--conf spark.driver.memory=4g --conf spark.executor.memory=4g"}"
# ===================================[functions]===================================
Expand Down Expand Up @@ -89,7 +89,7 @@ function runContainer() {

if [[ "$master" == "spark:"* ]] || [[ "$master" == "local"* ]]; then
docker run -d --init \
--name "csv-kafka-${source_name}" \
--name "csv-kafka${source_name}" \
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這邊拿掉-是有目的的嗎?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

沒有,我在查上面那個bug時不小心刪掉的,已恢復。

$network_config \
-v "$propertiesPath":"$propertiesPath":ro \
-v "$jar_path":/tmp/astraea-etl.jar:ro \
Expand Down
13 changes: 8 additions & 5 deletions etl/src/main/scala/org/astraea/etl/DataFrameProcessor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,14 @@ class DataFrameProcessor(dataFrame: DataFrame) {
.withColumn(
"value",
defaultConverter(
map(cols.flatMap(c => List(lit(c.name), col(c.name))): _*)
map(
cols.flatMap(c =>
List(
lit(c.name),
when(col(c.name).isNotNull, col(c.name)).otherwise(lit(null))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

或許我們可以直接把 null 的欄位取消掉,因為當null的時候就代表沒有該值,直接過濾掉可能還可以提升一點效能

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這是我能想到的將null欄位取消掉的寫法,看上去沒有很優雅,但我也找不到其他的了。有優雅的我再修改。

)
): _*
)
)
)
.withColumn(
Expand Down Expand Up @@ -171,10 +178,6 @@ object DataFrameProcessor {

private def schema(columns: Seq[DataColumn]): StructType =
StructType(columns.map { col =>
if (col.dataType != DataType.StringType)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

現在支援非string 型別了嗎?

Copy link
Copy Markdown
Collaborator Author

@wycccccc wycccccc Feb 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

沒錯,目前我測試下來已支援。因爲在column時是能夠處理null的,但如果放在udf中轉換回scala中的某些type就不支持null處理了。

throw new IllegalArgumentException(
"Sorry, only string type is currently supported.Because a problem(astraea #1286) has led to the need to wrap the non-nullable type."
)
StructField(col.name, col.dataType.sparkType)
})
}
Expand Down
2 changes: 1 addition & 1 deletion etl/src/main/scala/org/astraea/etl/Metadata.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ object Metadata {

private[etl] val DEFAULT_PARTITIONS = 15
private[etl] val DEFAULT_REPLICAS = 1.toShort
private[etl] val DEFAULT_RECURSIVE = "ture"
private[etl] val DEFAULT_RECURSIVE = "true"
private[etl] val DEFAULT_CLEAN_SOURCE = "delete"

// Parameters needed to configure ETL.
Expand Down