Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Move "execute batch" methods to Batch type (CASSGO-57)

- Make `Session` immutable by removing setters and associated mutex (CASSGO-23)

### Fixed
- Cassandra version unmarshal fix (CASSGO-49)

Expand Down
3 changes: 1 addition & 2 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,10 @@ func TestTracing(t *testing.T) {
}

// also works from session tracer
session.SetTrace(trace)
trace.mu.Lock()
buf.Reset()
trace.mu.Unlock()
if err := session.Query(`SELECT id FROM trace WHERE id = ?`, 42).Scan(&value); err != nil {
if err := session.Query(`SELECT id FROM trace WHERE id = ?`, 42).Trace(trace).Scan(&value); err != nil {
t.Fatal("select:", err)
}
if buf.Len() == 0 {
Expand Down
12 changes: 12 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,17 @@ type ClusterConfig struct {
// If not specified, defaults to the gocql.defaultLogger.
Logger StdLogger

// Tracer will be used for all queries. Alternatively it can be set of on a
// per query basis.
// default: nil
Tracer Tracer

// NextPagePrefetch sets the default threshold for pre-fetching new pages. If
// there are only p*pageSize rows remaining, the next page will be requested
// automatically. This value can also be changed on a per-query basis.
// default: 0.25.
NextPagePrefetch float64

// internal config for testing
disableControlConn bool
}
Expand Down Expand Up @@ -298,6 +309,7 @@ func NewCluster(hosts ...string) *ClusterConfig {
ConvictionPolicy: &SimpleConvictionPolicy{},
ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second},
WriteCoalesceWaitTime: 200 * time.Microsecond,
NextPagePrefetch: 0.25,
}
return cfg
}
Expand Down
2 changes: 1 addition & 1 deletion doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@
//
// # Paging
//
// The driver supports paging of results with automatic prefetch, see ClusterConfig.PageSize, Session.SetPrefetch,
// The driver supports paging of results with automatic prefetch, see ClusterConfig.PageSize,
// Query.PageSize, and Query.Prefetch.
//
// It is also possible to control the paging manually with Query.PageState (this disables automatic prefetch).
Expand Down
44 changes: 2 additions & 42 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ type Session struct {
ring ring
metadata clusterMetadata

mu sync.RWMutex

control *controlConn

// event handlers
Expand Down Expand Up @@ -153,14 +151,15 @@ func NewSession(cfg ClusterConfig) (*Session, error) {

s := &Session{
cons: cfg.Consistency,
prefetch: 0.25,
prefetch: cfg.NextPagePrefetch,
cfg: cfg,
pageSize: cfg.PageSize,
stmtsLRU: &preparedLRU{lru: lru.New(cfg.MaxPreparedStmts)},
connectObserver: cfg.ConnectObserver,
ctx: ctx,
cancel: cancel,
logger: cfg.logger(),
trace: cfg.Tracer,
}

s.schemaDescriber = newSchemaDescriber(s)
Expand Down Expand Up @@ -416,41 +415,6 @@ func (s *Session) reconnectDownedHosts(intv time.Duration) {
}
}

// SetConsistency sets the default consistency level for this session. This
// setting can also be changed on a per-query basis and the default value
// is Quorum.
func (s *Session) SetConsistency(cons Consistency) {
s.mu.Lock()
s.cons = cons
s.mu.Unlock()
}

// SetPageSize sets the default page size for this session. A value <= 0 will
// disable paging. This setting can also be changed on a per-query basis.
func (s *Session) SetPageSize(n int) {
s.mu.Lock()
s.pageSize = n
s.mu.Unlock()
}

// SetPrefetch sets the default threshold for pre-fetching new pages. If
// there are only p*pageSize rows remaining, the next page will be requested
// automatically. This value can also be changed on a per-query basis and
// the default value is 0.25.
func (s *Session) SetPrefetch(p float64) {
s.mu.Lock()
s.prefetch = p
s.mu.Unlock()
}

// SetTrace sets the default tracer for this session. This setting can also
// be changed on a per-query basis.
func (s *Session) SetTrace(trace Tracer) {
s.mu.Lock()
s.trace = trace
s.mu.Unlock()
}

// Query generates a new query object for interacting with the database.
// Further details of the query may be tweaked using the resulting query
// value before the query is executed. Query is automatically prepared
Expand Down Expand Up @@ -1005,7 +969,6 @@ type queryRoutingInfo struct {
func (q *Query) defaultsFromSession() {
s := q.session

s.mu.RLock()
q.cons = s.cons
q.pageSize = s.pageSize
q.trace = s.trace
Expand All @@ -1018,7 +981,6 @@ func (q *Query) defaultsFromSession() {
q.metrics = &queryMetrics{m: make(map[string]*hostMetrics)}

q.spec = &NonSpeculativeExecution{}
s.mu.RUnlock()
}

// Statement returns the statement that was used to generate this query.
Expand Down Expand Up @@ -1855,7 +1817,6 @@ func (s *Session) NewBatch(typ BatchType) *Batch {

// Batch creates a new batch operation using defaults defined in the cluster
func (s *Session) Batch(typ BatchType) *Batch {
s.mu.RLock()
batch := &Batch{
Type: typ,
rt: s.cfg.RetryPolicy,
Expand All @@ -1871,7 +1832,6 @@ func (s *Session) Batch(typ BatchType) *Batch {
routingInfo: &queryRoutingInfo{},
}

s.mu.RUnlock()
return batch
}

Expand Down
26 changes: 2 additions & 24 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,43 +35,21 @@ import (
)

func TestSessionAPI(t *testing.T) {
cfg := &ClusterConfig{}
cfg := NewCluster()

s := &Session{
cfg: *cfg,
cons: Quorum,
policy: RoundRobinHostPolicy(),
logger: cfg.logger(),
}
defer s.Close()

s.pool = cfg.PoolConfig.buildPool(s)
s.executor = &queryExecutor{
pool: s.pool,
policy: s.policy,
}
defer s.Close()

s.SetConsistency(All)
if s.cons != All {
t.Fatalf("expected consistency 'All', got '%v'", s.cons)
}

s.SetPageSize(100)
if s.pageSize != 100 {
t.Fatalf("expected pageSize 100, got %v", s.pageSize)
}

s.SetPrefetch(0.75)
if s.prefetch != 0.75 {
t.Fatalf("expceted prefetch 0.75, got %v", s.prefetch)
}

trace := &traceWriter{}

s.SetTrace(trace)
if s.trace != trace {
t.Fatalf("expected traceWriter '%v',got '%v'", trace, s.trace)
}

qry := s.Query("test", 1)
if v, ok := qry.values[0].(int); !ok {
Expand Down