Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions python-client/pypegasus/base/ttypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,20 @@ def write(self, oprot):

def validate(self):
return

def raw(self):
if self._is_str:
return self.data.decode('UTF-8')
else:
return self.data

def __init__(self, data=None):
if isinstance(data,str):
data = data.encode('UTF-8')
self.data = data
self._is_str = True
self.data = data.encode('UTF-8')
else:
self._is_str = False
self.data = data

def __hash__(self):
value = 17
Expand Down
73 changes: 60 additions & 13 deletions python-client/pypegasus/pgclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from pypegasus.replication.ttypes import query_cfg_request
from pypegasus.rrdb import *
from pypegasus.rrdb.ttypes import scan_request, get_scanner_request, update_request, key_value, multi_put_request, \
multi_get_request, multi_remove_request
multi_get_request, multi_remove_request, filter_type
from pypegasus.transport.protocol import *
from pypegasus.utils.tools import restore_key, get_ttl, bytes_cmp, ScanOptions

Expand Down Expand Up @@ -496,6 +496,10 @@ def start_scan(self):
request.stop_inclusive = self._scan_options.stop_inclusive
request.batch_size = self._scan_options.batch_size
request.need_check_hash = self._check_hash
request.sort_key_filter_type = self._scan_options.sortkey_filter_type
request.sort_key_filter_pattern = blob(self._scan_options.sortkey_filter_pattern)
request.hash_key_filter_type = self._scan_options.hashkey_filter_type
request.hash_key_filter_pattern = blob(self._scan_options.hashkey_filter_pattern)

op = RrdbGetScannerOperator(self._gpid, request, self._partition_hash)
session = self._table.get_session(self._gpid)
Expand Down Expand Up @@ -597,6 +601,9 @@ def generate_key(cls, hash_key, sort_key):
hash_key_len = len(hash_key)
sort_key_len = len(sort_key)

if hash_key_len >= 0xFFFF:
raise ValueError("hash_key length must be less than 65535")

if sort_key_len > 0:
values = (hash_key_len, hash_key, sort_key)
s = struct.Struct('>H'+str(hash_key_len)+'s'+str(sort_key_len)+'s')
Expand All @@ -611,24 +618,50 @@ def generate_key(cls, hash_key, sort_key):

@classmethod
def generate_next_bytes(cls, buff):
pos = len(buff) - 1
"""
Increment the last non-0xFF byte in the buffer.

If `buff` is a string, it is assumed to be encoded with 'latin-1' to ensure
a 1:1 mapping between characters and bytes. Unicode strings with characters
outside the 0-255 range will raise a UnicodeEncodeError.
"""
is_str = isinstance(buff, str)
is_ba = isinstance(buff, bytearray)

if is_str:
arr = bytearray(buff.encode('latin-1'))
elif is_ba:
arr = buff
else:
arr = bytearray(buff)
pos = len(arr) - 1
found = False
while pos >= 0:
if ord(buff[pos]) != 0xFF:
buff[pos] += 1
if arr[pos] != 0xFF:
arr[pos] += 1
found = True
break
if found:
return buff
pos -= 1
if not found:
arr += b'\x00'
if is_str:
return arr.decode('latin-1')
elif is_ba:
return arr
else:
return buff + chr(0)
return bytes(arr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it converted to bytes here?

Copy link
Contributor Author

@WJSGDBZ WJSGDBZ Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid unexpected change: bytearray is mutable and used for in-place operations, but must be converted back to immutable bytes to avoid unexpected change later.


@classmethod
def generate_next_key(cls, hash_key, stop_sort_key):
key = cls.generate_key(hash_key, stop_sort_key)
return blob(cls.generate_next_bytes(key.raw()))

@classmethod
def generate_stop_key(cls, hash_key, stop_sort_key):
if stop_sort_key:
return cls.generate_key(hash_key, stop_sort_key), True
else:
return cls.generate_next_bytes(hash_key), False
return blob(cls.generate_next_bytes(hash_key)), False

def __init__(self, meta_addrs=None, table_name='',
timeout=DEFAULT_TIMEOUT):
Expand Down Expand Up @@ -1004,6 +1037,24 @@ def get_scanner(self, hash_key,
stop_key, stop_inclusive = self.generate_stop_key(hash_key, stop_sort_key)
if not stop_inclusive:
scan_options.stop_inclusive = stop_inclusive

# limit key range by prefix filter
if scan_options.sortkey_filter_type == filter_type.FT_MATCH_PREFIX and \
len(scan_options.sortkey_filter_pattern) > 0:
prefix_start = self.generate_key(hash_key, scan_options.sortkey_filter_pattern)
# If the prefix start is after the current start_key, move the scan start to the prefix.
if bytes_cmp(prefix_start.data, start_key.data) > 0:
start_key = prefix_start
scan_options.start_inclusive = True

prefix_stop = self.generate_next_key(hash_key, scan_options.sortkey_filter_pattern)
# If the prefix stop is before or equal to the current stop_key, move the scan stop to the prefix stop.
# The prefix stop represents the next key after hash_key and sortkey_filter_pattern,
# so stop_inclusive should be False.
if bytes_cmp(prefix_stop.data, stop_key.data) <= 0:
stop_key = prefix_stop
scan_options.stop_inclusive = False

gpid_list = []
hash_list = []
r = bytes_cmp(start_key.data, stop_key.data)
Expand Down Expand Up @@ -1033,10 +1084,6 @@ def get_unordered_scanners(self, max_split_count, scan_options):
size = count // split
more = count % split

opt = ScanOptions()
opt.timeout_millis = scan_options.timeout_millis
opt.batch_size = scan_options.batch_size
opt.snapshot = scan_options.snapshot
scanner_list = []
for i in range(split):
gpid_list = []
Expand All @@ -1048,6 +1095,6 @@ def get_unordered_scanners(self, max_split_count, scan_options):
gpid_list.append(all_gpid_list[count])
hash_list.append(int(count))

scanner_list.append(PegasusScanner(self.table, gpid_list, opt, hash_list, True))
scanner_list.append(PegasusScanner(self.table, gpid_list, scan_options, hash_list, True))

return scanner_list
12 changes: 10 additions & 2 deletions python-client/pypegasus/utils/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ def __init__(self):
self.start_inclusive = True
self.stop_inclusive = False
self.snapshot = None # for future use

self.sortkey_filter_type = filter_type.FT_NO_FILTER
self.sortkey_filter_pattern = ""
self.hashkey_filter_type = filter_type.FT_NO_FILTER
self.hashkey_filter_pattern = ""

def __repr__(self):
lst = ['%s=%r' % (key, value)
for key, value in self.__dict__.items()]
Expand Down Expand Up @@ -104,11 +108,15 @@ def restore_key(merge_key):

return hash_key, sort_key

# This is to ensure compatibility between different byte-like string representations,
# such as 'bytes' and 'str' in various Python versions.
def bval(ch):
return ch if isinstance(ch, int) else ord(ch)

def bytes_cmp(left, right):
min_len = min(len(left), len(right))
for i in range(min_len):
r = ord(left[i]) - ord(right[i])
r = bval(left[i]) - bval(right[i])
if r != 0:
return r

Expand Down
Loading