Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

# [1.1.1] - 2025-07-31
### Features
* Support select float32 column as float.

### Bug Fixes
* Make GetCassandraColumnType space agnostic.
* Handle boolean as string in string bool map.

# [1.1.0] - 2025-04-29
### Features
* Enable multiplexed session by default for all writes.
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
> We introduced the newer official [go-spanner-cassandra](https://github.com/googleapis/go-spanner-cassandra/blob/main/README.md) and [java-spanner-cassandra](https://github.com/googleapis/java-spanner-cassandra/blob/main/README.md) clients(currently in public preview). These clients enable in-process use of gocql (for Go) or the Java Cassandra driver with [Spanner's native v4 Cassandra protocol support](https://cloud.google.com/spanner/docs/non-relational/connect-cassandra-adapter), eliminating the need for running this proxy adapter as a sidecar.
## Current Released Version

Version `1.1.0`
Version `1.1.1`

## Introduction
Cassandra to Cloud Spanner Proxy Adapter is designed to forward your application's CQL traffic to Spanner database service. It listens on a local address and securely forwards that traffic.
Expand Down
142 changes: 98 additions & 44 deletions third_party/datastax/proxy/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var (
clusterReleaseversion = "4.0.0.6816"
defaultCqlVersion = "3.4.5"
TCP_BIND_PORT = "0.0.0.0:%s"
proxyReleaseVersion = "v1.1.0"
proxyReleaseVersion = "v1.1.1"
)
var readFile = os.ReadFile
var atomicStartedListenersCounter atomic.Int32
Expand Down Expand Up @@ -132,26 +132,26 @@ type Otel struct {
}

type runConfig struct {
Version bool `yaml:"version" help:"Show current proxy version" short:"v" default:"false" env:"PROXY_VERSION"`
Username string `yaml:"username" help:"Username to use for authentication" short:"u" env:"USERNAME"`
Password string `yaml:"password" help:"Password to use for authentication" short:"p" env:"PASSWORD"`
ProtocolVersion string `yaml:"protocol-version" help:"Initial protocol version to use when connecting to the backend cluster (default: v4, options: v3, v4, v5, DSEv1, DSEv2)" default:"v4" short:"n" env:"PROTOCOL_VERSION"`
MaxProtocolVersion string `yaml:"max-protocol-version" help:"Max protocol version supported by the backend cluster (default: v4, options: v3, v4, v5, DSEv1, DSEv2)" default:"v4" short:"m" env:"MAX_PROTOCOL_VERSION"`
Bind string `yaml:"bind" help:"Address to use to bind server" short:"a" default:":9042" env:"BIND"`
Config *os.File `yaml:"-" help:"YAML configuration file" short:"f" env:"CONFIG_FILE"` // Not available in the configuration file
Debug bool `yaml:"debug" help:"Show debug logging" default:"false" env:"DEBUG"`
HeartbeatInterval time.Duration `yaml:"heartbeat-interval" help:"Interval between performing heartbeats to the cluster" default:"30s" env:"HEARTBEAT_INTERVAL"`
ConnectTimeout time.Duration `yaml:"connect-timeout" help:"Duration before an attempt to connect to a cluster is considered timed out" default:"10s" env:"CONNECT_TIMEOUT"`
IdleTimeout time.Duration `yaml:"idle-timeout" help:"Duration between successful heartbeats before a connection to the cluster is considered unresponsive and closed" default:"60s" env:"IDLE_TIMEOUT"`
ReadinessTimeout time.Duration `yaml:"readiness-timeout" help:"Duration the proxy is unable to connect to the backend cluster before it is considered not ready" default:"30s" env:"READINESS_TIMEOUT"`
ProxyCertFile string `yaml:"proxy-cert-file" help:"Path to a PEM encoded certificate file with its intermediate certificate chain. This is used to encrypt traffic for proxy clients" env:"PROXY_CERT_FILE"`
ProxyKeyFile string `yaml:"proxy-key-file" help:"Path to a PEM encoded private key file. This is used to encrypt traffic for proxy clients" env:"PROXY_KEY_FILE"`
DataCenter string `yaml:"data-center" help:"Data center to use in system tables" default:"datacenter1" env:"DATA_CENTER"`
ReleaseVersion string `yaml:"release-version" help:"Cluster Release version" default:"4.0.0.6816" env:"RELEASE_VERSION"`
Partitioner string `yaml:"partitioner" help:"Partitioner partitioner" default:"org.apache.cassandra.dht.Murmur3Partitioner" env:"PARTITIONER"`
Tokens []string `yaml:"tokens" help:"Tokens to use in the system tables. It's not recommended" env:"TOKENS"`
CQLVersion string `yaml:"cql-version" help:"CQL version" default:"3.4.5" env:"CQLVERSION"`
LogLevel string `yaml:"log-level" help:"Log level configuration." default:"info" env:"LOG_LEVEL"`
Version bool `yaml:"version" help:"Show current proxy version" short:"v" default:"false" env:"PROXY_VERSION"`
Username string `yaml:"username" help:"Username to use for authentication" short:"u" env:"USERNAME"`
Password string `yaml:"password" help:"Password to use for authentication" short:"p" env:"PASSWORD"`
ProtocolVersion string `yaml:"protocol-version" help:"Initial protocol version to use when connecting to the backend cluster (default: v4, options: v3, v4, v5, DSEv1, DSEv2)" short:"n" default:"v4" env:"PROTOCOL_VERSION"`
MaxProtocolVersion string `yaml:"max-protocol-version" help:"Max protocol version supported by the backend cluster (default: v4, options: v3, v4, v5, DSEv1, DSEv2)" short:"m" default:"v4" env:"MAX_PROTOCOL_VERSION"`
Bind string `yaml:"bind" help:"Address to use to bind server" short:"a" default:":9042" env:"BIND"`
Config *os.File `yaml:"-" help:"YAML configuration file" short:"f" env:"CONFIG_FILE"` // Not available in the configuration file
Debug bool `yaml:"debug" help:"Show debug logging" default:"false" env:"DEBUG"`
HeartbeatInterval time.Duration `yaml:"heartbeat-interval" help:"Interval between performing heartbeats to the cluster" default:"30s" env:"HEARTBEAT_INTERVAL"`
ConnectTimeout time.Duration `yaml:"connect-timeout" help:"Duration before an attempt to connect to a cluster is considered timed out" default:"10s" env:"CONNECT_TIMEOUT"`
IdleTimeout time.Duration `yaml:"idle-timeout" help:"Duration between successful heartbeats before a connection to the cluster is considered unresponsive and closed" default:"60s" env:"IDLE_TIMEOUT"`
ReadinessTimeout time.Duration `yaml:"readiness-timeout" help:"Duration the proxy is unable to connect to the backend cluster before it is considered not ready" default:"30s" env:"READINESS_TIMEOUT"`
ProxyCertFile string `yaml:"proxy-cert-file" help:"Path to a PEM encoded certificate file with its intermediate certificate chain. This is used to encrypt traffic for proxy clients" env:"PROXY_CERT_FILE"`
ProxyKeyFile string `yaml:"proxy-key-file" help:"Path to a PEM encoded private key file. This is used to encrypt traffic for proxy clients" env:"PROXY_KEY_FILE"`
DataCenter string `yaml:"data-center" help:"Data center to use in system tables" default:"datacenter1" env:"DATA_CENTER"`
ReleaseVersion string `yaml:"release-version" help:"Cluster Release version" default:"4.0.0.6816" env:"RELEASE_VERSION"`
Partitioner string `yaml:"partitioner" help:"Partitioner partitioner" default:"org.apache.cassandra.dht.Murmur3Partitioner" env:"PARTITIONER"`
Tokens []string `yaml:"tokens" help:"Tokens to use in the system tables. It's not recommended" env:"TOKENS"`
CQLVersion string `yaml:"cql-version" help:"CQL version" default:"3.4.5" env:"CQLVERSION"`
LogLevel string `yaml:"log-level" help:"Log level configuration." default:"info" env:"LOG_LEVEL"`
}

func readinessCheckHandler(w http.ResponseWriter, r *http.Request) {
Expand All @@ -166,7 +166,8 @@ func readinessCheckHandler(w http.ResponseWriter, r *http.Request) {
}
}

// Run starts the proxy command. 'args' shouldn't include the executable (i.e. os.Args[1:]). It returns the exit code
// Run starts the proxy command. 'args' shouldn't include the executable (i.e.
// os.Args[1:]). It returns the exit code
// for the proxy.
func Run(ctx context.Context, args []string) int {
var cfg runConfig
Expand All @@ -185,12 +186,16 @@ func Run(ctx context.Context, args []string) int {
atomicStartedListenersCounter.Store(0)
expectedListeners = len(UserConfig.Listeners)

// Start HTTP server for readiness check if user has specified an endpoint for it.
// Start HTTP server for readiness check if user has specified an endpoint for
// it.
if UserConfig.CassandraToSpannerConfigs.ReadinessCheckEndpoint != "" {
http.HandleFunc("/debug/health", readinessCheckHandler)
httpServer := &http.Server{Addr: UserConfig.CassandraToSpannerConfigs.ReadinessCheckEndpoint}
httpServer := &http.Server{
Addr: UserConfig.CassandraToSpannerConfigs.ReadinessCheckEndpoint,
}
go func() {
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
if err := httpServer.ListenAndServe(); err != nil &&
err != http.ErrServerClosed {
log.Printf("Error when registering http health check endpoint:%v", err)
}
}()
Expand All @@ -211,19 +216,30 @@ func Run(ctx context.Context, args []string) int {
if cfg.Config != nil {
bytes, err := ioutil.ReadAll(cfg.Config)
if err != nil {
cliCtx.Errorf("unable to read contents of configuration file '%s': %v", cfg.Config.Name(), err)
cliCtx.Errorf(
"unable to read contents of configuration file '%s': %v",
cfg.Config.Name(),
err,
)
return 1
}
err = yaml.Unmarshal(bytes, &cfg)
if err != nil {
cliCtx.Errorf("invalid YAML in configuration file '%s': %v", cfg.Config.Name(), err)
cliCtx.Errorf(
"invalid YAML in configuration file '%s': %v",
cfg.Config.Name(),
err,
)
}
}

var resolver proxycore.EndpointResolver
if cfg.HeartbeatInterval >= cfg.IdleTimeout {
cliCtx.Errorf("idle-timeout must be greater than heartbeat-interval (heartbeat interval: %s, idle timeout: %s)",
cfg.HeartbeatInterval, cfg.IdleTimeout)
cliCtx.Errorf(
"idle-timeout must be greater than heartbeat-interval (heartbeat interval: %s, idle timeout: %s)",
cfg.HeartbeatInterval,
cfg.IdleTimeout,
)
return 1
}

Expand All @@ -241,7 +257,9 @@ func Run(ctx context.Context, args []string) int {
}

if version > maxVersion {
cliCtx.Errorf("default protocol version is greater than max protocol version")
cliCtx.Errorf(
"default protocol version is greater than max protocol version",
)
return 1
}

Expand Down Expand Up @@ -319,11 +337,21 @@ func Run(ctx context.Context, args []string) int {
logger.Info("Protocol Version:" + version.String())
logger.Info("CQL Version:" + cqlVersion)
logger.Info("Release Version:" + releaseVersion)
logger.Info("Cassandra to Cloud Spanner Proxy Adapter Version:" + proxyReleaseVersion)
logger.Info(
"Cassandra to Cloud Spanner Proxy Adapter Version:" + proxyReleaseVersion,
)
logger.Info("Partitioner:" + partitioner)
logger.Info("Data Center:" + cfg.DataCenter)
logger.Info("Configured keyspace name flattening status", zap.Bool("isKeyspaceFlatteningEnabled", UserConfig.CassandraToSpannerConfigs.KeyspaceFlatter))
enableDirectPath, err := strconv.ParseBool(strings.TrimSpace(os.Getenv("GOOGLE_SPANNER_ENABLE_DIRECT_ACCESS")))
logger.Info(
"Configured keyspace name flattening status",
zap.Bool(
"isKeyspaceFlatteningEnabled",
UserConfig.CassandraToSpannerConfigs.KeyspaceFlatter,
),
)
enableDirectPath, err := strconv.ParseBool(
strings.TrimSpace(os.Getenv("GOOGLE_SPANNER_ENABLE_DIRECT_ACCESS")),
)
if err == nil && enableDirectPath {
logger.Info("Google Spanner Direct Path Feature Enabled")
} else {
Expand Down Expand Up @@ -382,7 +410,11 @@ func Run(ctx context.Context, args []string) int {
wg.Add(1)
go func(cfg runConfig, p *Proxy) {
defer wg.Done()
err := cfg.listenAndServe(p, ctx, logger) // Use cfg2 or other instances as needed
err := cfg.listenAndServe(
p,
ctx,
logger,
) // Use cfg2 or other instances as needed
if err != nil {
logger.Fatal("Error while serving - ", zap.Error(err))
}
Expand Down Expand Up @@ -412,7 +444,9 @@ func LoadConfig(filename string) (*UserConfig, error) {
return &config, nil
}

func parseProtocolVersion(s string) (version primitive.ProtocolVersion, ok bool) {
func parseProtocolVersion(
s string,
) (version primitive.ProtocolVersion, ok bool) {
ok = true
lowered := strings.ToLower(s)
if lowered == "3" || lowered == "v3" {
Expand All @@ -431,35 +465,49 @@ func parseProtocolVersion(s string) (version primitive.ProtocolVersion, ok bool)
return version, ok
}

// maybeAddPort adds the default port to an IP; otherwise, it returns the original address.
// maybeAddPort adds the default port to an IP; otherwise, it returns the
// original address.
func maybeAddPort(addr string, defaultPort string) string {
if net.ParseIP(addr) != nil {
return net.JoinHostPort(addr, defaultPort)
}
return addr
}

// listenAndServe correctly handles serving both the proxy and an HTTP server simultaneously.
func (c *runConfig) listenAndServe(p *Proxy, ctx context.Context, logger *zap.Logger) (err error) {
// listenAndServe correctly handles serving both the proxy and an HTTP server
// simultaneously.
func (c *runConfig) listenAndServe(
p *Proxy,
ctx context.Context,
logger *zap.Logger,
) (err error) {
var wg sync.WaitGroup

ch := make(chan error)
numServers := 1 // Without the HTTP server

// Connect and listen is called first to set up the listening server connection and establish initial client
// connections to the backend cluster so that when the readiness check is hit the proxy is actually ready.
// Connect and listen is called first to set up the listening server
// connection and establish initial client connections to the backend cluster
// so that when the readiness check is hit the proxy is actually ready.

err = p.Connect()
if err != nil {
return err
}

proxyListener, err := resolveAndListen(c.Bind, c.ProxyCertFile, c.ProxyKeyFile)
proxyListener, err := resolveAndListen(
c.Bind,
c.ProxyCertFile,
c.ProxyKeyFile,
)
if err != nil {
return err
}

logger.Info("proxy is listening", zap.Stringer("address", proxyListener.Addr()))
logger.Info(
"proxy is listening",
zap.Stringer("address", proxyListener.Addr()),
)
atomicStartedListenersCounter.Add(1)

wg.Add(numServers)
Expand Down Expand Up @@ -497,13 +545,19 @@ func (c *runConfig) listenAndServe(p *Proxy, ctx context.Context, logger *zap.Lo
func resolveAndListen(address, cert, key string) (net.Listener, error) {
if len(cert) > 0 || len(key) > 0 {
if len(cert) == 0 || len(key) == 0 {
return nil, errors.New("both certificate and private key are required for TLS")
return nil, errors.New(
"both certificate and private key are required for TLS",
)
}
cert, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
return nil, fmt.Errorf("unable to load TLS certificate pair: %v", err)
}
return tls.Listen("tcp", address, &tls.Config{Certificates: []tls.Certificate{cert}})
return tls.Listen(
"tcp",
address,
&tls.Config{Certificates: []tls.Certificate{cert}},
)
} else {
return net.Listen("tcp", address)
}
Expand Down