Skip to content

Commit 020734c

Browse files
committed
Minor fixes
1 parent 37ff80e commit 020734c

File tree

3 files changed

+37
-38
lines changed

3 files changed

+37
-38
lines changed

historyserver/pkg/storage/gcs/config.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,13 @@ func (c *config) completeCollectorConfig(rcc *types.RayCollectorConfig, jd map[s
2929
if bucket, ok := jd["gcsBucket"]; ok {
3030
c.Bucket = bucket.(string)
3131
}
32-
c.GCPProjectID = os.Getenv("GCP_PROJECT_ID")
32+
if gcpProjectID, ok := jd["gcpProkectID"]; ok {
33+
c.GCPProjectID = gcpProjectID.(string)
34+
}
35+
} else {
36+
if os.Getenv("GCP_PROJECT_ID") != "" {
37+
c.GCPProjectID = os.Getenv("GCP_PROJECT_ID")
38+
}
3339
}
3440
}
3541

@@ -42,6 +48,12 @@ func (c *config) completeHistoryServerConfig(rcc *types.RayHistoryServerConfig,
4248
if bucket, ok := jd["gcsBucket"]; ok {
4349
c.Bucket = bucket.(string)
4450
}
45-
c.GCPProjectID = os.Getenv("GCP_PROJECT_ID")
51+
if gcpProjectID, ok := jd["gcpProjectID"]; ok {
52+
c.GCPProjectID = gcpProjectID.(string)
53+
}
54+
} else {
55+
if os.Getenv("GCP_PROJECT_ID") != "" {
56+
c.GCPProjectID = os.Getenv("GCP_PROJECT_ID")
57+
}
4658
}
4759
}

historyserver/pkg/storage/gcs/gcs_handler.go

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"time"
1313

1414
gstorage "cloud.google.com/go/storage"
15-
"github.com/google/martian/v3/log"
1615
"github.com/ray-project/kuberay/historyserver/pkg/collector/types"
1716
"github.com/ray-project/kuberay/historyserver/pkg/storage"
1817
"github.com/ray-project/kuberay/historyserver/pkg/utils"
@@ -27,8 +26,6 @@ type RayLogsHandler struct {
2726
LogFiles chan string
2827
RootDir string
2928
SessionDir string
30-
HttpClient *http.Client
31-
GCSEndpoint string
3229
RayClusterName string
3330
RayClusterID string
3431
RayNodeName string
@@ -50,7 +47,7 @@ func (h *RayLogsHandler) CreateDirectory(directoryPath string) error {
5047
_, err := h.StorageClient.Bucket(h.GCSBucket).Object(objectPath).Attrs(ctx)
5148
if errors.Is(err, gstorage.ErrObjectNotExist) {
5249
writer := h.StorageClient.Bucket(h.GCSBucket).Object(objectPath).NewWriter(ctx)
53-
if createErr := writer.Close(); err != nil {
50+
if createErr := writer.Close(); createErr != nil {
5451
logrus.Errorf("Failed to create directory: %s, error: %v", objectPath, createErr)
5552
return createErr
5653
}
@@ -73,6 +70,7 @@ func (h *RayLogsHandler) WriteFile(file string, reader io.ReadSeeker) error {
7370
_, err := io.Copy(writer, reader)
7471
if err != nil {
7572
logrus.Error("GCS Client failed to Copy from source.")
73+
writer.Close()
7674
return err
7775
}
7876

@@ -88,7 +86,7 @@ func (h *RayLogsHandler) ListFiles(clusterId string, directory string) []string
8886
// TODO(chiayi): Look into potential timeout issues
8987
ctx := context.Background()
9088

91-
pathPrefix := path.Join(h.RootDir, clusterId, directory)
89+
pathPrefix := strings.TrimPrefix(path.Join(clusterId, directory), "/") + "/"
9290

9391
query := &gstorage.Query{
9492
Prefix: pathPrefix,
@@ -121,7 +119,7 @@ func (h *RayLogsHandler) ListFiles(clusterId string, directory string) []string
121119
// We only want files, so check if attrs.Name is non-empty.
122120
// Exclude the placeholder object if it exists for the directory itself.
123121
// attrs.Name contains the whole object path.
124-
if !strings.HasSuffix(attrs.Name, "/") {
122+
if attrs.Name != "" && !strings.HasSuffix(attrs.Name, "/") {
125123
fileList = append(fileList, attrs.Name)
126124
}
127125
}
@@ -149,7 +147,8 @@ func (h *RayLogsHandler) List() []utils.ClusterInfo {
149147
break
150148
}
151149
if err != nil {
152-
logrus.Fatalf("Failed to get attribute of ray clusters: %v", err)
150+
logrus.Errorf("Failed to get attribute of ray clusters: %v", err)
151+
return nil
153152
}
154153

155154
fullObjectPath := objectAttr.Name
@@ -161,7 +160,7 @@ func (h *RayLogsHandler) List() []utils.ClusterInfo {
161160
clusterMeta := strings.Split(metaInfo[0], "_")
162161
if len(clusterMeta) != 2 {
163162
logrus.Errorf("Unable to get cluster name and namespace from directory: %s", metaInfo[0])
164-
return nil
163+
continue
165164
}
166165
cluster.Name = clusterMeta[0]
167166
cluster.Namespace = clusterMeta[1]
@@ -170,6 +169,7 @@ func (h *RayLogsHandler) List() []utils.ClusterInfo {
170169
time, err := utils.GetDateTimeFromSessionID(metaInfo[1])
171170
if err != nil {
172171
logrus.Errorf("Failed to get date time from the given sessionID: %s, error: %v", metaInfo[1], err)
172+
continue
173173
}
174174
cluster.CreateTimeStamp = time.Unix()
175175
cluster.CreateTime = time.UTC().Format(("2006-01-02T15:04:05Z"))
@@ -210,17 +210,23 @@ func (h *RayLogsHandler) GetContent(clusterId string, fileName string) io.Reader
210210
// Change into bufio.Scanner if needed or limit the size of the read
211211
data, err := io.ReadAll(reader)
212212
if err != nil {
213-
logrus.Fatalf("Failed to get all content of the file: %s, %v", fileName, err)
213+
logrus.Errorf("Failed to get all content of the file: %s, %v", fileName, err)
214+
return nil
214215
}
215216
return bytes.NewReader(data)
216217
}
217218

218219
func createGCSBucket(gcsClient *gstorage.Client, projectID, bucketName string) error {
219220
ctx := context.Background()
220221

222+
if projectID == "" {
223+
logrus.Errorf("Project ID cannot be empty. Failed to create GCS bucket: %s", bucketName)
224+
return errors.New("Project ID cannot be empty when creating a GCS Bucket")
225+
}
226+
221227
bucket := gcsClient.Bucket(bucketName)
222228
if err := bucket.Create(ctx, projectID, nil); err != nil {
223-
log.Errorf("Failed to create GCS bucket: %s", bucketName)
229+
logrus.Errorf("Failed to create GCS bucket: %s", bucketName)
224230
return err
225231
}
226232

@@ -262,6 +268,10 @@ func New(c *config) (*RayLogsHandler, error) {
262268
baseTransport,
263269
option.WithScopes(gstorage.ScopeFullControl),
264270
)
271+
if err != nil {
272+
logrus.Errorf("Failed to create authentication transport object")
273+
return nil, err
274+
}
265275

266276
// Create a custom client with the authenticated transport
267277
customHttpTransportClient := &http.Client{
@@ -281,9 +291,10 @@ func New(c *config) (*RayLogsHandler, error) {
281291
// Check if bucket exists
282292
_, err = storageClient.Bucket(c.Bucket).Attrs(ctx)
283293
if err == gstorage.ErrBucketNotExist {
284-
logrus.Warn("Bucket %s does not exist, will attempt to create bucket", c.Bucket)
294+
logrus.Warnf("Bucket %s does not exist, will attempt to create bucket", c.Bucket)
285295
err = createGCSBucket(storageClient, c.GCPProjectID, c.Bucket)
286296
if err != nil {
297+
storageClient.Close()
287298
return nil, err
288299
}
289300
} else if err != nil {

historyserver/pkg/storage/gcs/gcs_handler_test.go

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
package gcs
22

33
import (
4-
"context"
54
"io"
6-
"net/http"
7-
"path"
85
"sort"
96
"strings"
107
"testing"
@@ -14,7 +11,6 @@ import (
1411
"github.com/fsouza/fake-gcs-server/fakestorage"
1512
"github.com/google/go-cmp/cmp"
1613
"github.com/ray-project/kuberay/historyserver/pkg/utils"
17-
gIterator "google.golang.org/api/iterator"
1814
)
1915

2016
// setupFakeGCS creates a fake GCS server and a client connected to it.
@@ -53,7 +49,6 @@ func createRayLogsHandler(client *gstorage.Client, bucketName string) *RayLogsHa
5349
StorageClient: client,
5450
GCSBucket: bucketName,
5551
RootDir: "ray_historyserver",
56-
HttpClient: &http.Client{}, // Not used by fake, but required by struct
5752
}
5853
}
5954

@@ -202,26 +197,7 @@ func TestListFiles(t *testing.T) {
202197

203198
for _, tc := range tests {
204199
t.Run(tc.name, func(t *testing.T) {
205-
ctx := context.Background()
206-
pathPrefix := strings.TrimPrefix(path.Join(tc.clusterID, tc.directory), "/") + "/"
207-
query := &gstorage.Query{
208-
Prefix: pathPrefix,
209-
Delimiter: "/",
210-
}
211-
fileIterator := handler.StorageClient.Bucket(handler.GCSBucket).Objects(ctx, query)
212-
var files []string
213-
for {
214-
attrs, err := fileIterator.Next()
215-
if err == gIterator.Done {
216-
break
217-
}
218-
if err != nil {
219-
t.Fatalf("Iterator error: %v", err)
220-
}
221-
if attrs.Name != "" && !strings.HasSuffix(attrs.Name, "/") {
222-
files = append(files, attrs.Name)
223-
}
224-
}
200+
files := handler.ListFiles(tc.clusterID, tc.directory)
225201
sort.Strings(files)
226202
sort.Strings(tc.expected)
227203
if diff := cmp.Diff(tc.expected, files); diff != "" {

0 commit comments

Comments
 (0)