Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions ci/ray_ci/tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,14 @@ def main(

if bisect_run_test_target:
test_targets = [bisect_run_test_target]
print(f"[Kunchd] Got bisect run test target: {bisect_run_test_target}")
else:
get_high_impact_tests = (
run_high_impact_tests or os.environ.get("RAYCI_MICROCHECK_RUN") == "1"
)
print(f"[Kunchd] Got high impact tests: {get_high_impact_tests}")
lookup_test_database = os.environ.get("RAYCI_DISABLE_TEST_DB") != "1"
print(f"[Kunchd] Got lookup test database: {lookup_test_database}")
test_targets = _get_test_targets(
container,
targets,
Expand All @@ -282,7 +285,12 @@ def main(
)
if not test_targets:
print("--- No tests to run", file=sys.stderr)
sys.exit(0)
# sys.exit(0)
print(
"[Kunchd] Did not find any test targets to run defaulting to: //python/ray/tests:test_object_spilling",
file=sys.stderr,
)
test_targets = ["//python/ray/tests:test_object_spilling"]

print(f"+++ Running {len(test_targets)} tests", file=sys.stderr)
success = container.run_tests(
Expand Down Expand Up @@ -419,6 +427,7 @@ def _get_test_targets(
Get test targets that are owned by a particular team
"""
query = _get_all_test_query(targets, team, except_tags, only_tags)
print(f"[Kunchd] Got test query: {query}")
if not workspace_dir:
workspace_dir = bazel_workspace_dir
test_targets = {
Expand All @@ -432,6 +441,7 @@ def _get_test_targets(
.split(os.linesep)
if target
}
print(f"[Kunchd] Got test targets: {test_targets}")
flaky_tests = set(
_get_flaky_test_targets(
team,
Expand All @@ -440,7 +450,7 @@ def _get_test_targets(
lookup_test_database=lookup_test_database,
)
)

print(f"[Kunchd] Got flaky tests: {flaky_tests}")
if get_flaky_tests:
# run flaky test cases, so we include flaky tests in the list of targets
# provided by users
Expand All @@ -463,7 +473,7 @@ def _get_test_targets(
team=team,
).union(_get_new_tests(prefix, workspace_dir))
final_targets = high_impact_tests.intersection(final_targets)

print(f"[Kunchd] Got final targets: {final_targets}")
return sorted(final_targets)


Expand Down
63 changes: 61 additions & 2 deletions python/ray/_private/external_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ def _write_multiple_objects(
The order of returned keys are equivalent to the one
with given object_refs.
"""
logger.info(
f"[Kunchd] ExternalStorage._write_multiple_objects: Writing "
f"num_objects={len(object_refs)} to url={url}"
)
keys = []
offset = 0
ray_object_pairs = self._get_objects_from_store(object_refs)
Expand Down Expand Up @@ -181,6 +185,10 @@ def _write_multiple_objects(
offset += written_bytes
# Necessary because pyarrow.io.NativeFile does not flush() on close().
f.flush()
logger.info(
f"[Kunchd] ExternalStorage._write_multiple_objects: Finished writing "
f"num_keys={len(keys)}, total_bytes_written={offset}"
)
return keys

def _size_check(self, address_len, metadata_len, buffer_len, obtained_data_size):
Expand Down Expand Up @@ -307,21 +315,40 @@ def __init__(
# Create directories.
for path in directory_path:
full_dir_path = os.path.join(path, f"{DEFAULT_OBJECT_PREFIX}_{node_id}")
logger.info(
f"[Kunchd] FileSystemStorage.__init__: Creating spill directory "
f"full_dir_path={full_dir_path}, base_path={path}, node_id={node_id}"
)
os.makedirs(full_dir_path, exist_ok=True)
if not os.path.exists(full_dir_path):
raise ValueError(
"The given directory path to store objects, "
f"{full_dir_path}, could not be created."
)
logger.info(
f"[Kunchd] FileSystemStorage.__init__: Successfully created/verified "
f"spill directory full_dir_path={full_dir_path}, exists={os.path.exists(full_dir_path)}"
)
self._directory_paths.append(full_dir_path)
assert len(self._directory_paths) == len(directory_path)
logger.info(
f"[Kunchd] FileSystemStorage.__init__: Initialized with "
f"directory_paths={self._directory_paths}"
)
# Choose the current directory.
# It chooses a random index to maximize multiple directories that are
# mounted at different point.
self._current_directory_index = random.randrange(0, len(self._directory_paths))

def spill_objects(self, object_refs, owner_addresses) -> List[str]:
logger.info(
f"[Kunchd] FileSystemStorage.spill_objects: Called with "
f"num_object_refs={len(object_refs)}, directory_paths={self._directory_paths}"
)
if len(object_refs) == 0:
logger.info(
"[Kunchd] FileSystemStorage.spill_objects: No objects to spill, returning empty"
)
return []
# Choose the current directory path by round robin order.
self._current_directory_index = (self._current_directory_index + 1) % len(
Expand All @@ -331,8 +358,17 @@ def spill_objects(self, object_refs, owner_addresses) -> List[str]:

filename = _get_unique_spill_filename(object_refs)
url = f"{os.path.join(directory_path, filename)}"
logger.info(
f"[Kunchd] FileSystemStorage.spill_objects: Writing to url={url}, "
f"directory_path={directory_path}, filename={filename}"
)
with open(url, "wb", buffering=self._buffer_size) as f:
return self._write_multiple_objects(f, object_refs, owner_addresses, url)
result = self._write_multiple_objects(f, object_refs, owner_addresses, url)
logger.info(
f"[Kunchd] FileSystemStorage.spill_objects: Wrote {len(result)} objects to {url}, "
f"file_exists={os.path.exists(url)}"
)
return result

def restore_spilled_objects(
self, object_refs: List[ObjectRef], url_with_offset_list: List[str]
Expand Down Expand Up @@ -576,10 +612,17 @@ def spill_objects(self, object_refs, owner_addresses) -> List[str]:

def setup_external_storage(config, node_id, session_name):
"""Setup the external storage according to the config."""
logger.info(
f"[Kunchd] setup_external_storage: Called with config={config}, "
f"node_id={node_id}, session_name={session_name}"
)
assert node_id is not None, "node_id should be provided."
global _external_storage
if config:
storage_type = config["type"]
logger.info(
f"[Kunchd] setup_external_storage: Setting up storage_type={storage_type}"
)
if storage_type == "filesystem":
_external_storage = FileSystemStorage(node_id, **config["params"])
elif storage_type == "smart_open":
Expand All @@ -599,7 +642,14 @@ def setup_external_storage(config, node_id, session_name):
_external_storage = SlowFileStorage(node_id, **config["params"])
else:
raise ValueError(f"Unknown external storage type: {storage_type}")
logger.info(
f"[Kunchd] setup_external_storage: Created storage "
f"type={type(_external_storage).__name__}"
)
else:
logger.info(
"[Kunchd] setup_external_storage: No config provided, using NullStorage"
)
_external_storage = NullStorage()
return _external_storage

Expand All @@ -619,7 +669,16 @@ def spill_objects(object_refs, owner_addresses):
Returns:
A list of keys corresponding to the input object refs.
"""
return _external_storage.spill_objects(object_refs, owner_addresses)
logger.info(
f"[Kunchd] spill_objects (module function): Called with "
f"num_object_refs={len(object_refs)}, storage_type={type(_external_storage).__name__}"
)
result = _external_storage.spill_objects(object_refs, owner_addresses)
logger.info(
f"[Kunchd] spill_objects (module function): Completed, "
f"num_urls_returned={len(result)}"
)
return result


def restore_spilled_objects(
Expand Down
8 changes: 8 additions & 0 deletions python/ray/_private/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -1813,6 +1813,10 @@ def _get_object_spilling_config(self):
variable, and system config. The object spilling directory specified through
ray params will override the one specified through environment variable and
system config."""
logger.info(
f"[Kunchd] _get_object_spilling_config: Called, "
f"session_dir={self._session_dir}"
)

object_spilling_directory = self._ray_params.object_spilling_directory
if not object_spilling_directory:
Expand Down Expand Up @@ -1856,6 +1860,10 @@ def _get_object_spilling_config(self):
"Ray team."
)

logger.info(
f"[Kunchd] _get_object_spilling_config: Returning "
f"object_spilling_config={object_spilling_config}"
)
return object_spilling_config

def _record_stats(self):
Expand Down
16 changes: 16 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2353,6 +2353,10 @@ cdef c_vector[c_string] spill_objects_handler(
c_vector[c_string] owner_addresses

with gil:
logger.info(
f"[Kunchd] spill_objects_handler: Received spill request for "
f"{object_refs_to_spill.size()} objects"
)
object_refs = VectorToObjectRefs(
object_refs_to_spill,
skip_adding_local_ref=False)
Expand All @@ -2364,20 +2368,32 @@ cdef c_vector[c_string] spill_objects_handler(
with ray._private.worker._changeproctitle(
ray_constants.WORKER_PROCESS_TYPE_SPILL_WORKER,
ray_constants.WORKER_PROCESS_TYPE_SPILL_WORKER_IDLE):
logger.info(
f"[Kunchd] spill_objects_handler: Calling external_storage.spill_objects "
f"with {len(object_refs)} object_refs"
)
urls = external_storage.spill_objects(
object_refs, owner_addresses)
logger.info(
f"[Kunchd] spill_objects_handler: external_storage.spill_objects returned "
f"{len(urls)} urls"
)
for url in urls:
return_urls.push_back(url)
except Exception as err:
exception_str = (
"An unexpected internal error occurred while the IO worker "
"was spilling objects: {}".format(err))
logger.exception(exception_str)
logger.error(f"[Kunchd] spill_objects_handler: Exception occurred: {err}")
ray._private.utils.push_error_to_driver(
ray._private.worker.global_worker,
"spill_objects_error",
traceback.format_exc() + exception_str,
job_id=None)
logger.info(
f"[Kunchd] spill_objects_handler: Returning {return_urls.size()} urls"
)
return return_urls


Expand Down
Loading