Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions flytestdlib/cache/in_memory_auto_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ type InMemoryAutoRefresh struct {
processing *sync.Map
toDelete *syncSet
syncPeriod time.Duration
workqueue workqueue.RateLimitingInterface
workqueue workqueue.TypedRateLimitingInterface[*Batch]
parallelizm uint
lock sync.RWMutex
clock clock.Clock // pluggable clock for unit testing
Expand All @@ -120,7 +120,7 @@ type InMemoryAutoRefresh struct {
func NewInMemoryAutoRefresh(
name string,
syncCb SyncFunc,
syncRateLimiter workqueue.RateLimiter,
syncRateLimiter workqueue.TypedRateLimiter[*Batch],
resyncPeriod time.Duration,
parallelizm uint,
size uint,
Expand Down Expand Up @@ -149,7 +149,7 @@ func NewInMemoryAutoRefresh(
processing: &sync.Map{},
toDelete: newSyncSet(),
syncPeriod: resyncPeriod,
workqueue: workqueue.NewRateLimitingQueueWithConfig(syncRateLimiter, workqueue.RateLimitingQueueConfig{
workqueue: workqueue.NewTypedRateLimitingQueueWithConfig(syncRateLimiter, workqueue.TypedRateLimitingQueueConfig[*Batch]{
Name: scope.CurrentScope(),
Clock: opts.clock,
}),
Expand Down Expand Up @@ -341,8 +341,8 @@ func (w *InMemoryAutoRefresh) sync(ctx context.Context) (err error) {
w.workqueue.Forget(batch)
w.workqueue.Done(batch)

newBatch := make(Batch, 0, len(*batch.(*Batch)))
for _, b := range *batch.(*Batch) {
newBatch := make(Batch, 0, len(*batch))
for _, b := range *batch {
itemID := b.GetID()
w.processing.Delete(itemID)
item, ok := w.lruMap.Get(itemID)
Expand Down Expand Up @@ -404,13 +404,13 @@ func (w *InMemoryAutoRefresh) inProcessing(key interface{}) bool {
}

// Instantiates a new AutoRefresh Cache that syncs items in batches.
func NewAutoRefreshBatchedCache(name string, createBatches CreateBatchesFunc, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter,
func NewAutoRefreshBatchedCache(name string, createBatches CreateBatchesFunc, syncCb SyncFunc, syncRateLimiter workqueue.TypedRateLimiter[*Batch],
resyncPeriod time.Duration, parallelizm, size uint, scope promutils.Scope) (AutoRefresh, error) {
return NewInMemoryAutoRefresh(name, syncCb, syncRateLimiter, resyncPeriod, parallelizm, size, scope, WithCreateBatchesFunc(createBatches))
}

// Instantiates a new AutoRefresh Cache that syncs items periodically.
func NewAutoRefreshCache(name string, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter, resyncPeriod time.Duration,
func NewAutoRefreshCache(name string, syncCb SyncFunc, syncRateLimiter workqueue.TypedRateLimiter[*Batch], resyncPeriod time.Duration,
parallelizm, size uint, scope promutils.Scope) (AutoRefresh, error) {
return NewAutoRefreshBatchedCache(name, SingleItemBatches, syncCb, syncRateLimiter, resyncPeriod, parallelizm, size, scope)
}
Expand Down
2 changes: 1 addition & 1 deletion flytestdlib/cli/pflags/api/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ func loadPackage(pkg string) (*types.Package, error) {
}

if len(loadedPkgs) == 0 {
return nil, fmt.Errorf("No packages loaded")
return nil, fmt.Errorf("no packages loaded")
}

targetPackage := loadedPkgs[0].Types
Expand Down
16 changes: 8 additions & 8 deletions flytestdlib/config/config_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ func NewConfigCommand(accessorProvider AccessorProvider) *cobra.Command {
Short: "Generate configuration documentation in rst format",
RunE: func(cmd *cobra.Command, args []string) error {
sections := GetRootSection().GetSections()
orderedSectionKeys := sets.NewString()
orderedSectionKeys := sets.New[string]()
for s := range sections {
orderedSectionKeys.Insert(s)
}
printToc(orderedSectionKeys)
visitedSection := map[string]bool{}
visitedType := map[reflect.Type]bool{}
for _, sectionKey := range orderedSectionKeys.List() {
for _, sectionKey := range orderedSectionKeys.UnsortedList() {
if canPrint(sections[sectionKey].GetConfig()) {
printDocs(sectionKey, false, sections[sectionKey], visitedSection, visitedType)
}
Expand Down Expand Up @@ -148,11 +148,11 @@ func printDocs(title string, isSubsection bool, section Section, visitedSection

if section != nil {
sections := section.GetSections()
orderedSectionKeys := sets.NewString()
orderedSectionKeys := sets.New[string]()
for s := range sections {
orderedSectionKeys.Insert(s)
}
for _, sectionKey := range orderedSectionKeys.List() {
for _, sectionKey := range orderedSectionKeys.UnsortedList() {
fieldName := sectionKey
fieldType := reflect.TypeOf(sections[sectionKey].GetConfig())
fieldTypeString := getFieldTypeString(fieldType)
Expand All @@ -162,19 +162,19 @@ func printDocs(title string, isSubsection bool, section Section, visitedSection
printSection(fieldName, fieldTypeString, fieldDefaultValue, "", isSubsection)
}
}
orderedSectionKeys := sets.NewString()
orderedSectionKeys := sets.New[string]()
for s := range subsections {
orderedSectionKeys.Insert(s)
}

for _, sectionKey := range orderedSectionKeys.List() {
for _, sectionKey := range orderedSectionKeys.UnsortedList() {
printDocs(sectionKey, true, NewSection(subsections[sectionKey], nil), visitedSection, visitedType)
}
}

// Print Table of contents
func printToc(orderedSectionKeys sets.String) {
for _, sectionKey := range orderedSectionKeys.List() {
func printToc(orderedSectionKeys sets.Set[string]) {
for _, sectionKey := range orderedSectionKeys.UnsortedList() {
fmt.Printf("- `%s <#section-%s>`_\n\n", sectionKey, sectionKey)
}
}
Expand Down
8 changes: 4 additions & 4 deletions flytestdlib/config/viper/viper.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type viperAccessor struct {
rootConfig config.Section
// Ensures we initialize the file Watcher once.
watcherInitializer *sync.Once
existingFlagKeys sets.String
existingFlagKeys sets.Set[string]
}

func (viperAccessor) ID() string {
Expand All @@ -53,7 +53,7 @@ func (viperAccessor) InitializeFlags(cmdFlags *flag.FlagSet) {
}

func (v *viperAccessor) InitializePflags(cmdFlags *pflag.FlagSet) {
existingFlagKeys := sets.NewString()
existingFlagKeys := sets.New[string]()
cmdFlags.VisitAll(func(f *pflag.Flag) {
existingFlagKeys.Insert(f.Name)
if len(f.Shorthand) > 0 {
Expand Down Expand Up @@ -235,7 +235,7 @@ func stringToByteArray(f, t reflect.Type, data interface{}) (interface{}, error)
// object. Otherwise, it'll just pass on the original data.
func jsonUnmarshallerHook(_, to reflect.Type, data interface{}) (interface{}, error) {
unmarshalerType := reflect.TypeOf((*json.Unmarshaler)(nil)).Elem()
if to.Implements(unmarshalerType) || reflect.PtrTo(to).Implements(unmarshalerType) ||
if to.Implements(unmarshalerType) || reflect.PointerTo(to).Implements(unmarshalerType) ||
(canGetElement(to.Kind()) && to.Elem().Implements(unmarshalerType)) {

ctx := context.Background()
Expand Down Expand Up @@ -268,7 +268,7 @@ func (v viperAccessor) parseViperConfigRecursive(root config.Section, settings i
errs := stdLibErrs.ErrorCollection{}
var mine interface{}
myKeysCount := 0
discoveredKeys := sets.NewString()
discoveredKeys := sets.New[string]()
if asMap, casted := settings.(map[string]interface{}); casted {
myMap := map[string]interface{}{}
for childKey, childValue := range asMap {
Expand Down
3 changes: 1 addition & 2 deletions flytestdlib/database/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"io/ioutil"
"os"
"strings"

Expand All @@ -30,7 +29,7 @@ func resolvePassword(ctx context.Context, passwordVal, passwordPath string) stri
logger.Fatalf(ctx,
"missing database password at specified path [%s]", passwordPath)
}
passwordVal, err := ioutil.ReadFile(passwordPath)
passwordVal, err := os.ReadFile(passwordPath)
if err != nil {
logger.Fatalf(ctx, "failed to read database password from path [%s] with err: %v",
passwordPath, err)
Expand Down
3 changes: 1 addition & 2 deletions flytestdlib/ioutils/timed_readers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package ioutils

import (
"io"
"io/ioutil"
)

// Defines a common interface for timers.
Expand All @@ -13,5 +12,5 @@ type Timer interface {

func ReadAll(r io.Reader, t Timer) ([]byte, error) {
defer t.Stop()
return ioutil.ReadAll(r)
return io.ReadAll(r)
}
2 changes: 1 addition & 1 deletion flytestdlib/logger/gcp_formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (f *GcpFormatter) Format(entry *logrus.Entry) ([]byte, error) {

serialized, err := json.Marshal(log)
if err != nil {
return nil, fmt.Errorf("Failed to marshal fields to JSON, %w", err)
return nil, fmt.Errorf("failed to marshal fields to JSON: %w", err)
}
return append(serialized, '\n'), nil
}
4 changes: 2 additions & 2 deletions flytestdlib/pbhash/pbhash.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"encoding/base64"

goObjectHash "github.com/benlaurie/objecthash/go/objecthash"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/jsonpb" //nolint:staticcheck // TODO: migrate to google.golang.org/protobuf/encoding/protojson
"github.com/golang/protobuf/proto" //nolint:staticcheck // TODO: migrate to google.golang.org/protobuf/proto

"github.com/flyteorg/flyte/v2/flytestdlib/logger"
)
Expand Down
5 changes: 3 additions & 2 deletions flytestdlib/resolver/k8s_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
v1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1" //nolint:staticcheck // TODO: migrate to discoveryv1.EndpointSlice
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -133,6 +133,7 @@ func (k *kResolver) Close() {
logger.Infof(k.ctx, "k8s resolver: closed")
}

//nolint:staticcheck // TODO: migrate to discoveryv1.EndpointSlice
func (k *kResolver) resolve(e *v1.Endpoints) {
var newAddrs []resolver.Address
for _, subset := range e.Subsets {
Expand Down Expand Up @@ -187,7 +188,7 @@ func (k *kResolver) run() {
if event.Object == nil {
continue
}
k.resolve(event.Object.(*v1.Endpoints))
k.resolve(event.Object.(*v1.Endpoints)) //nolint:staticcheck // TODO: migrate to discoveryv1.EndpointSlice
}
}
}
2 changes: 1 addition & 1 deletion flytestdlib/sandboxutils/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"sync"
"time"

"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/proto" //nolint:staticcheck // TODO: migrate to google.golang.org/protobuf/proto

"github.com/flyteorg/flyte/v2/flytestdlib/logger"
)
Expand Down
5 changes: 2 additions & 3 deletions flytestdlib/storage/mem_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"os"
"strings"
"sync"
Expand Down Expand Up @@ -82,7 +81,7 @@ func (s *InMemoryStore) ReadRaw(ctx context.Context, reference DataReference) (i
defer s.rwMutex.RUnlock()

if raw, found := s.cache[reference]; found {
return ioutil.NopCloser(bytes.NewReader(raw)), nil
return io.NopCloser(bytes.NewReader(raw)), nil
}

return nil, os.ErrNotExist
Expand All @@ -107,7 +106,7 @@ func (s *InMemoryStore) WriteRaw(ctx context.Context, reference DataReference, s
s.rwMutex.Lock()
defer s.rwMutex.Unlock()

rawBytes, err := ioutil.ReadAll(raw)
rawBytes, err := io.ReadAll(raw)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion flytestdlib/storage/protobuf_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"fmt"
"time"

"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/proto" //nolint:staticcheck // TODO: migrate to google.golang.org/protobuf/proto
errs "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

Expand Down
2 changes: 1 addition & 1 deletion flytestdlib/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"strings"
"time"

"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/proto" //nolint:staticcheck // TODO: migrate to google.golang.org/protobuf/proto

"github.com/flyteorg/stow"
)
Expand Down
4 changes: 2 additions & 2 deletions flytestdlib/storage/stow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"sync"
"time"

"github.com/aws/aws-sdk-go/aws/awserr"
s32 "github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/aws/awserr" //nolint:staticcheck // TODO: migrate to aws-sdk-go-v2
s32 "github.com/aws/aws-sdk-go/service/s3" //nolint:staticcheck // TODO: migrate to aws-sdk-go-v2
errs "github.com/pkg/errors"

"github.com/flyteorg/flyte/v2/flytestdlib/contextutils"
Expand Down
4 changes: 2 additions & 2 deletions flytestdlib/utils/marshal_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"fmt"
"strings"

"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/jsonpb" //nolint:staticcheck // TODO: migrate to google.golang.org/protobuf/encoding/protojson
"github.com/golang/protobuf/proto" //nolint:staticcheck // TODO: migrate to google.golang.org/protobuf/proto
structpb "github.com/golang/protobuf/ptypes/struct"
"github.com/pkg/errors"
)
Expand Down
4 changes: 2 additions & 2 deletions flytestdlib/yamlutils/yaml_json.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package yamlutils

import (
"io/ioutil"
"os"
"path/filepath"

"github.com/ghodss/yaml"
)

func ReadYamlFileAsJSON(path string) ([]byte, error) {
r, err := ioutil.ReadFile(filepath.Clean(path))
r, err := os.ReadFile(filepath.Clean(path))
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions manager/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type KubernetesConfig struct {
KubeConfig string `json:"kubeconfig"` // Optional, defaults to in-cluster or ~/.kube/config
}

//nolint:unused // Used by go:generate pflags
var defaultConfig = &Config{
RunsService: ServiceConfig{
Host: "0.0.0.0",
Expand Down
12 changes: 10 additions & 2 deletions manager/testclient/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,11 @@ func watchRuns(ctx context.Context, client workflowconnect.RunServiceClient) {
log.Printf("❌ Failed to start WatchRuns: %v", err)
return
}
defer stream.Close()
defer func() {
if err := stream.Close(); err != nil {
log.Printf("❌ Failed to close WatchRuns stream: %v", err)
}
}()

log.Println("✅ WatchRuns stream connected")

Expand Down Expand Up @@ -148,7 +152,11 @@ func watchActions(ctx context.Context, client workflowconnect.RunServiceClient,
log.Printf("❌ Failed to start WatchActions: %v", err)
return
}
defer stream.Close()
defer func() {
if err := stream.Close(); err != nil {
log.Printf("❌ Failed to close WatchActions stream: %v", err)
}
}()

log.Println("✅ WatchActions stream connected")

Expand Down
Loading