Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 4343f21

Browse files
AFFogartyimback82
authored andcommittedOct 22, 2019
Support for new Delta v0.4.0 APIs (#297)
1 parent 6d875c1 commit 4343f21

File tree

2 files changed

+216
-20
lines changed

2 files changed

+216
-20
lines changed
 

‎src/csharp/Extensions/Microsoft.Spark.Extensions.Delta.E2ETest/DeltaTableTests.cs

Lines changed: 102 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
using System.Collections.Generic;
77
using System.IO;
88
using System.Linq;
9-
using System.Threading;
109
using Microsoft.Spark.E2ETest.Utils;
1110
using Microsoft.Spark.Extensions.Delta.Tables;
1211
using Microsoft.Spark.Sql;
@@ -41,7 +40,7 @@ public void TestTutorialScenario()
4140
data.Write().Format("delta").Save(path);
4241

4342
// Validate that data contains the the sequence [0 ... 4].
44-
ValidateDataFrame(Enumerable.Range(0, 5), data);
43+
ValidateRangeDataFrame(Enumerable.Range(0, 5), data);
4544

4645
// Create a second iteration of the table.
4746
data = _spark.Range(5, 10);
@@ -51,7 +50,7 @@ public void TestTutorialScenario()
5150
var deltaTable = DeltaTable.ForPath(path);
5251

5352
// Validate that deltaTable contains the the sequence [5 ... 9].
54-
ValidateDataFrame(Enumerable.Range(5, 5), deltaTable.ToDF());
53+
ValidateRangeDataFrame(Enumerable.Range(5, 5), deltaTable.ToDF());
5554

5655
// Update every even value by adding 100 to it.
5756
deltaTable.Update(
@@ -70,7 +69,7 @@ public void TestTutorialScenario()
7069
// |106|
7170
// |108|
7271
// +---+
73-
ValidateDataFrame(
72+
ValidateRangeDataFrame(
7473
new List<int>() { 5, 7, 9, 106, 108 },
7574
deltaTable.ToDF());
7675

@@ -85,7 +84,7 @@ public void TestTutorialScenario()
8584
// | 7|
8685
// | 9|
8786
// +---+
88-
ValidateDataFrame(new List<int>() { 5, 7, 9 }, deltaTable.ToDF());
87+
ValidateRangeDataFrame(new List<int>() { 5, 7, 9 }, deltaTable.ToDF());
8988

9089
// Upsert (merge) new data.
9190
DataFrame newData = _spark.Range(0, 20).As("newData").ToDF();
@@ -100,7 +99,7 @@ public void TestTutorialScenario()
10099
.Execute();
101100

102101
// Validate that the resulTable contains the the sequence [0 ... 19].
103-
ValidateDataFrame(Enumerable.Range(0, 20), deltaTable.ToDF());
102+
ValidateRangeDataFrame(Enumerable.Range(0, 20), deltaTable.ToDF());
104103
}
105104
}
106105

@@ -134,17 +133,90 @@ public void TestStreamingScenario()
134133

135134
// Now read the sink DeltaTable and validate its content.
136135
DeltaTable sink = DeltaTable.ForPath(sinkPath);
137-
ValidateDataFrame(Enumerable.Range(0, 5), sink.ToDF());
136+
ValidateRangeDataFrame(Enumerable.Range(0, 5), sink.ToDF());
138137

139138
// Write [5,6,7,8,9] to the source and trigger another stream batch.
140139
_spark.Range(5, 10).Write().Format("delta").Mode("append").Save(sourcePath);
141140
dataStreamWriter.Trigger(Trigger.Once()).Start(sinkPath).AwaitTermination();
142141

143142
// Finally, validate that the new data made its way to the sink.
144-
ValidateDataFrame(Enumerable.Range(0, 10), sink.ToDF());
143+
ValidateRangeDataFrame(Enumerable.Range(0, 10), sink.ToDF());
145144
}
146145
}
147146

147+
/// <summary>
148+
/// Test <c>DeltaTable.IsDeltaTable()</c> API.
149+
/// </summary>
150+
[SkipIfSparkVersionIsLessThan(Versions.V2_4_2)]
151+
public void TestIsDeltaTable()
152+
{
153+
using (var tempDirectory = new TemporaryDirectory())
154+
{
155+
// Save the same data to a DeltaTable and to Parquet.
156+
DataFrame data = _spark.Range(0, 5);
157+
string parquetPath = Path.Combine(tempDirectory.Path, "parquet-data");
158+
data.Write().Parquet(parquetPath);
159+
string deltaTablePath = Path.Combine(tempDirectory.Path, "delta-table");
160+
data.Write().Format("delta").Save(deltaTablePath);
161+
162+
Assert.False(DeltaTable.IsDeltaTable(parquetPath));
163+
Assert.False(DeltaTable.IsDeltaTable(_spark, parquetPath));
164+
165+
Assert.True(DeltaTable.IsDeltaTable(deltaTablePath));
166+
Assert.True(DeltaTable.IsDeltaTable(_spark, deltaTablePath));
167+
}
168+
}
169+
170+
/// <summary>
171+
/// Test <c>DeltaTable.ConvertToDelta()</c> API.
172+
/// </summary>
173+
[SkipIfSparkVersionIsLessThan(Versions.V2_4_2)]
174+
public void TestConvertToDelta()
175+
{
176+
string partitionColumnName = "id_plus_one";
177+
DataFrame data = _spark.Range(0, 5).Select(
178+
Functions.Col("id"),
179+
Functions.Expr($"(`id` + 1) AS `{partitionColumnName}`"));
180+
181+
// Run the same test on the different overloads of DeltaTable.ConvertToDelta().
182+
void testWrapper(
183+
DataFrame dataFrame,
184+
Func<string, DeltaTable> convertToDelta,
185+
string partitionColumn = null)
186+
{
187+
using (var tempDirectory = new TemporaryDirectory())
188+
{
189+
string path = Path.Combine(tempDirectory.Path, "parquet-data");
190+
DataFrameWriter dataWriter = dataFrame.Write();
191+
192+
if (!string.IsNullOrEmpty(partitionColumn))
193+
{
194+
dataWriter = dataWriter.PartitionBy(partitionColumn);
195+
}
196+
197+
dataWriter.Parquet(path);
198+
199+
Assert.False(DeltaTable.IsDeltaTable(path));
200+
201+
string identifier = $"parquet.`{path}`";
202+
DeltaTable convertedDeltaTable = convertToDelta(identifier);
203+
204+
ValidateRangeDataFrame(Enumerable.Range(0, 5), convertedDeltaTable.ToDF());
205+
Assert.True(DeltaTable.IsDeltaTable(path));
206+
}
207+
}
208+
209+
testWrapper(data, identifier => DeltaTable.ConvertToDelta(_spark, identifier));
210+
testWrapper(
211+
data.Repartition(Functions.Col(partitionColumnName)),
212+
identifier => DeltaTable.ConvertToDelta(
213+
_spark,
214+
identifier,
215+
$"{partitionColumnName} bigint"),
216+
partitionColumnName);
217+
// TODO: Test with StructType partition schema once StructType is supported.
218+
}
219+
148220
/// <summary>
149221
/// Test that methods return the expected signature.
150222
/// </summary>
@@ -161,7 +233,11 @@ public void TestSignatures()
161233
DeltaTable table = Assert.IsType<DeltaTable>(DeltaTable.ForPath(path));
162234
table = Assert.IsType<DeltaTable>(DeltaTable.ForPath(_spark, path));
163235

236+
Assert.IsType<bool>(DeltaTable.IsDeltaTable(_spark, path));
237+
Assert.IsType<bool>(DeltaTable.IsDeltaTable(path));
238+
164239
Assert.IsType<DeltaTable>(table.As("oldTable"));
240+
Assert.IsType<DeltaTable>(table.Alias("oldTable"));
165241
Assert.IsType<DataFrame>(table.History());
166242
Assert.IsType<DataFrame>(table.History(200));
167243
Assert.IsType<DataFrame>(table.ToDF());
@@ -221,17 +297,31 @@ public void TestSignatures()
221297
.Option("path", path)
222298
.Load());
223299
Assert.IsType<DataFrame>(_spark.ReadStream().Format("delta").Load(path));
300+
301+
// Create Parquet data and convert it to DeltaTables.
302+
string parquetIdentifier = $"parquet.`{path}`";
303+
rangeRate.Write().Mode(SaveMode.Overwrite).Parquet(path);
304+
Assert.IsType<DeltaTable>(DeltaTable.ConvertToDelta(_spark, parquetIdentifier));
305+
rangeRate
306+
.Select(Functions.Col("id"), Functions.Expr($"(`id` + 1) AS `id_plus_one`"))
307+
.Write()
308+
.PartitionBy("id")
309+
.Mode(SaveMode.Overwrite)
310+
.Parquet(path);
311+
Assert.IsType<DeltaTable>(DeltaTable.ConvertToDelta(
312+
_spark,
313+
parquetIdentifier,
314+
"id bigint"));
315+
// TODO: Test with StructType partition schema once StructType is supported.
224316
}
225317
}
226318

227319
/// <summary>
228-
/// Validate that a tutorial DataFrame contains only the expected values.
320+
/// Validate that a range DataFrame contains only the expected values.
229321
/// </summary>
230322
/// <param name="expectedValues"></param>
231323
/// <param name="dataFrame"></param>
232-
private void ValidateDataFrame(
233-
IEnumerable<int> expectedValues,
234-
DataFrame dataFrame)
324+
private void ValidateRangeDataFrame(IEnumerable<int> expectedValues, DataFrame dataFrame)
235325
{
236326
Assert.Equal(expectedValues.Count(), dataFrame.Count());
237327

‎src/csharp/Extensions/Microsoft.Spark.Extensions.Delta/Tables/DeltaTable.cs

Lines changed: 114 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using Microsoft.Spark.Interop;
77
using Microsoft.Spark.Interop.Ipc;
88
using Microsoft.Spark.Sql;
9+
using Microsoft.Spark.Sql.Types;
910

1011
namespace Microsoft.Spark.Extensions.Delta.Tables
1112
{
@@ -21,13 +22,69 @@ public class DeltaTable : IJvmObjectReferenceProvider
2122
{
2223
private readonly JvmObjectReference _jvmObject;
2324

25+
private static readonly string s_deltaTableClassName = "io.delta.tables.DeltaTable";
26+
2427
internal DeltaTable(JvmObjectReference jvmObject)
2528
{
2629
_jvmObject = jvmObject;
2730
}
2831

2932
JvmObjectReference IJvmObjectReferenceProvider.Reference => _jvmObject;
3033

34+
/// <summary>
35+
/// Create a DeltaTable from the given parquet table and partition schema.
36+
/// Takes an existing parquet table and constructs a delta transaction log in the base path
37+
/// of that table.
38+
///
39+
/// Note: Any changes to the table during the conversion process may not result in a
40+
/// consistent state at the end of the conversion. Users should stop any changes to the
41+
/// table before the conversion is started.
42+
///
43+
/// An example usage would be
44+
/// <code>
45+
/// DeltaTable.ConvertToDelta(spark, "parquet.`/path`", "key1 long, key2 string")
46+
/// </code>
47+
/// </summary>
48+
/// <param name="spark">The relevant session.</param>
49+
/// <param name="identifier">String used to identify the parquet table.</param>
50+
/// <param name="partitionSchema">String representing the partition schema.</param>
51+
/// <returns>The converted DeltaTable.</returns>
52+
public static DeltaTable ConvertToDelta(
53+
SparkSession spark,
54+
string identifier,
55+
string partitionSchema) =>
56+
new DeltaTable(
57+
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
58+
s_deltaTableClassName,
59+
"convertToDelta",
60+
spark,
61+
identifier,
62+
partitionSchema));
63+
64+
/// <summary>
65+
/// Create a DeltaTable from the given parquet table. Takes an existing parquet table and
66+
/// constructs a delta transaction log in the base path of the table.
67+
///
68+
/// Note: Any changes to the table during the conversion process may not result in a
69+
/// consistent state at the end of the conversion. Users should stop any changes to the
70+
/// table before the conversion is started.
71+
///
72+
/// An example would be
73+
/// <code>
74+
/// DeltaTable.ConvertToDelta(spark, "parquet.`/path`")
75+
/// </code>
76+
/// </summary>
77+
/// <param name="spark">The relevant session.</param>
78+
/// <param name="identifier">String used to identify the parquet table.</param>
79+
/// <returns>The converted DeltaTable.</returns>
80+
public static DeltaTable ConvertToDelta(SparkSession spark, string identifier) =>
81+
new DeltaTable(
82+
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
83+
s_deltaTableClassName,
84+
"convertToDelta",
85+
spark,
86+
identifier));
87+
3188
/// <summary>
3289
/// Create a DeltaTable for the data at the given <c>path</c>.
3390
///
@@ -40,7 +97,7 @@ internal DeltaTable(JvmObjectReference jvmObject)
4097
public static DeltaTable ForPath(string path) =>
4198
new DeltaTable(
4299
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
43-
"io.delta.tables.DeltaTable",
100+
s_deltaTableClassName,
44101
"forPath",
45102
path));
46103

@@ -54,11 +111,51 @@ public static DeltaTable ForPath(string path) =>
54111
public static DeltaTable ForPath(SparkSession sparkSession, string path) =>
55112
new DeltaTable(
56113
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
57-
"io.delta.tables.DeltaTable",
114+
s_deltaTableClassName,
58115
"forPath",
59116
sparkSession,
60117
path));
61118

119+
/// <summary>
120+
/// Check if the provided <c>identifier</c> string, in this case a file path,
121+
/// is the root of a Delta table using the given SparkSession.
122+
///
123+
/// An example would be
124+
/// <code>
125+
/// DeltaTable.IsDeltaTable(spark, "path/to/table")
126+
/// </code>
127+
/// </summary>
128+
/// <param name="sparkSession">The relevant session.</param>
129+
/// <param name="identifier">String that identifies the table, e.g. path to table.</param>
130+
/// <returns>True if the table is a DeltaTable.</returns>
131+
public static bool IsDeltaTable(SparkSession sparkSession, string identifier) =>
132+
(bool)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
133+
s_deltaTableClassName,
134+
"isDeltaTable",
135+
sparkSession,
136+
identifier);
137+
138+
/// <summary>
139+
/// Check if the provided <c>identifier</c> string, in this case a file path,
140+
/// is the root of a Delta table.
141+
///
142+
/// Note: This uses the active SparkSession in the current thread to search for the table.
143+
/// Hence, this throws error if active SparkSession has not been set, that is,
144+
/// <c>SparkSession.GetActiveSession()</c> is empty.
145+
///
146+
/// An example would be
147+
/// <code>
148+
/// DeltaTable.IsDeltaTable(spark, "/path/to/table")
149+
/// </code>
150+
/// </summary>
151+
/// <param name="identifier">String that identifies the table, e.g. path to table.</param>
152+
/// <returns>True if the table is a DeltaTable.</returns>
153+
public static bool IsDeltaTable(string identifier) =>
154+
(bool)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
155+
s_deltaTableClassName,
156+
"isDeltaTable",
157+
identifier);
158+
62159
/// <summary>
63160
/// Apply an alias to the DeltaTable. This is similar to <c>Dataset.As(alias)</c> or SQL
64161
/// <c>tableName AS alias</c>.
@@ -68,6 +165,15 @@ public static DeltaTable ForPath(SparkSession sparkSession, string path) =>
68165
public DeltaTable As(string alias) =>
69166
new DeltaTable((JvmObjectReference)_jvmObject.Invoke("as", alias));
70167

168+
/// <summary>
169+
/// Apply an alias to the DeltaTable. This is similar to <c>Dataset.as(alias)</c>
170+
/// or SQL <c>tableName AS alias</c>.
171+
/// </summary>
172+
/// <param name="alias">The table alias.</param>
173+
/// <returns>Aliased DeltaTable.</returns>
174+
public DeltaTable Alias(string alias) =>
175+
new DeltaTable((JvmObjectReference)_jvmObject.Invoke("alias", alias));
176+
71177
/// <summary>
72178
/// Get a DataFrame (that is, Dataset[Row]) representation of this Delta table.
73179
/// </summary>
@@ -91,7 +197,7 @@ public DataFrame Vacuum(double retentionHours) =>
91197
/// for maintaining older versions up to the given retention threshold. This method will
92198
/// return an empty DataFrame on successful completion.
93199
///
94-
/// Note: This will use the default retention period of 7 hours.
200+
/// Note: This will use the default retention period of 7 days.
95201
/// </summary>
96202
/// <returns>Vacuumed DataFrame.</returns>
97203
public DataFrame Vacuum() =>
@@ -207,16 +313,16 @@ public void UpdateExpr(string condition, Dictionary<string, string> set) =>
207313

208314
/// <summary>
209315
/// Merge data from the <c>source</c> DataFrame based on the given merge <c>condition</c>.
210-
/// This class returns a <c>DeltaMergeBuilder</c> object that can be used to specify the
316+
/// This returns a <c>DeltaMergeBuilder</c> object that can be used to specify the
211317
/// update, delete, or insert actions to be performed on rows based on whether the rows
212318
/// matched the condition or not.
213319
///
214320
/// See the <see cref="DeltaMergeBuilder"/> for a full description of this operation and
215-
/// what combination update, delete and insert operations are allowed.
321+
/// what combinations of update, delete and insert operations are allowed.
216322
/// </summary>
217323
/// <example>
218-
/// See the <c>DeltaMergeBuilder</c> for a full description of this operation and what combination
219-
/// update, delete and insert operations are allowed.
324+
/// See the <c>DeltaMergeBuilder</c> for a full description of this operation and what
325+
/// combinations of update, delete and insert operations are allowed.
220326
///
221327
/// Example to update a key-value Delta table with new key-values from a source DataFrame:
222328
/// <code>
@@ -256,7 +362,7 @@ public DeltaMergeBuilder Merge(DataFrame source, string condition) =>
256362
/// matched the condition or not.
257363
///
258364
/// See the <see cref="DeltaMergeBuilder"/> for a full description of this operation and
259-
/// what combination update, delete and insert operations are allowed.
365+
/// what combinations of update, delete and insert operations are allowed.
260366
/// </summary>
261367
/// <example>
262368
/// Example to update a key-value Delta table with new key-values from a source DataFrame:

0 commit comments

Comments
 (0)
Please sign in to comment.