Skip to content

Commit eca42d6

Browse files
committed
wip
1 parent 7899906 commit eca42d6

7 files changed

Lines changed: 24 additions & 38 deletions

File tree

datacatalog/pkg/manager/impl/artifact_manager.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,11 @@ func (m *artifactManager) findArtifact(ctx context.Context, datasetID *datacatal
223223
artifactModel = tag.Artifact
224224
}
225225

226+
// If the artifact is expired consider this tag expired too
227+
if artifactModel.ExpiresAt != nil && artifactModel.ExpiresAt.Before(m.clock.Now()) {
228+
return models.Artifact{}, errors.NewDataCatalogErrorf(codes.NotFound, "entry not found")
229+
}
230+
226231
if len(artifactModel.ArtifactData) == 0 {
227232
return models.Artifact{}, errors.NewDataCatalogErrorf(codes.Internal, "artifact [%+v] with key %v does not have artifact data associated", artifactModel, key)
228233
}

datacatalog/pkg/manager/impl/artifact_manager_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,9 @@ func getExpectedArtifactModel(ctx context.Context, t *testing.T, datastore *stor
201201
Tags: []models.Tag{
202202
{TagKey: models.TagKey{TagName: "test-tag"}, DatasetUUID: expectedDataset.GetUUID(), ArtifactID: artifact.GetId()},
203203
},
204-
CreatedAt: getTestTimestamp(),
204+
BaseModel: models.BaseModel{
205+
CreatedAt: getTestTimestamp(),
206+
},
205207
}
206208
}
207209

datacatalog/pkg/repositories/gormimpl/artifact.go

Lines changed: 11 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,11 @@ package gormimpl
33
import (
44
"context"
55

6-
"google.golang.org/grpc/codes"
76
"gorm.io/gorm"
87
"gorm.io/gorm/clause"
98
"k8s.io/utils/clock"
109

1110
"github.com/flyteorg/flyte/datacatalog/pkg/common"
12-
catalogErrors "github.com/flyteorg/flyte/datacatalog/pkg/errors"
1311
"github.com/flyteorg/flyte/datacatalog/pkg/repositories/errors"
1412
"github.com/flyteorg/flyte/datacatalog/pkg/repositories/interfaces"
1513
"github.com/flyteorg/flyte/datacatalog/pkg/repositories/models"
@@ -43,38 +41,21 @@ func (h *artifactRepo) Create(ctx context.Context, artifact models.Artifact) err
4341
timer := h.repoMetrics.CreateDuration.Start(ctx)
4442
defer timer.Stop()
4543

46-
err := h.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
47-
var existing models.Artifact
48-
getResult := tx.
49-
Clauses(clause.Locking{Strength: "UPDATE"}).
50-
Order("artifacts.created_at DESC"). // Always pick the most recent
51-
First(
52-
&existing,
53-
&models.Artifact{ArtifactKey: artifact.ArtifactKey},
54-
)
55-
56-
if getResult.Error == nil {
57-
if existing.ExpiresAt != nil && h.clock.Now().Before(*existing.ExpiresAt) {
58-
// If the previous artifact is not expired return already exists
59-
return catalogErrors.NewDataCatalogErrorf(codes.AlreadyExists, "value already exists")
60-
}
61-
} else {
62-
if getResult.Error.Error() != gorm.ErrRecordNotFound.Error() {
63-
return h.errorTransformer.ToDataCatalogError(getResult.Error)
64-
}
65-
// Not found is desirable
66-
}
44+
tx := h.db.WithContext(ctx).Begin()
6745

68-
createResult := tx.Create(&artifact)
46+
tx = tx.Create(&artifact)
6947

70-
if createResult.Error != nil {
71-
return h.errorTransformer.ToDataCatalogError(createResult.Error)
72-
}
48+
if tx.Error != nil {
49+
tx.Rollback()
50+
return h.errorTransformer.ToDataCatalogError(tx.Error)
51+
}
7352

74-
return nil
75-
})
53+
tx = tx.Commit()
54+
if tx.Error != nil {
55+
return h.errorTransformer.ToDataCatalogError(tx.Error)
56+
}
7657

77-
return err
58+
return nil
7859
}
7960

8061
func (h *artifactRepo) GetAndFilterExpired(ctx context.Context, in models.ArtifactKey) (models.Artifact, error) {

datacatalog/pkg/repositories/gormimpl/tag.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package gormimpl
22

33
import (
44
"context"
5-
65
"gorm.io/gorm"
76

87
"github.com/flyteorg/flyte/datacatalog/pkg/repositories/errors"

datacatalog/pkg/repositories/models/artifact.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,7 @@ type ArtifactKey struct {
1111
}
1212

1313
type Artifact struct {
14-
CreatedAt time.Time `gorm:"primary_key"`
15-
UpdatedAt time.Time
16-
DeletedAt *time.Time `sql:"index"`
17-
14+
BaseModel
1815
ArtifactKey
1916
DatasetUUID string `gorm:"type:uuid;index:artifacts_dataset_uuid_idx"`
2017
Dataset Dataset `gorm:"association_autocreate:false"`

datacatalog/pkg/repositories/models/tag.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ type TagKey struct {
1111
type Tag struct {
1212
BaseModel
1313
TagKey
14-
ArtifactID string
14+
ArtifactID string `gorm:"primary_key"`
1515
DatasetUUID string `gorm:"type:uuid;index:tags_dataset_uuid_idx"`
1616
Artifact Artifact `gorm:"references:DatasetProject,DatasetName,DatasetDomain,DatasetVersion,ArtifactID;foreignkey:DatasetProject,DatasetName,DatasetDomain,DatasetVersion,ArtifactID"`
1717
}

datacatalog/pkg/repositories/transformers/artifact_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,9 @@ func TestFromArtifactModel(t *testing.T) {
123123
SerializedMetadata: []byte{},
124124
Partitions: getTestPartitions(),
125125
Tags: getTestTags(),
126-
CreatedAt: createdAt,
126+
BaseModel: models.BaseModel{
127+
CreatedAt: createdAt,
128+
},
127129
}
128130

129131
actual, err := FromArtifactModel(artifactModel)

0 commit comments

Comments
 (0)