Skip to content

Commit b9457b7

Browse files
authored
[BEAM-12513] Schemas and Coders (#15632)
1 parent d6b9156 commit b9457b7

File tree

5 files changed

+535
-12
lines changed

5 files changed

+535
-12
lines changed

sdks/go/examples/snippets/04transforms.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ func filterWordsAbove(word string, lengthCutOffIter func(*float64) bool, emitAbo
300300
var cutOff float64
301301
ok := lengthCutOffIter(&cutOff)
302302
if !ok {
303-
return fmt.Errorf("No length cutoff provided.")
303+
return fmt.Errorf("no length cutoff provided")
304304
}
305305
if float64(len(word)) > cutOff {
306306
emitAboveCutoff(word)
+143
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one or more
2+
// contributor license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright ownership.
4+
// The ASF licenses this file to You under the Apache License, Version 2.0
5+
// (the "License"); you may not use this file except in compliance with
6+
// the License. You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
package snippets
17+
18+
import (
19+
"fmt"
20+
"io"
21+
"reflect"
22+
"time"
23+
24+
"github.com/apache/beam/sdks/v2/go/pkg/beam"
25+
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
26+
)
27+
28+
// [START schema_define]
29+
30+
type Purchase struct {
31+
// ID of the user who made the purchase.
32+
UserID string `beam:"userId"`
33+
// Identifier of the item that was purchased.
34+
ItemID int64 `beam:"itemId"`
35+
// The shipping address, a nested type.
36+
ShippingAddress ShippingAddress `beam:"shippingAddress"`
37+
// The cost of the item in cents.
38+
Cost int64 `beam:"cost"`
39+
// The transactions that paid for this purchase.
40+
// A slice since the purchase might be spread out over multiple
41+
// credit cards.
42+
Transactions []Transaction `beam:"transactions"`
43+
}
44+
45+
type ShippingAddress struct {
46+
StreetAddress string `beam:"streetAddress"`
47+
City string `beam:"city"`
48+
State *string `beam:"state"`
49+
Country string `beam:"country"`
50+
PostCode string `beam:"postCode"`
51+
}
52+
53+
type Transaction struct {
54+
Bank string `beam:"bank"`
55+
PurchaseAmount float64 `beam:"purchaseAmount"`
56+
}
57+
58+
// [END schema_define]
59+
60+
// Validate that the interface is being implemented.
61+
var _ beam.SchemaProvider = &TimestampNanosProvider{}
62+
63+
// [START schema_logical_provider]
64+
65+
// TimestampNanos is a logical type using time.Time, but
66+
// encodes as a schema type.
67+
type TimestampNanos time.Time
68+
69+
func (tn TimestampNanos) Seconds() int64 {
70+
return time.Time(tn).Unix()
71+
}
72+
func (tn TimestampNanos) Nanos() int32 {
73+
return int32(time.Time(tn).UnixNano() % 1000000000)
74+
}
75+
76+
// tnStorage is the storage schema for TimestampNanos.
77+
type tnStorage struct {
78+
Seconds int64 `beam:"seconds"`
79+
Nanos int32 `beam:"nanos"`
80+
}
81+
82+
var (
83+
// reflect.Type of the Value type of TimestampNanos
84+
tnType = reflect.TypeOf((*TimestampNanos)(nil)).Elem()
85+
tnStorageType = reflect.TypeOf((*tnStorage)(nil)).Elem()
86+
)
87+
88+
// TimestampNanosProvider implements the beam.SchemaProvider interface.
89+
type TimestampNanosProvider struct{}
90+
91+
// FromLogicalType converts checks if the given type is TimestampNanos, and if so
92+
// returns the storage type.
93+
func (p *TimestampNanosProvider) FromLogicalType(rt reflect.Type) (reflect.Type, error) {
94+
if rt != tnType {
95+
return nil, fmt.Errorf("unable to provide schema.LogicalType for type %v, want %v", rt, tnType)
96+
}
97+
return tnStorageType, nil
98+
}
99+
100+
// BuildEncoder builds a Beam schema encoder for the TimestampNanos type.
101+
func (p *TimestampNanosProvider) BuildEncoder(rt reflect.Type) (func(interface{}, io.Writer) error, error) {
102+
if _, err := p.FromLogicalType(rt); err != nil {
103+
return nil, err
104+
}
105+
enc, err := coder.RowEncoderForStruct(tnStorageType)
106+
if err != nil {
107+
return nil, err
108+
}
109+
return func(iface interface{}, w io.Writer) error {
110+
v := iface.(TimestampNanos)
111+
return enc(tnStorage{
112+
Seconds: v.Seconds(),
113+
Nanos: v.Nanos(),
114+
}, w)
115+
}, nil
116+
}
117+
118+
// BuildDecoder builds a Beam schema decoder for the TimestampNanos type.
119+
func (p *TimestampNanosProvider) BuildDecoder(rt reflect.Type) (func(io.Reader) (interface{}, error), error) {
120+
if _, err := p.FromLogicalType(rt); err != nil {
121+
return nil, err
122+
}
123+
dec, err := coder.RowDecoderForStruct(tnStorageType)
124+
if err != nil {
125+
return nil, err
126+
}
127+
return func(r io.Reader) (interface{}, error) {
128+
s, err := dec(r)
129+
if err != nil {
130+
return nil, err
131+
}
132+
tn := s.(tnStorage)
133+
return TimestampNanos(time.Unix(tn.Seconds, int64(tn.Nanos))), nil
134+
}, nil
135+
}
136+
137+
// [END schema_logical_provider]
138+
139+
func LogicalTypeExample() {
140+
// [START schema_logical_register]
141+
beam.RegisterSchemaProvider(tnType, &TimestampNanosProvider{})
142+
// [END schema_logical_register]
143+
}
+170
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one or more
2+
// contributor license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright ownership.
4+
// The ASF licenses this file to You under the Apache License, Version 2.0
5+
// (the "License"); you may not use this file except in compliance with
6+
// the License. You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
package snippets
17+
18+
import (
19+
"fmt"
20+
"reflect"
21+
"testing"
22+
"time"
23+
24+
"github.com/apache/beam/sdks/v2/go/pkg/beam"
25+
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder/testutil"
26+
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
27+
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
28+
"github.com/google/go-cmp/cmp"
29+
"google.golang.org/protobuf/proto"
30+
"google.golang.org/protobuf/testing/protocmp"
31+
)
32+
33+
func atomicSchemaField(name string, typ pipepb.AtomicType) *pipepb.Field {
34+
return &pipepb.Field{
35+
Name: name,
36+
Type: &pipepb.FieldType{
37+
TypeInfo: &pipepb.FieldType_AtomicType{
38+
AtomicType: typ,
39+
},
40+
},
41+
}
42+
}
43+
44+
func rowSchemaField(name string, typ *pipepb.Schema) *pipepb.Field {
45+
return &pipepb.Field{
46+
Name: name,
47+
Type: &pipepb.FieldType{
48+
TypeInfo: &pipepb.FieldType_RowType{
49+
RowType: &pipepb.RowType{
50+
Schema: typ,
51+
},
52+
},
53+
},
54+
}
55+
}
56+
57+
func listSchemaField(name string, typ *pipepb.Field) *pipepb.Field {
58+
return &pipepb.Field{
59+
Name: name,
60+
Type: &pipepb.FieldType{
61+
TypeInfo: &pipepb.FieldType_ArrayType{
62+
ArrayType: &pipepb.ArrayType{
63+
ElementType: typ.GetType(),
64+
},
65+
},
66+
},
67+
}
68+
}
69+
70+
func nillable(f *pipepb.Field) *pipepb.Field {
71+
f.Type.Nullable = true
72+
return f
73+
}
74+
75+
func TestSchemaTypes(t *testing.T) {
76+
transactionSchema := &pipepb.Schema{
77+
Fields: []*pipepb.Field{
78+
atomicSchemaField("bank", pipepb.AtomicType_STRING),
79+
atomicSchemaField("purchaseAmount", pipepb.AtomicType_DOUBLE),
80+
},
81+
}
82+
shippingAddressSchema := &pipepb.Schema{
83+
Fields: []*pipepb.Field{
84+
atomicSchemaField("streetAddress", pipepb.AtomicType_STRING),
85+
atomicSchemaField("city", pipepb.AtomicType_STRING),
86+
nillable(atomicSchemaField("state", pipepb.AtomicType_STRING)),
87+
atomicSchemaField("country", pipepb.AtomicType_STRING),
88+
atomicSchemaField("postCode", pipepb.AtomicType_STRING),
89+
},
90+
}
91+
92+
tests := []struct {
93+
rt reflect.Type
94+
st *pipepb.Schema
95+
preReg func(reg *schema.Registry)
96+
}{{
97+
rt: reflect.TypeOf(Transaction{}),
98+
st: transactionSchema,
99+
}, {
100+
rt: reflect.TypeOf(ShippingAddress{}),
101+
st: shippingAddressSchema,
102+
}, {
103+
rt: reflect.TypeOf(Purchase{}),
104+
st: &pipepb.Schema{
105+
Fields: []*pipepb.Field{
106+
atomicSchemaField("userId", pipepb.AtomicType_STRING),
107+
atomicSchemaField("itemId", pipepb.AtomicType_INT64),
108+
rowSchemaField("shippingAddress", shippingAddressSchema),
109+
atomicSchemaField("cost", pipepb.AtomicType_INT64),
110+
listSchemaField("transactions",
111+
rowSchemaField("n/a", transactionSchema)),
112+
},
113+
},
114+
}, {
115+
rt: tnType,
116+
st: &pipepb.Schema{
117+
Fields: []*pipepb.Field{
118+
atomicSchemaField("seconds", pipepb.AtomicType_INT64),
119+
atomicSchemaField("nanos", pipepb.AtomicType_INT32),
120+
},
121+
},
122+
preReg: func(reg *schema.Registry) {
123+
reg.RegisterLogicalType(schema.ToLogicalType(tnType.Name(), tnType, tnStorageType))
124+
},
125+
}}
126+
for _, test := range tests {
127+
t.Run(fmt.Sprintf("%v", test.rt), func(t *testing.T) {
128+
reg := schema.NewRegistry()
129+
if test.preReg != nil {
130+
test.preReg(reg)
131+
}
132+
{
133+
got, err := reg.FromType(test.rt)
134+
if err != nil {
135+
t.Fatalf("error FromType(%v) = %v", test.rt, err)
136+
}
137+
if d := cmp.Diff(test.st, got,
138+
protocmp.Transform(),
139+
protocmp.IgnoreFields(proto.Message(&pipepb.Schema{}), "id", "options"),
140+
); d != "" {
141+
t.Errorf("diff (-want, +got): %v", d)
142+
}
143+
}
144+
})
145+
}
146+
}
147+
148+
func TestSchema_validate(t *testing.T) {
149+
tests := []struct {
150+
rt reflect.Type
151+
p beam.SchemaProvider
152+
logical, storage interface{}
153+
}{
154+
{
155+
rt: tnType,
156+
p: &TimestampNanosProvider{},
157+
logical: TimestampNanos(time.Unix(2300003, 456789)),
158+
storage: tnStorage{},
159+
},
160+
}
161+
for _, test := range tests {
162+
sc := &testutil.SchemaCoder{
163+
CmpOptions: cmp.Options{
164+
cmp.Comparer(func(a, b TimestampNanos) bool {
165+
return a.Seconds() == b.Seconds() && a.Nanos() == b.Nanos()
166+
})},
167+
}
168+
sc.Validate(t, test.rt, test.p.BuildEncoder, test.p.BuildDecoder, test.storage, test.logical)
169+
}
170+
}

sdks/go/pkg/beam/schema.go

+19-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package beam
1717

1818
import (
19+
"fmt"
1920
"io"
2021
"reflect"
2122

@@ -55,7 +56,24 @@ import (
5556
// is called in a package init() function.
5657
func RegisterSchemaProvider(rt reflect.Type, provider interface{}) {
5758
p := provider.(SchemaProvider)
58-
schema.RegisterLogicalTypeProvider(rt, p.FromLogicalType)
59+
switch rt.Kind() {
60+
case reflect.Interface:
61+
schema.RegisterLogicalTypeProvider(rt, p.FromLogicalType)
62+
case reflect.Ptr:
63+
if rt.Elem().Kind() != reflect.Struct {
64+
panic(fmt.Sprintf("beam.RegisterSchemaProvider: unsupported type kind for schema provider %v is a %v, must be interface, struct or *struct.", rt, rt.Kind()))
65+
}
66+
fallthrough
67+
case reflect.Struct:
68+
st, err := p.FromLogicalType(rt)
69+
if err != nil {
70+
panic(fmt.Sprintf("beam.RegisterSchemaProvider: schema type provider for %v, doesn't support that type", rt))
71+
}
72+
schema.RegisterLogicalType(schema.ToLogicalType(rt.Name(), rt, st))
73+
default:
74+
panic(fmt.Sprintf("beam.RegisterSchemaProvider: unsupported type kind for schema provider %v is a %v, must be interface, struct or *struct.", rt, rt.Kind()))
75+
}
76+
5977
coder.RegisterSchemaProviders(rt, p.BuildEncoder, p.BuildDecoder)
6078
}
6179

0 commit comments

Comments
 (0)