Skip to content

Commit e70ec5c

Browse files
committed
Minor fixes
1 parent 37ff80e commit e70ec5c

File tree

3 files changed

+46
-47
lines changed

3 files changed

+46
-47
lines changed

historyserver/pkg/storage/gcs/config.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,13 @@ func (c *config) completeCollectorConfig(rcc *types.RayCollectorConfig, jd map[s
2929
if bucket, ok := jd["gcsBucket"]; ok {
3030
c.Bucket = bucket.(string)
3131
}
32-
c.GCPProjectID = os.Getenv("GCP_PROJECT_ID")
32+
if gcpProjectID, ok := jd["gcpProkectID"]; ok {
33+
c.GCPProjectID = gcpProjectID.(string)
34+
}
35+
} else {
36+
if os.Getenv("GCP_PROJECT_ID") != "" {
37+
c.GCPProjectID = os.Getenv("GCP_PROJECT_ID")
38+
}
3339
}
3440
}
3541

@@ -42,6 +48,12 @@ func (c *config) completeHistoryServerConfig(rcc *types.RayHistoryServerConfig,
4248
if bucket, ok := jd["gcsBucket"]; ok {
4349
c.Bucket = bucket.(string)
4450
}
45-
c.GCPProjectID = os.Getenv("GCP_PROJECT_ID")
51+
if gcpProjectID, ok := jd["gcpProjectID"]; ok {
52+
c.GCPProjectID = gcpProjectID.(string)
53+
}
54+
} else {
55+
if os.Getenv("GCP_PROJECT_ID") != "" {
56+
c.GCPProjectID = os.Getenv("GCP_PROJECT_ID")
57+
}
4658
}
4759
}

historyserver/pkg/storage/gcs/gcs_handler.go

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"time"
1313

1414
gstorage "cloud.google.com/go/storage"
15-
"github.com/google/martian/v3/log"
1615
"github.com/ray-project/kuberay/historyserver/pkg/collector/types"
1716
"github.com/ray-project/kuberay/historyserver/pkg/storage"
1817
"github.com/ray-project/kuberay/historyserver/pkg/utils"
@@ -27,8 +26,6 @@ type RayLogsHandler struct {
2726
LogFiles chan string
2827
RootDir string
2928
SessionDir string
30-
HttpClient *http.Client
31-
GCSEndpoint string
3229
RayClusterName string
3330
RayClusterID string
3431
RayNodeName string
@@ -50,7 +47,7 @@ func (h *RayLogsHandler) CreateDirectory(directoryPath string) error {
5047
_, err := h.StorageClient.Bucket(h.GCSBucket).Object(objectPath).Attrs(ctx)
5148
if errors.Is(err, gstorage.ErrObjectNotExist) {
5249
writer := h.StorageClient.Bucket(h.GCSBucket).Object(objectPath).NewWriter(ctx)
53-
if createErr := writer.Close(); err != nil {
50+
if createErr := writer.Close(); createErr != nil {
5451
logrus.Errorf("Failed to create directory: %s, error: %v", objectPath, createErr)
5552
return createErr
5653
}
@@ -73,6 +70,7 @@ func (h *RayLogsHandler) WriteFile(file string, reader io.ReadSeeker) error {
7370
_, err := io.Copy(writer, reader)
7471
if err != nil {
7572
logrus.Error("GCS Client failed to Copy from source.")
73+
writer.Close()
7674
return err
7775
}
7876

@@ -88,7 +86,7 @@ func (h *RayLogsHandler) ListFiles(clusterId string, directory string) []string
8886
// TODO(chiayi): Look into potential timeout issues
8987
ctx := context.Background()
9088

91-
pathPrefix := path.Join(h.RootDir, clusterId, directory)
89+
pathPrefix := strings.TrimPrefix(path.Join(h.RootDir, clusterId, directory), "/") + "/"
9290

9391
query := &gstorage.Query{
9492
Prefix: pathPrefix,
@@ -121,7 +119,7 @@ func (h *RayLogsHandler) ListFiles(clusterId string, directory string) []string
121119
// We only want files, so check if attrs.Name is non-empty.
122120
// Exclude the placeholder object if it exists for the directory itself.
123121
// attrs.Name contains the whole object path.
124-
if !strings.HasSuffix(attrs.Name, "/") {
122+
if attrs.Name != "" && !strings.HasSuffix(attrs.Name, "/") {
125123
fileList = append(fileList, attrs.Name)
126124
}
127125
}
@@ -149,7 +147,8 @@ func (h *RayLogsHandler) List() []utils.ClusterInfo {
149147
break
150148
}
151149
if err != nil {
152-
logrus.Fatalf("Failed to get attribute of ray clusters: %v", err)
150+
logrus.Errorf("Failed to get attribute of ray clusters: %v", err)
151+
return nil
153152
}
154153

155154
fullObjectPath := objectAttr.Name
@@ -161,7 +160,7 @@ func (h *RayLogsHandler) List() []utils.ClusterInfo {
161160
clusterMeta := strings.Split(metaInfo[0], "_")
162161
if len(clusterMeta) != 2 {
163162
logrus.Errorf("Unable to get cluster name and namespace from directory: %s", metaInfo[0])
164-
return nil
163+
continue
165164
}
166165
cluster.Name = clusterMeta[0]
167166
cluster.Namespace = clusterMeta[1]
@@ -170,6 +169,7 @@ func (h *RayLogsHandler) List() []utils.ClusterInfo {
170169
time, err := utils.GetDateTimeFromSessionID(metaInfo[1])
171170
if err != nil {
172171
logrus.Errorf("Failed to get date time from the given sessionID: %s, error: %v", metaInfo[1], err)
172+
continue
173173
}
174174
cluster.CreateTimeStamp = time.Unix()
175175
cluster.CreateTime = time.UTC().Format(("2006-01-02T15:04:05Z"))
@@ -210,17 +210,23 @@ func (h *RayLogsHandler) GetContent(clusterId string, fileName string) io.Reader
210210
// Change into bufio.Scanner if needed or limit the size of the read
211211
data, err := io.ReadAll(reader)
212212
if err != nil {
213-
logrus.Fatalf("Failed to get all content of the file: %s, %v", fileName, err)
213+
logrus.Errorf("Failed to get all content of the file: %s, %v", fileName, err)
214+
return nil
214215
}
215216
return bytes.NewReader(data)
216217
}
217218

218219
func createGCSBucket(gcsClient *gstorage.Client, projectID, bucketName string) error {
219220
ctx := context.Background()
220221

222+
if projectID == "" {
223+
logrus.Errorf("Project ID cannot be empty. Failed to create GCS bucket: %s", bucketName)
224+
return errors.New("Project ID cannot be empty when creating a GCS Bucket")
225+
}
226+
221227
bucket := gcsClient.Bucket(bucketName)
222228
if err := bucket.Create(ctx, projectID, nil); err != nil {
223-
log.Errorf("Failed to create GCS bucket: %s", bucketName)
229+
logrus.Errorf("Failed to create GCS bucket: %s", bucketName)
224230
return err
225231
}
226232

@@ -262,6 +268,10 @@ func New(c *config) (*RayLogsHandler, error) {
262268
baseTransport,
263269
option.WithScopes(gstorage.ScopeFullControl),
264270
)
271+
if err != nil {
272+
logrus.Errorf("Failed to create authentication transport object")
273+
return nil, err
274+
}
265275

266276
// Create a custom client with the authenticated transport
267277
customHttpTransportClient := &http.Client{
@@ -281,9 +291,10 @@ func New(c *config) (*RayLogsHandler, error) {
281291
// Check if bucket exists
282292
_, err = storageClient.Bucket(c.Bucket).Attrs(ctx)
283293
if err == gstorage.ErrBucketNotExist {
284-
logrus.Warn("Bucket %s does not exist, will attempt to create bucket", c.Bucket)
294+
logrus.Warnf("Bucket %s does not exist, will attempt to create bucket", c.Bucket)
285295
err = createGCSBucket(storageClient, c.GCPProjectID, c.Bucket)
286296
if err != nil {
297+
storageClient.Close()
287298
return nil, err
288299
}
289300
} else if err != nil {

historyserver/pkg/storage/gcs/gcs_handler_test.go

Lines changed: 10 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
package gcs
22

33
import (
4-
"context"
54
"io"
6-
"net/http"
7-
"path"
85
"sort"
96
"strings"
107
"testing"
@@ -14,7 +11,6 @@ import (
1411
"github.com/fsouza/fake-gcs-server/fakestorage"
1512
"github.com/google/go-cmp/cmp"
1613
"github.com/ray-project/kuberay/historyserver/pkg/utils"
17-
gIterator "google.golang.org/api/iterator"
1814
)
1915

2016
// setupFakeGCS creates a fake GCS server and a client connected to it.
@@ -53,7 +49,6 @@ func createRayLogsHandler(client *gstorage.Client, bucketName string) *RayLogsHa
5349
StorageClient: client,
5450
GCSBucket: bucketName,
5551
RootDir: "ray_historyserver",
56-
HttpClient: &http.Client{}, // Not used by fake, but required by struct
5752
}
5853
}
5954

@@ -125,42 +120,42 @@ func TestListFiles(t *testing.T) {
125120
{
126121
ObjectAttrs: fakestorage.ObjectAttrs{
127122
BucketName: "test-bucket",
128-
Name: "cluster1/logs/file1.txt",
123+
Name: "ray_historyserver/cluster1/logs/file1.txt",
129124
},
130125
Content: []byte("a"),
131126
},
132127
{
133128
ObjectAttrs: fakestorage.ObjectAttrs{
134129
BucketName: "test-bucket",
135-
Name: "cluster1/logs/file2.log",
130+
Name: "ray_historyserver/cluster1/logs/file2.log",
136131
},
137132
Content: []byte("b"),
138133
},
139134
{
140135
ObjectAttrs: fakestorage.ObjectAttrs{
141136
BucketName: "test-bucket",
142-
Name: "cluster1/logs/subdir/",
137+
Name: "ray_historyserver/cluster1/logs/subdir/",
143138
},
144139
Content: []byte(""),
145140
},
146141
{
147142
ObjectAttrs: fakestorage.ObjectAttrs{
148143
BucketName: "test-bucket",
149-
Name: "cluster1/logs/subdir/file3.txt",
144+
Name: "ray_historyserver/cluster1/logs/subdir/file3.txt",
150145
},
151146
Content: []byte("c"),
152147
},
153148
{
154149
ObjectAttrs: fakestorage.ObjectAttrs{
155150
BucketName: "test-bucket",
156-
Name: "cluster1/other/file4.txt",
151+
Name: "ray_historyserver/cluster1/other/file4.txt",
157152
},
158153
Content: []byte("d"),
159154
},
160155
{
161156
ObjectAttrs: fakestorage.ObjectAttrs{
162157
BucketName: "test-bucket",
163-
Name: "cluster2/logs/file5.txt",
158+
Name: "ray_historyserver/cluster2/logs/file5.txt",
164159
},
165160
Content: []byte("e"),
166161
},
@@ -178,13 +173,13 @@ func TestListFiles(t *testing.T) {
178173
name: "list_logs",
179174
clusterID: "cluster1",
180175
directory: "logs",
181-
expected: []string{"cluster1/logs/file1.txt", "cluster1/logs/file2.log"},
176+
expected: []string{"ray_historyserver/cluster1/logs/file1.txt", "ray_historyserver/cluster1/logs/file2.log"},
182177
},
183178
{
184179
name: "list_other",
185180
clusterID: "cluster1",
186181
directory: "other",
187-
expected: []string{"cluster1/other/file4.txt"},
182+
expected: []string{"ray_historyserver/cluster1/other/file4.txt"},
188183
},
189184
{
190185
name: "list_nonexistent",
@@ -196,32 +191,13 @@ func TestListFiles(t *testing.T) {
196191
name: "list_cluster2",
197192
clusterID: "cluster2",
198193
directory: "logs",
199-
expected: []string{"cluster2/logs/file5.txt"},
194+
expected: []string{"ray_historyserver/cluster2/logs/file5.txt"},
200195
},
201196
}
202197

203198
for _, tc := range tests {
204199
t.Run(tc.name, func(t *testing.T) {
205-
ctx := context.Background()
206-
pathPrefix := strings.TrimPrefix(path.Join(tc.clusterID, tc.directory), "/") + "/"
207-
query := &gstorage.Query{
208-
Prefix: pathPrefix,
209-
Delimiter: "/",
210-
}
211-
fileIterator := handler.StorageClient.Bucket(handler.GCSBucket).Objects(ctx, query)
212-
var files []string
213-
for {
214-
attrs, err := fileIterator.Next()
215-
if err == gIterator.Done {
216-
break
217-
}
218-
if err != nil {
219-
t.Fatalf("Iterator error: %v", err)
220-
}
221-
if attrs.Name != "" && !strings.HasSuffix(attrs.Name, "/") {
222-
files = append(files, attrs.Name)
223-
}
224-
}
200+
files := handler.ListFiles(tc.clusterID, tc.directory)
225201
sort.Strings(files)
226202
sort.Strings(tc.expected)
227203
if diff := cmp.Diff(tc.expected, files); diff != "" {

0 commit comments

Comments
 (0)