Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Hard deletion for Clusters and NodePools: resources and their adapter statuses are permanently removed from the database once all required adapters report `Finalized=True` and no child resources remain ([#119](https://github.com/openshift-hyperfleet/hyperfleet-api/pull/119))
- `Finalized` condition aggregation with `WaitingForChildResources` intermediate state when all adapters are finalized but child node pools still exist ([#119](https://github.com/openshift-hyperfleet/hyperfleet-api/pull/119))
- Soft deletion for Clusters and NodePools with `deleted_time` and `deleted_by` fields for tracking deletion requests ([#106](https://github.com/openshift-hyperfleet/hyperfleet-api/pull/106))
- Aggregation logic for resource data ([#91](https://github.com/openshift-hyperfleet/hyperfleet-api/pull/91))
- Version subcommand to CLI ([#84](https://github.com/openshift-hyperfleet/hyperfleet-api/pull/84))
Expand All @@ -26,6 +28,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- Refactored `AdapterStatusDao.Upsert()` to accept a pre-fetched existing record, moving lookup and `LastTransitionTime` preservation logic to the service layer ([#119](https://github.com/openshift-hyperfleet/hyperfleet-api/pull/119))
- Refactored DAO methods to remove Unscoped calls for fetching Clusters and NodePools ([#106](https://github.com/openshift-hyperfleet/hyperfleet-api/pull/106))
- Bumped oapi-codegen version to fix missing `omitempty` on generated response objects ([#106](https://github.com/openshift-hyperfleet/hyperfleet-api/pull/106))
- Updated OpenAPI spec with examples for Cluster and NodePool schemas ([#106](https://github.com/openshift-hyperfleet/hyperfleet-api/pull/106))
Expand Down
14 changes: 14 additions & 0 deletions pkg/api/adapter_status_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"encoding/json"
"fmt"
"time"

Expand Down Expand Up @@ -52,3 +53,16 @@ func (as *AdapterStatus) BeforeCreate(tx *gorm.DB) error {
as.ID = id
return nil
}

func (as *AdapterStatus) IsFinalized() bool {
var conditions []AdapterCondition
if err := json.Unmarshal(as.Conditions, &conditions); err != nil {
return false
}
for _, cond := range conditions {
if cond.Type == ConditionTypeFinalized && cond.Status == AdapterConditionTrue {
return true
}
}
return false
}
94 changes: 15 additions & 79 deletions pkg/dao/adapter_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,20 @@ package dao

import (
"context"
"encoding/json"
"errors"
"time"

"gorm.io/datatypes"
"gorm.io/gorm"
"gorm.io/gorm/clause"

"github.com/openshift-hyperfleet/hyperfleet-api/pkg/api"
"github.com/openshift-hyperfleet/hyperfleet-api/pkg/api/openapi"
"github.com/openshift-hyperfleet/hyperfleet-api/pkg/db"
)

type AdapterStatusDao interface {
Get(ctx context.Context, id string) (*api.AdapterStatus, error)
Create(ctx context.Context, adapterStatus *api.AdapterStatus) (*api.AdapterStatus, error)
Replace(ctx context.Context, adapterStatus *api.AdapterStatus) (*api.AdapterStatus, error)
Upsert(ctx context.Context, adapterStatus *api.AdapterStatus) (*api.AdapterStatus, error)
Upsert(ctx context.Context, adapterStatus *api.AdapterStatus, existing *api.AdapterStatus) (*api.AdapterStatus, error)
Delete(ctx context.Context, id string) error
DeleteByResource(ctx context.Context, resourceType, resourceID string) error
FindByResource(ctx context.Context, resourceType, resourceID string) (api.AdapterStatusList, error)
FindByResourceIDs(ctx context.Context, resourceType string, resourceIDs []string) (api.AdapterStatusList, error)
FindByResourcePaginated(
Expand Down Expand Up @@ -73,30 +68,12 @@ func (d *sqlAdapterStatusDao) Replace(
return adapterStatus, nil
}

// Upsert creates or updates an adapter status based on resource_type, resource_id, and adapter
// This implements the upsert semantic required by the new API spec
func (d *sqlAdapterStatusDao) Upsert(
ctx context.Context, adapterStatus *api.AdapterStatus,
ctx context.Context, adapterStatus *api.AdapterStatus, existing *api.AdapterStatus,
) (*api.AdapterStatus, error) {
g2 := (*d.sessionFactory).New(ctx)

// Keep deterministic observed time from the incoming report when provided (observed_time).
if adapterStatus.LastReportTime.IsZero() {
adapterStatus.LastReportTime = time.Now()
}

existing, err := d.FindByResourceAndAdapter(
ctx, adapterStatus.ResourceType, adapterStatus.ResourceID, adapterStatus.Adapter,
)
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
db.MarkForRollback(ctx, err)
return nil, err
}

if err == nil && existing != nil {
// Preserve LastTransitionTime for conditions whose status hasn't changed.
adapterStatus.Conditions = preserveLastTransitionTime(existing.Conditions, adapterStatus.Conditions)

if existing != nil {
updateResult := g2.Model(&api.AdapterStatus{}).
Where("resource_type = ? AND resource_id = ? AND adapter = ?",
adapterStatus.ResourceType, adapterStatus.ResourceID, adapterStatus.Adapter).
Expand All @@ -118,9 +95,8 @@ func (d *sqlAdapterStatusDao) Upsert(
return nil, updateResult.Error
}

// No-op when the stored row is fresher or equal.
if updateResult.RowsAffected == 0 {
return d.FindByResourceAndAdapter(ctx, adapterStatus.ResourceType, adapterStatus.ResourceID, adapterStatus.Adapter)
return existing, nil
}
Comment thread
mliptak0 marked this conversation as resolved.

return d.FindByResourceAndAdapter(ctx, adapterStatus.ResourceType, adapterStatus.ResourceID, adapterStatus.Adapter)
Expand All @@ -135,7 +111,6 @@ func (d *sqlAdapterStatusDao) Upsert(
return adapterStatus, nil
}

// A row was inserted concurrently; return the latest stored row without overwriting it.
return d.FindByResourceAndAdapter(ctx, adapterStatus.ResourceType, adapterStatus.ResourceID, adapterStatus.Adapter)
}

Expand All @@ -150,6 +125,16 @@ func (d *sqlAdapterStatusDao) Delete(ctx context.Context, id string) error {
return nil
}

func (d *sqlAdapterStatusDao) DeleteByResource(ctx context.Context, resourceType, resourceID string) error {
g2 := (*d.sessionFactory).New(ctx)
if err := g2.Where("resource_type = ? AND resource_id = ?", resourceType, resourceID).
Delete(&api.AdapterStatus{}).Error; err != nil {
db.MarkForRollback(ctx, err)
return err
}
return nil
}

func (d *sqlAdapterStatusDao) FindByResource(
ctx context.Context, resourceType, resourceID string,
) (api.AdapterStatusList, error) {
Expand Down Expand Up @@ -220,52 +205,3 @@ func (d *sqlAdapterStatusDao) All(ctx context.Context) (api.AdapterStatusList, e
}
return statuses, nil
}

// preserveLastTransitionTime preserves LastTransitionTime for conditions whose status hasn't changed
// This implements the Kubernetes condition semantic where LastTransitionTime is only updated when status changes
func preserveLastTransitionTime(oldConditionsJSON, newConditionsJSON datatypes.JSON) datatypes.JSON {
// Unmarshal old conditions
var oldConditions []openapi.AdapterCondition
if len(oldConditionsJSON) > 0 {
if err := json.Unmarshal(oldConditionsJSON, &oldConditions); err != nil {
// If we can't unmarshal old conditions, return new conditions as-is
return newConditionsJSON
}
}

// Unmarshal new conditions
var newConditions []openapi.AdapterCondition
if len(newConditionsJSON) > 0 {
if err := json.Unmarshal(newConditionsJSON, &newConditions); err != nil {
// If we can't unmarshal new conditions, return new conditions as-is
return newConditionsJSON
}
}

// Build a map of old conditions by type for quick lookup
oldConditionsMap := make(map[string]openapi.AdapterCondition)
for _, oldCond := range oldConditions {
oldConditionsMap[oldCond.Type] = oldCond
}

// Update new conditions: preserve LastTransitionTime if status hasn't changed
for i := range newConditions {
if oldCond, exists := oldConditionsMap[newConditions[i].Type]; exists {
// If status hasn't changed, preserve the old LastTransitionTime
if oldCond.Status == newConditions[i].Status {
newConditions[i].LastTransitionTime = oldCond.LastTransitionTime
}
// If status changed, keep the new LastTransitionTime (already set to now)
}
// If this is a new condition type, keep the new LastTransitionTime
}

// Marshal back to JSON
updatedJSON, err := json.Marshal(newConditions)
if err != nil {
// If we can't marshal, return new conditions as-is
return newConditionsJSON
}

return updatedJSON
}
21 changes: 21 additions & 0 deletions pkg/dao/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import (

type ClusterDao interface {
Get(ctx context.Context, id string) (*api.Cluster, error)
GetForUpdate(ctx context.Context, id string) (*api.Cluster, error)
Create(ctx context.Context, cluster *api.Cluster) (*api.Cluster, error)
Replace(ctx context.Context, cluster *api.Cluster) (*api.Cluster, error)
Save(ctx context.Context, cluster *api.Cluster) error
SaveStatusConditions(ctx context.Context, id string, statusConditions []byte) error
Delete(ctx context.Context, id string) error
FindByIDs(ctx context.Context, ids []string) (api.ClusterList, error)
All(ctx context.Context) (api.ClusterList, error)
Expand All @@ -39,6 +41,15 @@ func (d *sqlClusterDao) Get(ctx context.Context, id string) (*api.Cluster, error
return &cluster, nil
}

func (d *sqlClusterDao) GetForUpdate(ctx context.Context, id string) (*api.Cluster, error) {
g2 := (*d.sessionFactory).New(ctx)
var cluster api.Cluster
if err := g2.Clauses(clause.Locking{Strength: "UPDATE"}).Take(&cluster, "id = ?", id).Error; err != nil {
return nil, err
}
return &cluster, nil
}

func (d *sqlClusterDao) Create(ctx context.Context, cluster *api.Cluster) (*api.Cluster, error) {
g2 := (*d.sessionFactory).New(ctx)
if err := g2.Omit(clause.Associations).Create(cluster).Error; err != nil {
Expand Down Expand Up @@ -83,6 +94,16 @@ func (d *sqlClusterDao) Save(ctx context.Context, cluster *api.Cluster) error {
return nil
}

func (d *sqlClusterDao) SaveStatusConditions(ctx context.Context, id string, statusConditions []byte) error {
g2 := (*d.sessionFactory).New(ctx)
result := g2.Model(&api.Cluster{}).Where("id = ?", id).Update("status_conditions", statusConditions)
if result.Error != nil {
db.MarkForRollback(ctx, result.Error)
return result.Error
}
return nil
}
Comment thread
mliptak0 marked this conversation as resolved.

func (d *sqlClusterDao) Delete(ctx context.Context, id string) error {
g2 := (*d.sessionFactory).New(ctx)
if err := g2.Omit(clause.Associations).Delete(&api.Cluster{Meta: api.Meta{ID: id}}).Error; err != nil {
Expand Down
14 changes: 14 additions & 0 deletions pkg/dao/mocks/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ func (d *clusterDaoMock) Get(ctx context.Context, id string) (*api.Cluster, erro
return nil, gorm.ErrRecordNotFound
}

func (d *clusterDaoMock) GetForUpdate(ctx context.Context, id string) (*api.Cluster, error) {
return d.Get(ctx, id)
}

func (d *clusterDaoMock) Create(ctx context.Context, cluster *api.Cluster) (*api.Cluster, error) {
d.clusters = append(d.clusters, cluster)
return cluster, nil
Expand All @@ -39,6 +43,16 @@ func (d *clusterDaoMock) Save(ctx context.Context, cluster *api.Cluster) error {
return nil
}

func (d *clusterDaoMock) SaveStatusConditions(ctx context.Context, id string, statusConditions []byte) error {
for _, c := range d.clusters {
if c.ID == id {
c.StatusConditions = statusConditions
return nil
}
}
return gorm.ErrRecordNotFound
}

func (d *clusterDaoMock) Delete(ctx context.Context, id string) error {
return errors.NotImplemented("Cluster").AsError()
}
Expand Down
27 changes: 27 additions & 0 deletions pkg/dao/mocks/node_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@ func (d *nodePoolDaoMock) Get(ctx context.Context, id string) (*api.NodePool, er
return nil, gorm.ErrRecordNotFound
}

func (d *nodePoolDaoMock) GetForUpdate(ctx context.Context, id string) (*api.NodePool, error) {
return d.Get(ctx, id)
}

func (d *nodePoolDaoMock) SaveStatusConditions(ctx context.Context, id string, statusConditions []byte) error {
for _, np := range d.nodePools {
if np.ID == id {
np.StatusConditions = statusConditions
return nil
}
}
return gorm.ErrRecordNotFound
}

func (d *nodePoolDaoMock) Create(ctx context.Context, nodePool *api.NodePool) (*api.NodePool, error) {
d.nodePools = append(d.nodePools, nodePool)
return nodePool, nil
Expand Down Expand Up @@ -56,10 +70,23 @@ func (d *nodePoolDaoMock) FindByIDs(ctx context.Context, ids []string) (api.Node
return nil, errors.NotImplemented("NodePool").AsError()
}

func (d *nodePoolDaoMock) FindByOwner(ctx context.Context, ownerID string) (api.NodePoolList, error) {
return nil, errors.NotImplemented("NodePool").AsError()
}

func (d *nodePoolDaoMock) UpdateStatusConditionsByIDs(ctx context.Context, updates map[string][]byte) error {
return errors.NotImplemented("NodePool").AsError()
}

func (d *nodePoolDaoMock) ExistsByOwner(ctx context.Context, ownerID string) (bool, error) {
for _, np := range d.nodePools {
if np.OwnerID == ownerID {
return true, nil
}
}
return false, nil
}

func (d *nodePoolDaoMock) All(ctx context.Context) (api.NodePoolList, error) {
return d.nodePools, nil
}
41 changes: 41 additions & 0 deletions pkg/dao/node_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@ import (

type NodePoolDao interface {
Get(ctx context.Context, id string) (*api.NodePool, error)
GetForUpdate(ctx context.Context, id string) (*api.NodePool, error)
Create(ctx context.Context, nodePool *api.NodePool) (*api.NodePool, error)
Replace(ctx context.Context, nodePool *api.NodePool) (*api.NodePool, error)
Save(ctx context.Context, nodePool *api.NodePool) error
SaveStatusConditions(ctx context.Context, id string, statusConditions []byte) error
Delete(ctx context.Context, id string) error
FindByIDs(ctx context.Context, ids []string) (api.NodePoolList, error)
FindByOwner(ctx context.Context, ownerID string) (api.NodePoolList, error)
FindSoftDeletedByOwner(ctx context.Context, ownerID string) (api.NodePoolList, error)
SoftDeleteByOwner(ctx context.Context, ownerID string, t time.Time, deletedBy string) error
UpdateStatusConditionsByIDs(ctx context.Context, updates map[string][]byte) error
ExistsByOwner(ctx context.Context, ownerID string) (bool, error)
All(ctx context.Context) (api.NodePoolList, error)
}

Expand All @@ -44,6 +48,25 @@ func (d *sqlNodePoolDao) Get(ctx context.Context, id string) (*api.NodePool, err
return &nodePool, nil
}

func (d *sqlNodePoolDao) GetForUpdate(ctx context.Context, id string) (*api.NodePool, error) {
g2 := (*d.sessionFactory).New(ctx)
var nodePool api.NodePool
if err := g2.Clauses(clause.Locking{Strength: "UPDATE"}).Take(&nodePool, "id = ?", id).Error; err != nil {
return nil, err
}
return &nodePool, nil
}

func (d *sqlNodePoolDao) SaveStatusConditions(ctx context.Context, id string, statusConditions []byte) error {
g2 := (*d.sessionFactory).New(ctx)
result := g2.Model(&api.NodePool{}).Where("id = ?", id).Update("status_conditions", statusConditions)
if result.Error != nil {
db.MarkForRollback(ctx, result.Error)
return result.Error
}
return nil
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

func (d *sqlNodePoolDao) Create(ctx context.Context, nodePool *api.NodePool) (*api.NodePool, error) {
g2 := (*d.sessionFactory).New(ctx)
if err := g2.Omit(clause.Associations).Create(nodePool).Error; err != nil {
Expand Down Expand Up @@ -131,6 +154,15 @@ func (d *sqlNodePoolDao) FindByIDs(ctx context.Context, ids []string) (api.NodeP
return nodePools, nil
}

func (d *sqlNodePoolDao) FindByOwner(ctx context.Context, ownerID string) (api.NodePoolList, error) {
g2 := (*d.sessionFactory).New(ctx)
var nodePools api.NodePoolList
if err := g2.Where("owner_id = ?", ownerID).Find(&nodePools).Error; err != nil {
return nil, err
}
return nodePools, nil
}

func (d *sqlNodePoolDao) UpdateStatusConditionsByIDs(ctx context.Context, updates map[string][]byte) error {
g2 := (*d.sessionFactory).New(ctx)
if len(updates) == 0 {
Expand All @@ -149,6 +181,15 @@ func (d *sqlNodePoolDao) UpdateStatusConditionsByIDs(ctx context.Context, update
return nil
}

func (d *sqlNodePoolDao) ExistsByOwner(ctx context.Context, ownerID string) (bool, error) {
g2 := (*d.sessionFactory).New(ctx)
var count int64
if err := g2.Model(&api.NodePool{}).Where("owner_id = ?", ownerID).Limit(1).Count(&count).Error; err != nil {
return false, err
}
return count > 0, nil
}

func (d *sqlNodePoolDao) All(ctx context.Context) (api.NodePoolList, error) {
g2 := (*d.sessionFactory).New(ctx)
nodePools := api.NodePoolList{}
Expand Down
Loading