Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [2.1.0]

### Added

- Session.StatementMetadata (CASSGO-92)
- NewLogFieldIP, NewLogFieldError, NewLogFieldStringer, NewLogFieldString, NewLogFieldInt, NewLogFieldBool (CASSGO-92)

### Fixed

- Prevent panic with queries during session init (CASSGO-92)

## [2.0.0]

### Removed
Expand Down
139 changes: 72 additions & 67 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2794,66 +2794,62 @@ func TestKeyspaceMetadata(t *testing.T) {
}

// Integration test of the routing key calculation
func TestRoutingKey(t *testing.T) {
func TestRoutingStatementMetadata(t *testing.T) {
session := createSession(t)
defer session.Close()

if err := createTable(session, "CREATE TABLE gocql_test.test_single_routing_key (first_id int, second_id int, PRIMARY KEY (first_id, second_id))"); err != nil {
if err := createTable(session, "CREATE TABLE gocql_test.test_single_routing_key (first_id int, second_id varchar, PRIMARY KEY (first_id, second_id))"); err != nil {
t.Fatalf("failed to create table with error '%v'", err)
}
if err := createTable(session, "CREATE TABLE gocql_test.test_composite_routing_key (first_id int, second_id int, PRIMARY KEY ((first_id, second_id)))"); err != nil {
if err := createTable(session, "CREATE TABLE gocql_test.test_composite_routing_key (first_id int, second_id varchar, PRIMARY KEY ((first_id, second_id)))"); err != nil {
t.Fatalf("failed to create table with error '%v'", err)
}

routingKeyInfo, err := session.routingKeyInfo(context.Background(), "SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?", "")
meta, err := session.routingStatementMetadata(context.Background(), "SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?", "")
if err != nil {
t.Fatalf("failed to get routing key info due to error: %v", err)
t.Fatalf("failed to get routing statement metadata due to error: %v", err)
}
if routingKeyInfo == nil {
t.Fatal("Expected routing key info, but was nil")
if meta == nil {
t.Fatal("Expected routing statement metadata, but was nil")
}
if len(routingKeyInfo.indexes) != 1 {
t.Fatalf("Expected routing key indexes length to be 1 but was %d", len(routingKeyInfo.indexes))
if len(meta.PKBindColumnIndexes) != 1 {
t.Fatalf("Expected routing statement metadata PKBindColumnIndexes length to be 1 but was %d", len(meta.PKBindColumnIndexes))
}
if routingKeyInfo.indexes[0] != 1 {
t.Errorf("Expected routing key index[0] to be 1 but was %d", routingKeyInfo.indexes[0])
if meta.PKBindColumnIndexes[0] != 1 {
t.Errorf("Expected routing statement metadata PKBindColumnIndexes[0] to be 1 but was %d", meta.PKBindColumnIndexes[0])
}
if len(routingKeyInfo.types) != 1 {
t.Fatalf("Expected routing key types length to be 1 but was %d", len(routingKeyInfo.types))
if len(meta.BindColumns) != 2 {
t.Fatalf("Expected routing statement metadata BindColumns length to be 2 but was %d", len(meta.BindColumns))
}
if routingKeyInfo.types[0] == nil {
t.Fatal("Expected routing key types[0] to be non-nil")
if meta.BindColumns[0].TypeInfo.Type() != TypeVarchar {
t.Fatalf("Expected routing statement metadata BindColumns[0].TypeInfo.Type to be %v but was %v", TypeVarchar, meta.BindColumns[0].TypeInfo.Type())
}
if routingKeyInfo.types[0].Type() != TypeInt {
t.Fatalf("Expected routing key types[0].Type to be %v but was %v", TypeInt, routingKeyInfo.types[0].Type())
if meta.BindColumns[1].TypeInfo.Type() != TypeInt {
t.Fatalf("Expected routing statement metadata BindColumns[1].TypeInfo.Type to be %v but was %v", TypeInt, meta.BindColumns[1].TypeInfo.Type())
}

// verify the cache is working
routingKeyInfo, err = session.routingKeyInfo(context.Background(), "SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?", "")
if err != nil {
t.Fatalf("failed to get routing key info due to error: %v", err)
if len(meta.ResultColumns) != 2 {
t.Fatalf("Expected routing statement metadata ResultColumns length to be 2 but was %d", len(meta.ResultColumns))
}
if len(routingKeyInfo.indexes) != 1 {
t.Fatalf("Expected routing key indexes length to be 1 but was %d", len(routingKeyInfo.indexes))
if meta.ResultColumns[0].Name != "first_id" {
t.Fatalf("Expected routing statement metadata ResultColumns[0].Name to be %v but was %v", "first_id", meta.ResultColumns[0].Name)
}
if routingKeyInfo.indexes[0] != 1 {
t.Errorf("Expected routing key index[0] to be 1 but was %d", routingKeyInfo.indexes[0])
if meta.ResultColumns[0].TypeInfo.Type() != TypeInt {
t.Fatalf("Expected routing statement metadata ResultColumns[0].TypeInfo.Type to be %v but was %v", TypeInt, meta.ResultColumns[0].TypeInfo.Type())
}
if len(routingKeyInfo.types) != 1 {
t.Fatalf("Expected routing key types length to be 1 but was %d", len(routingKeyInfo.types))
if meta.ResultColumns[1].Name != "second_id" {
t.Fatalf("Expected routing statement metadata ResultColumns[1].Name to be %v but was %v", "second_id", meta.ResultColumns[1].Name)
}
if routingKeyInfo.types[0] == nil {
t.Fatal("Expected routing key types[0] to be non-nil")
if meta.ResultColumns[1].TypeInfo.Type() != TypeVarchar {
t.Fatalf("Expected routing statement metadata ResultColumns[1].TypeInfo.Type to be %v but was %v", TypeVarchar, meta.ResultColumns[1].TypeInfo.Type())
}
if routingKeyInfo.types[0].Type() != TypeInt {
t.Fatalf("Expected routing key types[0] to be %v but was %v", TypeInt, routingKeyInfo.types[0].Type())
}
cacheSize := session.routingKeyInfoCache.lru.Len()

// verify the cache is working
cacheSize := session.routingMetadataCache.lru.Len()
if cacheSize != 1 {
t.Errorf("Expected cache size to be 1 but was %d", cacheSize)
}

query := newInternalQuery(session.Query("SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?", 1, 2), nil)
query := newInternalQuery(session.Query("SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?", "1", 2), nil)
routingKey, err := query.GetRoutingKey()
if err != nil {
t.Fatalf("Failed to get routing key due to error: %v", err)
Expand All @@ -2863,50 +2859,59 @@ func TestRoutingKey(t *testing.T) {
t.Errorf("Expected routing key %v but was %v", expectedRoutingKey, routingKey)
}

routingKeyInfo, err = session.routingKeyInfo(context.Background(), "SELECT * FROM test_composite_routing_key WHERE second_id=? AND first_id=?", "")
meta, err = session.routingStatementMetadata(context.Background(), "SELECT * FROM test_composite_routing_key WHERE second_id=? AND first_id=?", "")
if err != nil {
t.Fatalf("failed to get routing key info due to error: %v", err)
t.Fatalf("failed to get routing statement metadata due to error: %v", err)
}
if meta == nil {
t.Fatal("Expected routing statement metadata, but was nil")
}
if len(meta.PKBindColumnIndexes) != 2 {
t.Fatalf("Expected routing statement metadata PKBindColumnIndexes length to be 2 but was %d", len(meta.PKBindColumnIndexes))
}
if meta.PKBindColumnIndexes[0] != 1 {
t.Errorf("Expected routing statement metadata PKBindColumnIndexes[0] to be 1 but was %d", meta.PKBindColumnIndexes[0])
}
if routingKeyInfo == nil {
t.Fatal("Expected routing key info, but was nil")
if meta.PKBindColumnIndexes[1] != 0 {
t.Errorf("Expected routing statement metadata PKBindColumnIndexes[1] to be 0 but was %d", meta.PKBindColumnIndexes[1])
}
if len(routingKeyInfo.indexes) != 2 {
t.Fatalf("Expected routing key indexes length to be 2 but was %d", len(routingKeyInfo.indexes))
if len(meta.BindColumns) != 2 {
t.Fatalf("Expected routing statement metadata BindColumns length to be 2 but was %d", len(meta.BindColumns))
}
if routingKeyInfo.indexes[0] != 1 {
t.Errorf("Expected routing key index[0] to be 1 but was %d", routingKeyInfo.indexes[0])
if meta.BindColumns[0].TypeInfo.Type() != TypeVarchar {
t.Fatalf("Expected routing statement metadata BindColumns[0].TypeInfo.Type to be %v but was %v", TypeVarchar, meta.BindColumns[0].TypeInfo.Type())
}
if routingKeyInfo.indexes[1] != 0 {
t.Errorf("Expected routing key index[1] to be 0 but was %d", routingKeyInfo.indexes[1])
if meta.BindColumns[1].TypeInfo.Type() != TypeInt {
t.Fatalf("Expected routing statement metadata BindColumns[1].TypeInfo.Type to be %v but was %v", TypeInt, meta.BindColumns[1].TypeInfo.Type())
}
if len(routingKeyInfo.types) != 2 {
t.Fatalf("Expected routing key types length to be 1 but was %d", len(routingKeyInfo.types))
if len(meta.ResultColumns) != 2 {
t.Fatalf("Expected routing statement metadata ResultColumns length to be 2 but was %d", len(meta.ResultColumns))
}
if routingKeyInfo.types[0] == nil {
t.Fatal("Expected routing key types[0] to be non-nil")
if meta.ResultColumns[0].Name != "first_id" {
t.Fatalf("Expected routing statement metadata ResultColumns[0].Name to be %v but was %v", "first_id", meta.ResultColumns[0].Name)
}
if routingKeyInfo.types[0].Type() != TypeInt {
t.Fatalf("Expected routing key types[0] to be %v but was %v", TypeInt, routingKeyInfo.types[0].Type())
if meta.ResultColumns[0].TypeInfo.Type() != TypeInt {
t.Fatalf("Expected routing statement metadata ResultColumns[0].TypeInfo.Type to be %v but was %v", TypeInt, meta.ResultColumns[0].TypeInfo.Type())
}
if routingKeyInfo.types[1] == nil {
t.Fatal("Expected routing key types[1] to be non-nil")
if meta.ResultColumns[1].Name != "second_id" {
t.Fatalf("Expected routing statement metadata ResultColumns[1].Name to be %v but was %v", "second_id", meta.ResultColumns[1].Name)
}
if routingKeyInfo.types[1].Type() != TypeInt {
t.Fatalf("Expected routing key types[0] to be %v but was %v", TypeInt, routingKeyInfo.types[1].Type())
if meta.ResultColumns[1].TypeInfo.Type() != TypeVarchar {
t.Fatalf("Expected routing statement metadata ResultColumns[1].TypeInfo.Type to be %v but was %v", TypeVarchar, meta.ResultColumns[1].TypeInfo.Type())
}

query = newInternalQuery(session.Query("SELECT * FROM test_composite_routing_key WHERE second_id=? AND first_id=?", 1, 2), nil)
query = newInternalQuery(session.Query("SELECT * FROM test_composite_routing_key WHERE second_id=? AND first_id=?", "1", 2), nil)
routingKey, err = query.GetRoutingKey()
if err != nil {
t.Fatalf("Failed to get routing key due to error: %v", err)
}
expectedRoutingKey = []byte{0, 4, 0, 0, 0, 2, 0, 0, 4, 0, 0, 0, 1, 0}
expectedRoutingKey = []byte{0, 4, 0, 0, 0, 2, 0, 0, 1, 49, 0}
if !reflect.DeepEqual(expectedRoutingKey, routingKey) {
t.Errorf("Expected routing key %v but was %v", expectedRoutingKey, routingKey)
}

// verify the cache is working
cacheSize = session.routingKeyInfoCache.lru.Len()
cacheSize = session.routingMetadataCache.lru.Len()
if cacheSize != 2 {
t.Errorf("Expected cache size to be 2 but was %d", cacheSize)
}
Expand Down Expand Up @@ -3956,17 +3961,17 @@ func TestRoutingKeyCacheUsesOverriddenKeyspace(t *testing.T) {
t.Fatal(err)
}

getRoutingKeyInfo := func(key string) *routingKeyInfo {
getStatementMetadata := func(key string) *StatementMetadata {
t.Helper()
session.routingKeyInfoCache.mu.Lock()
value, ok := session.routingKeyInfoCache.lru.Get(key)
session.routingMetadataCache.mu.Lock()
value, ok := session.routingMetadataCache.lru.Get(key)
if !ok {
t.Fatalf("routing key not found in cache for key %v", key)
}
session.routingKeyInfoCache.mu.Unlock()
session.routingMetadataCache.mu.Unlock()

inflight := value.(*inflightCachedEntry)
return inflight.value.(*routingKeyInfo)
return inflight.value.(*StatementMetadata)
}

const insertQuery = "INSERT INTO routing_key_cache_uses_overridden_ks (id) VALUES (?)"
Expand All @@ -3979,8 +3984,8 @@ func TestRoutingKeyCacheUsesOverriddenKeyspace(t *testing.T) {
require.NoError(t, err)

// Ensuring that the cache contains the query with default ks
routingKeyInfo1 := getRoutingKeyInfo("gocql_test" + b1.Entries[0].Stmt)
require.Equal(t, "gocql_test", routingKeyInfo1.keyspace)
meta1 := getStatementMetadata("gocql_test" + b1.Entries[0].Stmt)
require.Equal(t, "gocql_test", meta1.Keyspace)

// Running batch in gocql_test_routing_key_cache ks
b2 := session.Batch(LoggedBatch)
Expand All @@ -3991,8 +3996,8 @@ func TestRoutingKeyCacheUsesOverriddenKeyspace(t *testing.T) {
require.NoError(t, err)

// Ensuring that the cache contains the query with gocql_test_routing_key_cache ks
routingKeyInfo2 := getRoutingKeyInfo("gocql_test_routing_key_cache" + b2.Entries[0].Stmt)
require.Equal(t, "gocql_test_routing_key_cache", routingKeyInfo2.keyspace)
meta2 := getStatementMetadata("gocql_test_routing_key_cache" + b2.Entries[0].Stmt)
require.Equal(t, "gocql_test_routing_key_cache", meta2.Keyspace)

const selectStmt = "SELECT * FROM routing_key_cache_uses_overridden_ks WHERE id=?"

Expand Down
4 changes: 2 additions & 2 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,8 @@ func (cfg *ClusterConfig) translateAddressPort(addr net.IP, port int, logger Str
}
newAddr, newPort := cfg.AddressTranslator.Translate(addr, port)
logger.Debug("Translating address.",
newLogFieldIp("old_addr", addr), newLogFieldInt("old_port", port),
newLogFieldIp("new_addr", newAddr), newLogFieldInt("new_port", newPort))
NewLogFieldIP("old_addr", addr), NewLogFieldInt("old_port", port),
NewLogFieldIP("new_addr", newAddr), NewLogFieldInt("new_port", newPort))
return newAddr, newPort
}

Expand Down
16 changes: 8 additions & 8 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ func (c *Conn) processFrame(ctx context.Context, r io.Reader) error {
delete(c.calls, head.stream)
c.mu.Unlock()
if call == nil || !ok {
c.logger.Warning("Received response for stream which has no handler.", newLogFieldString("header", head.String()))
c.logger.Warning("Received response for stream which has no handler.", NewLogFieldString("header", head.String()))
return c.discardFrame(r, head)
} else if head.stream != call.streamID {
panic(fmt.Sprintf("call has incorrect streamID: got %d expected %d", call.streamID, head.stream))
Expand Down Expand Up @@ -1316,7 +1316,7 @@ func (c *Conn) execInternal(ctx context.Context, req frameBuilder, tracer Tracer
responseFrame, err := resp.framer.parseFrame()
if err != nil {
c.logger.Warning("Framer error while attempting to parse potential protocol error.",
newLogFieldError("err", err))
NewLogFieldError("err", err))
return nil, errProtocol
}
//goland:noinspection GoTypeAssertionOnErrors
Expand All @@ -1333,17 +1333,17 @@ func (c *Conn) execInternal(ctx context.Context, req frameBuilder, tracer Tracer
case <-timeoutCh:
close(call.timeout)
c.logger.Debug("Request timed out on connection.",
newLogFieldString("host_id", c.host.HostID()), newLogFieldIp("addr", c.host.ConnectAddress()))
NewLogFieldString("host_id", c.host.HostID()), NewLogFieldIP("addr", c.host.ConnectAddress()))
return nil, ErrTimeoutNoResponse
case <-ctxDone:
c.logger.Debug("Request failed because context elapsed out on connection.",
newLogFieldString("host_id", c.host.HostID()), newLogFieldIp("addr", c.host.ConnectAddress()),
newLogFieldError("ctx_err", ctx.Err()))
NewLogFieldString("host_id", c.host.HostID()), NewLogFieldIP("addr", c.host.ConnectAddress()),
NewLogFieldError("ctx_err", ctx.Err()))
close(call.timeout)
return nil, ctx.Err()
case <-c.ctx.Done():
c.logger.Debug("Request failed because connection closed.",
newLogFieldString("host_id", c.host.HostID()), newLogFieldIp("addr", c.host.ConnectAddress()))
NewLogFieldString("host_id", c.host.HostID()), NewLogFieldIP("addr", c.host.ConnectAddress()))
close(call.timeout)
return nil, ErrConnectionClosed
}
Expand Down Expand Up @@ -1698,7 +1698,7 @@ func (c *Conn) executeQuery(ctx context.Context, q *internalQuery) *Iter {
iter.framer = framer
if err := c.awaitSchemaAgreement(ctx); err != nil {
// TODO: should have this behind a flag
c.logger.Warning("Error while awaiting for schema agreement after a schema change event.", newLogFieldError("err", err))
c.logger.Warning("Error while awaiting for schema agreement after a schema change event.", NewLogFieldError("err", err))
}
// dont return an error from this, might be a good idea to give a warning
// though. The impact of this returning an error would be that the cluster
Expand Down Expand Up @@ -1956,7 +1956,7 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) {
goto cont
}
if !isValidPeer(host) || host.schemaVersion == "" {
c.logger.Warning("Invalid peer or peer with empty schema_version.", newLogFieldIp("peer", host.ConnectAddress()))
c.logger.Warning("Invalid peer or peer with empty schema_version.", NewLogFieldIP("peer", host.ConnectAddress()))
continue
}

Expand Down
14 changes: 7 additions & 7 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,13 +356,13 @@ func (o *testQueryObserver) ObserveQuery(ctx context.Context, q ObservedQuery) {
host := q.Host.ConnectAddress().String()
o.metrics[host] = q.Metrics
o.logger.Debug("Observed query.",
newLogFieldString("stmt", q.Statement),
newLogFieldInt("rows", q.Rows),
newLogFieldString("duration", q.End.Sub(q.Start).String()),
newLogFieldString("host", host),
newLogFieldInt("attempts", q.Metrics.Attempts),
newLogFieldString("latency", strconv.FormatInt(q.Metrics.TotalLatency, 10)),
newLogFieldError("err", q.Err))
NewLogFieldString("stmt", q.Statement),
NewLogFieldInt("rows", q.Rows),
NewLogFieldString("duration", q.End.Sub(q.Start).String()),
NewLogFieldString("host", host),
NewLogFieldInt("attempts", q.Metrics.Attempts),
NewLogFieldString("latency", strconv.FormatInt(q.Metrics.TotalLatency, 10)),
NewLogFieldError("err", q.Err))
}

func (o *testQueryObserver) GetMetrics(host *HostInfo) *hostMetrics {
Expand Down
Loading
Loading