Skip to content

Commit 9a47881

Browse files
chore: staging -> master v0.3.6 (#689)
Signed-off-by: badalprasadsingh <badal@datazip.io> Signed-off-by: Badal Prasad Singh <badal@datazip.io> Co-authored-by: Ankit Sharma <111491139+hash-data@users.noreply.github.com> Co-authored-by: hash-data <ankit@datazip.io>
1 parent 42cddeb commit 9a47881

21 files changed

Lines changed: 7365 additions & 399 deletions

File tree

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
package arrowwriter
2+
3+
import (
4+
"encoding/binary"
5+
"errors"
6+
"fmt"
7+
"math"
8+
"net/url"
9+
"regexp"
10+
"strconv"
11+
"strings"
12+
"time"
13+
14+
"github.com/twmb/murmur3"
15+
)
16+
17+
// The current transform logic is limited to the data types handled by OLake.
18+
// As OLake starts supporting more data types, we will update the transformations logic here.
19+
//
20+
// Supported Transforms:
21+
// - Identity, Void: All data types
22+
// - Bucket: int, long, string, timestamptz
23+
// - Truncate: int, long, string
24+
// - Year, Month, Day, Hour: timestamptz
25+
26+
const NULL = "null"
27+
28+
var transformPattern = regexp.MustCompile(`^([a-zA-Z]+)(?:\[(\d+)\])?$`)
29+
30+
func parseTransform(transform string) (base string, arg int, err error) {
31+
if transform == "" {
32+
return "", -1, errors.New("empty transform")
33+
}
34+
35+
m := transformPattern.FindStringSubmatch(transform)
36+
if m == nil {
37+
return "", -1, errors.New("invalid transform")
38+
}
39+
40+
base = strings.ToLower(m[1])
41+
if m[2] != "" {
42+
arg, err = strconv.Atoi(m[2])
43+
if err != nil {
44+
return "", -1, fmt.Errorf("invalid numeric argument in transform %s: %s", transform, err)
45+
}
46+
}
47+
48+
return base, arg, nil
49+
}
50+
51+
func hashInt[T ~int32 | ~int64 | ~int](v T) uint32 {
52+
var buf [8]byte
53+
binary.LittleEndian.PutUint64(buf[:], uint64(v))
54+
55+
return murmur3.Sum32(buf[:])
56+
}
57+
58+
func hashString(s string) uint32 {
59+
return murmur3.Sum32([]byte(s))
60+
}
61+
62+
func identityTransform(val any, colType string) (string, error) {
63+
switch colType {
64+
case "boolean":
65+
return strconv.FormatBool(val.(bool)), nil
66+
case "timestamptz":
67+
t := val.(time.Time).UTC()
68+
return t.Format("2006-01-02T15:04:05-07:00"), nil
69+
default:
70+
return fmt.Sprintf("%v", val), nil
71+
}
72+
}
73+
74+
func timeTransform(val any, unit string, colType string) (string, error) {
75+
if colType != "timestamptz" {
76+
return "", fmt.Errorf("unsupported time transform %q", unit)
77+
}
78+
79+
v, _ := val.(time.Time)
80+
v = v.UTC()
81+
82+
switch unit {
83+
case "year":
84+
return strconv.Itoa(v.Year()), nil
85+
case "month":
86+
return v.Format("2006-01"), nil
87+
case "day":
88+
return v.Format("2006-01-02"), nil
89+
case "hour":
90+
return v.Format("2006-01-02-15"), nil
91+
default:
92+
return "", fmt.Errorf("unsupported time transform %q", unit)
93+
}
94+
}
95+
96+
func bucketTransform(val any, num int, colType string) (string, error) {
97+
if num <= 0 {
98+
return "", fmt.Errorf("invalid number of buckets: %d (must be > 0)", num)
99+
}
100+
101+
var h uint32
102+
switch colType {
103+
case "int":
104+
v, _ := val.(int32)
105+
h = hashInt(v)
106+
case "long":
107+
v, _ := val.(int64)
108+
h = hashInt(v)
109+
case "timestamptz":
110+
tm, ok := val.(time.Time)
111+
if !ok {
112+
return "", fmt.Errorf("expected time.Time for colType %q, got %T", colType, val)
113+
}
114+
h = hashInt(tm.UnixMicro())
115+
case "string":
116+
str, ok := val.(string)
117+
if !ok {
118+
return "", fmt.Errorf("expected string for colType %q, got %T", colType, val)
119+
}
120+
h = hashString(str)
121+
default:
122+
return "", fmt.Errorf("unsupported colType %q for bucket transform", colType)
123+
}
124+
125+
masked := int(h & 0x7FFFFFFF)
126+
bucket := masked % num
127+
return strconv.Itoa(bucket), nil
128+
}
129+
130+
func truncateTransform(val any, n int, colType string) (string, error) {
131+
if n <= 0 {
132+
return "", fmt.Errorf("invalid truncate width: %d (must be > 0)", n)
133+
}
134+
135+
switch colType {
136+
case "int":
137+
v, _ := val.(int32)
138+
if n > math.MaxInt32 {
139+
return "", fmt.Errorf("truncate width %d exceeds int32 range", n)
140+
}
141+
n32 := int32(n)
142+
// Using Iceberg's formula for proper negative number handling
143+
trunc := v - (((v % n32) + n32) % n32)
144+
145+
return fmt.Sprintf("%d", trunc), nil
146+
case "long":
147+
v, _ := val.(int64)
148+
n64 := int64(n)
149+
// Using Iceberg's formula for proper negative number handling
150+
trunc := v - (((v % n64) + n64) % n64)
151+
152+
return fmt.Sprintf("%d", trunc), nil
153+
case "string":
154+
v, ok := val.(string)
155+
if !ok {
156+
return "", fmt.Errorf("expected string for colType %q, got %T", colType, val)
157+
}
158+
159+
// Truncate by unicode code points, not bytes
160+
runes := []rune(v)
161+
if len(runes) <= n {
162+
return v, nil
163+
}
164+
165+
return string(runes[:n]), nil
166+
default:
167+
return "", fmt.Errorf("unsupported colType %q for truncate transform", colType)
168+
}
169+
}
170+
171+
func ConstructColPath(valueStr, field, transform string) string {
172+
base, _, _ := parseTransform(transform)
173+
174+
encodedField := url.QueryEscape(field)
175+
encodedValue := url.QueryEscape(valueStr)
176+
177+
if base == "identity" {
178+
return fmt.Sprintf("%s=%s", encodedField, encodedValue)
179+
}
180+
181+
switch base {
182+
case "bucket":
183+
return fmt.Sprintf("%s_bucket=%s", encodedField, encodedValue)
184+
case "truncate":
185+
return fmt.Sprintf("%s_trunc=%s", encodedField, encodedValue)
186+
default:
187+
return fmt.Sprintf("%s_%s=%s", encodedField, base, encodedValue)
188+
}
189+
}
190+
191+
func TransformValue(val any, transform string, colType string) (string, error) {
192+
transform = strings.TrimSpace(strings.ToLower(transform))
193+
if val == nil {
194+
return NULL, nil
195+
}
196+
197+
base, arg, err := parseTransform(transform)
198+
if err != nil {
199+
return "", err
200+
}
201+
202+
switch base {
203+
case "identity":
204+
return identityTransform(val, colType)
205+
case "void":
206+
return NULL, nil
207+
case "year", "month", "day", "hour":
208+
return timeTransform(val, base, colType)
209+
case "bucket":
210+
return bucketTransform(val, arg, colType)
211+
case "truncate":
212+
return truncateTransform(val, arg, colType)
213+
default:
214+
return "", fmt.Errorf("unknown partition transform %q", transform)
215+
}
216+
}

0 commit comments

Comments
 (0)