Skip to content

Not able to read parquet using S3 source #76

@gmohmad

Description

@gmohmad

hey. i have a problem reading a parquet file from s3 using s3 source. the file was written to s3 initially by using the s3 source. also, im able to open the file using python's pandas without any problems
struct:

type UserHistoryRecord struct {
	SearchText     string `parquet:"name=search_text, type=BYTE_ARRAY, convertedtype=UTF8"`
	EventType      string `parquet:"name=event_type, type=BYTE_ARRAY, convertedtype=UTF8"`
	WbUserID       int64  `parquet:"name=wb_user_id, type=INT64"`
	UTCEventTime   int64  `parquet:"name=utc_event_time, type=INT64, convertedtype=TIMESTAMP_MILLIS"`
	EventCode      int32  `parquet:"name=event_code, type=INT32, convertedtype=UINT_8"`
	SearchTextProc string `parquet:"name=search_text_processed, type=BYTE_ARRAY, convertedtype=UTF8"`
}

writer:

func (p *UserHistoryRecord) Build(
	awsCfg *aws.Config, bucket, key string, eventsMap models.EventsMap,
) error {
	pf, err := s3.NewS3FileWriter(context.Background(), bucket, key, nil, awsCfg)
	if err != nil {
		return fmt.Errorf("failed to create S3 file writer: %v", err)
	}
	defer pf.Close()

	pw, err := writer.NewParquetWriter(pf, new(UserHistoryRecord), 8)
	if err != nil {
		return fmt.Errorf("failed to create Parquet writer: %v", err)
	}
	defer pw.WriteStop()

	pw.RowGroupSize = 10 << 17 // 1.25 MB
	pw.PageSize = 10 << 15     // 320 KB
	pw.CompressionType = parquet.CompressionCodec_GZIP

	for _, event := range eventsMap {
		if err := pw.Write(p.FromEventDTO(event)); err != nil {
			return err
		}
	}
	if err := pw.WriteStop(); err != nil {
		return err
	}

	return nil
}

reader:

func (p *UserHistoryRecord) ReadParquetToMap(
	ctx context.Context, awsCfg *aws.Config, bucket, key string, eventsMap models.EventsMap,
) error {
	pf, err := s3.NewS3FileReader(ctx, bucket, key, awsCfg)
	if err != nil {
		return err
	}
	defer pf.Close()

	pr, err := reader.NewParquetReader(pf, new(UserHistoryRecord), 8)
	if err != nil {
		return err
	}
	defer pr.ReadStop()

	const batchSize = 1000
	for {
		rows, err := pr.ReadByNumber(batchSize)
		if err != nil {
			return err
		}
		if len(rows) == 0 {
			break
		}

		for _, rec := range rows {
			pm, ok := rec.(*UserHistoryRecord)
			if !ok {
				log.Printf("unexpected type %T, skipping", rec)
				continue
			}
			dto := pm.ToEventDTO()
			eventsMap[dto.GetHash()] = dto
		}
	}
	return nil
}

error:

2025/06/19 18:01:54 [NextRowGroup] Column not found: Parquet_go_rootSearchText

whats wrong here? im reading the same file im writing to s3. i've also tried using local source, downloading the file from s3, saving it to local storage and then loading, i've also tried loading the file into memory and using buffer source. i either get this error, or panics (index out of range, nil pointer deref).

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions