Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e431c2e
tests numpy
mariakrzywnicka Feb 6, 2025
419a7ed
add tests for data transfer between pytorch and daphne
irhox Feb 9, 2025
18b5096
add tests for data transfer between tensorflow and daphne
irhox Feb 10, 2025
4a5b643
fix bug in getDtype method in daphnelib matrix operator
irhox Feb 10, 2025
e627b5e
test pandas
mariakrzywnicka Feb 17, 2025
01cc9ec
Merge pull request #2 from mariakrzywnicka/899_benchmarks-tensorflow-…
irhox Feb 17, 2025
4487a6d
Merge pull request #3 from daphne-eu/main
irhox Feb 17, 2025
8f854ea
Merge pull request #1 from mariakrzywnicka/main
irhox Feb 28, 2025
ecf7eec
string data transfer via files for np.arrays and pd.series
mariakrzywnicka Mar 2, 2025
d7f181f
Merge pull request #4 from daphne-eu/main
irhox Mar 8, 2025
e3b7b54
Merge remote-tracking branch 'origin/main'
irhox Mar 8, 2025
1607a2e
Merge branch 'main' into update-fork
irhox Mar 8, 2025
b8bbc98
Merge pull request #5 from irhox/update-fork
irhox Mar 8, 2025
4774f97
Merge remote-tracking branch 'origin/main'
irhox Mar 8, 2025
b858728
unify address parameter in ReceiveFromNumpy Method
irhox Mar 8, 2025
82d7b7d
add method from_python() in daphne_context in order to support data t…
irhox Mar 10, 2025
df55149
fix bug in issue #950 with pandas series data transfer
irhox Mar 15, 2025
dcab758
fix bug mentioned in issue #616 - numpy vectors are now supported
irhox Mar 17, 2025
ef3c930
add performance benchmarking and results graphs for matrix multiplica…
irhox Mar 24, 2025
635fccb
add documentation for new from_python() method
irhox Mar 29, 2025
929f3de
add benchmarks for data transfer and exchange between daphne and diff…
irhox Mar 29, 2025
7e4f351
fix title in some of the graphs
irhox Mar 29, 2025
a89b9e6
Reformat ReceiveFromNumpy.h
irhox Mar 31, 2025
e722c80
Update consts.py
irhox Mar 31, 2025
646fa13
Delete send-to-server.sh
irhox Mar 31, 2025
6931118
Clean-up of the PR.
pdamme May 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions doc/DaphneLib/APIRef.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ However, as the methods largely map to DaphneDSL built-in functions, you can fin

### `DaphneContext` API Reference

**Importing data from Python lists:**

- **`from_python`**`(mat: [], shared_memory=True, verbose=False, return_shape=False) -> Matrix`

**Importing data from other Python libraries:**

- **`from_numpy`**`(mat: np.array, shared_memory=True, verbose=False, return_shape=False) -> Matrix`
Expand Down
54 changes: 53 additions & 1 deletion doc/DaphneLib/Overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,65 @@ A comprehensive list of these methods can be found in the [DaphneLib API referen
The data transfer from DaphneLib back to Python happens during the call to `compute()`.
If the result of the computation in DAPHNE is a matrix, `compute()` returns a `numpy.ndarray` (or optionally a `tensorflow.Tensor` or `torch.Tensor`); if the result is a frame, it returns a `pandas.DataFrame`; and if the result is a scalar, it returns a plain Python scalar.

So far, DaphneLib can exchange data with numpy, pandas, TensorFlow, and PyTorch.
So far, DaphneLib can exchange data with numpy, pandas, TensorFlow, PyTorch, and plain Python lists.
By default, the data transfer is via shared memory (and in many cases zero-copy).
Numpy and pandas are *required* dependencies for DaphneLib, so they should anyway be installed.
TensorFlow and PyTorch are *optional* for DaphneLib; if these libraries are not installed, DaphneLib cannot exchange data with them, but all remaining features still work.
In case you run DAPHNE inside the [`daphne-dev` container](/doc/GettingStarted.md), please note that TensorFlow and PyTorch are *not* included in the `daphne-dev` container due to their large footprint.
Please follow the [instructions](/doc/development/InstallPythonLibsInContainer.md) on installing Python libraries in the `daphne-dev` container if you need them.

### Data Exchange with Plain Python Lists

*Example:*

```python
from daphne.context.daphne_context import DaphneContext

dc = DaphneContext()

# Create a python list.
a = [10, 20, 30, 40, 50, 60]

# Transfer data to DaphneLib (lazily evaluated).
X = dc.from_python(a)

print("How DAPHNE sees the data:")
X.print().compute()

# Add 100 to each value in X.
X = X + 100.0

# Compute in DAPHNE, transfer result back to Python.
print("\nResult of adding 100 to each value, back in Python:")
print(X.compute())
```

*Run by:*
```shell
python3 scripts/examples/daphnelib/data-exchange-python.py
```

*Output:*
```text
How DAPHNE sees the data:
DenseMatrix(6x1, int64_t)
10
20
30
40
50
60

Result of adding 100 to each value, back in Python:
[[110.]
[120.]
[130.]
[140.]
[150.]
[160.]]
```


### Data Exchange with numpy

*Example:*
Expand Down
33 changes: 33 additions & 0 deletions scripts/examples/daphnelib/data-exchange-python.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright 2025 The DAPHNE Consortium
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from daphne.context.daphne_context import DaphneContext

dc = DaphneContext()

# Create a python list.
a = [10, 20, 30, 40, 50, 60]

# Transfer data to DaphneLib (lazily evaluated).
X = dc.from_python(a)

print("How DAPHNE sees the data:")
X.print().compute()

# Add 100 to each value in X.
X = X + 100.0

# Compute in DAPHNE, transfer result back to Python.
print("\nResult of adding 100 to each value, back in Python:")
print(X.compute())
58 changes: 56 additions & 2 deletions src/api/python/daphne/context/daphne_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from daphne.operator.nodes.do_while_loop import DoWhileLoop
from daphne.operator.nodes.multi_return import MultiReturn
from daphne.operator.operation_node import OperationNode
from daphne.utils.consts import VALID_INPUT_TYPES, VALID_COMPUTED_TYPES, TMP_PATH, F64, F32, SI64, SI32, SI8, UI64, UI32, UI8
from daphne.utils.consts import VALID_INPUT_TYPES, VALID_COMPUTED_TYPES, TMP_PATH, F64, F32, SI64, SI32, SI8, UI64, UI32, UI8, STR

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -69,6 +69,55 @@ def readFrame(self, file: str) -> Frame:
"""
unnamed_params = ['\"'+file+'\"']
return Frame(self, 'readFrame', unnamed_params)

def from_python(self, mat: [], shared_memory=True, verbose=False, return_shape=False):
"""Generates a `DAGNode` representing a matrix with data given by a Python `list`.
:param mat: The Python list.
:param shared_memory: Whether to use shared memory data transfer (True) or not (False).
:param verbose: Whether to print timing information (True) or not (False).
:param return_shape: Whether to return the original shape of the input array.
:return: The data from Python as a Matrix.
"""

original_mat_length = len(mat)
original_mat_dim2_length = None
original_mat_dim3_length = None

# check if mat has one, two or more dimensions
if isinstance(mat[0], list):
original_list_dim2_length = len(mat[0])
if isinstance(mat[0][0], list):
original_list_dim3_length = len(mat[0][0])

if verbose:
start_time = time.time()

# Check if the python list is 2d or higher dimensional.
if original_mat_dim2_length is not None and original_mat_dim3_length is None:
# If 2d, handle as a matrix, convert to numpy array.
mat = np.array(mat)
# Using the existing from_numpy method for 2d arrays.
matrix = self.from_numpy(mat, shared_memory, verbose)
else:
# If higher dimensional, reshape to 2d and handle as a matrix.
# Store the original numpy representation.
original_mat = np.array(mat)
# Reshape to 2d using numpy's zero copy reshape.
reshaped_mat = original_mat.reshape((original_mat_length, -1))

if verbose:
# Check if the original and reshaped lists share memory.
shares_memory = np.shares_memory(mat, reshaped_mat)
print(f"from_python(): original and reshaped lists share memory: {shares_memory}")

# Use the existing from_numpy method for the reshaped 2D array
matrix = self.from_numpy(mat=reshaped_mat, shared_memory=shared_memory, verbose=verbose)

if verbose:
print(f"from_python(): total Python-side execution time: {(time.time() - start_time):.10f} seconds")

# Return the matrix, and the original shape if return_shape is set to True.
return (matrix, (original_mat_length, original_mat_dim2_length, original_mat_dim3_length)) if return_shape else matrix

def from_numpy(self, mat: np.array, shared_memory=True, verbose=False, return_shape=False):
"""Generates a `DAGNode` representing a matrix with data given by a numpy `array`.
Expand All @@ -88,6 +137,7 @@ def from_numpy(self, mat: np.array, shared_memory=True, verbose=False, return_sh
if mat.ndim == 1:
rows = mat.shape[0]
cols = 1
mat = mat.reshape(-1, 1)
elif mat.ndim >= 2:
if mat.ndim > 2:
mat = mat.reshape((original_shape[0], -1))
Expand Down Expand Up @@ -121,6 +171,8 @@ def from_numpy(self, mat: np.array, shared_memory=True, verbose=False, return_sh
vtc = UI32
elif d_type == np.uint64:
vtc = UI64
elif mat.dtype.kind in {'U', 'S', 'O'}:
raise RuntimeError("transfering a numpy array of strings to DAPHNE via shared memory is not supported yet")
else:
# TODO Raise an error here?
print("unsupported numpy dtype")
Expand Down Expand Up @@ -216,6 +268,8 @@ def from_pandas(self, df: pd.DataFrame, shared_memory=True, verbose=False, keepI
vtc = UI32
elif d_type == np.uint64:
vtc = UI64
elif mat.dtype.kind in {'U', 'S', 'O'}:
vtc = STR
else:
raise TypeError(f'Unsupported numpy dtype in column "{column}" ({idx})')

Expand Down Expand Up @@ -251,7 +305,7 @@ def from_pandas(self, df: pd.DataFrame, shared_memory=True, verbose=False, keepI

# This feature is only available if TensorFlow is available.
if isinstance(tf, ImportError):
def from_tensorflow(self, tensor , shared_memory=True, verbose=False, return_shape=False):
def from_tensorflow(self, tensor, shared_memory=True, verbose=False, return_shape=False):
raise tf
else:
def from_tensorflow(self, tensor: tf.Tensor, shared_memory=True, verbose=False, return_shape=False):
Expand Down
4 changes: 3 additions & 1 deletion src/api/python/daphne/operator/nodes/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def code_line(self, var_name: str, unnamed_input_vars: Sequence[str], named_inpu
"numCols": self._pd_dataframe.shape[1],
"schema": [
{
"label": self._pd_dataframe.columns[i],
"label": str(self._pd_dataframe.columns[i]),
"valueType": self.getDType(self._pd_dataframe.dtypes.iloc[i])
}
for i in range(self._pd_dataframe.shape[1])
Expand All @@ -94,6 +94,8 @@ def getDType(self, d_type):
return "si64"
elif d_type == "float64":
return "f64"
elif d_type.kind in {'U', 'S', 'O'}:
return "str"
else:
print("Error")

Expand Down
42 changes: 27 additions & 15 deletions src/api/python/daphne/operator/nodes/matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ def __init__(self, daphne_context: 'DaphneContext', operation:str, unnamed_input
local_data: np.array = None, brackets:bool = False, left_brackets: bool = False, copy: bool = False,
consumer_list: List['OperationNode'] = None)->'Matrix':
self.__copy = copy
is_python_local_data = False
if local_data is not None:

self._np_array = local_data
is_python_local_data = True
else:
self._np_array = None
is_python_local_data = False

super().__init__(daphne_context, operation, unnamed_input_nodes, named_input_nodes, OutputType.MATRIX,is_python_local_data, brackets, left_brackets, consumer_list)

def code_line(self, var_name: str, unnamed_input_vars: Sequence[str],
Expand All @@ -66,7 +66,15 @@ def code_line(self, var_name: str, unnamed_input_vars: Sequence[str],

if self._is_numpy() and self.operation == "readMatrix":
with open(TMP_PATH+"/"+var_name+".csv", "wb") as f:
np.savetxt(f, self._np_array, delimiter=",")
if self._np_array.dtype in [np.float32, np.float64]:
fmt = "%.18e"
elif self._np_array.dtype in [np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32, np.uint64]:
fmt = "%i"
elif self._np_array.dtype.kind in {'U', 'S', 'O'}:
fmt = "%s"
else:
raise RuntimeError("unsupport numpy dtype")
np.savetxt(f, self._np_array, delimiter=",", fmt=fmt)
with open(TMP_PATH+"/"+var_name+".csv.meta", "w") as f:
json.dump(
{
Expand All @@ -79,24 +87,28 @@ def code_line(self, var_name: str, unnamed_input_vars: Sequence[str],
return code_line

def getDType(self, d_type):
if d_type == np.dtype('f4'):
if d_type == np.dtype('float32'):
return "f32"
elif d_type == np.dtype('f8'):
elif d_type == np.dtype('float64'):
return "f64"
elif d_type == np.dtype('si2'):
return "si8"
elif d_type == np.dtype('si4'):
elif d_type == np.dtype('int16'):
return "si16"
elif d_type == np.dtype('int32'):
return "si32"
elif d_type == np.dtype('si8'):
elif d_type == np.dtype('int64'):
return "si64"
elif d_type == np.dtype('ui2'):
return "ui8"
elif d_type == np.dtype('ui4'):
return "ui8"
elif d_type == np.dtype('ui8'):
elif d_type == np.dtype('uint8'):
return "ui8"
elif d_type == np.dtype('uint16'):
return "ui16"
elif d_type == np.dtype('uint32'):
return "ui32"
elif d_type == np.dtype('uint64'):
return "ui64"
elif d_type.kind in {'U', 'S', 'O'}:
return "str"
else:
print("Error")
raise RuntimeError("unsupported numpy dtype")

def _is_numpy(self) -> bool:
return self._np_array is not None
Expand Down
13 changes: 11 additions & 2 deletions src/api/python/daphne/operator/operation_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def update_node_in_input_list(self, new_node, current_node):
current_index = self._unnamed_input_nodes.index(current_node)
self._unnamed_input_nodes[current_index] = new_node

def compute(self, type="shared memory", verbose=False, asTensorFlow=False, asPyTorch=False, shape=None, useIndexColumn=False):
def compute(self, type="shared memory", verbose=False, asTensorFlow=False, asPyTorch=False, shape=None, useIndexColumn=False) -> Union[np.array, pd.DataFrame, 'tf.Tensor', 'torch.Tensor', float]:
"""
Compute function for processing the Daphne Object or operation node and returning the results.
The function builds a DaphneDSL script from the node and its context, executes it, and processes the results
Expand Down Expand Up @@ -194,7 +194,16 @@ def compute(self, type="shared memory", verbose=False, asTensorFlow=False, asPyT
)
self.clear_tmp()
elif self._output_type == OutputType.MATRIX and type=="files":
arr = np.genfromtxt(result, delimiter=',')
# Ensure string data is handled correctly
arr = np.genfromtxt(result, delimiter=',', dtype=None, encoding='utf-8')
meta_file_name = result + ".meta"
if os.path.exists(meta_file_name):
with open(meta_file_name, "r") as meta_file:
meta_data = json.load(meta_file)
if meta_data.get("valueType") == "str":
arr = arr.astype(str)
else:
print(f"metadata file not found: {meta_file_name}")
self.clear_tmp()
return arr
elif self._output_type == OutputType.SCALAR:
Expand Down
9 changes: 6 additions & 3 deletions src/api/python/daphne/script_building/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,12 @@ def clear(self, dag_root:DAGNode):

def execute(self):
temp_out_path = os.path.join(TMP_PATH, "tmpdaphne.daphne")
temp_out_file = open(temp_out_path, "w")
temp_out_file.writelines(self.daphnedsl_script)
temp_out_file.close()
with open(temp_out_path, "w") as temp_out_file:
temp_out_file.writelines(self.daphnedsl_script)

# Check if the file exists
if not os.path.exists(temp_out_path):
raise RuntimeError(f"file '{temp_out_path}' does not exist")

#os.environ['OPENBLAS_NUM_THREADS'] = '1'
res = DaphneLib.daphne(ctypes.c_char_p(str.encode(PROTOTYPE_PATH)), ctypes.c_char_p(str.encode(temp_out_path)))
Expand Down
3 changes: 2 additions & 1 deletion src/api/python/daphne/utils/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from daphne.operator.nodes.frame import Frame
from daphne.operator.nodes.scalar import Scalar

VALID_INPUT_TYPES = Union['DAGNode', str, int, float, bool]
VALID_INPUT_TYPES = Union['DAGNode', str, int, float, bool, object]
# These are the operator symbols used in DaphneDSL (not in Python).
BINARY_OPERATIONS = ['+', '-', '/', '*', '^', '%', '<', '<=', '>', '>=', '==', '!=', '@', '&&', '||']
VALID_ARITHMETIC_TYPES = Union['DAGNode', int, float]
Expand All @@ -55,3 +55,4 @@
UI64 = 5
F32 = 6
F64 = 7
STR = 8
Loading