Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ on:
push:
branches:
- master
- ci-test
pull_request:
types: [ opened, synchronize, reopened ]
types: [opened, synchronize, reopened]

env:
CCM_VERSION: "6e71061146f7ae67b84ccd2b1d90d7319b640e4c"
Expand All @@ -16,7 +17,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go: [ '1.19', '1.20' ]
go: ["1.19", "1.20"]
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v4
Expand All @@ -32,7 +33,6 @@ jobs:
name: Integration Tests
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
go: [ '1.19', '1.20' ]
cassandra_version: [ '4.0.8', '4.1.1' ]
Expand Down Expand Up @@ -199,3 +199,30 @@ jobs:
run: |
export JVM_EXTRA_OPTS="${{env.JVM_EXTRA_OPTS}}"
go test -v -run=TestAuthentication -tags "${{ matrix.tags }} gocql_debug" -timeout=15s -runauth ${{ env.args }}
integration-yugabyte:
timeout-minutes: 10
needs:
- build
name: Integration Tests (Yugabyte)
runs-on: ubuntu-latest
strategy:
matrix:
go: [ '1.19', '1.20' ]
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: ${{ matrix.go }}
- uses: actions/cache@v2
id: gomod-cache
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('go.mod') }}
restore-keys: |
${{ runner.os }}-go-
- name: Setup YugabyteDB cluster
uses: jameshartig/yugabyte-db-action@master
id: yb
- name: Integration tests
run: |
go test -tags "yugabyte gocql_debug" -timeout=5m -race ./...
4 changes: 3 additions & 1 deletion helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func goType(t TypeInfo) (reflect.Type, error) {
return reflect.TypeOf(*new(time.Duration)), nil
case TypeTimestamp:
return reflect.TypeOf(*new(time.Time)), nil
case TypeBlob:
case TypeBlob, TypeJsonb:
return reflect.TypeOf(*new([]byte)), nil
case TypeBoolean:
return reflect.TypeOf(*new(bool)), nil
Expand Down Expand Up @@ -137,6 +137,8 @@ func getCassandraBaseType(name string) Type {
return TypeSet
case "TupleType":
return TypeTuple
case "jsonb":
return TypeJsonb
default:
return TypeCustom
}
Expand Down
7 changes: 5 additions & 2 deletions marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func Marshal(info TypeInfo, value interface{}) ([]byte, error) {
}

switch info.Type() {
case TypeVarchar, TypeAscii, TypeBlob, TypeText:
case TypeVarchar, TypeAscii, TypeBlob, TypeText, TypeJsonb:
return marshalVarchar(info, value)
case TypeBoolean:
return marshalBool(info, value)
Expand Down Expand Up @@ -212,7 +212,7 @@ func Unmarshal(info TypeInfo, data []byte, value interface{}) error {
}

switch info.Type() {
case TypeVarchar, TypeAscii, TypeBlob, TypeText:
case TypeVarchar, TypeAscii, TypeBlob, TypeText, TypeJsonb:
return unmarshalVarchar(info, data, value)
case TypeBoolean:
return unmarshalBool(info, data, value)
Expand Down Expand Up @@ -2615,6 +2615,7 @@ const (
TypeSet Type = 0x0022
TypeUDT Type = 0x0030
TypeTuple Type = 0x0031
TypeJsonb Type = 0x0080 // Yugabyte YCQL JSONB
)

// String returns the name of the identifier.
Expand Down Expand Up @@ -2672,6 +2673,8 @@ func (t Type) String() string {
return "varint"
case TypeTuple:
return "tuple"
case TypeJsonb:
return "jsonb"
default:
return fmt.Sprintf("unknown_type_%d", t)
}
Expand Down
108 changes: 89 additions & 19 deletions metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package gocql
import (
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -1048,13 +1049,17 @@ func getFunctionsMetadata(session *Session, keyspaceName string) ([]FunctionMeta
if session.cfg.ProtoVersion == protoVersion1 || !session.hasAggregatesAndFunctions {
return nil, nil
}
var withoutBody bool
var tableName string
if session.useSystemSchema {
tableName = "system_schema.functions"
} else {
tableName = "system.schema_functions"
}
stmt := fmt.Sprintf(`
FuncsStmt:
var stmt string
if !withoutBody {
stmt = fmt.Sprintf(`
SELECT
function_name,
argument_types,
Expand All @@ -1065,6 +1070,18 @@ func getFunctionsMetadata(session *Session, keyspaceName string) ([]FunctionMeta
return_type
FROM %s
WHERE keyspace_name = ?`, tableName)
} else {
stmt = fmt.Sprintf(`
SELECT
function_name,
argument_types,
argument_names,
called_on_null_input,
language,
return_type
FROM %s
WHERE keyspace_name = ?`, tableName)
}

var functions []FunctionMetadata

Expand All @@ -1073,14 +1090,25 @@ func getFunctionsMetadata(session *Session, keyspaceName string) ([]FunctionMeta
function := FunctionMetadata{Keyspace: keyspaceName}
var argumentTypes []string
var returnType string
err := rows.Scan(&function.Name,
&argumentTypes,
&function.ArgumentNames,
&function.Body,
&function.CalledOnNullInput,
&function.Language,
&returnType,
)
var err error
if !withoutBody {
err = rows.Scan(&function.Name,
&argumentTypes,
&function.ArgumentNames,
&function.Body,
&function.CalledOnNullInput,
&function.Language,
&returnType,
)
} else {
err = rows.Scan(&function.Name,
&argumentTypes,
&function.ArgumentNames,
&function.CalledOnNullInput,
&function.Language,
&returnType,
)
}
if err != nil {
return nil, err
}
Expand All @@ -1093,6 +1121,14 @@ func getFunctionsMetadata(session *Session, keyspaceName string) ([]FunctionMeta
}

if err := rows.Err(); err != nil {
if !withoutBody {
// Yugabyte doesn't support the body column in the functions table
var rerr RequestError
if errors.As(err, &rerr) && rerr.Code() == ErrCodeInvalid {
withoutBody = true
goto FuncsStmt
}
}
return nil, err
}

Expand All @@ -1103,14 +1139,17 @@ func getAggregatesMetadata(session *Session, keyspaceName string) ([]AggregateMe
if session.cfg.ProtoVersion == protoVersion1 || !session.hasAggregatesAndFunctions {
return nil, nil
}
var withoutReturnType bool
var tableName string
if session.useSystemSchema {
tableName = "system_schema.aggregates"
} else {
tableName = "system.schema_aggregates"
}

stmt := fmt.Sprintf(`
AggsStmt:
var stmt string
if !withoutReturnType {
stmt = fmt.Sprintf(`
SELECT
aggregate_name,
argument_types,
Expand All @@ -1121,6 +1160,18 @@ func getAggregatesMetadata(session *Session, keyspaceName string) ([]AggregateMe
state_type
FROM %s
WHERE keyspace_name = ?`, tableName)
} else {
stmt = fmt.Sprintf(`
SELECT
aggregate_name,
argument_types,
final_func,
initcond,
state_func,
state_type
FROM %s
WHERE keyspace_name = ?`, tableName)
}

var aggregates []AggregateMetadata

Expand All @@ -1130,14 +1181,25 @@ func getAggregatesMetadata(session *Session, keyspaceName string) ([]AggregateMe
var argumentTypes []string
var returnType string
var stateType string
err := rows.Scan(&aggregate.Name,
&argumentTypes,
&aggregate.finalFunc,
&aggregate.InitCond,
&returnType,
&aggregate.stateFunc,
&stateType,
)
var err error
if !withoutReturnType {
err = rows.Scan(&aggregate.Name,
&argumentTypes,
&aggregate.finalFunc,
&aggregate.InitCond,
&returnType,
&aggregate.stateFunc,
&stateType,
)
} else {
err = rows.Scan(&aggregate.Name,
&argumentTypes,
&aggregate.finalFunc,
&aggregate.InitCond,
&aggregate.stateFunc,
&stateType,
)
}
if err != nil {
return nil, err
}
Expand All @@ -1151,6 +1213,14 @@ func getAggregatesMetadata(session *Session, keyspaceName string) ([]AggregateMe
}

if err := rows.Err(); err != nil {
if !withoutReturnType {
// Yugabyte doesn't support the return_type column in the aggregates table
var rerr RequestError
if errors.As(err, &rerr) && rerr.Code() == ErrCodeInvalid {
withoutReturnType = true
goto AggsStmt
}
}
return nil, err
}

Expand Down
70 changes: 70 additions & 0 deletions yugabyte_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
//go:build yugabyte
// +build yugabyte

package gocql

import (
"bytes"
"testing"
)

func TestJSONB(t *testing.T) {
session := createSession(t)
defer session.Close()

defer func() {
err := createTable(session, "DROP TABLE IF EXISTS gocql_test.jsonb")
if err != nil {
t.Logf("failed to delete jsonb table: %v", err)
}
}()

if err := createTable(session, "CREATE TABLE gocql_test.jsonb (id int, my_jsonb jsonb, PRIMARY KEY (id))"); err != nil {
t.Fatalf("failed to create table with error '%v'", err)
}

b := session.NewBatch(LoggedBatch)
b.Query("INSERT INTO gocql_test.jsonb(id, my_jsonb) VALUES (?,?)", 1, []byte("true"))
b.Query("INSERT INTO gocql_test.jsonb(id, my_jsonb) VALUES (?,?)", 2, []byte(`{"foo":"bar"}`))

if err := session.ExecuteBatch(b); err != nil {
t.Fatalf("query failed. %v", err)
} else {
if b.Attempts() < 1 {
t.Fatal("expected at least 1 attempt, but got 0")
}
if b.Latency() <= 0 {
t.Fatalf("expected latency to be greater than 0, but got %v instead.", b.Latency())
}
}

var id int
var myJSONB []byte
if err := session.Query("SELECT id, my_jsonb FROM gocql_test.jsonb WHERE id = 1;").Scan(&id, &myJSONB); err != nil {
t.Fatalf("Failed to select with err: %v", err)
} else if id != 1 {
t.Fatalf("Expected id = 1, got %v", id)
} else if !bytes.Equal(myJSONB, []byte("true")) {
t.Fatalf("Expected my_jsonb = true, got %v", string(myJSONB))
}

if err := session.Query("SELECT id, my_jsonb FROM gocql_test.jsonb WHERE id = 2;").Scan(&id, &myJSONB); err != nil {
t.Fatalf("Failed to select with err: %v", err)
} else if id != 2 {
t.Fatalf("Expected id = 2, got %v", id)
} else if !bytes.Equal(myJSONB, []byte(`{"foo":"bar"}`)) {
t.Fatalf(`Expected my_jsonb = {"foo":"bar"}, got %v`, string(myJSONB))
}

if rd, err := session.Query("SELECT id, my_jsonb FROM gocql_test.jsonb;").Iter().RowData(); err != nil {
t.Fatalf("Failed to select with err: %v", err)
} else if len(rd.Columns) != 2 || rd.Columns[0] != "id" || rd.Columns[1] != "my_jsonb" {
t.Fatalf("Expected [id, my_jsonb], got %v", rd.Columns)
} else if len(rd.Values) != 2 {
t.Fatalf("Expected 2 values, got %v", rd.Values)
} else if _, ok := rd.Values[0].(*int); !ok {
t.Fatalf("Expected values[0] = *int, got %T", rd.Values[0])
} else if _, ok := rd.Values[1].(*[]byte); !ok {
t.Fatalf("Expected values[1] = *[]byte, got %T", rd.Values[1])
}
}