diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 98fb6b791..17986178e 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -4,8 +4,9 @@ on: push: branches: - master + - ci-test pull_request: - types: [ opened, synchronize, reopened ] + types: [opened, synchronize, reopened] env: CCM_VERSION: "6e71061146f7ae67b84ccd2b1d90d7319b640e4c" @@ -16,7 +17,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go: [ '1.19', '1.20' ] + go: ["1.19", "1.20"] steps: - uses: actions/checkout@v3 - uses: actions/setup-go@v4 @@ -32,7 +33,6 @@ jobs: name: Integration Tests runs-on: ubuntu-latest strategy: - fail-fast: false matrix: go: [ '1.19', '1.20' ] cassandra_version: [ '4.0.8', '4.1.1' ] @@ -199,3 +199,30 @@ jobs: run: | export JVM_EXTRA_OPTS="${{env.JVM_EXTRA_OPTS}}" go test -v -run=TestAuthentication -tags "${{ matrix.tags }} gocql_debug" -timeout=15s -runauth ${{ env.args }} + integration-yugabyte: + timeout-minutes: 10 + needs: + - build + name: Integration Tests (Yugabyte) + runs-on: ubuntu-latest + strategy: + matrix: + go: [ '1.19', '1.20' ] + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-go@v2 + with: + go-version: ${{ matrix.go }} + - uses: actions/cache@v2 + id: gomod-cache + with: + path: ~/go/pkg/mod + key: ${{ runner.os }}-go-${{ hashFiles('go.mod') }} + restore-keys: | + ${{ runner.os }}-go- + - name: Setup YugabyteDB cluster + uses: jameshartig/yugabyte-db-action@master + id: yb + - name: Integration tests + run: | + go test -tags "yugabyte gocql_debug" -timeout=5m -race ./... diff --git a/helpers.go b/helpers.go index 00f339779..3cd3426db 100644 --- a/helpers.go +++ b/helpers.go @@ -30,7 +30,7 @@ func goType(t TypeInfo) (reflect.Type, error) { return reflect.TypeOf(*new(time.Duration)), nil case TypeTimestamp: return reflect.TypeOf(*new(time.Time)), nil - case TypeBlob: + case TypeBlob, TypeJsonb: return reflect.TypeOf(*new([]byte)), nil case TypeBoolean: return reflect.TypeOf(*new(bool)), nil @@ -137,6 +137,8 @@ func getCassandraBaseType(name string) Type { return TypeSet case "TupleType": return TypeTuple + case "jsonb": + return TypeJsonb default: return TypeCustom } diff --git a/marshal.go b/marshal.go index d47644901..a1efc8f0a 100644 --- a/marshal.go +++ b/marshal.go @@ -110,7 +110,7 @@ func Marshal(info TypeInfo, value interface{}) ([]byte, error) { } switch info.Type() { - case TypeVarchar, TypeAscii, TypeBlob, TypeText: + case TypeVarchar, TypeAscii, TypeBlob, TypeText, TypeJsonb: return marshalVarchar(info, value) case TypeBoolean: return marshalBool(info, value) @@ -212,7 +212,7 @@ func Unmarshal(info TypeInfo, data []byte, value interface{}) error { } switch info.Type() { - case TypeVarchar, TypeAscii, TypeBlob, TypeText: + case TypeVarchar, TypeAscii, TypeBlob, TypeText, TypeJsonb: return unmarshalVarchar(info, data, value) case TypeBoolean: return unmarshalBool(info, data, value) @@ -2615,6 +2615,7 @@ const ( TypeSet Type = 0x0022 TypeUDT Type = 0x0030 TypeTuple Type = 0x0031 + TypeJsonb Type = 0x0080 // Yugabyte YCQL JSONB ) // String returns the name of the identifier. @@ -2672,6 +2673,8 @@ func (t Type) String() string { return "varint" case TypeTuple: return "tuple" + case TypeJsonb: + return "jsonb" default: return fmt.Sprintf("unknown_type_%d", t) } diff --git a/metadata.go b/metadata.go index 13345d937..3597cc047 100644 --- a/metadata.go +++ b/metadata.go @@ -7,6 +7,7 @@ package gocql import ( "encoding/hex" "encoding/json" + "errors" "fmt" "strconv" "strings" @@ -1048,13 +1049,17 @@ func getFunctionsMetadata(session *Session, keyspaceName string) ([]FunctionMeta if session.cfg.ProtoVersion == protoVersion1 || !session.hasAggregatesAndFunctions { return nil, nil } + var withoutBody bool var tableName string if session.useSystemSchema { tableName = "system_schema.functions" } else { tableName = "system.schema_functions" } - stmt := fmt.Sprintf(` +FuncsStmt: + var stmt string + if !withoutBody { + stmt = fmt.Sprintf(` SELECT function_name, argument_types, @@ -1065,6 +1070,18 @@ func getFunctionsMetadata(session *Session, keyspaceName string) ([]FunctionMeta return_type FROM %s WHERE keyspace_name = ?`, tableName) + } else { + stmt = fmt.Sprintf(` + SELECT + function_name, + argument_types, + argument_names, + called_on_null_input, + language, + return_type + FROM %s + WHERE keyspace_name = ?`, tableName) + } var functions []FunctionMetadata @@ -1073,14 +1090,25 @@ func getFunctionsMetadata(session *Session, keyspaceName string) ([]FunctionMeta function := FunctionMetadata{Keyspace: keyspaceName} var argumentTypes []string var returnType string - err := rows.Scan(&function.Name, - &argumentTypes, - &function.ArgumentNames, - &function.Body, - &function.CalledOnNullInput, - &function.Language, - &returnType, - ) + var err error + if !withoutBody { + err = rows.Scan(&function.Name, + &argumentTypes, + &function.ArgumentNames, + &function.Body, + &function.CalledOnNullInput, + &function.Language, + &returnType, + ) + } else { + err = rows.Scan(&function.Name, + &argumentTypes, + &function.ArgumentNames, + &function.CalledOnNullInput, + &function.Language, + &returnType, + ) + } if err != nil { return nil, err } @@ -1093,6 +1121,14 @@ func getFunctionsMetadata(session *Session, keyspaceName string) ([]FunctionMeta } if err := rows.Err(); err != nil { + if !withoutBody { + // Yugabyte doesn't support the body column in the functions table + var rerr RequestError + if errors.As(err, &rerr) && rerr.Code() == ErrCodeInvalid { + withoutBody = true + goto FuncsStmt + } + } return nil, err } @@ -1103,14 +1139,17 @@ func getAggregatesMetadata(session *Session, keyspaceName string) ([]AggregateMe if session.cfg.ProtoVersion == protoVersion1 || !session.hasAggregatesAndFunctions { return nil, nil } + var withoutReturnType bool var tableName string if session.useSystemSchema { tableName = "system_schema.aggregates" } else { tableName = "system.schema_aggregates" } - - stmt := fmt.Sprintf(` +AggsStmt: + var stmt string + if !withoutReturnType { + stmt = fmt.Sprintf(` SELECT aggregate_name, argument_types, @@ -1121,6 +1160,18 @@ func getAggregatesMetadata(session *Session, keyspaceName string) ([]AggregateMe state_type FROM %s WHERE keyspace_name = ?`, tableName) + } else { + stmt = fmt.Sprintf(` + SELECT + aggregate_name, + argument_types, + final_func, + initcond, + state_func, + state_type + FROM %s + WHERE keyspace_name = ?`, tableName) + } var aggregates []AggregateMetadata @@ -1130,14 +1181,25 @@ func getAggregatesMetadata(session *Session, keyspaceName string) ([]AggregateMe var argumentTypes []string var returnType string var stateType string - err := rows.Scan(&aggregate.Name, - &argumentTypes, - &aggregate.finalFunc, - &aggregate.InitCond, - &returnType, - &aggregate.stateFunc, - &stateType, - ) + var err error + if !withoutReturnType { + err = rows.Scan(&aggregate.Name, + &argumentTypes, + &aggregate.finalFunc, + &aggregate.InitCond, + &returnType, + &aggregate.stateFunc, + &stateType, + ) + } else { + err = rows.Scan(&aggregate.Name, + &argumentTypes, + &aggregate.finalFunc, + &aggregate.InitCond, + &aggregate.stateFunc, + &stateType, + ) + } if err != nil { return nil, err } @@ -1151,6 +1213,14 @@ func getAggregatesMetadata(session *Session, keyspaceName string) ([]AggregateMe } if err := rows.Err(); err != nil { + if !withoutReturnType { + // Yugabyte doesn't support the return_type column in the aggregates table + var rerr RequestError + if errors.As(err, &rerr) && rerr.Code() == ErrCodeInvalid { + withoutReturnType = true + goto AggsStmt + } + } return nil, err } diff --git a/yugabyte_test.go b/yugabyte_test.go new file mode 100644 index 000000000..fad97caa3 --- /dev/null +++ b/yugabyte_test.go @@ -0,0 +1,70 @@ +//go:build yugabyte +// +build yugabyte + +package gocql + +import ( + "bytes" + "testing" +) + +func TestJSONB(t *testing.T) { + session := createSession(t) + defer session.Close() + + defer func() { + err := createTable(session, "DROP TABLE IF EXISTS gocql_test.jsonb") + if err != nil { + t.Logf("failed to delete jsonb table: %v", err) + } + }() + + if err := createTable(session, "CREATE TABLE gocql_test.jsonb (id int, my_jsonb jsonb, PRIMARY KEY (id))"); err != nil { + t.Fatalf("failed to create table with error '%v'", err) + } + + b := session.NewBatch(LoggedBatch) + b.Query("INSERT INTO gocql_test.jsonb(id, my_jsonb) VALUES (?,?)", 1, []byte("true")) + b.Query("INSERT INTO gocql_test.jsonb(id, my_jsonb) VALUES (?,?)", 2, []byte(`{"foo":"bar"}`)) + + if err := session.ExecuteBatch(b); err != nil { + t.Fatalf("query failed. %v", err) + } else { + if b.Attempts() < 1 { + t.Fatal("expected at least 1 attempt, but got 0") + } + if b.Latency() <= 0 { + t.Fatalf("expected latency to be greater than 0, but got %v instead.", b.Latency()) + } + } + + var id int + var myJSONB []byte + if err := session.Query("SELECT id, my_jsonb FROM gocql_test.jsonb WHERE id = 1;").Scan(&id, &myJSONB); err != nil { + t.Fatalf("Failed to select with err: %v", err) + } else if id != 1 { + t.Fatalf("Expected id = 1, got %v", id) + } else if !bytes.Equal(myJSONB, []byte("true")) { + t.Fatalf("Expected my_jsonb = true, got %v", string(myJSONB)) + } + + if err := session.Query("SELECT id, my_jsonb FROM gocql_test.jsonb WHERE id = 2;").Scan(&id, &myJSONB); err != nil { + t.Fatalf("Failed to select with err: %v", err) + } else if id != 2 { + t.Fatalf("Expected id = 2, got %v", id) + } else if !bytes.Equal(myJSONB, []byte(`{"foo":"bar"}`)) { + t.Fatalf(`Expected my_jsonb = {"foo":"bar"}, got %v`, string(myJSONB)) + } + + if rd, err := session.Query("SELECT id, my_jsonb FROM gocql_test.jsonb;").Iter().RowData(); err != nil { + t.Fatalf("Failed to select with err: %v", err) + } else if len(rd.Columns) != 2 || rd.Columns[0] != "id" || rd.Columns[1] != "my_jsonb" { + t.Fatalf("Expected [id, my_jsonb], got %v", rd.Columns) + } else if len(rd.Values) != 2 { + t.Fatalf("Expected 2 values, got %v", rd.Values) + } else if _, ok := rd.Values[0].(*int); !ok { + t.Fatalf("Expected values[0] = *int, got %T", rd.Values[0]) + } else if _, ok := rd.Values[1].(*[]byte); !ok { + t.Fatalf("Expected values[1] = *[]byte, got %T", rd.Values[1]) + } +}