Skip to content

Commit c8a1d3d

Browse files
committed
datasource.go: switch from whitelist to gocql.HostFilterFunc
1 parent d6bd90e commit c8a1d3d

File tree

1 file changed

+143
-11
lines changed

1 file changed

+143
-11
lines changed

pkg/plugin/datasource.go

Lines changed: 143 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,20 @@ import (
44
"context"
55
"encoding/json"
66
"errors"
7-
"gopkg.in/inf.v0"
7+
"fmt"
88
"math/big"
9+
"net"
910
"strconv"
11+
"strings"
1012
"time"
1113

12-
"fmt"
14+
"gopkg.in/inf.v0"
15+
1316
"github.com/gocql/gocql"
1417
"github.com/grafana/grafana-plugin-sdk-go/backend"
1518
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
1619
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
1720
"github.com/grafana/grafana-plugin-sdk-go/data"
18-
"strings"
1921
)
2022

2123
// NewDatasource creates a new datasource instance.
@@ -207,6 +209,7 @@ func (td *Datasource) query(_ context.Context, pCtx backend.PluginContext, insta
207209
session, err := instance.getSession(strings.TrimSpace(specificHost), addHost)
208210
if err != nil {
209211
log.DefaultLogger.Warn("Failed getting session", "err", err, "host", specificHost)
212+
response.Error = err
210213
return response
211214
}
212215
iter := session.Query(querytxt).Iter()
@@ -282,6 +285,29 @@ func (d *Datasource) CheckHealth(_ context.Context, req *backend.CheckHealthRequ
282285
}, nil
283286
}
284287

288+
// customAddressTranslator forces the driver to use a specific target IP
289+
// instead of the addresses discovered through gossip
290+
type customAddressTranslator struct {
291+
targetIP string
292+
port int
293+
}
294+
295+
func (t *customAddressTranslator) Translate(ip net.IP, port int) (net.IP, int) {
296+
log.DefaultLogger.Debug("Address translation", "from", ip.String(), "fromPort", port, "to", t.targetIP, "toPort", t.port)
297+
298+
// First, try to parse targetIP as a literal IP address.
299+
if parsedIP := net.ParseIP(t.targetIP); parsedIP != nil {
300+
return parsedIP, t.port
301+
}
302+
// If parsing fails, try to resolve targetIP as a hostname.
303+
if addrs, err := net.LookupIP(t.targetIP); err == nil && len(addrs) > 0 {
304+
return addrs[0], t.port
305+
}
306+
// As a last resort, fall back to the original IP/port to avoid returning a nil IP.
307+
log.DefaultLogger.Warn("Failed to translate address, falling back to original", "targetIP", t.targetIP, "originalIP", ip.String(), "originalPort", port)
308+
return ip, port
309+
}
310+
285311
type instanceSettings struct {
286312
cluster *gocql.ClusterConfig
287313
authenticator *gocql.PasswordAuthenticator
@@ -318,28 +344,134 @@ func (settings *instanceSettings) getSession(hostRef interface{}, specificHost b
318344
cluster = settings.cluster
319345
} else if settings.clusters[host] == nil {
320346
settings.clusters[host] = gocql.NewCluster(host)
321-
settings.clusters[host].HostFilter = gocql.WhiteListHostFilter(host)
347+
// Custom host filter that handles both public and private IPs
348+
targetIP := host
349+
// Remove port if present (host might be "ip:port" or "[ipv6]:port")
350+
if h, _, err := net.SplitHostPort(host); err == nil {
351+
targetIP = h
352+
}
353+
log.DefaultLogger.Debug("Setting up host filter", "targetIP", targetIP, "originalHost", host)
354+
355+
// Configure cluster to handle private/public IP scenarios
356+
// IgnorePeerAddr = true prevents the driver from connecting to addresses discovered via gossip
357+
// and forces it to only use the addresses we explicitly provided
358+
settings.clusters[host].IgnorePeerAddr = true
359+
// DisableInitialHostLookup = true to use the exact host we specified
360+
settings.clusters[host].DisableInitialHostLookup = true
361+
362+
// Add connection timeout to avoid hanging indefinitely
363+
settings.clusters[host].ConnectTimeout = 5 * time.Second
364+
settings.clusters[host].Timeout = 10 * time.Second
365+
366+
// Extract port from host if present
367+
port := 9042
368+
if _, portStr, err := net.SplitHostPort(host); err == nil {
369+
if p, err := strconv.Atoi(portStr); err == nil {
370+
port = p
371+
}
372+
}
373+
374+
// Set up custom address translator to force connections to use the target IP
375+
settings.clusters[host].AddressTranslator = &customAddressTranslator{
376+
targetIP: targetIP,
377+
port: port,
378+
}
379+
380+
// Custom host filter to force using the target IP
381+
// This ensures we connect to the private IP even if the driver discovers public IPs
382+
settings.clusters[host].HostFilter = gocql.HostFilterFunc(func(hostInfo *gocql.HostInfo) bool {
383+
connectAddr := hostInfo.ConnectAddress().String()
384+
broadcastAddr := hostInfo.BroadcastAddress().String()
385+
listenAddr := hostInfo.ListenAddress().String()
386+
rpcAddr := hostInfo.RPCAddress().String()
387+
388+
// Log all available hosts for debugging
389+
log.DefaultLogger.Debug("Available host detected",
390+
"connectAddress", connectAddr,
391+
"broadcastAddress", broadcastAddr,
392+
"listenAddress", listenAddr,
393+
"rpcAddress", rpcAddr,
394+
"targetIP", targetIP,
395+
"hostID", hostInfo.HostID())
396+
397+
// Extract and normalize IPs from addresses (they might include port)
398+
connectIP := connectAddr
399+
if h, _, err := net.SplitHostPort(connectAddr); err == nil {
400+
connectIP = h
401+
}
402+
403+
broadcastIP := broadcastAddr
404+
if h, _, err := net.SplitHostPort(broadcastAddr); err == nil {
405+
broadcastIP = h
406+
}
407+
408+
listenIP := listenAddr
409+
if h, _, err := net.SplitHostPort(listenAddr); err == nil {
410+
listenIP = h
411+
}
412+
413+
rpcIP := rpcAddr
414+
if h, _, err := net.SplitHostPort(rpcAddr); err == nil {
415+
rpcIP = h
416+
}
417+
418+
// Helper function to check if two IPs match (handles IPv6 normalization)
419+
ipMatches := func(ip1Str, ip2Str string) bool {
420+
ip1 := net.ParseIP(ip1Str)
421+
ip2 := net.ParseIP(ip2Str)
422+
if ip1 == nil || ip2 == nil {
423+
// Fall back to string comparison if parsing fails
424+
return ip1Str == ip2Str
425+
}
426+
return ip1.Equal(ip2)
427+
}
428+
429+
// Check if target IP matches any of the addresses
430+
if ipMatches(connectIP, targetIP) || ipMatches(broadcastIP, targetIP) ||
431+
ipMatches(listenIP, targetIP) || ipMatches(rpcIP, targetIP) {
432+
log.DefaultLogger.Debug("Host matched - connection allowed",
433+
"targetIP", targetIP,
434+
"connectIP", connectIP,
435+
"broadcastIP", broadcastIP,
436+
"listenIP", listenIP,
437+
"rpcIP", rpcIP)
438+
return true
439+
}
440+
441+
log.DefaultLogger.Debug("Host filtered out",
442+
"targetIP", targetIP,
443+
"connectIP", connectIP,
444+
"broadcastIP", broadcastIP,
445+
"listenIP", listenIP,
446+
"rpcIP", rpcIP)
447+
return false
448+
})
449+
322450
log.DefaultLogger.Debug("getSession creating cluster from host", "host", host)
323451
if settings.authenticator != nil {
324452
settings.clusters[host].Authenticator = *settings.authenticator
325453
}
326454
if settings.cluster == nil {
327455
// good opportunity to create a default cluster
328456
settings.cluster = gocql.NewCluster(host)
329-
}
330-
if specificHost {
331-
cluster = settings.clusters[host]
332-
} else {
333-
cluster = settings.cluster
457+
if settings.authenticator != nil {
458+
settings.cluster.Authenticator = *settings.authenticator
459+
}
334460
}
335461

336462
}
337-
log.DefaultLogger.Debug("getSession, creating new session", "host", host)
463+
// Cluster already exists for this host
464+
if specificHost {
465+
cluster = settings.clusters[host]
466+
} else {
467+
cluster = settings.cluster
468+
}
338469
session, err := gocql.NewSession(*cluster)
339470
if err != nil {
340-
log.DefaultLogger.Info("unable to connect to scylla", "err", err, "session", session, "host", host)
471+
log.DefaultLogger.Info("unable to connect to scylla", "err", err, "host", host, "clusterHosts", cluster.Hosts)
341472
return nil, err
342473
}
474+
log.DefaultLogger.Debug("Session created successfully", "host", host)
343475
settings.sessions[host] = session
344476
return session, nil
345477
}

0 commit comments

Comments
 (0)