Skip to content

Commit 192ba5f

Browse files
committed
Minor fixes
1 parent 37ff80e commit 192ba5f

File tree

3 files changed

+27
-36
lines changed

3 files changed

+27
-36
lines changed

historyserver/pkg/storage/gcs/config.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ func (c *config) completeCollectorConfig(rcc *types.RayCollectorConfig, jd map[s
2929
if bucket, ok := jd["gcsBucket"]; ok {
3030
c.Bucket = bucket.(string)
3131
}
32-
c.GCPProjectID = os.Getenv("GCP_PROJECT_ID")
32+
if os.Getenv("GCP_PROJECT_ID") != "" {
33+
c.GCPProjectID = os.Getenv("GCP_PROJECT_ID")
34+
}
3335
}
3436
}
3537

@@ -42,6 +44,8 @@ func (c *config) completeHistoryServerConfig(rcc *types.RayHistoryServerConfig,
4244
if bucket, ok := jd["gcsBucket"]; ok {
4345
c.Bucket = bucket.(string)
4446
}
45-
c.GCPProjectID = os.Getenv("GCP_PROJECT_ID")
47+
if os.Getenv("GCP_PROJECT_ID") != "" {
48+
c.GCPProjectID = os.Getenv("GCP_PROJECT_ID")
49+
}
4650
}
4751
}

historyserver/pkg/storage/gcs/gcs_handler.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ type RayLogsHandler struct {
2727
LogFiles chan string
2828
RootDir string
2929
SessionDir string
30-
HttpClient *http.Client
31-
GCSEndpoint string
3230
RayClusterName string
3331
RayClusterID string
3432
RayNodeName string
@@ -50,7 +48,7 @@ func (h *RayLogsHandler) CreateDirectory(directoryPath string) error {
5048
_, err := h.StorageClient.Bucket(h.GCSBucket).Object(objectPath).Attrs(ctx)
5149
if errors.Is(err, gstorage.ErrObjectNotExist) {
5250
writer := h.StorageClient.Bucket(h.GCSBucket).Object(objectPath).NewWriter(ctx)
53-
if createErr := writer.Close(); err != nil {
51+
if createErr := writer.Close(); createErr != nil {
5452
logrus.Errorf("Failed to create directory: %s, error: %v", objectPath, createErr)
5553
return createErr
5654
}
@@ -73,6 +71,7 @@ func (h *RayLogsHandler) WriteFile(file string, reader io.ReadSeeker) error {
7371
_, err := io.Copy(writer, reader)
7472
if err != nil {
7573
logrus.Error("GCS Client failed to Copy from source.")
74+
writer.Close()
7675
return err
7776
}
7877

@@ -88,7 +87,7 @@ func (h *RayLogsHandler) ListFiles(clusterId string, directory string) []string
8887
// TODO(chiayi): Look into potential timeout issues
8988
ctx := context.Background()
9089

91-
pathPrefix := path.Join(h.RootDir, clusterId, directory)
90+
pathPrefix := strings.TrimPrefix(path.Join(clusterId, directory), "/") + "/"
9291

9392
query := &gstorage.Query{
9493
Prefix: pathPrefix,
@@ -121,7 +120,7 @@ func (h *RayLogsHandler) ListFiles(clusterId string, directory string) []string
121120
// We only want files, so check if attrs.Name is non-empty.
122121
// Exclude the placeholder object if it exists for the directory itself.
123122
// attrs.Name contains the whole object path.
124-
if !strings.HasSuffix(attrs.Name, "/") {
123+
if attrs.Name != "" && !strings.HasSuffix(attrs.Name, "/") {
125124
fileList = append(fileList, attrs.Name)
126125
}
127126
}
@@ -149,7 +148,8 @@ func (h *RayLogsHandler) List() []utils.ClusterInfo {
149148
break
150149
}
151150
if err != nil {
152-
logrus.Fatalf("Failed to get attribute of ray clusters: %v", err)
151+
logrus.Errorf("Failed to get attribute of ray clusters: %v", err)
152+
return nil
153153
}
154154

155155
fullObjectPath := objectAttr.Name
@@ -161,7 +161,7 @@ func (h *RayLogsHandler) List() []utils.ClusterInfo {
161161
clusterMeta := strings.Split(metaInfo[0], "_")
162162
if len(clusterMeta) != 2 {
163163
logrus.Errorf("Unable to get cluster name and namespace from directory: %s", metaInfo[0])
164-
return nil
164+
continue
165165
}
166166
cluster.Name = clusterMeta[0]
167167
cluster.Namespace = clusterMeta[1]
@@ -170,6 +170,7 @@ func (h *RayLogsHandler) List() []utils.ClusterInfo {
170170
time, err := utils.GetDateTimeFromSessionID(metaInfo[1])
171171
if err != nil {
172172
logrus.Errorf("Failed to get date time from the given sessionID: %s, error: %v", metaInfo[1], err)
173+
continue
173174
}
174175
cluster.CreateTimeStamp = time.Unix()
175176
cluster.CreateTime = time.UTC().Format(("2006-01-02T15:04:05Z"))
@@ -210,14 +211,20 @@ func (h *RayLogsHandler) GetContent(clusterId string, fileName string) io.Reader
210211
// Change into bufio.Scanner if needed or limit the size of the read
211212
data, err := io.ReadAll(reader)
212213
if err != nil {
213-
logrus.Fatalf("Failed to get all content of the file: %s, %v", fileName, err)
214+
logrus.Errorf("Failed to get all content of the file: %s, %v", fileName, err)
215+
return nil
214216
}
215217
return bytes.NewReader(data)
216218
}
217219

218220
func createGCSBucket(gcsClient *gstorage.Client, projectID, bucketName string) error {
219221
ctx := context.Background()
220222

223+
if projectID == "" {
224+
log.Errorf("Project ID cannot be empty. Failed to create GCS bucket: %s", bucketName)
225+
return errors.New("Project ID cannot be empty when creating a GCS Bucket")
226+
}
227+
221228
bucket := gcsClient.Bucket(bucketName)
222229
if err := bucket.Create(ctx, projectID, nil); err != nil {
223230
log.Errorf("Failed to create GCS bucket: %s", bucketName)
@@ -262,6 +269,10 @@ func New(c *config) (*RayLogsHandler, error) {
262269
baseTransport,
263270
option.WithScopes(gstorage.ScopeFullControl),
264271
)
272+
if err != nil {
273+
logrus.Errorf("Failed to create authentication transport object")
274+
return nil, err
275+
}
265276

266277
// Create a custom client with the authenticated transport
267278
customHttpTransportClient := &http.Client{
@@ -281,7 +292,7 @@ func New(c *config) (*RayLogsHandler, error) {
281292
// Check if bucket exists
282293
_, err = storageClient.Bucket(c.Bucket).Attrs(ctx)
283294
if err == gstorage.ErrBucketNotExist {
284-
logrus.Warn("Bucket %s does not exist, will attempt to create bucket", c.Bucket)
295+
logrus.Warnf("Bucket %s does not exist, will attempt to create bucket", c.Bucket)
285296
err = createGCSBucket(storageClient, c.GCPProjectID, c.Bucket)
286297
if err != nil {
287298
return nil, err

historyserver/pkg/storage/gcs/gcs_handler_test.go

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
package gcs
22

33
import (
4-
"context"
54
"io"
6-
"net/http"
7-
"path"
85
"sort"
96
"strings"
107
"testing"
@@ -14,7 +11,6 @@ import (
1411
"github.com/fsouza/fake-gcs-server/fakestorage"
1512
"github.com/google/go-cmp/cmp"
1613
"github.com/ray-project/kuberay/historyserver/pkg/utils"
17-
gIterator "google.golang.org/api/iterator"
1814
)
1915

2016
// setupFakeGCS creates a fake GCS server and a client connected to it.
@@ -53,7 +49,6 @@ func createRayLogsHandler(client *gstorage.Client, bucketName string) *RayLogsHa
5349
StorageClient: client,
5450
GCSBucket: bucketName,
5551
RootDir: "ray_historyserver",
56-
HttpClient: &http.Client{}, // Not used by fake, but required by struct
5752
}
5853
}
5954

@@ -202,26 +197,7 @@ func TestListFiles(t *testing.T) {
202197

203198
for _, tc := range tests {
204199
t.Run(tc.name, func(t *testing.T) {
205-
ctx := context.Background()
206-
pathPrefix := strings.TrimPrefix(path.Join(tc.clusterID, tc.directory), "/") + "/"
207-
query := &gstorage.Query{
208-
Prefix: pathPrefix,
209-
Delimiter: "/",
210-
}
211-
fileIterator := handler.StorageClient.Bucket(handler.GCSBucket).Objects(ctx, query)
212-
var files []string
213-
for {
214-
attrs, err := fileIterator.Next()
215-
if err == gIterator.Done {
216-
break
217-
}
218-
if err != nil {
219-
t.Fatalf("Iterator error: %v", err)
220-
}
221-
if attrs.Name != "" && !strings.HasSuffix(attrs.Name, "/") {
222-
files = append(files, attrs.Name)
223-
}
224-
}
200+
files := handler.ListFiles(tc.clusterID, tc.directory)
225201
sort.Strings(files)
226202
sort.Strings(tc.expected)
227203
if diff := cmp.Diff(tc.expected, files); diff != "" {

0 commit comments

Comments
 (0)