@@ -86,8 +86,23 @@ rssFullText = false
8686 - トランスフォーメーション
8787 - 遅延評価されるため、変換処理は即座に計算されるわけではなく、リネージとして記録
8888 - リネージの記録を利用して、協調処理や処理の最適化を実現
89- - ナロートランスフォーメーション(単一のパーティションで処理が完結)とワイドトランスフォーメーション(複数のパーティションを考慮した処理が必要)の2種類
9089 - 例:orderBy(), groupBy(), filter(), select(), join()
90+ - ナロートランスフォーメーション(単一のパーティションで処理が完結)とワイドトランスフォーメーション(複数のパーティションを考慮したシャッフルやデータ交換処理が必要)の2種類
91+ - ナロートランスフォーメーション
92+ - select
93+ - filter
94+ - cast
95+ - union
96+ - ワイドトランスフォーメーション
97+ - distinct
98+ - groupBy
99+ - sort
100+ - join
101+ ![ ワイドトランスフォーメーション] ( wide_transformation.png " ワイドトランスフォーメーション ")
102+ - 例)groupBy.count()の処理フロー
103+ - 1つのパーティションでgroupBy.count()した結果を key-value ペアの情報として書き出し (Shuffle Write)
104+ - 全パーティションの出力を集計 (Shuffle Read) し、データ全体に対するグルーピング処理結果を出力
105+ - 実際のデータサイズに比べて、Shuffle操作で読み書きするデータサイズは小さい
91106 - アクション
92107 - 記録されたトランスフォーメーションの評価を開始するトリガー
93108 - 例:show(), take(), count(), collect(), save()
@@ -162,8 +177,7 @@ rssFullText = false
162177> - [ pyspark.sql.DataFrameの公式ドキュメント] ( https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html )
163178> - [ Pyspark Vs Pandas Cheatsheet] ( https://www.scribd.com/document/423024301/pyspark-vs-pandas-cheatsheet )
164179> - DataFrameは実際にはScalaのDataset[ Row]
165- > - 頻繁または繰り返しクエリを実行する予定の大規模なDataFrameの場合、Cacheを利用するとよい
166- > - DataFrame.cache()
180+ > - 頻繁または繰り返しクエリを実行する予定の大規模なDataFrameの場合、Cacheを利用するとよい (DataFrame.cache()、cache()は遅延操作のためActionの実行時に有効となる)
167181- 列名の取得
168182 ``` python
169183 DataFrame.columns
@@ -336,18 +350,31 @@ rssFullText = false
336350 | ** データの結合** | ` pd.concat([df1,df2]) ` <br >` df1.append(df2) ` <br >` df1.join(df2) ` | ` df1.union(df2) ` <br >` df1.join(df2) ` |
337351 | ** 直積** | ` df1['key'] = 1 ` <br >` df2['key'] = 1 ` <br >` df1.merge(df2, how='outer', on='key') ` | ` df1.crossJoin(df2) ` |
338352 | ** データのソート** | ` df.sort_values() ` <br >` df.sort_index() ` | ` df.sort() ` <br >` df.orderBy() ` |
353+
339354### SparkSQL
340355![ Spark SQL architecture and interface] ( spark_sql_architecture.png " Spark SQL architecture and interface ")
341356- SparkSQLのCatalystOptimizerは、計算クエリを受け取り、実行計画に変換する
342357 ``` python
343358 # 実行計画の確認
344359 DataFrame.explain(True )
360+ DataFrame.explain(mode = " CODEGEN" ) # バイトコードの表示
345361 ```
346362- クエリ最適化の4つのフェーズ
347363 - 分析:SQLまたはDataFrameクエリの抽象構文木(AST)を生成
348364 - 論理的最適化:コストベース最適化
349365 - 物理的最適化:論理計画に適した物理計画を生成
350366 - コード生成:効率的なJavaバイトコードを生成
367+ ![ クエリの最適化の詳細] ( query_optimization.png " クエリの最適化の詳細 ")
368+ - Adaptive Query Optimizer (AQE)
369+ - 設定された Shuffle Partition の数ではなく、適切なパーティション分割を実行
370+ - 小さすぎるパーティション分割を回避
371+ - パーティション関連のコマンド
372+ - ` spark.sparkContext.defaultParallelism ` : クラスタのコア数の取得
373+ - ` df.rdd.getNumPartitions() ` : パーティション数の取得
374+ - ` df.repartition({num_partition}) ` : {num_partition}にパーティション数を変更
375+ - ` spark.conf.get("spark.sql.shuffle.partitions") ` : シャッフルパーティション数の取得
376+ - ` spark.conf.set("spark.sql.shuffle.partitions", {num_partition}) ` : シャッフルパーティション数の設定
377+ - ` spark.conf.get("spark.sql.adaptive.enabled") ` : AQE設定の取得
351378
352379## 4章
353380### TempViewの作成とクエリの実行
@@ -453,6 +480,7 @@ for content in binary_row:
453480
454481## 5章
455482### ユーザー定義関数 (UDF)
483+ ![ UDFの内部処理] ( udf_process.png " UDFの内部処理 ")
456484- UDFはセッションごとに動作し、メタストアに永続化されない
457485``` python
458486from pyspark.sql import SparkSession
@@ -483,7 +511,8 @@ df = spark.range(1, 10001)
483511def squared (n ):
484512 return n** 2
485513
486- spark.udf.register(" squared" , squared, LongType()) # UDFの登録
514+ spark.udf.register(" squared" , squared, LongType()) # UDFの登録
515+ squared = udf(lambda n : squared(n), LongType()) # UDFの別の登録方法
487516
488517pyspark_udf_elapse_list = []
489518for iter in range (6 ):
@@ -877,13 +906,18 @@ conf.set({key}: {value})
877906 - 投影オペレーション:select, explode, map , flatmap, etc
878907 - 選択オペレーション:filter , where, etc
879908 - サポートモード:append, update, output
909+ - append : 新しいストリームデータのみを保持
910+ - update : 集約処理で変更があった部分を更新(集約処理がない場合、updateはappendと同じ)
880911 - completeモードがサポートされていないのは、増加し続ける結果データの保持コストが高いため
881912 - ステートフル・オペレーション
882913 - 常に DataFrame.groupBy() or DataFrame.groupByKey() を使用する必要がある
883914 - ウィンドウによる集計
884- -> `streamDF.groupBy(" {column_name} " , window(" {timestamp} " , " 10 minutes" , " 5 minutes" )).count()`
885- - watermarkによる遅延保障:指定時間内のデータは削除しない
886- -> `streamDF.withWatermark(" {timestamp} " , " 10 minutes" )groupBy(" {column_name} " , window(" {timestamp} " , " 10 minutes" , " 5 minutes" )).count()`
915+ - Tumbling Windows : 処理対象データが一つのWindowでだけ処理される
916+ -> `streamDF.groupBy(" {column_name} " , window(" {timestamp} " , " 5 minutes" )).count()`
917+ - Sliding Windows : 処理対象データが複数のWindowで処理される
918+ -> `streamDF.groupBy(" {column_name} " , window(" {timestamp} " , " 10 minutes" , " 5 minutes" )).count()`
919+ - watermarkによる遅延保障:指定時間内のデータは処理対象から除外しない(ネットワークえらーなどの理由で、データ内に列として記録されている時刻に比べて、遅れて到着してきたデータに対して、その時刻に到着したデータとして再度処理を行い更新)
920+ -> `streamDF.withWatermark(" {timestamp} " , " 10 minutes" ).groupBy(" {column_name} " , window(" {timestamp} " , " 10 minutes" , " 5 minutes" )).count()`
887921- ストリームデータの結合
888922 - Stream- Static:全ストリームデータに対して結合可能、1 つのストリームデータを静的情報でエンリッチ化する際に使用
889923 - inner- join, left- join (StreamDFが左), right- join (StreamDFが右) をサポート
@@ -1523,16 +1557,41 @@ print("execute 'mlflow ui' in another terminal and access http://127.0.0.1:5000"
15231557 result_df = df.withColumn(" value_doubled" , col(" value" )* 2 )
15241558 result_df.show()
15251559 ```
1560+ - Python UDF
1561+ - シリアライズ・デシリアライズをPickleで実施
1562+ - UDF を1 行ずつ適用するため処理が遅い
1563+ ```python
1564+ # Vanilla UDF
1565+ def say_hello(name):
1566+ return f " Hello { name} "
1567+
1568+ # Decorator UDF
1569+ @ udf(returnType = StringType())
1570+ def say_hello(name):
1571+ return f " Hello { name} "
1572+
1573+ df = df.withColumn(" greet" , say_hello(" name" ))
1574+ ```
15261575- Pandas UDF
1527- - pandasがデータを操作する際にApache Arrowを使用
1528- - pandas UDF ではベクトル化操作が可能で、一度に1 行ずつ処理する Python UDF と比較してパフォーマンスが良い
1576+ - pandasがデータを操作する際にApache Arrowを使用するため、シリアライズコストがない
1577+ - pandas UDF ではベクトル化操作(列に対する一斉処理)が可能で、一度に1 行ずつ処理する Python UDF と比較してパフォーマンスが良い
1578+ ```python
1579+ import pandas as pd
1580+ from pyspark.sql.functions import pandas_udf
1581+
1582+ @ pandas_udf(returnType = StringType())
1583+ def vector_udf(email:pd.Series)
1584+ return email.split(" @" )[0 ]
1585+
1586+ df.select(vector_udf(" email" ))
1587+ ```
15291588 ```python
15301589 data = [(1 , 10.0 ), (2 , 20.0 ), (3 , 30.0 )]
15311590 columns = [" id" , " value" ]
15321591 df = spark.createDataFrame(data, columns)
15331592
15341593 # pandas_udfの定義
1535- @ pandas_udf(DoubleType())
1594+ @ pandas_udf(returnType = DoubleType())
15361595 def multiply_by_two(values):
15371596 return values * 2
15381597
0 commit comments