Skip to content

Commit 0f25c64

Browse files
authored
Releases v0.11.6.3 (#246)
1 parent be261ff commit 0f25c64

File tree

10 files changed

+143
-55
lines changed

10 files changed

+143
-55
lines changed

odps/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
version_info = (0, 11, 6, 2)
15+
version_info = (0, 11, 6, 3)
1616
_num_index = max(idx if isinstance(v, int) else 0
1717
for idx, v in enumerate(version_info))
1818
__version__ = '.'.join(map(str, version_info[:_num_index + 1])) + \

odps/apis/storage_api/tests/test_storage_api.py

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,7 @@ def test_storage_api(storage_api_client):
3838

3939
assert resp.status == Status.OK
4040
if resp.status != Status.OK:
41-
logger.info("Create write session failed")
42-
return
41+
raise IOError("Create write session failed")
4342

4443
req = SessionRequest(session_id=resp.session_id)
4544

@@ -49,7 +48,7 @@ def test_storage_api(storage_api_client):
4948
assert resp.status == Status.OK
5049

5150
if resp.status != Status.OK:
52-
logger.info("Get write session failed")
51+
raise IOError("Get write session failed")
5352
return
5453

5554
if resp.session_status != SessionStatus.NORMAL and resp.session_status != SessionStatus.COMMITTED:
@@ -78,20 +77,21 @@ def test_storage_api(storage_api_client):
7877
if i == 0:
7978
suc = writer.write(record_batch.schema.serialize().to_pybytes())
8079
if not suc:
81-
logger.info("write arrow schema failed")
82-
break
80+
raise IOError("write arrow schema failed")
8381

8482
suc = writer.write(record_batch.serialize().to_pybytes())
8583
if not suc:
86-
logger.info("write arrow record batch failed")
87-
break
84+
raise IOError("write arrow record batch failed")
8885

86+
# write EOS given https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
87+
suc = writer.write(b'\xff\xff\xff\xff\x00\x00\x00\x00')
88+
if not suc:
89+
raise IOError("write EOS failed")
8990
commit_message, suc = writer.finish()
9091

9192
assert suc is True
9293
if not suc:
93-
logger.info("Write rows failed")
94-
return
94+
raise IOError("Write rows failed")
9595
else:
9696
end = time.time()
9797
logger.info("Write rows cost: " + str(end - start) + "s")
@@ -103,8 +103,7 @@ def test_storage_api(storage_api_client):
103103
resp = storage_api_client.commit_write_session(req, commit_messages)
104104

105105
if resp.status != Status.OK and resp.status != Status.WAIT:
106-
logger.info("Fail to commit write session")
107-
return
106+
raise IOError("Fail to commit write session")
108107

109108
if resp.status == Status.WAIT:
110109
req = SessionRequest(session_id=resp.session_id)
@@ -114,8 +113,7 @@ def test_storage_api(storage_api_client):
114113
assert resp.status == Status.OK
115114

116115
if resp.status != Status.OK:
117-
logger.info("Get write session failed")
118-
return
116+
raise IOError("Get write session failed")
119117

120118
if resp.session_status != SessionStatus.NORMAL and resp.session_status != SessionStatus.COMMITTED:
121119
logger.info("Wait...")
@@ -131,17 +129,15 @@ def test_storage_api(storage_api_client):
131129
resp = storage_api_client.create_read_session(req)
132130

133131
if resp.status != Status.OK and resp.status != Status.WAIT:
134-
logger.info("create read session failed")
135-
return
132+
raise IOError("create read session failed")
136133

137134
req = SessionRequest(session_id=resp.session_id)
138135

139136
while True:
140137
resp = storage_api_client.get_read_session(req)
141138

142139
if resp.status != Status.OK:
143-
logger.info("get read session failed")
144-
return
140+
raise IOError("get read session failed")
145141

146142
if resp.session_status == SessionStatus.INIT:
147143
logger.info("Wait...")
@@ -168,8 +164,7 @@ def test_storage_api(storage_api_client):
168164

169165
reader.close()
170166
if reader.get_status() != Status.OK:
171-
logger.info("Read rows failed")
172-
return
167+
raise IOError("Read rows failed")
173168

174169
end = time.time()
175170
logger.info("Read rows cost (index " + str(i) + "): " + str(end - start) + "s")

odps/core.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import copy
1616
import functools
17+
import hashlib
1718
import json # noqa: F401
1819
import os
1920
import random
@@ -2357,6 +2358,18 @@ def _create_mcqa_session(
23572358
task=task, session_project=project, session_name=session_name
23582359
)
23592360

2361+
def _get_mcqa_session_file(self):
2362+
try:
2363+
dir_name = utils.build_pyodps_dir()
2364+
if not os.path.exists(dir_name):
2365+
os.makedirs(dir_name)
2366+
access_id_digest = hashlib.md5(utils.to_binary(self.account.access_id)).hexdigest()
2367+
return os.path.join(
2368+
dir_name, "mcqa-session-" + access_id_digest
2369+
)
2370+
except:
2371+
return None
2372+
23602373
def run_sql_interactive(self, sql, hints=None, **kwargs):
23612374
"""
23622375
Run SQL query in interactive mode (a.k.a MaxCompute QueryAcceleration).
@@ -2370,10 +2383,29 @@ def run_sql_interactive(self, sql, hints=None, **kwargs):
23702383
task_name = kwargs.pop('task_name', None)
23712384
service_startup_timeout = kwargs.pop('service_startup_timeout', 60)
23722385
force_reattach = kwargs.pop('force_reattach', False)
2373-
if self._default_session != None:
2386+
2387+
session_file_name = self._get_mcqa_session_file()
2388+
if (
2389+
self._default_session is None
2390+
and session_file_name
2391+
and os.path.exists(session_file_name)
2392+
):
2393+
try:
2394+
with open(session_file_name, "r") as session_file:
2395+
session_info = json.loads(session_file.read())
2396+
instance_obj = self.get_instance(session_info.pop("id"))
2397+
session_project = self.get_project(session_info.pop("session_project_name"))
2398+
self._default_session_name = session_info["session_name"]
2399+
self._default_session = models.SessionInstance.from_instance(
2400+
instance_obj, session_project=session_project, **session_info
2401+
)
2402+
except:
2403+
pass
2404+
2405+
if self._default_session is not None:
23742406
try:
23752407
cached_is_running = self._default_session.is_running()
2376-
except BaseException:
2408+
except:
23772409
pass
23782410
if (
23792411
force_reattach
@@ -2385,6 +2417,15 @@ def run_sql_interactive(self, sql, hints=None, **kwargs):
23852417
self._default_session = self._attach_mcqa_session(service_name, task_name=task_name)
23862418
self._default_session.wait_for_startup(0.1, service_startup_timeout, max_interval=1)
23872419
self._default_session_name = service_name
2420+
2421+
if session_file_name:
2422+
try:
2423+
with open(session_file_name, "w") as session_file:
2424+
session_file.write(
2425+
json.dumps(self._default_session._extract_json_info())
2426+
)
2427+
except:
2428+
pass
23882429
return self._default_session.run_sql(sql, hints, **kwargs)
23892430

23902431
@utils.deprecated(

odps/models/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from .function import Function
2626
from .resources import Resources
2727
from .resource import *
28+
from .session import InSessionInstance, SessionInstance
2829
from .tenant import Tenant
2930
from .volumes import *
3031
from .volume_parted import PartedVolume, VolumePartition

odps/models/session.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,20 @@ def __init__(self, **kw):
147147
self._session_name = kw.pop("session_name", "")
148148
super(SessionInstance, self).__init__(**kw)
149149

150+
@classmethod
151+
def from_instance(cls, instance, **kw):
152+
return SessionInstance(
153+
name=instance.id, parent=instance.parent, client=instance._client, **kw
154+
)
155+
156+
def _extract_json_info(self):
157+
return {
158+
"id": self.id,
159+
"session_project_name": self._project.name,
160+
"session_task_name": self._task_name,
161+
"session_name": self._session_name,
162+
}
163+
150164
def wait_for_startup(self, interval=1, timeout=-1, retry=True, max_interval=None):
151165
"""
152166
Wait for the session to startup(status changed to RUNNING).

odps/models/tests/test_session.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,3 +358,11 @@ def test_fallback_policy():
358358
assert default_policy.get_mode_from_exception(
359359
errors.SQAQueryTimedout("QueryTimedout")
360360
) is FallbackMode.OFFLINE
361+
362+
363+
def test_reuse_session(odps):
364+
inst = odps.run_sql_interactive("select * from dual", force_reattach=True)
365+
inst.wait_for_completion()
366+
new_odps = ODPS(account=odps.account, project=odps.project, endpoint=odps.endpoint)
367+
new_inst = new_odps.run_sql_interactive("select * from dual")
368+
assert inst.id == new_inst.id

odps/tunnel/io/reader.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,7 @@ def to_pandas(self):
508508

509509
if not batches:
510510
return self._arrow_schema.empty_table().to_pandas()
511-
return pd.concat(batches, axis=0)
511+
return pd.concat(batches, axis=0, ignore_index=True)
512512

513513
def __enter__(self):
514514
return self

odps/tunnel/io/writer.py

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -438,15 +438,7 @@ def _reset_writer(self, write_response):
438438
self._crc.reset()
439439

440440
def _send_buffer(self):
441-
def gen(): # synchronize chunk upload
442-
data = self._buffer.getvalue()
443-
chunk_size = options.chunk_size
444-
while data:
445-
to_send = data[:chunk_size]
446-
data = data[chunk_size:]
447-
yield to_send
448-
449-
return self._request_callback(self._block_id, gen())
441+
return self._request_callback(self._block_id, self._buffer.getvalue())
450442

451443
def _flush(self):
452444
self._write_finish_tags()
@@ -543,20 +535,20 @@ def _re_init(self, output):
543535
self._output = output
544536

545537
def _write_chunk_size(self):
546-
self._write_unint32(self._chunk_size)
538+
self._write_uint32(self._chunk_size)
547539

548-
def _write_unint32(self, val):
540+
def _write_uint32(self, val):
549541
data = struct.pack("!I", utils.long_to_uint(val))
550542
self._output.write(data)
551543

552544
def _write_chunk(self, buf):
553545
self._output.write(buf)
546+
self._crc.update(buf)
554547
self._crccrc.update(buf)
555548
self._cur_chunk_size += len(buf)
556549
if self._cur_chunk_size >= self._chunk_size:
557-
self._crc.update(buf)
558550
checksum = self._crc.getvalue()
559-
self._write_unint32(checksum)
551+
self._write_uint32(checksum)
560552
self._crc.reset()
561553
self._cur_chunk_size = 0
562554

@@ -602,7 +594,7 @@ def write(self, data):
602594

603595
if tp == pa.timestamp("ms") or tp == pa.timestamp("ns"):
604596
if self._schema[name].type == types.timestamp_ntz:
605-
column_dict[name] = self._localize_timezone(column_dict, "UTC")
597+
column_dict[name] = self._localize_timezone(column_dict[name], "UTC")
606598
else:
607599
column_dict[name] = self._localize_timezone(column_dict[name])
608600

@@ -633,7 +625,7 @@ def write(self, data):
633625

634626
def _write_finish_tags(self):
635627
checksum = self._crccrc.getvalue()
636-
self._write_unint32(checksum)
628+
self._write_uint32(checksum)
637629
self._crccrc.reset()
638630

639631
def flush(self):
@@ -723,15 +715,7 @@ def _reset_writer(self):
723715
self._crc.reset()
724716

725717
def _send_buffer(self):
726-
def gen(): # synchronize chunk upload
727-
data = self._buffer.getvalue()
728-
chunk_size = options.chunk_size
729-
while data:
730-
to_send = data[:chunk_size]
731-
data = data[chunk_size:]
732-
yield to_send
733-
734-
return self._request_callback(self._block_id, gen())
718+
return self._request_callback(self._block_id, self._buffer.getvalue())
735719

736720
def _flush(self):
737721
self._write_finish_tags()

odps/tunnel/tabletunnel.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,14 @@ def _init(self):
355355
def reload(self):
356356
self._create_or_reload_session(reload=True)
357357

358+
@classmethod
359+
def _iter_data_in_batches(cls, data):
360+
pos = 0
361+
chunk_size = options.chunk_size
362+
while pos < len(data):
363+
yield data[pos: pos + chunk_size]
364+
pos += chunk_size
365+
358366
def _open_writer(
359367
self,
360368
block_id=None,
@@ -387,9 +395,15 @@ def _open_writer(
387395
@_wrap_upload_call(self.id)
388396
def upload_block(blockid, data):
389397
params['blockid'] = blockid
390-
return utils.call_with_retry(
391-
self._client.put, url, data=data, params=params, headers=headers
392-
)
398+
399+
def upload_func():
400+
if isinstance(data, (bytes, bytearray)):
401+
to_upload = self._iter_data_in_batches(data)
402+
else:
403+
to_upload = data
404+
return self._client.put(url, data=to_upload, params=params, headers=headers)
405+
406+
return utils.call_with_retry(upload_func)
393407

394408
if writer_cls is ArrowWriter:
395409
writer_cls = BufferedArrowWriter

0 commit comments

Comments
 (0)