|
| 1 | +package gocql |
| 2 | + |
| 3 | +import ( |
| 4 | + "log" |
| 5 | + "time" |
| 6 | + |
| 7 | + "github.com/scylladb/scylla-go-driver" |
| 8 | + scyllalog "github.com/scylladb/scylla-go-driver/log" |
| 9 | + "github.com/scylladb/scylla-go-driver/transport" |
| 10 | +) |
| 11 | + |
| 12 | +type ClusterConfig struct { |
| 13 | + // addresses for the initial connections. It is recommended to use the value set in |
| 14 | + // the Cassandra config for broadcast_address or listen_address, an IP address not |
| 15 | + // a domain name. This is because events from Cassandra will use the configured IP |
| 16 | + // address, which is used to index connected hosts. If the domain name specified |
| 17 | + // resolves to more than 1 IP address then the driver may connect multiple times to |
| 18 | + // the same host, and will not mark the node being down or up from events. |
| 19 | + Hosts []string |
| 20 | + |
| 21 | + // CQL version (default: 3.0.0) |
| 22 | + CQLVersion string |
| 23 | + |
| 24 | + // ProtoVersion sets the version of the native protocol to use, this will |
| 25 | + // enable features in the driver for specific protocol versions, generally this |
| 26 | + // should be set to a known version (2,3,4) for the cluster being connected to. |
| 27 | + // |
| 28 | + // If it is 0 or unset (the default) then the driver will attempt to discover the |
| 29 | + // highest supported protocol for the cluster. In clusters with nodes of different |
| 30 | + // versions the protocol selected is not defined (ie, it can be any of the supported in the cluster) |
| 31 | + ProtoVersion int |
| 32 | + |
| 33 | + // Connection timeout (default: 600ms) |
| 34 | + Timeout time.Duration |
| 35 | + |
| 36 | + // Initial connection timeout, used during initial dial to server (default: 600ms) |
| 37 | + // ConnectTimeout is used to set up the default dialer and is ignored if Dialer or HostDialer is provided. |
| 38 | + ConnectTimeout time.Duration |
| 39 | + |
| 40 | + // Port used when dialing. |
| 41 | + // Default: 9042 |
| 42 | + Port int |
| 43 | + |
| 44 | + // Initial keyspace. Optional. |
| 45 | + Keyspace string |
| 46 | + |
| 47 | + // Number of connections per host. |
| 48 | + // Default: 2 |
| 49 | + NumConns int |
| 50 | + |
| 51 | + // Default consistency level. |
| 52 | + // Default: Quorum |
| 53 | + Consistency Consistency |
| 54 | + |
| 55 | + // Compression algorithm. |
| 56 | + // Default: nil |
| 57 | + Compressor Compressor |
| 58 | + |
| 59 | + // Default: nil |
| 60 | + Authenticator Authenticator |
| 61 | + |
| 62 | + // An Authenticator factory. Can be used to create alternative authenticators. |
| 63 | + // Default: nil |
| 64 | + // AuthProvider func(h *HostInfo) (Authenticator, error) |
| 65 | + |
| 66 | + // Default retry policy to use for queries. |
| 67 | + // Default: no retries. |
| 68 | + RetryPolicy RetryPolicy |
| 69 | + |
| 70 | + // ConvictionPolicy decides whether to mark host as down based on the error and host info. |
| 71 | + // Default: SimpleConvictionPolicy |
| 72 | + ConvictionPolicy ConvictionPolicy // TODO: use it? |
| 73 | + |
| 74 | + // Default reconnection policy to use for reconnecting before trying to mark host as down. |
| 75 | + // ReconnectionPolicy ReconnectionPolicy |
| 76 | + |
| 77 | + // The keepalive period to use, enabled if > 0 (default: 0) |
| 78 | + // SocketKeepalive is used to set up the default dialer and is ignored if Dialer or HostDialer is provided. |
| 79 | + SocketKeepalive time.Duration |
| 80 | + |
| 81 | + // Maximum cache size for prepared statements globally for gocql. |
| 82 | + // Default: 1000 |
| 83 | + MaxPreparedStmts int |
| 84 | + |
| 85 | + // Maximum cache size for query info about statements for each session. |
| 86 | + // Default: 1000 |
| 87 | + MaxRoutingKeyInfo int |
| 88 | + |
| 89 | + // Default page size to use for created sessions. |
| 90 | + // Default: 5000 |
| 91 | + PageSize int |
| 92 | + |
| 93 | + // Consistency for the serial part of queries, values can be either SERIAL or LOCAL_SERIAL. |
| 94 | + // Default: unset |
| 95 | + // SerialConsistency SerialConsistency |
| 96 | + |
| 97 | + // SslOpts configures TLS use when HostDialer is not set. |
| 98 | + // SslOpts is ignored if HostDialer is set. |
| 99 | + SslOpts *SslOptions |
| 100 | + |
| 101 | + // Sends a client side timestamp for all requests which overrides the timestamp at which it arrives at the server. |
| 102 | + // Default: true, only enabled for protocol 3 and above. |
| 103 | + DefaultTimestamp bool |
| 104 | + |
| 105 | + // PoolConfig configures the underlying connection pool, allowing the |
| 106 | + // configuration of host selection and connection selection policies. |
| 107 | + PoolConfig PoolConfig |
| 108 | + |
| 109 | + // If not zero, gocql attempt to reconnect known DOWN nodes in every ReconnectInterval. |
| 110 | + ReconnectInterval time.Duration // FIXME: unused |
| 111 | + |
| 112 | + // The maximum amount of time to wait for schema agreement in a cluster after |
| 113 | + // receiving a schema change frame. (default: 60s) |
| 114 | + MaxWaitSchemaAgreement time.Duration |
| 115 | + |
| 116 | + // HostFilter will filter all incoming events for host, any which don't pass |
| 117 | + // the filter will be ignored. If set will take precedence over any options set |
| 118 | + // via Discovery |
| 119 | + // HostFilter HostFilter |
| 120 | + |
| 121 | + // AddressTranslator will translate addresses found on peer discovery and/or |
| 122 | + // node change events. |
| 123 | + // AddressTranslator AddressTranslator |
| 124 | + |
| 125 | + // If IgnorePeerAddr is true and the address in system.peers does not match |
| 126 | + // the supplied host by either initial hosts or discovered via events then the |
| 127 | + // host will be replaced with the supplied address. |
| 128 | + // |
| 129 | + // For example if an event comes in with host=10.0.0.1 but when looking up that |
| 130 | + // address in system.local or system.peers returns 127.0.0.1, the peer will be |
| 131 | + // set to 10.0.0.1 which is what will be used to connect to. |
| 132 | + IgnorePeerAddr bool |
| 133 | + |
| 134 | + // If DisableInitialHostLookup then the driver will not attempt to get host info |
| 135 | + // from the system.peers table, this will mean that the driver will connect to |
| 136 | + // hosts supplied and will not attempt to lookup the hosts information, this will |
| 137 | + // mean that data_centre, rack and token information will not be available and as |
| 138 | + // such host filtering and token aware query routing will not be available. |
| 139 | + DisableInitialHostLookup bool |
| 140 | + |
| 141 | + // Configure events the driver will register for |
| 142 | + Events struct { |
| 143 | + // disable registering for status events (node up/down) |
| 144 | + DisableNodeStatusEvents bool |
| 145 | + // disable registering for topology events (node added/removed/moved) |
| 146 | + DisableTopologyEvents bool |
| 147 | + // disable registering for schema events (keyspace/table/function removed/created/updated) |
| 148 | + DisableSchemaEvents bool |
| 149 | + } |
| 150 | + |
| 151 | + // DisableSkipMetadata will override the internal result metadata cache so that the driver does not |
| 152 | + // send skip_metadata for queries, this means that the result will always contain |
| 153 | + // the metadata to parse the rows and will not reuse the metadata from the prepared |
| 154 | + // statement. |
| 155 | + // |
| 156 | + // See https://issues.apache.org/jira/browse/CASSANDRA-10786 |
| 157 | + DisableSkipMetadata bool |
| 158 | + |
| 159 | + // QueryObserver will set the provided query observer on all queries created from this session. |
| 160 | + // Use it to collect metrics / stats from queries by providing an implementation of QueryObserver. |
| 161 | + // QueryObserver QueryObserver |
| 162 | + |
| 163 | + // BatchObserver will set the provided batch observer on all queries created from this session. |
| 164 | + // Use it to collect metrics / stats from batch queries by providing an implementation of BatchObserver. |
| 165 | + // BatchObserver BatchObserver |
| 166 | + |
| 167 | + // ConnectObserver will set the provided connect observer on all queries |
| 168 | + // created from this session. |
| 169 | + // ConnectObserver ConnectObserver |
| 170 | + |
| 171 | + // FrameHeaderObserver will set the provided frame header observer on all frames' headers created from this session. |
| 172 | + // Use it to collect metrics / stats from frames by providing an implementation of FrameHeaderObserver. |
| 173 | + // FrameHeaderObserver FrameHeaderObserver |
| 174 | + |
| 175 | + // Default idempotence for queries |
| 176 | + DefaultIdempotence bool |
| 177 | + |
| 178 | + // The time to wait for frames before flushing the frames connection to Cassandra. |
| 179 | + // Can help reduce syscall overhead by making less calls to write. Set to 0 to |
| 180 | + // disable. |
| 181 | + // |
| 182 | + // (default: 200 microseconds) |
| 183 | + WriteCoalesceWaitTime time.Duration |
| 184 | + |
| 185 | + // Dialer will be used to establish all connections created for this Cluster. |
| 186 | + // If not provided, a default dialer configured with ConnectTimeout will be used. |
| 187 | + // Dialer is ignored if HostDialer is provided. |
| 188 | + // Dialer Dialer |
| 189 | + |
| 190 | + // HostDialer will be used to establish all connections for this Cluster. |
| 191 | + // Unlike Dialer, HostDialer is responsible for setting up the entire connection, including the TLS session. |
| 192 | + // To support shard-aware port, HostDialer should implement ShardDialer. |
| 193 | + // If not provided, Dialer will be used instead. |
| 194 | + // HostDialer HostDialer |
| 195 | + |
| 196 | + // DisableShardAwarePort will prevent the driver from connecting to Scylla's shard-aware port, |
| 197 | + // even if there are nodes in the cluster that support it. |
| 198 | + // |
| 199 | + // It is generally recommended to leave this option turned off because gocql can use |
| 200 | + // the shard-aware port to make the process of establishing more robust. |
| 201 | + // However, if you have a cluster with nodes which expose shard-aware port |
| 202 | + // but the port is unreachable due to network configuration issues, you can use |
| 203 | + // this option to work around the issue. Set it to true only if you neither can fix |
| 204 | + // your network nor disable shard-aware port on your nodes. |
| 205 | + DisableShardAwarePort bool |
| 206 | + |
| 207 | + // Logger for this ClusterConfig. |
| 208 | + // If not specified, defaults to the global gocql.Logger. |
| 209 | + Logger StdLogger |
| 210 | + |
| 211 | + // internal config for testing |
| 212 | + disableControlConn bool |
| 213 | + disableInit bool |
| 214 | +} |
| 215 | + |
| 216 | +func NewCluster(hosts ...string) *ClusterConfig { |
| 217 | + cfg := ClusterConfig{Hosts: hosts, WriteCoalesceWaitTime: 200 * time.Microsecond} |
| 218 | + return &cfg |
| 219 | +} |
| 220 | + |
| 221 | +func sessionConfigFromGocql(cfg *ClusterConfig) (scylla.SessionConfig, error) { |
| 222 | + scfg := scylla.DefaultSessionConfig(cfg.Keyspace, cfg.Hosts...) |
| 223 | + scfg.Hosts = cfg.Hosts |
| 224 | + scfg.WriteCoalesceWaitTime = cfg.WriteCoalesceWaitTime |
| 225 | + if _, ok := cfg.Compressor.(SnappyCompressor); ok { |
| 226 | + scfg.Compression = scylla.Snappy |
| 227 | + } |
| 228 | + |
| 229 | + if auth, ok := cfg.Authenticator.(PasswordAuthenticator); ok { |
| 230 | + scfg.Username = auth.Username |
| 231 | + scfg.Password = auth.Password |
| 232 | + } |
| 233 | + |
| 234 | + if policy, ok := cfg.PoolConfig.HostSelectionPolicy.(transport.HostSelectionPolicy); ok { |
| 235 | + scfg.HostSelectionPolicy = policy |
| 236 | + } |
| 237 | + |
| 238 | + if retryPolicy, ok := cfg.RetryPolicy.(transport.RetryPolicy); ok { |
| 239 | + scfg.RetryPolicy = retryPolicy |
| 240 | + } |
| 241 | + |
| 242 | + if cfg.Logger == nil { |
| 243 | + if Logger == nil { |
| 244 | + scfg.Logger = scyllalog.NewDefaultLogger() |
| 245 | + } else { |
| 246 | + scfg.Logger = stdLoggerWrapper{Logger} |
| 247 | + } |
| 248 | + } else { |
| 249 | + if cfg.Logger == nil { |
| 250 | + cfg.Logger = log.Default() |
| 251 | + } |
| 252 | + scfg.Logger = stdLoggerWrapper{cfg.Logger} |
| 253 | + } |
| 254 | + |
| 255 | + if cfg.SslOpts != nil { |
| 256 | + tlsConfig, err := setupTLSConfig(cfg.SslOpts) |
| 257 | + if err != nil { |
| 258 | + return scylla.SessionConfig{}, err |
| 259 | + } |
| 260 | + scfg.TLSConfig = tlsConfig |
| 261 | + } |
| 262 | + |
| 263 | + return scfg, nil |
| 264 | +} |
| 265 | + |
| 266 | +func (cfg *ClusterConfig) CreateSession() (*Session, error) { |
| 267 | + return NewSession(*cfg) |
| 268 | +} |
0 commit comments