forked from ai-dynamo/nixl
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathquery_mem_example.py
More file actions
executable file
·134 lines (111 loc) · 4.74 KB
/
query_mem_example.py
File metadata and controls
executable file
·134 lines (111 loc) · 4.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import tempfile
try:
from nixl._api import nixl_agent, nixl_agent_config
from nixl.logging import get_logger
logger = get_logger(__name__)
NIXL_AVAILABLE = True
except ImportError:
import logging
logger = logging.getLogger(__name__)
logger.error("NIXL API missing install NIXL.")
NIXL_AVAILABLE = False
if __name__ == "__main__":
logger.info("NIXL queryMem Python API Example")
logger.info("=" * 40)
if not NIXL_AVAILABLE:
logger.warning("Skipping example - NIXL bindings not available")
sys.exit(0)
# Create temporary test files
os.makedirs("files_for_query", exist_ok=True)
temp_files = []
for i in range(3):
with tempfile.NamedTemporaryFile(
dir="files_for_query", delete=False, suffix=f"_{i}.txt", mode="wb"
) as temp_file:
temp_file.write(f"Test content for file {i}".encode())
temp_files.append(str(temp_file.name))
# Create a non-existent file path
non_existent_file = "/tmp/nixl_example_nonexistent.txt"
try:
logger.info(
"Using NIXL Plugins from: %s",
os.environ.get("NIXL_PLUGIN_DIR", "default location"),
)
# Create an NIXL agent
logger.info("Creating NIXL agent...")
config = nixl_agent_config(
enable_prog_thread=False, enable_listen_thread=False, backends=[]
)
agent = nixl_agent("example_agent", config)
# Prepare a list of tuples as file paths in metaInfo field for querying.
# Addr and length and devID fields are set to 0 for file queries.
logger.info("Preparing file paths for querying...")
file_paths = [
(0, 0, 0, temp_files[0]), # Existing file 1
(0, 0, 0, temp_files[1]), # Existing file 2
(0, 0, 0, non_existent_file), # Non-existent file
(0, 0, 0, temp_files[2]), # Existing file 3
]
# Query memory using queryMem
logger.info("Querying memory/storage information...")
# Try to create a backend with POSIX plugin
try:
params = agent.get_plugin_params("POSIX")
agent.create_backend("POSIX", params)
logger.info("Created backend: POSIX")
# Query with specific backend
resp = agent.query_memory(file_paths, "POSIX", mem_type="FILE")
except Exception as e:
logger.exception("POSIX backend creation failed: %s", e)
# Try MOCK_DRAM as fallback
try:
params = agent.get_plugin_params("MOCK_DRAM")
agent.create_backend("MOCK_DRAM", params)
logger.info("Created backend: MOCK_DRAM")
# Query with specific backend
resp = agent.query_memory(file_paths, "MOCK_DRAM", mem_type="FILE")
except Exception as e2:
logger.exception("MOCK_DRAM also failed: %s", e2)
logger.exception("No working backends available")
sys.exit(0)
# Display results
logger.info("\nQuery results (%d responses):", len(resp))
logger.info("-" * 50)
for i, result in enumerate(resp):
logger.info("Descriptor %d:", i)
if result is not None:
logger.info(" File size: %s bytes", result.get("size", "N/A"))
logger.info(" File mode: %s", result.get("mode", "N/A"))
logger.info(" Modified time: %s", result.get("mtime", "N/A"))
else:
logger.info(" File does not exist or is not accessible")
logger.info("")
logger.info("Example completed successfully!")
except Exception as e:
logger.exception("Error in example: %s", e)
import traceback
traceback.print_exc()
finally:
# Clean up temporary files
logger.info("Cleaning up temporary files...")
for temp_file_path in temp_files:
if os.path.exists(temp_file_path):
os.unlink(temp_file_path)
logger.info("Removed: %s", temp_file_path)