Skip to content

Commit 5eb1025

Browse files
authored
Release v0.11.6.4 (#247)
1 parent 0f25c64 commit 5eb1025

File tree

8 files changed

+87
-11
lines changed

8 files changed

+87
-11
lines changed

odps/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
version_info = (0, 11, 6, 3)
15+
version_info = (0, 11, 6, 4)
1616
_num_index = max(idx if isinstance(v, int) else 0
1717
for idx, v in enumerate(version_info))
1818
__version__ = '.'.join(map(str, version_info[:_num_index + 1])) + \

odps/config.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
contextvars = None
3131

3232

33+
DEFAULT_BLOCK_BUFFER_SIZE = 20 * 1024 ** 2
3334
DEFAULT_CHUNK_SIZE = 65536
3435
DEFAULT_CONNECT_RETRY_TIMES = 4
3536
DEFAULT_CONNECT_TIMEOUT = 120
@@ -493,7 +494,7 @@ def emit(self, record):
493494
default_options.register_option(
494495
'tunnel.quota_name', None, validator=any_validator(is_null, is_string)
495496
)
496-
default_options.register_option('tunnel.block_buffer_size', 20 * 1024 ** 2, validator=is_integer)
497+
default_options.register_option('tunnel.block_buffer_size', DEFAULT_BLOCK_BUFFER_SIZE, validator=is_integer)
497498
default_options.register_option('tunnel.use_block_writer_by_default', False, validator=is_bool)
498499

499500
default_options.redirect_option('tunnel_endpoint', 'tunnel.endpoint')

odps/models/tasks.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,8 @@ class MaxFrameTask(Task):
298298
class CommandType(enum.Enum):
299299
CREATE_SESSION = "CREATE_SESSION"
300300
PYTHON_PACK = "PYTHON_PACK"
301+
RAY_CLUSTER_INIT = "RAY_CLUSTER_INIT"
302+
RAY_CLUSTER_FREE = "RAY_CLUSTER_FREE"
301303

302304
command = serializers.XMLNodeField(
303305
"Command",

odps/models/tests/test_tasks.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,31 @@
109109
</MaxFrame>
110110
'''
111111

112+
mf_ray_cluster_init_template = '''<?xml version="1.0" encoding="utf-8"?>
113+
<MaxFrame>
114+
<Name>AnonymousMaxFrameTask</Name>
115+
<Config>
116+
<Property>
117+
<Name>settings</Name>
118+
<Value>{"odps.maxframe.output_format": "maxframe_v1"}</Value>
119+
</Property>
120+
<Property>
121+
<Name>aliyunId</Name>
122+
<Value>%(aliyun_id)s</Value>
123+
</Property>
124+
<Property>
125+
<Name>regionId</Name>
126+
<Value>%(region_id)s</Value>
127+
</Property>
128+
<Property>
129+
<Name>quotaNick</Name>
130+
<Value>%(quota_nick)s</Value>
131+
</Property>
132+
</Config>
133+
<Command>RAY_CLUSTER_INIT</Command>
134+
</MaxFrame>
135+
'''
136+
112137

113138
def test_task_class_type():
114139
typed = Task(type='SQL', query='select * from dual')
@@ -280,3 +305,25 @@ def test_maxframe_task_to_xml(odps):
280305
task = Task.parse(None, to_xml)
281306
assert isinstance(task, MaxFrameTask)
282307
assert task.command == MaxFrameTask.CommandType.CREATE_SESSION
308+
309+
310+
def test_ray_cluster_init(odps):
311+
task = MaxFrameTask(command=MaxFrameTask.CommandType.RAY_CLUSTER_INIT)
312+
aliyun_id = "test-aliyunid"
313+
region_id = "test-regionId"
314+
quota_nick = "test-quotaNick"
315+
task.update_settings({"odps.maxframe.output_format": "maxframe_v1"})
316+
task.set_property("aliyunId", aliyun_id)
317+
task.set_property("regionId", region_id)
318+
task.set_property("quotaNick", quota_nick)
319+
to_xml = task.serialize()
320+
right_xml = mf_ray_cluster_init_template % {
321+
"aliyun_id": aliyun_id,
322+
"region_id": region_id,
323+
"quota_nick": quota_nick,
324+
}
325+
assert to_text(to_xml) == to_text(right_xml)
326+
task = Task.parse(None, to_xml)
327+
assert isinstance(task, MaxFrameTask)
328+
assert task.command == MaxFrameTask.CommandType.RAY_CLUSTER_INIT
329+

odps/tunnel/io/writer.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,6 @@ def __init__(
396396
block_id=None,
397397
block_id_gen=None,
398398
):
399-
self._buffer_size = buffer_size or options.tunnel.block_buffer_size
400399
self._request_callback = request_callback
401400
self._block_id = block_id or 0
402401
self._blocks_written = []
@@ -408,6 +407,9 @@ def __init__(
408407
out = get_compress_stream(self._buffer, compress_option)
409408
super(BufferedRecordWriter, self).__init__(schema, out, encoding=encoding)
410409

410+
# make sure block buffer size is applied correctly here
411+
self._buffer_size = buffer_size or options.tunnel.block_buffer_size
412+
411413
@property
412414
def cur_block_id(self):
413415
return self._block_id
@@ -529,10 +531,12 @@ def __init__(self, schema, out=None, chunk_size=None):
529531
self._cur_chunk_size = 0
530532

531533
self._output = out
532-
self._write_chunk_size()
534+
self._chunk_size_written = False
533535

534536
def _re_init(self, output):
535537
self._output = output
538+
self._chunk_size_written = False
539+
self._cur_chunk_size = 0
536540

537541
def _write_chunk_size(self):
538542
self._write_uint32(self._chunk_size)
@@ -542,6 +546,9 @@ def _write_uint32(self, val):
542546
self._output.write(data)
543547

544548
def _write_chunk(self, buf):
549+
if not self._chunk_size_written:
550+
self._write_chunk_size()
551+
self._chunk_size_written = True
545552
self._output.write(buf)
546553
self._crc.update(buf)
547554
self._crccrc.update(buf)

odps/tunnel/tabletunnel.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -443,11 +443,13 @@ def open_record_writer(
443443
)
444444

445445
def open_arrow_writer(
446-
self, block_id=None, compress=False, initial_block_id=None, block_id_gen=None
446+
self, block_id=None, compress=False, buffer_size=None, initial_block_id=None,
447+
block_id_gen=None
447448
):
448449
return self._open_writer(
449-
block_id=block_id, compress=compress, initial_block_id=initial_block_id,
450-
block_id_gen=block_id_gen, writer_cls=ArrowWriter
450+
block_id=block_id, compress=compress, buffer_size=buffer_size,
451+
initial_block_id=initial_block_id, block_id_gen=block_id_gen,
452+
writer_cls=ArrowWriter
451453
)
452454

453455
if np is not None:

odps/tunnel/tests/test_arrow_tabletunnel.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,16 +70,21 @@ def upload_data(test_table, data, compress=False, **kw):
7070
writer.close()
7171
upload_ss.commit([0, ])
7272

73-
def buffered_upload_data(test_table, data, compress=False, **kw):
73+
def buffered_upload_data(test_table, data, buffer_size=None, compress=False, **kw):
7474
upload_ss = tunnel.create_upload_session(test_table, **kw)
75-
writer = upload_ss.open_arrow_writer(compress=compress)
75+
writer = upload_ss.open_arrow_writer(compress=compress, buffer_size=buffer_size)
7676

7777
pd_data = data.to_pandas()
7878
part1 = pd_data.iloc[:len(pd_data) // 2]
7979
writer.write(part1)
8080
part2 = pd_data.iloc[len(pd_data) // 2:]
8181
writer.write(part2)
8282
writer.close()
83+
84+
if buffer_size is None:
85+
assert len(writer.get_blocks_written()) == 1
86+
else:
87+
assert len(writer.get_blocks_written()) > 1
8388
upload_ss.commit(writer.get_blocks_written())
8489

8590
def download_data(test_table, columns=None, compress=False, **kw):
@@ -140,6 +145,7 @@ def delete_table(table_name):
140145
"create_table, create_partitioned_table, delete_table"
141146
)
142147
raw_chunk_size = options.chunk_size
148+
raw_buffer_size = options.tunnel.block_buffer_size
143149
try:
144150
options.sql.use_odps2_extension = True
145151
yield nt(
@@ -149,6 +155,7 @@ def delete_table(table_name):
149155
finally:
150156
options.sql.use_odps2_extension = None
151157
options.chunk_size = raw_chunk_size
158+
options.tunnel.block_buffer_size = raw_buffer_size
152159

153160

154161
def _assert_frame_equal(left, right):
@@ -212,8 +219,8 @@ def test_buffered_upload_and_download_by_raw_tunnel(odps, setup):
212219
assert len(pd_df) == 0
213220

214221
# test upload and download without errors
215-
data = setup.gen_data()
216-
setup.buffered_upload_data(test_table_name, data)
222+
data = setup.gen_data(1024)
223+
setup.buffered_upload_data(test_table_name, data, buffer_size=4096)
217224

218225
pd_df = setup.download_data(test_table_name)
219226
_assert_frame_equal(data, pd_df)

odps/tunnel/tests/test_tabletunnel.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,11 @@ def buffered_upload_data(self, test_table, records, buffer_size=None, compress=F
269269
record[i] = it
270270
writer.write(record)
271271
writer.close()
272+
273+
if buffer_size is None:
274+
assert len(writer.get_blocks_written()) == 1
275+
else:
276+
assert len(writer.get_blocks_written()) > 1
272277
upload_ss.commit(writer.get_blocks_written())
273278

274279
def download_data(self, test_table, compress=False, columns=None, **kw):
@@ -345,12 +350,17 @@ def _reloader():
345350
@pytest.fixture
346351
def setup(odps, tunnel):
347352
random.seed(0)
353+
raw_chunk_size = options.chunk_size
354+
raw_buffer_size = options.tunnel.block_buffer_size
355+
348356
util = TunnelTestUtil(odps, tunnel)
349357
try:
350358
yield util
351359
finally:
352360
if util.last_table:
353361
util.last_table.drop(async_=True)
362+
options.chunk_size = raw_chunk_size
363+
options.tunnel.block_buffer_size = raw_buffer_size
354364

355365

356366
def test_malicious_request_detection(setup):

0 commit comments

Comments
 (0)