Skip to content

Commit 8a05f17

Browse files
staging -> master v0.3.12
chore: Staging -> master v0.3.12
2 parents c72b04a + 6bab8e7 commit 8a05f17

40 files changed

Lines changed: 6042 additions & 40 deletions

.github/workflows/release-approval.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ jobs:
1919

2020
strategy:
2121
matrix:
22-
driver: [mongodb, mysql, postgres, oracle, kafka] # Add new drivers here as they become available
22+
driver: [mongodb, mysql, postgres, oracle, kafka, s3] # Add new drivers here as they become available
2323

2424
uses: ./.github/workflows/build-and-release-driver.yml
2525
with:

CONTRIBUTING.md

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,24 @@ To ensure consistency, we follow a structured contribution process. All guidelin
99

1010
---
1111

12-
## 🎃 Hacktoberfest 2025 @ OLake
12+
## ❄️ Social Winter of Code (SWoC 2026) @OLake
1313

14-
OLake is officially open for **Hacktoberfest contributions**! 🚀
14+
OLake is officially open for **Social Winter of Code 2026 contributions**! 🚀
1515

16-
If you’re participating in Hacktoberfest, look out for any issues labeled:
17-
- **`hacktoberfest`**
18-
- **`good first issue`**
16+
If you’re participating in SWoC 2026, look out for issues labeled:
1917

20-
These are designed to help new contributors get started quickly.
21-
We welcome everything — bug fixes, documentation updates, tests, or feature enhancements.
18+
* **`SWoC26`**
19+
* **`beginner`**
20+
* **`intermediate`**
21+
* **`advanced`**
2222

23-
👉 [Check our open issues here](../../issues)
23+
These labels help you choose issues based on your experience level.
24+
We welcome all kinds of contributions — bug fixes, documentation improvements, tests, and feature enhancements.
2425

25-
Let’s hack, learn, and grow together this Hacktoberfest. Happy contributing & happy engineering! ⚡
26+
👉 [Check our open issues here](../../issues)
2627

28+
---
29+
Let’s hack, learn, and grow together this Hacktoberfest. Happy contributing & happy engineering! ⚡
2730
---
2831

2932
## Getting Help

constants/constants.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ const (
4141
Postgres DriverType = "postgres"
4242
MySQL DriverType = "mysql"
4343
Oracle DriverType = "oracle"
44+
S3 DriverType = "s3"
4445
Kafka DriverType = "kafka"
4546
)
4647

constants/state_version.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package constants
2+
3+
// State version constants for backward compatibility
4+
// State files can have different versions to support migration and backward compatibility
5+
// when the state file format or behavior changes.
6+
7+
// LatestStateVersion is the current version of the state file format.
8+
// This version is used when creating new state files.
9+
//
10+
// Version History:
11+
// - Version 0: Legacy format (backward compatibility)
12+
// * More lenient date/timestamp parsing behavior
13+
// * When a string cannot be parsed as a timestamp, it returns epoch time (1970-01-01)
14+
// * Used for state files created before version 1 was introduced
15+
//
16+
// - Version 1: Current format (introduced stricter validation)
17+
// * Stricter date/timestamp parsing validation
18+
// * When a string cannot be parsed as a timestamp, it will be returned as string. Earlier it was returning epoch time (1970-01-01)
19+
// * This prevents data corruption by failing fast on invalid date strings
20+
21+
const (
22+
LatestStateVersion = 1
23+
)
24+
25+
// Used as the current version of the state when the program is running
26+
var LoadedStateVersion = 1

destination/iceberg/arrow-writer/utils.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ func appendValueToBuilder(builder array.Builder, val interface{}) error {
216216
return err
217217
}
218218
case *array.TimestampBuilder:
219-
if timeVal, err := typeutils.ReformatDate(val); err == nil {
219+
if timeVal, err := typeutils.ReformatDate(val, true); err == nil {
220220
ts := arrow.Timestamp(timeVal.UnixMicro())
221221
builder.Append(ts)
222222
} else {

destination/iceberg/legacy-writer/writer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func (w *LegacyWriter) Write(ctx context.Context, records []types.RawRecord) err
9292
}
9393
protoColumnsValue = append(protoColumnsValue, &proto.IcebergPayload_IceRecord_FieldValue{Value: &proto.IcebergPayload_IceRecord_FieldValue_DoubleValue{DoubleValue: doubleValue}})
9494
case "timestamptz":
95-
timeValue, err := typeutils.ReformatDate(val)
95+
timeValue, err := typeutils.ReformatDate(val, true)
9696
if err != nil {
9797
return fmt.Errorf("failed to reformat rawValue[%v] of type[%T] as time value: %s", val, val, err)
9898
}

drivers/kafka/internal/cdc.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package driver
22

33
import (
4+
"bytes"
45
"context"
56
"encoding/json"
67
"fmt"
@@ -181,14 +182,21 @@ func (k *Kafka) processKafkaMessages(ctx context.Context, reader *kafka.Reader,
181182

182183
var data map[string]interface{}
183184
if message.Value != nil {
184-
if err := json.Unmarshal(message.Value, &data); err != nil {
185+
// decode message value
186+
decoder := json.NewDecoder(bytes.NewReader(message.Value))
187+
// to avoid automatic conversion of numbers to float64
188+
decoder.UseNumber()
189+
if err := decoder.Decode(&data); err != nil {
185190
logger.Warnf("failed to unmarshal message value: %s", err)
186191
continue
187192
}
188193
data[Partition] = message.Partition
189194
data[Offset] = message.Offset
190195
data[Key] = string(message.Key)
191-
data[KafkaTimestamp], _ = typeutils.ReformatDate(message.Time)
196+
data[KafkaTimestamp], err = typeutils.ReformatDate(message.Time, true)
197+
if err != nil {
198+
return fmt.Errorf("failed to reformat date: %s", err)
199+
}
192200
}
193201

194202
stopProcessing, err := stopProcessFn(types.KafkaRecord{Data: data, Message: message})

drivers/mongodb/internal/backfill.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ func buildMongoCondition(cond types.Condition) bson.D {
415415
if strings.ToLower(val) == "true" || strings.ToLower(val) == "false" {
416416
return strings.ToLower(val) == "true"
417417
}
418-
if timeVal, err := typeutils.ReformatDate(val); err == nil {
418+
if timeVal, err := typeutils.ReformatDate(val, false); err == nil {
419419
return timeVal
420420
}
421421
if intVal, err := typeutils.ReformatInt64(val); err == nil {

drivers/mysql/internal/datatype_conversion.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ var mysqlTypeToDataTypes = map[string]types.DataType{
1818
"bigint": types.Int64,
1919

2020
// Floating point types
21-
"float": types.Float32,
22-
"real": types.Float32,
23-
"double": types.Float64,
21+
"float": types.Float32,
22+
"real": types.Float32,
23+
"double": types.Float64,
2424

2525
// Can handle up to 15 significant digits accurately (e.g., DECIMAL(15,2) or DECIMAL(15,7))
2626
// Values with 16 digits may have minor rounding. Beyond 16 (from 17) digits will have precision loss.

drivers/s3/README.md

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
# Olake S3 Source Driver
2+
Production-ready S3 source connector for Olake that ingests data directly from AWS S3 or S3-compatible storage (MinIO, LocalStack, etc.).
3+
4+
## Highlights
5+
- **Multi-format**: CSV (plain or `.gz`), JSON (JSONL/array/object), and Parquet with schema inference.
6+
- **Incremental sync**: Tracks `_last_modified_time` per stream and processes only newer files.
7+
- **Parallel processing**: Chunked downloads and configurable `max_threads` keep throughput high.
8+
- **Stateful**: Stream-level state keeps cursor information so you can resume syncs reliably.
9+
10+
## Configuration
11+
### Required fields
12+
| Field | Type | Description |
13+
| --- | --- | --- |
14+
| `bucket_name` | string | Target S3 bucket name |
15+
| `region` | string | AWS region (e.g., `us-east-1`) |
16+
| `file_format` | string | `csv`, `json`, or `parquet` |
17+
| `path_prefix` | string | Prefix used to group files into streams |
18+
19+
### Optional fields
20+
| Field | Type | Default | Description |
21+
| --- | --- | --- | --- |
22+
| `access_key_id` | string || Static AWS access key (pair with `secret_access_key`) |
23+
| `secret_access_key` | string || Static AWS secret key (pair with `access_key_id`) |
24+
| `endpoint` | string | AWS S3 | Override S3 endpoint (MinIO/LocalStack) |
25+
| `max_threads` | integer | 10 | Concurrent file download/parsing workers |
26+
| `retry_count` | integer | 3 | Retries for transient failures |
27+
| `compression` | string | — | Override auto-detected compression (`gzip` or `none`)
28+
29+
**Authentication note**: Omitting credentials lets the driver fall back to the AWS default credential chain (environment variables, IAM roles, instance profiles, etc.). If you provide one static credential, include the other as well.
30+
31+
### CSV-specific tuning
32+
Use the nested `csv` block to customize parsing.
33+
| Field | Type | Default | Description |
34+
| --- | --- | --- | --- |
35+
| `delimiter` | string | `","` | Field delimiter |
36+
| `has_header` | boolean | `true` | Whether the first row is a header |
37+
| `skip_rows` | integer | 0 | Rows to skip before parsing |
38+
| `quote_character` | string | `\"\"\"` | Quote character for fields |
39+
40+
### JSON-specific tuning
41+
Use the `json` block to override parsing of JSON files.
42+
| Field | Type | Default | Description |
43+
| --- | --- | --- | --- |
44+
| `line_delimited` | boolean | `true` | When true, treat each line as a separate record; set to false for JSON arrays or single objects |
45+
46+
## Commands
47+
Run the driver binaries through the repository root `build.sh` helper:
48+
```
49+
./build.sh driver-s3 discover --config /path/to/source.json
50+
./build.sh driver-s3 sync --config /path/to/source.json --catalog /path/to/catalog.json --destination /path/to/destination.json --state /path/to/state.json
51+
```
52+
- `discover` generates a `streams.json` catalog describing each folder and inferred columns.
53+
- `sync` processes files in ~2 GB chunks, injects `_last_modified_time` as the cursor, and pushes records to the destination.
54+
- The `state.json` file records `_last_modified_time` per stream so subsequent runs only process changed files.
55+
56+
## Example configuration snippets
57+
Use these as a starting point; substitute your bucket, path, and authentication values.
58+
### JSON stream example
59+
```json
60+
{
61+
"bucket_name": "your-bucket",
62+
"region": "us-east-1",
63+
"path_prefix": "data/json/",
64+
"file_format": "json",
65+
"json": { "line_delimited": true },
66+
"compression": "gzip",
67+
"max_threads": 5,
68+
"retry_count": 3
69+
}
70+
```
71+
### CSV stream example
72+
```json
73+
{
74+
"bucket_name": "your-bucket",
75+
"region": "us-east-1",
76+
"path_prefix": "data/csv/",
77+
"file_format": "csv",
78+
"csv": { "has_header": true, "delimiter": "," },
79+
"max_threads": 5,
80+
"retry_count": 3
81+
}
82+
```
83+
### Parquet stream example
84+
```json
85+
{
86+
"bucket_name": "your-bucket",
87+
"region": "us-east-1",
88+
"path_prefix": "data/parquet/",
89+
"file_format": "parquet",
90+
"compression": "none",
91+
"max_threads": 5,
92+
"retry_count": 3
93+
}
94+
```
95+
96+
## Catalog guidance (streams)
97+
- `selected_streams` selects which folders (streams) to sync; each entry maps to a stream name under your prefix (e.g., `users`).
98+
- Each `streams[]` entry includes the inferred schema, available sync modes (`full_refresh`, `incremental`), and the cursor field (`_last_modified_time`).
99+
- `_last_modified_time` is added to every stream so you can configure incremental syncs per folder.
100+
101+
Example catalog structure:
102+
```json
103+
{
104+
"selected_streams": {
105+
"data": [
106+
{ "stream_name": "users", "partition_regex": "" },
107+
{ "stream_name": "orders", "partition_regex": "" }
108+
]
109+
},
110+
"streams": [
111+
{
112+
"stream": {
113+
"name": "users",
114+
"namespace": "data",
115+
"supported_sync_modes": ["full_refresh", "incremental"],
116+
"cursor_field": "_last_modified_time",
117+
"sync_mode": "incremental"
118+
}
119+
}
120+
]
121+
}
122+
```
123+
124+
## State guidance
125+
The `state.json` structure mirrors the catalog streams and records the latest `_last_modified_time` per stream.
126+
```json
127+
{
128+
"type": "STREAM",
129+
"streams": [
130+
{ "stream": "users", "state": { "_last_modified_time": "2025-01-01T00:00:00Z", "chunks": [] } }
131+
]
132+
}
133+
```
134+
Use this file when re-running syncs to resume from the last `_last_modified_time` per stream.
135+
136+
Find more at [S3 Docs](https://olake.io/docs/category/s3)

0 commit comments

Comments
 (0)