Skip to content

Commit 9bad982

Browse files
authored
Merge pull request #559 from aurb9/python-implementation
Expose Parquet operator to Python API
2 parents 1788e1b + 00e9ad8 commit 9bad982

38 files changed

Lines changed: 397 additions & 251 deletions

File tree

python/.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
# Build files
22
dist
3-
3+
__pycache__
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
from typing import Any, List
19+
20+
21+
class Record:
22+
"""
23+
A Type that represents a record with a schema.
24+
"""
25+
values: List[Any]
26+
27+
def __init__(self, values: List[Any]) -> None:
28+
self.values = values
29+
30+
def copy(self) -> 'Record':
31+
return Record(self.values.copy())
32+
33+
def equals(self, o: Any) -> bool:
34+
if o is None or type(self) != type(o):
35+
return False
36+
37+
return self.values == o.values
38+
39+
def hash_code(self) -> int:
40+
return hash(self.values)
41+
42+
def get_field(self, index: int) -> Any:
43+
return self.values[index]
44+
45+
def get_double(self, index: int) -> float:
46+
return float(self.values[index])
47+
48+
def get_int(self, index: int) -> int:
49+
return int(self.values[index])
50+
51+
def get_string(self, index: int) -> str:
52+
return str(self.values[index])
53+
54+
def set_field(self, index: int, field: Any) -> None:
55+
self.values[index] = field
56+
57+
def add_field(self, field: Any) -> None:
58+
self.values.append(field)
59+
60+
def size(self) -> int:
61+
return len(self.values)
62+
63+
def __str__(self):
64+
return "Record" + str(self.values)
65+
66+
def __repr__(self):
67+
return self.__str__()

python/src/pywy/core/core.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
# Licensed to the Apache Software Foundation (ASF) under one or more
32
# contributor license agreements. See the NOTICE file distributed with
43
# this work for additional information regarding copyright ownership.
@@ -17,18 +16,13 @@
1716

1817
from typing import Set, Iterable, Dict
1918
import json
20-
import base64
21-
import cloudpickle
2219
import requests
23-
import subprocess
24-
import time
25-
import os
2620

2721
from pywy.core.platform import Platform
2822
from pywy.core.serializer import JSONSerializer
2923
from pywy.graph.graph import WayangGraph
30-
from pywy.graph.types import WGraphOfVec, NodeOperator, NodeVec
31-
from pywy.operators import SinkOperator, UnaryToUnaryOperator, SourceUnaryOperator
24+
from pywy.graph.types import WGraphOfVec
25+
from pywy.operators import SinkOperator, UnaryToUnaryOperator
3226

3327

3428
class Plugin:
@@ -122,7 +116,6 @@ def execute(self):
122116

123117
for node in nodes:
124118
operator = node.current[0]
125-
126119
if isinstance(operator, UnaryToUnaryOperator):
127120
pipeline.append(operator)
128121
else:

python/src/pywy/core/serializer.py

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from pywy.types import get_java_type, NDimArray, ndim_from_type
3131
from pywy.operators import SinkOperator, UnaryToUnaryOperator, SourceUnaryOperator
3232

33+
3334
class JSONSerializer:
3435
id_table: Iterable[int]
3536

@@ -53,39 +54,42 @@ def serialize(self, operator):
5354

5455
json_operator["data"] = {}
5556

56-
if operator.json_name != "join":
57-
if hasattr(operator, "input_type"):
58-
if operator.input_type is not None:
59-
json_operator["data"]["inputType"] = ndim_from_type(operator.input_type).to_json()
60-
if hasattr(operator, "output_type"):
61-
if operator.output_type is not None:
62-
json_operator["data"]["outputType"] = ndim_from_type(operator.output_type).to_json()
57+
if hasattr(operator, "input_type") and operator.input_type is not None:
58+
json_operator["data"]["inputType"] = ndim_from_type(operator.input_type).to_json()
59+
60+
if hasattr(operator, "output_type") and operator.output_type is not None:
61+
json_operator["data"]["outputType"] = ndim_from_type(operator.output_type).to_json()
6362

6463
if operator.json_name == "filter":
6564
json_operator["data"]["udf"] = base64.b64encode(cloudpickle.dumps(operator.use_predicate)).decode('utf-8')
6665

67-
return json_operator
68-
if operator.json_name == "reduceBy":
66+
elif operator.json_name == "reduceBy":
6967
json_operator["data"]["keyUdf"] = base64.b64encode(cloudpickle.dumps(operator.key_function)).decode('utf-8')
7068
json_operator["data"]["udf"] = base64.b64encode(cloudpickle.dumps(operator.reduce_function)).decode('utf-8')
7169

72-
return json_operator
7370
elif operator.json_name == "join":
7471
json_operator["data"]["thisKeyUdf"] = base64.b64encode(cloudpickle.dumps(operator.this_key_function)).decode('utf-8')
7572
json_operator["data"]["thatKeyUdf"] = base64.b64encode(cloudpickle.dumps(operator.that_key_function)).decode('utf-8')
7673

77-
return json_operator
74+
elif operator.json_name == "cartesian":
75+
del json_operator["data"]
76+
7877
elif operator.json_name == "dlTraining":
7978
json_operator["data"]["model"] = {"modelType": "DLModel", "op": operator.model.get_out().to_dict()}
8079
json_operator["data"]["option"] = operator.option.to_dict()
8180

82-
return json_operator
8381
else:
8482
if hasattr(operator, "get_udf"):
8583
json_operator["data"]["udf"] = base64.b64encode(cloudpickle.dumps(operator.get_udf)).decode('utf-8')
8684

8785
if hasattr(operator, "path"):
88-
json_operator["data"]["filename"] = operator.path
86+
json_operator["data"]["filename"] = operator.path
87+
88+
if hasattr(operator, "projection"):
89+
json_operator["data"]["projection"] = operator.projection
90+
91+
if hasattr(operator, "column_names"):
92+
json_operator["data"]["column_names"] = operator.column_names
8993

9094
return json_operator
9195

@@ -98,7 +102,7 @@ def serialize_pipeline(self, pipeline):
98102
json_operator["cat"] = "unary"
99103
json_operator["data"] = {}
100104
json_operator["input"] = list(map(lambda x: self.id_table[x], pipeline[0].inputOperator))
101-
json_operator["output"] = list(map(lambda x: self.id_table[x], pipeline[len(pipeline) - 1].outputOperator))
105+
json_operator["output"] = list(map(lambda x: self.id_table[x], pipeline[-1].outputOperator))
102106

103107
if len(pipeline) == 1:
104108
return self.serialize(pipeline[0])

python/src/pywy/dataquanta.py

Lines changed: 49 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@
1515
# limitations under the License.
1616
#
1717

18-
from typing import Dict, Set, List, cast
18+
from typing import Dict, Set, List, Optional, cast
1919

2020
from pywy.core.core import Plugin, PywyPlan
2121
from pywy.operators.base import PO_T
2222
from pywy.types import (GenericTco, Predicate, Function, BiFunction, FlatmapFunction, IterableOut, T, In, Out)
2323
from pywy.operators import *
24-
from pywy.basic.model.ops import Op
24+
from pywy.basic.data.record import Record
2525
from pywy.basic.model.option import Option
2626
from pywy.basic.model.models import Model
2727

@@ -50,7 +50,6 @@ def __init__(self, configuration: Configuration = Configuration()):
5050
"""
5151
add a :class:`Plugin` to the :class:`Context`
5252
"""
53-
5453
def register(self, *plugins: Plugin):
5554
for p in plugins:
5655
self.plugins.update(p)
@@ -59,15 +58,19 @@ def register(self, *plugins: Plugin):
5958
"""
6059
remove a :class:`Plugin` from the :class:`Context`
6160
"""
62-
6361
def unregister(self, *plugins: Plugin):
6462
for p in plugins:
6563
self.plugins.remove(p)
6664
return self
6765

68-
def textfile(self, file_path: str) -> 'DataQuanta[str]':
66+
def textfile(self, file_path: str) -> "DataQuanta[str]":
6967
return DataQuanta(self, TextFileSource(file_path))
7068

69+
def parquet(
70+
self, file_path: str, projection: Optional[List[str]] = None, column_names: Optional[List[str]] = None
71+
) -> "DataQuanta[Record]":
72+
return DataQuanta(self, ParquetSource(file_path, projection, column_names))
73+
7174
def __str__(self):
7275
return "Plugins: {}".format(str(self.plugins))
7376

@@ -88,25 +91,31 @@ def __init__(self, context: WayangContext, operator: PywyOperator):
8891
def filter(self: "DataQuanta[T]", p: Predicate, input_type: GenericTco = None) -> "DataQuanta[T]":
8992
return DataQuanta(self.context, self._connect(FilterOperator(p, input_type)))
9093

91-
def map(self: "DataQuanta[In]", f: Function, input_type: GenericTco = None, output_type: GenericTco = None) -> "DataQuanta[Out]":
94+
def map(
95+
self: "DataQuanta[In]",
96+
f: Function,
97+
input_type: GenericTco = None,
98+
output_type: GenericTco = None
99+
) -> "DataQuanta[Out]":
92100
return DataQuanta(self.context, self._connect(MapOperator(f, input_type, output_type)))
93101

94-
def flatmap(self: "DataQuanta[In]", f: FlatmapFunction, input_type: GenericTco = None, output_type: GenericTco = None) -> "DataQuanta[IterableOut]":
102+
def flatmap(
103+
self: "DataQuanta[In]",
104+
f: FlatmapFunction,
105+
input_type: GenericTco = None,
106+
output_type: GenericTco = None
107+
) -> "DataQuanta[IterableOut]":
95108
return DataQuanta(self.context, self._connect(FlatmapOperator(f, input_type, output_type)))
96109

97-
def reduce_by_key(self: "DataQuanta[In]",
98-
key_f: Function,
99-
f: BiFunction,
100-
input_type: GenericTco = None
101-
) -> "DataQuanta[IterableOut]":
102-
110+
def reduce_by_key(
111+
self: "DataQuanta[In]",
112+
key_f: Function,
113+
f: BiFunction,
114+
input_type: GenericTco = None
115+
) -> "DataQuanta[IterableOut]":
103116
return DataQuanta(self.context, self._connect(ReduceByKeyOperator(key_f, f, input_type)))
104117

105-
def sort(self: "DataQuanta[In]",
106-
key_f: Function,
107-
input_type: GenericTco = None
108-
) -> "DataQuanta[IterableOut]":
109-
118+
def sort(self: "DataQuanta[In]", key_f: Function, input_type: GenericTco = None) -> "DataQuanta[IterableOut]":
110119
return DataQuanta(self.context, self._connect(SortOperator(key_f, input_type)))
111120

112121
def join(
@@ -115,21 +124,34 @@ def join(
115124
that: "DataQuanta[In]",
116125
that_key_f: Function,
117126
input_type: GenericTco = None,
118-
output_type: GenericTco = None
119-
) -> "DataQuanta[Out]":
120-
127+
) -> "DataQuanta[Out]":
121128
op = JoinOperator(
122129
this_key_f,
123130
that,
124131
that_key_f,
125-
input_type,
126-
output_type
132+
input_type
127133
)
128134

129135
self._connect(op),
130136
return DataQuanta(
131137
self.context,
132-
that._connect(op,1)
138+
that._connect(op, 1)
139+
)
140+
141+
def cartesian(
142+
self: "DataQuanta[In]",
143+
that: "DataQuanta[In]",
144+
input_type: GenericTco = None,
145+
) -> "DataQuanta[Out]":
146+
op = CartesianOperator(
147+
that,
148+
input_type
149+
)
150+
151+
self._connect(op),
152+
return DataQuanta(
153+
self.context,
154+
that._connect(op, 1)
133155
)
134156

135157
def dlTraining(
@@ -140,7 +162,6 @@ def dlTraining(
140162
input_type: GenericTco,
141163
output_type: GenericTco
142164
) -> "DataQuanta[Out]":
143-
144165
op = DLTrainingOperator(
145166
model,
146167
option,
@@ -151,7 +172,7 @@ def dlTraining(
151172

152173
return DataQuanta(
153174
self.context,
154-
that._connect(op,1)
175+
that._connect(op, 1)
155176
)
156177

157178
def predict(
@@ -169,10 +190,10 @@ def predict(
169190

170191
return DataQuanta(
171192
self.context,
172-
that._connect(op,1)
193+
that._connect(op, 1)
173194
)
174195

175-
def store_textfile(self: "DataQuanta[In]", path: str, input_type: GenericTco = None):
196+
def store_textfile(self: "DataQuanta[In]", path: str, input_type: GenericTco = None) -> None:
176197
last: List[SinkOperator] = [
177198
cast(
178199
SinkOperator,
@@ -184,7 +205,6 @@ def store_textfile(self: "DataQuanta[In]", path: str, input_type: GenericTco = N
184205
)
185206
)
186207
]
187-
#print(PywyPlan(self.context.plugins, last))
188208
PywyPlan(self.context.plugins, self.context.configuration.entries, last).execute()
189209

190210
def _connect(self, op: PO_T, port_op: int = 0) -> PywyOperator:

python/src/pywy/execution/util.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,3 @@ class SpecialLengths(object):
2323
END_OF_STREAM = -4
2424
NULL = -5
2525
START_ARROW_STREAM = -6
26-

0 commit comments

Comments
 (0)