@@ -12,7 +12,6 @@ import (
1212 "time"
1313
1414 gstorage "cloud.google.com/go/storage"
15- "github.com/google/martian/v3/log"
1615 "github.com/ray-project/kuberay/historyserver/pkg/collector/types"
1716 "github.com/ray-project/kuberay/historyserver/pkg/storage"
1817 "github.com/ray-project/kuberay/historyserver/pkg/utils"
@@ -27,8 +26,6 @@ type RayLogsHandler struct {
2726 LogFiles chan string
2827 RootDir string
2928 SessionDir string
30- HttpClient * http.Client
31- GCSEndpoint string
3229 RayClusterName string
3330 RayClusterID string
3431 RayNodeName string
@@ -50,7 +47,7 @@ func (h *RayLogsHandler) CreateDirectory(directoryPath string) error {
5047 _ , err := h .StorageClient .Bucket (h .GCSBucket ).Object (objectPath ).Attrs (ctx )
5148 if errors .Is (err , gstorage .ErrObjectNotExist ) {
5249 writer := h .StorageClient .Bucket (h .GCSBucket ).Object (objectPath ).NewWriter (ctx )
53- if createErr := writer .Close (); err != nil {
50+ if createErr := writer .Close (); createErr != nil {
5451 logrus .Errorf ("Failed to create directory: %s, error: %v" , objectPath , createErr )
5552 return createErr
5653 }
@@ -73,6 +70,7 @@ func (h *RayLogsHandler) WriteFile(file string, reader io.ReadSeeker) error {
7370 _ , err := io .Copy (writer , reader )
7471 if err != nil {
7572 logrus .Error ("GCS Client failed to Copy from source." )
73+ writer .Close ()
7674 return err
7775 }
7876
@@ -88,7 +86,7 @@ func (h *RayLogsHandler) ListFiles(clusterId string, directory string) []string
8886 // TODO(chiayi): Look into potential timeout issues
8987 ctx := context .Background ()
9088
91- pathPrefix := path .Join (h .RootDir , clusterId , directory )
89+ pathPrefix := strings . TrimPrefix ( path .Join (h .RootDir , clusterId , directory ), "/" ) + "/"
9290
9391 query := & gstorage.Query {
9492 Prefix : pathPrefix ,
@@ -121,7 +119,7 @@ func (h *RayLogsHandler) ListFiles(clusterId string, directory string) []string
121119 // We only want files, so check if attrs.Name is non-empty.
122120 // Exclude the placeholder object if it exists for the directory itself.
123121 // attrs.Name contains the whole object path.
124- if ! strings .HasSuffix (attrs .Name , "/" ) {
122+ if attrs . Name != "" && ! strings .HasSuffix (attrs .Name , "/" ) {
125123 fileList = append (fileList , attrs .Name )
126124 }
127125 }
@@ -149,7 +147,8 @@ func (h *RayLogsHandler) List() []utils.ClusterInfo {
149147 break
150148 }
151149 if err != nil {
152- logrus .Fatalf ("Failed to get attribute of ray clusters: %v" , err )
150+ logrus .Errorf ("Failed to get attribute of ray clusters: %v" , err )
151+ return nil
153152 }
154153
155154 fullObjectPath := objectAttr .Name
@@ -161,7 +160,7 @@ func (h *RayLogsHandler) List() []utils.ClusterInfo {
161160 clusterMeta := strings .Split (metaInfo [0 ], "_" )
162161 if len (clusterMeta ) != 2 {
163162 logrus .Errorf ("Unable to get cluster name and namespace from directory: %s" , metaInfo [0 ])
164- return nil
163+ continue
165164 }
166165 cluster .Name = clusterMeta [0 ]
167166 cluster .Namespace = clusterMeta [1 ]
@@ -170,6 +169,7 @@ func (h *RayLogsHandler) List() []utils.ClusterInfo {
170169 time , err := utils .GetDateTimeFromSessionID (metaInfo [1 ])
171170 if err != nil {
172171 logrus .Errorf ("Failed to get date time from the given sessionID: %s, error: %v" , metaInfo [1 ], err )
172+ continue
173173 }
174174 cluster .CreateTimeStamp = time .Unix ()
175175 cluster .CreateTime = time .UTC ().Format (("2006-01-02T15:04:05Z" ))
@@ -210,17 +210,23 @@ func (h *RayLogsHandler) GetContent(clusterId string, fileName string) io.Reader
210210 // Change into bufio.Scanner if needed or limit the size of the read
211211 data , err := io .ReadAll (reader )
212212 if err != nil {
213- logrus .Fatalf ("Failed to get all content of the file: %s, %v" , fileName , err )
213+ logrus .Errorf ("Failed to get all content of the file: %s, %v" , fileName , err )
214+ return nil
214215 }
215216 return bytes .NewReader (data )
216217}
217218
218219func createGCSBucket (gcsClient * gstorage.Client , projectID , bucketName string ) error {
219220 ctx := context .Background ()
220221
222+ if projectID == "" {
223+ logrus .Errorf ("Project ID cannot be empty. Failed to create GCS bucket: %s" , bucketName )
224+ return errors .New ("Project ID cannot be empty when creating a GCS Bucket" )
225+ }
226+
221227 bucket := gcsClient .Bucket (bucketName )
222228 if err := bucket .Create (ctx , projectID , nil ); err != nil {
223- log .Errorf ("Failed to create GCS bucket: %s" , bucketName )
229+ logrus .Errorf ("Failed to create GCS bucket: %s" , bucketName )
224230 return err
225231 }
226232
@@ -262,6 +268,10 @@ func New(c *config) (*RayLogsHandler, error) {
262268 baseTransport ,
263269 option .WithScopes (gstorage .ScopeFullControl ),
264270 )
271+ if err != nil {
272+ logrus .Errorf ("Failed to create authentication transport object" )
273+ return nil , err
274+ }
265275
266276 // Create a custom client with the authenticated transport
267277 customHttpTransportClient := & http.Client {
@@ -281,9 +291,10 @@ func New(c *config) (*RayLogsHandler, error) {
281291 // Check if bucket exists
282292 _ , err = storageClient .Bucket (c .Bucket ).Attrs (ctx )
283293 if err == gstorage .ErrBucketNotExist {
284- logrus .Warn ("Bucket %s does not exist, will attempt to create bucket" , c .Bucket )
294+ logrus .Warnf ("Bucket %s does not exist, will attempt to create bucket" , c .Bucket )
285295 err = createGCSBucket (storageClient , c .GCPProjectID , c .Bucket )
286296 if err != nil {
297+ storageClient .Close ()
287298 return nil , err
288299 }
289300 } else if err != nil {
0 commit comments