Skip to content

Reconnecting node to cluster breaks gocql.Session #1884

@isopov

Description

@isopov

Sorry for the lengthy reproduction (maybe it is possible to make it smaller and have more control)
What I do:

  • Install bitnami/cassandra to my local k8s (I use kind) helm install --namespace cassandra --create-namespace cassandra bitnami/cassandra --set "replicaCount=6"
  • Expose it to connect from localhost kubectl expose service cassandra --type=NodePort --name=cassandra-external --namespace=cassandra
  • Get host for connection with docker inspect kind-control-plane | grep IPAddress
  • Get port for connection with kubectl get services --namespace=cassandra | grep external
  • Get password with kubectl get secret --namespace "cassandra" cassandra -o jsonpath="{.data.cassandra-password}" | base64 -d
  • Run this code:
package main

import (
	"fmt"
	"math/rand/v2"
	"strconv"
	"sync"
	"sync/atomic"
	"time"

	"github.com/gocql/gocql"
)

const workers = 100
const queries = 10_000

const readWorkers = 100
const readQueries = 10_000_000

// helm install --namespace cassandra --create-namespace cassandra bitnami/cassandra --set "replicaCount=6"
// kubectl expose service cassandra --type=NodePort --name=cassandra-external --namespace=cassandra
func main() {
	//host from docker inspect kind-control-plane | grep IPAddress
	//port from kubectl get services --namespace=cassandra | grep external
	cluster := gocql.NewCluster("172.18.0.2:32146")
	cluster.Authenticator = gocql.PasswordAuthenticator{
		Username: "cassandra",
		//from kubectl get secret --namespace "cassandra" cassandra -o jsonpath="{.data.cassandra-password}" | base64 -d
		Password: "kMDfXIiu5M",
	}
	session, err := cluster.CreateSession()
	if err != nil {
		panic(err)
	}

	execRelease(session.Query("drop keyspace if exists k8stest"))
	execRelease(session.Query("create keyspace k8stest with replication = {'class' : 'SimpleStrategy', 'replication_factor' : 3}"))
	execRelease(session.Query("drop table if exists k8stest.test"))
	execRelease(session.Query("create table k8stest.test (a int, b text, primary key(a))"))

	var wg sync.WaitGroup

	for i := 0; i <= workers; i++ {
		wg.Add(1)

		go func() {
			defer wg.Done()
			query := session.Query("insert into k8stest.test (a, b) values (?,?)")
			for j := i * queries; j < (i+1)*queries; j++ {
				query.Bind(j, "Message"+strconv.Itoa(j))
				if err := query.Exec(); err != nil {
					panic(err)
				}
			}
			query.Release()
		}()
	}

	wg.Wait()

	var scans uint64
	var errors uint64

	var mu sync.RWMutex
	erorsCount := make(map[string]uint64)

	for i := 0; i <= readWorkers; i++ {
		wg.Add(1)

		go func() {
			defer wg.Done()
			query := session.Query("select b from k8stest.test where a=?")
			for j := i * readQueries; j < (i+1)*readQueries; j++ {
				id := rand.IntN(queries * workers)
				query.Bind(id)
				iter := query.Iter()
				var val string
				if iter.Scan(&val) {
					if val != "Message"+strconv.Itoa(id) {
						panic("unexpected message " + val + "instead of Message" + strconv.Itoa(id))
					}
					atomic.AddUint64(&scans, 1)
				} else {
					atomic.AddUint64(&errors, 1)
					time.Sleep(time.Millisecond)
				}
				if err := iter.Close(); err != nil {
					mu.Lock()
					erorsCount[err.Error()]++
					mu.Unlock()
					query.Release()
					query = session.Query("select b from k8stest.test where a=?")
				}

			}
			query.Release()
		}()
	}

	go func() {
		for {
			time.Sleep(time.Second)
			fmt.Printf("##### %d scans, %d errors\n", scans, errors)
			mu.RLock()
			for err, count := range erorsCount {
				fmt.Printf("error %s count %d \n", err, count)
			}
			mu.RUnlock()
		}
	}()

	wg.Wait()
}

func execRelease(query *gocql.Query) {
	if err := query.Exec(); err != nil {
		println(err.Error())
		panic(err)
	}
	query.Release()
}
  • After reading starts:
##### 4494 scans, 0 errors
##### 11271 scans, 0 errors
##### 17814 scans, 0 errors
##### 23093 scans, 0 errors
...

I remove one pod from the statefulset

  • Get a bit of errors and reading continues
##### 373219 scans, 685 errors
error Server is shutting down count 635 
error writev tcp 172.18.0.1:57908->172.18.0.2:32146: writev: broken pipe count 50
  • However after cassandra node joins the ring session breaks and I receive only errors:
##### 546274 scans, 685 errors
error Server is shutting down count 635 
error writev tcp 172.18.0.1:57908->172.18.0.2:32146: writev: broken pipe count 50 
##### 553261 scans, 685 errors
error Server is shutting down count 635 
error writev tcp 172.18.0.1:57908->172.18.0.2:32146: writev: broken pipe count 50 
##### 556674 scans, 41433 errors
error gocql: connection closed waiting for response count 101 
error gocql: no hosts available in the pool count 40547 
error Server is shutting down count 635 
error writev tcp 172.18.0.1:57908->172.18.0.2:32146: writev: broken pipe count 50 
##### 556674 scans, 121771 errors
error gocql: connection closed waiting for response count 101 
error gocql: no hosts available in the pool count 120903 
error Server is shutting down count 635 
error writev tcp 172.18.0.1:57908->172.18.0.2:32146: writev: broken pipe count 50 
...
##### 556674 scans, 10370862 errors
error Server is shutting down count 635 
error writev tcp 172.18.0.1:57908->172.18.0.2:32146: writev: broken pipe count 50 
error gocql: connection closed waiting for response count 101 
error gocql: no hosts available in the pool count 10369976

I expect gocql.Session to continue serving queries. It seems that this reaction is equal for removing and waiting for recreation of any node in cluster.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions