Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions python/hopsworks/core/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# This file is generated by hopsworks-apigen.
# Do not edit it manually!
# It is managed by hopsworks-apigen and is subject to periodical deletion.
import hopsworks_common.core.sink_job_configuration


FeatureColumnMapping = hopsworks_common.core.sink_job_configuration.FeatureColumnMapping
FullLoadConfig = hopsworks_common.core.sink_job_configuration.FullLoadConfig
LoadingConfig = hopsworks_common.core.sink_job_configuration.LoadingConfig
SinkJobConfiguration = hopsworks_common.core.sink_job_configuration.SinkJobConfiguration
17 changes: 17 additions & 0 deletions python/hopsworks/mcp/tools/feature_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ def __init__(self, mcp: FastMCP):
self.mcp.tool(tags=[TAGS.FEATURE_GROUP, TAGS.READ, TAGS.STATEFUL])(
self.get_features
)
self.mcp.tool(tags=[TAGS.FEATURE_GROUP, TAGS.READ, TAGS.STATEFUL])(
self.get_feature_group_row_count
)

def _get_feature_group_versions(self, name: str | None = None):
# Get the current project and its feature groups
Expand Down Expand Up @@ -180,6 +183,20 @@ async def preview_feature_group( # docsig: disable
f"Unable to convert preview to dictionary. Here's the raw preview:\n{preview}"
)

async def get_feature_group_row_count( # docsig: disable
self,
ctx: Context,
name: str,
version: int | None = None,
) -> int:
"""Get the row count of a feature group with the specified name and version (latest by default)."""
await ctx.info(
f"Retrieving row count of {name}{f' v{version}' if version else ''} feature group..."
)

fg = self._get_feature_group_version(name, version)
return fg.get_row_count()

async def get_features( # docsig: disable
self,
ctx: Context,
Expand Down
27 changes: 27 additions & 0 deletions python/hsfs/core/feature_group_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,33 @@ def get_generated_feature_groups(
explicit_provenance.Links.Type.FEATURE_GROUP,
)

def get_row_count(
self,
feature_group_instance: fg_mod.FeatureGroup
| fg_mod.ExternalFeatureGroup
| fg_mod.SpineGroup,
) -> int:
"""Get the row count of a feature group.

Parameters:
feature_group_instance:
metadata object of feature group.

Returns:
The number of rows in the feature group.
"""
_client = client.get_instance()
path_params = [
"project",
_client._project_id,
"featurestores",
feature_group_instance.feature_store_id,
"featuregroups",
feature_group_instance.id,
"rowcount",
]
return _client._send_request("GET", path_params)

def _check_features(self, feature_group_instance) -> None:
if not feature_group_instance._features:
warnings.warn(
Expand Down
11 changes: 11 additions & 0 deletions python/hsfs/core/feature_group_base_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ def __init__(self, feature_store_id):
def delete(self, feature_group):
self._feature_group_api.delete(feature_group)

def get_row_count(self, feature_group: FeatureGroup) -> int:
"""Get the row count of a feature group.

Parameters:
feature_group: The feature group to get the row count of.

Returns:
The row count of the feature group.
"""
return self._feature_group_api.get_row_count(feature_group)

def add_tag(self, feature_group: FeatureGroup, name: str, value: Any):
"""Attach a name/value tag to a feature group.

Expand Down
1 change: 1 addition & 0 deletions python/hsfs/core/feature_group_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ def commit_details(self, feature_group, wallclock_time, limit):
"rowsUpdated": feature_group_commit.rows_updated,
"rowsInserted": feature_group_commit.rows_inserted,
"rowsDeleted": feature_group_commit.rows_deleted,
"rowCount": feature_group_commit.row_count,
}
return commit_details

Expand Down
24 changes: 23 additions & 1 deletion python/hsfs/feature_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,28 @@ def get_generated_feature_groups(self) -> explicit_provenance.Links | None:
"""
return self._feature_group_engine.get_generated_feature_groups(self)

def get_row_count(self) -> int:
"""Get the number of rows in this feature group.

Example:
```python
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

row_count = fg.get_row_count()
```

Returns:
The number of rows in the feature group.

Raises:
hopsworks.client.exceptions.RestAPIError: If the backend encounters an error when handling the request.
"""
return self._feature_group_engine.get_row_count(self)

@public
def get_feature(self, name: str) -> feature.Feature | None:
"""Retrieve a `Feature` object from the schema of the feature group.
Expand Down Expand Up @@ -4005,7 +4027,7 @@ def commit_details(
limit: Number of commits to retrieve.

Returns:
Dictionary object of commit metadata timeline, where Key is commit id and value is `dict[str, str]` with key value pairs of date committed on, number of rows updated, inserted and deleted.
Dictionary object of commit metadata timeline, where Key is commit id and value is `dict[str, str]` with key value pairs of date committed on, number of rows updated, inserted, deleted, and the cumulative row count at that commit.

Raises:
hopsworks.client.exceptions.RestAPIError: If the backend encounters an error when handling the request.
Expand Down
11 changes: 11 additions & 0 deletions python/hsfs/feature_group_commit.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def __init__(
archived=None,
last_active_commit_time=None,
table_size=None,
row_count=None,
items=None,
count=None,
href=None,
Expand All @@ -51,6 +52,7 @@ def __init__(
self._archived = archived
self._last_active_commit_time = last_active_commit_time
self._table_size = table_size
self._row_count = row_count

@classmethod
def from_response_json(cls, json_dict):
Expand Down Expand Up @@ -78,6 +80,7 @@ def to_dict(self):
"archived": self._archived,
"lastActiveCommitTime": self._last_active_commit_time,
"tableSize": self._table_size,
"rowCount": self._row_count,
}

@property
Expand Down Expand Up @@ -120,6 +123,10 @@ def last_active_commit_time(self):
def table_size(self):
return self._table_size

@property
def row_count(self):
return self._row_count

@commitid.setter
def commitid(self, commitid):
self._commitid = commitid
Expand Down Expand Up @@ -151,3 +158,7 @@ def last_active_commit_time(self, last_active_commit_time):
@table_size.setter
def table_size(self, table_size):
self._table_size = table_size

@row_count.setter
def row_count(self, row_count):
self._row_count = row_count
24 changes: 24 additions & 0 deletions python/tests/core/test_feature_group_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,30 @@ def test_check_features(self, mocker, backend_fixtures):
# Assert
assert len(warning_record) == 0

def test_get_row_count(self, mocker, backend_fixtures):
# Arrange
fg_api = feature_group_api.FeatureGroupApi()

client_mock = Mock()
client_mock.configure_mock(
**{"_send_request.return_value": 42, "_project_id": 1}
)
mocker.patch(
"hopsworks_common.client.get_instance",
return_value=client_mock,
)
mocker.patch("hsfs.engine.get_instance")

json = backend_fixtures["feature_group"]["get"]["response"]
fg = fg_mod.FeatureGroup.from_response_json(json)

# Act
result = fg_api.get_row_count(fg)

# Assert
assert result == 42
assert client_mock._send_request.call_count == 1

def test_check_features_no_features(self, mocker, backend_fixtures):
# Arrange
fg_api = feature_group_api.FeatureGroupApi()
Expand Down
16 changes: 16 additions & 0 deletions python/tests/core/test_feature_group_base_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,22 @@ def test_get_tags(self, mocker):
# Assert
assert mock_tags_api.return_value.get.call_count == 1

def test_get_row_count(self, mocker):
# Arrange
feature_store_id = 99

mock_fg_api = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi")

fg_base_engine = feature_group_base_engine.FeatureGroupBaseEngine(
feature_store_id=feature_store_id
)

# Act
fg_base_engine.get_row_count(feature_group=None)

# Assert
assert mock_fg_api.return_value.get_row_count.call_count == 1

def test_update_statistics_config(self, mocker):
# Arrange
feature_store_id = 99
Expand Down
3 changes: 2 additions & 1 deletion python/tests/core/test_feature_group_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ def test_commit_details_time_travel_format_hudi_fg_commit(self, mocker):
)

fg_commit = feature_group_commit.FeatureGroupCommit(
commitid=1, rows_inserted=2, rows_updated=3, rows_deleted=4
commitid=1, rows_inserted=2, rows_updated=3, rows_deleted=4, row_count=10
)
mock_fg_api.return_value.get_commit_details.return_value = [fg_commit]
mock_util_get_hudi_datestr_from_timestamp.return_value = "123"
Expand All @@ -762,6 +762,7 @@ def test_commit_details_time_travel_format_hudi_fg_commit(self, mocker):
"rowsUpdated": 3,
"rowsInserted": 2,
"rowsDeleted": 4,
"rowCount": 10,
}
}

Expand Down
1 change: 1 addition & 0 deletions python/tests/fixtures/feature_group_commit_fixtures.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"rows_deleted": 3,
"validation_id": 77,
"commit_time": "test_commit_time",
"row_count": 100,
"items": [],
"count": 0,
"href": "test_href"
Expand Down
12 changes: 12 additions & 0 deletions python/tests/test_feature_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,18 @@ def test_select_features(self):
assert len(features) == 2
assert {f.name for f in features} == {"f1", "f2"}

def test_get_row_count(self, mocker):
mock_engine = mocker.patch(
"hsfs.core.feature_group_base_engine.FeatureGroupBaseEngine.get_row_count",
return_value=123,
)

fg = get_test_feature_group()
result = fg.get_row_count()

assert result == 123
mock_engine.assert_called_once_with(fg)

def test_materialization_job(self, mocker):
mock_job = mocker.Mock()
mock_job_api = mocker.patch(
Expand Down
2 changes: 2 additions & 0 deletions python/tests/test_feature_group_commit.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def test_from_response_json(self, backend_fixtures):
assert fg_commit.rows_updated == 2
assert fg_commit.rows_deleted == 3
assert fg_commit.validation_id == 77
assert fg_commit.row_count == 100

def test_from_response_json_list(self, backend_fixtures):
# Arrange
Expand All @@ -54,6 +55,7 @@ def test_from_response_json_list(self, backend_fixtures):
assert fg_commit.rows_updated == 2
assert fg_commit.rows_deleted == 3
assert fg_commit.validation_id == 77
assert fg_commit.row_count == 100

def test_from_response_json_basic_info(self, backend_fixtures):
# Arrange
Expand Down
Loading