|
5 | 5 | "fmt" |
6 | 6 | "log" |
7 | 7 | "math" |
| 8 | + "math/bits" |
8 | 9 | "net" |
9 | 10 | "time" |
10 | 11 |
|
@@ -126,10 +127,11 @@ func (p *ConnPool) closeAll() { |
126 | 127 | } |
127 | 128 |
|
128 | 129 | type PoolRefiller struct { |
129 | | - addr string |
130 | | - pool ConnPool |
131 | | - cfg ConnConfig |
132 | | - active int |
| 130 | + addr string |
| 131 | + pool ConnPool |
| 132 | + cfg ConnConfig |
| 133 | + active int |
| 134 | + fillRandomly bool |
133 | 135 | } |
134 | 136 |
|
135 | 137 | func (r *PoolRefiller) init(ctx context.Context, host string) error { |
@@ -158,13 +160,17 @@ func (r *PoolRefiller) init(ctx context.Context, host string) error { |
158 | 160 | if v, ok := s.Options[ScyllaShardAwarePortSSL]; ok { |
159 | 161 | r.addr = net.JoinHostPort(host, v[0]) |
160 | 162 | } else { |
161 | | - return fmt.Errorf("missing encrypted shard aware port information %v", s.Options) |
| 163 | + r.cfg.Logger.Warnf("missing encrypted shard aware port information %v; falling back to random shard discovery", s.Options) |
| 164 | + r.addr = net.JoinHostPort(host, r.cfg.DefaultPort) |
| 165 | + r.fillRandomly = true |
162 | 166 | } |
163 | 167 | } else { |
164 | 168 | if v, ok := s.Options[ScyllaShardAwarePort]; ok { |
165 | 169 | r.addr = net.JoinHostPort(host, v[0]) |
166 | 170 | } else { |
167 | | - return fmt.Errorf("missing shard aware port information %v", s.Options) |
| 171 | + r.cfg.Logger.Warnf("missing shard aware port information %v; falling back to random shard discovery", s.Options) |
| 172 | + r.addr = net.JoinHostPort(host, r.cfg.DefaultPort) |
| 173 | + r.fillRandomly = true |
168 | 174 | } |
169 | 175 | } |
170 | 176 |
|
@@ -226,7 +232,9 @@ func (r *PoolRefiller) fill(ctx context.Context) { |
226 | 232 | if !r.needsFilling() { |
227 | 233 | return |
228 | 234 | } |
229 | | - |
| 235 | + if r.fillRandomly { |
| 236 | + r.fillRandom(ctx) |
| 237 | + } |
230 | 238 | si := ShardInfo{ |
231 | 239 | NrShards: uint16(r.pool.nrShards), |
232 | 240 | MsbIgnore: r.pool.msbIgnore, |
@@ -270,3 +278,41 @@ func (r *PoolRefiller) fill(ctx context.Context) { |
270 | 278 | func (r *PoolRefiller) needsFilling() bool { |
271 | 279 | return r.active < r.pool.nrShards |
272 | 280 | } |
| 281 | + |
| 282 | +func (r *PoolRefiller) fillRandom(ctx context.Context) { |
| 283 | + // https://en.wikipedia.org/wiki/Coupon_collector%27s_problem |
| 284 | + maxTries := r.pool.nrShards * bits.Len(uint(r.pool.nrShards)) |
| 285 | + r.cfg.Logger.Infof("trying random shards %d times", maxTries) |
| 286 | + for try := 0; r.needsFilling() && try < maxTries; try++ { |
| 287 | + span := startSpan() |
| 288 | + conn, err := OpenConn(ctx, r.addr, nil, r.cfg) |
| 289 | + span.stop() |
| 290 | + if err != nil { |
| 291 | + if r.pool.connObs != nil { |
| 292 | + r.pool.connObs.OnConnect(ConnectEvent{ConnEvent: ConnEvent{Addr: r.addr, Shard: 0}, span: span, Err: err}) |
| 293 | + } |
| 294 | + if conn != nil { |
| 295 | + conn.Close() |
| 296 | + } |
| 297 | + continue |
| 298 | + } |
| 299 | + |
| 300 | + if r.pool.loadConn(conn.Shard()) != nil { |
| 301 | + if r.pool.connObs != nil { |
| 302 | + r.pool.connObs.OnConnect(ConnectEvent{ |
| 303 | + ConnEvent: ConnEvent{Addr: r.addr, Shard: uint16(conn.Shard())}, |
| 304 | + span: span, |
| 305 | + Err: fmt.Errorf("shard already in pool")}) |
| 306 | + } |
| 307 | + conn.Close() |
| 308 | + continue |
| 309 | + } |
| 310 | + |
| 311 | + if r.pool.connObs != nil { |
| 312 | + r.pool.connObs.OnConnect(ConnectEvent{ConnEvent: conn.Event(), span: span}) |
| 313 | + } |
| 314 | + conn.setOnClose(r.onConnClose) |
| 315 | + r.pool.storeConn(conn) |
| 316 | + r.active++ |
| 317 | + } |
| 318 | +} |
0 commit comments