Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions dgraph/cmd/dgraphimport/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ func TestEmptyBulkOutDir(t *testing.T) {
}

func TestDrainModeAfterStartSnapshotStream(t *testing.T) {
t.Skip("Skipping... sometimes the query for schema succeeds even when the server is in draining mode")

tests := []struct {
name string
numAlphas int
Expand Down Expand Up @@ -123,8 +121,6 @@ func TestDrainModeAfterStartSnapshotStream(t *testing.T) {
}

func TestImportApis(t *testing.T) {
t.Skip("Skipping import tests due to persistent flakiness with container networking and Raft leadership issues")

tests := []testcase{
{
name: "SingleGroupShutTwoAlphasPerGroup",
Expand Down Expand Up @@ -391,12 +387,12 @@ func verifyImportResults(t *testing.T, gc *dgraphapi.GrpcClient, downAlphas int)
}

retryDelay := time.Second
hasAllPredicates := true

// Get expected predicates first
var expectedSchemaObj map[string]interface{}
require.NoError(t, json.Unmarshal([]byte(expectedSchema), &expectedSchemaObj))
expectedPredicates := getPredicateMap(expectedSchemaObj)
var hasAllPredicates bool

for i := 0; i < maxRetries; i++ {
// Checking client connection again here because an import operation may be in progress on the rejoined alpha
Expand All @@ -412,7 +408,7 @@ func verifyImportResults(t *testing.T, gc *dgraphapi.GrpcClient, downAlphas int)
// Get actual predicates
actualPredicates := getPredicateMap(actualSchema)

// Check if all expected predicates are present
hasAllPredicates = true
for predName := range expectedPredicates {
if _, exists := actualPredicates[predName]; !exists {
hasAllPredicates = false
Expand Down
4 changes: 2 additions & 2 deletions dgraphtest/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,8 +580,8 @@ func downloadFile(fname, url string) error {
cmd := exec.Command("wget", "-O", fname, url)
cmd.Dir = datasetFilesPath

if out, err := cmd.CombinedOutput(); err != nil {
return fmt.Errorf("error downloading file %s: %s", fname, string(out))
if _, err := cmd.CombinedOutput(); err != nil {
return fmt.Errorf("error downloading file %s: %w", fname, err)
}
return nil
}
3 changes: 1 addition & 2 deletions dgraphtest/local_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,8 +930,7 @@ func (c *LocalCluster) Client() (*dgraphapi.GrpcClient, func(), error) {
var conns []*grpc.ClientConn
for _, aa := range c.alphas {
if !aa.isRunning {
// QUESTIONS(shivaji): Should this be 'continue' instead of a break from the loop
break
continue
}
url, err := aa.alphaURL(c)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions systest/backup/nfs-backup/Untitled
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
alpha1_backup_clust_ha
18 changes: 11 additions & 7 deletions systest/backup/nfs-backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,11 @@ var (
)

func TestBackupHAClust(t *testing.T) {
t.Skip("Skipping HA backup test via NFS")
backupRestoreTest(t, "alpha1_backup_clust_ha", "zero1_backup_clust_ha",
"alpha4_restore_clust_ha", backupDstHA)
}

func TestBackupNonHAClust(t *testing.T) {
t.Skip("Skipping Non-HA backup test via NFS")
backupRestoreTest(t, "alpha7_backup_clust_non_ha", "zero7_backup_clust_non_ha",
"alpha8_restore_clust_non_ha", backupDstNonHA)
}
Expand All @@ -54,25 +52,31 @@ func backupRestoreTest(t *testing.T, backupAlphaName string, backupZeroName stri

// Wait for containers to be healthy before proceeding
t.Logf("Waiting for %s to be healthy...", backupAlphaName)
fmt.Println("=================================================")
fmt.Println("testutil.DockerPrefix ------>", testutil.DockerPrefix)
fmt.Println("=================================================")
backupAlpha := testutil.ContainerInstance{Name: backupAlphaName, Prefix: testutil.DockerPrefix}
require.NoError(t, backupAlpha.BestEffortWaitForHealthy(8080))

t.Logf("Waiting for %s to be healthy...", backupZeroName)
backupZero := testutil.ContainerInstance{Name: backupZeroName, Prefix: testutil.DockerPrefix}
require.NoError(t, backupZero.BestEffortWaitForHealthy(6080))

// Resolve addresses after containers are healthy
backupAlphaSocketAddr := testutil.ContainerAddr(backupAlphaName, 9080)
backupAlphaSocketAddrHttp := testutil.ContainerAddr(backupAlphaName, 8080)
restoreAlphaAddr := testutil.ContainerAddr(restoreAlphaName, 8080)
backupZeroAddr := testutil.ContainerAddr(backupZeroName, 6080)
backupAlphaSocketAddr := testutil.ContainerAddrRetry(backupAlphaName, 9080)
backupAlphaSocketAddrHttp := testutil.ContainerAddrRetry(backupAlphaName, 8080)
restoreAlphaAddr := testutil.ContainerAddrRetry(restoreAlphaName, 8080)
backupZeroAddr := testutil.ContainerAddrRetry(backupZeroName, 6080)

var dg *dgo.Dgraph
var err error
ctx := context.Background()

// Wait for gRPC connection to be ready with retries
t.Log("Waiting for gRPC connection to be ready...")
fmt.Println("=================================================")
fmt.Println("backup alpha addess ------>", backupAlphaSocketAddr)
fmt.Println("=================================================")

for i := 0; i < 30; i++ {
var connErr error
dg, connErr = dgo.Open(fmt.Sprintf("dgraph://%s?sslmode=disable", backupAlphaSocketAddr))
Expand Down
11 changes: 7 additions & 4 deletions systest/backup/nfs-backup/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ version: "3.5"
services:
#HA backup cluster
alpha1_backup_clust_ha:
depends_on:
- nfs
image: dgraph-nfs-client:local
build:
context: .
Expand All @@ -24,7 +26,7 @@ services:
- /bin/sh
- -c
- |
mount -v -o vers=4,loud nfs:/ /mnt 2>&1 &
mount -v -o vers=4.0,loud nfs:/ /mnt 2>&1 &
for i in $(seq 1 30); do
if mountpoint -q /mnt 2>/dev/null; then
break
Expand Down Expand Up @@ -150,6 +152,7 @@ services:
working_dir: /data/alpha4
depends_on:
- alpha1_backup_clust_ha
- nfs
links:
- nfs:nfs
privileged: true
Expand All @@ -167,7 +170,7 @@ services:
- /bin/sh
- -c
- |
mount -v -o vers=4,loud nfs:/ /mnt 2>&1 &
mount -v -o vers=4.0,loud nfs:/ /mnt 2>&1 &
for i in $(seq 1 30); do
if mountpoint -q /mnt 2>/dev/null; then
break
Expand Down Expand Up @@ -331,7 +334,7 @@ services:
- /bin/sh
- -c
- |
mount -v -o vers=4,loud nfs:/ /mnt 2>&1 &
mount -v -o vers=4.0,loud nfs:/ /mnt 2>&1 &
for i in $(seq 1 30); do
if mountpoint -q /mnt 2>/dev/null; then
break
Expand Down Expand Up @@ -388,7 +391,7 @@ services:
- /bin/sh
- -c
- |
mount -v -o vers=4,loud nfs:/ /mnt 2>&1 &
mount -v -o vers=4.0,loud nfs:/ /mnt 2>&1 &
for i in $(seq 1 30); do
if mountpoint -q /mnt 2>/dev/null; then
break
Expand Down
53 changes: 37 additions & 16 deletions t/t.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,40 +230,55 @@ func detectRace(prefix string) bool {
}

func outputLogs(prefix string) {
fmt.Printf("Outputting logs for prefix: %s\n", prefix)
fmt.Println("=================================================")
fmt.Println("=================================================")
fmt.Println("=================================================")
f, err := os.CreateTemp(".", prefix+"*.log")
x.Check(err)
defer func() {
if err := f.Close(); err != nil {
fmt.Printf("error closing file: %v", err)
}
}()
printLogs := func(container string) {
in := testutil.GetContainerInstance(prefix, container)
c := in.GetContainer()
if c == nil {
return
}

// Get all containers with this prefix instead of hardcoding container names
// This works for both default docker-compose.yml and custom compose files
containers := testutil.AllContainers(prefix)

fmt.Println("-------------------------------------------------")
fmt.Printf("Containers: %+v\n", containers)
fmt.Println("-------------------------------------------------")

for _, c := range containers {
logCmd := exec.Command("docker", "logs", c.ID)
out, err := logCmd.CombinedOutput()
x.Check(err)
if err != nil {
fmt.Printf("Error getting logs for container %s (ID: %s): %v\n", c.Names, c.ID, err)
continue
}
if _, err := f.Write(out); err != nil {
fmt.Printf("error writing container logs to file: %v", err)
}
fmt.Printf("Docker logs for %s is %s with error %+v ", c.ID, string(out), err)
}
for i := 0; i <= 3; i++ {
printLogs("zero" + strconv.Itoa(i))
// Include container name in the log output for easier debugging
containerName := "unknown"
if len(c.Names) > 0 {
containerName = strings.TrimPrefix(c.Names[0], "/")
}
fmt.Printf("Docker logs for %s (ID: %s):\n%s\n", containerName, c.ID, string(out))
}

for i := 0; i <= 6; i++ {
printLogs("alpha" + strconv.Itoa(i))
}
fmt.Println("-------------------------------------------------")
fmt.Printf("Logs written to file: %s\n", f.Name())
fmt.Println("-------------------------------------------------")

s := fmt.Sprintf("---> LOGS for %s written to %s .\n", prefix, f.Name())
_, err = oc.Write([]byte(s))
x.Check(err)
}

func stopCluster(composeFile, prefix string, wg *sync.WaitGroup, err error) {
fmt.Println("in stop cluster function========================")
go func() {
if err != nil {
outputLogs(prefix)
Expand Down Expand Up @@ -547,9 +562,15 @@ func runTests(taskCh chan task, closer *z.Closer) error {
} else {
// we are not using err variable here because we dont want to
// print logs of default cluster in case of custom test fail.
if cerr := runCustomClusterTest(ctx, task.pkg.ID, wg, xmlFile); cerr != nil {
return cerr
if err = runCustomClusterTest(ctx, task.pkg.ID, wg, xmlFile); err != nil {
fmt.Printf("Ran custom cluster test for package: %s with error: %v\n", task.pkg.ID, err)
fmt.Println("=================================================")
fmt.Println("=================================================")
return err
}
fmt.Printf("Ran custom cluster test for package: %s with error: %v\n", task.pkg.ID, err)
fmt.Println("=================================================")
fmt.Println("=================================================")
}
}
return err
Expand Down
53 changes: 47 additions & 6 deletions testutil/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ func (in ContainerInstance) BestEffortWaitForHealthy(privatePort uint16) error {
if len(port) == 0 {
return nil
}
fmt.Println("--------------------------------")
fmt.Println("best effort wait for healthy port ------>", port)
fmt.Println("--------------------------------", port == "")
checkACL := func(body []byte) error {
// Zero returns OK as response
if string(body) == "OK" {
Expand All @@ -70,6 +73,9 @@ func (in ContainerInstance) BestEffortWaitForHealthy(privatePort uint16) error {
tryWith := func(host string) error {
maxAttempts := 60
for attempt := range maxAttempts {
fmt.Println("--------------------------------")
fmt.Println("best effort wait for healthy attempt ------>", attempt)
fmt.Println("-------------url-------------------", "http://"+host+":"+port+"/health")
resp, err := http.Get("http://" + host + ":" + port + "/health")
var body []byte
if resp != nil && resp.Body != nil {
Expand Down Expand Up @@ -176,16 +182,14 @@ func (in ContainerInstance) GetContainer() *types.Container {
}

func getContainer(name string) types.Container {
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
x.Check(err)
containers := AllContainers(DockerPrefix)

containers, err := cli.ContainerList(context.Background(), container.ListOptions{All: true})
if err != nil {
log.Fatalf("While listing container: %v\n", err)
}
fmt.Println("-------------containers-------------------", len(containers))

q := fmt.Sprintf("/%s_%s_", DockerPrefix, name)
fmt.Println("-------------qprefix-------------------", q)
for _, c := range containers {
fmt.Println("-------------c.names-------------------", c.Names)
for _, n := range c.Names {
if !strings.HasPrefix(n, q) {
continue
Expand All @@ -207,9 +211,16 @@ func AllContainers(prefix string) []types.Container {

var out []types.Container
for _, c := range containers {
// fmt.Println("--------------------------------")
// fmt.Println("overall containers", c.Names)
// fmt.Println("-------------prefix-------------------", prefix)
for _, name := range c.Names {
if strings.HasPrefix(name, "/"+prefix) {
out = append(out, c)
fmt.Println("found container", name)
} else {
fmt.Println("-------------dont have that prefix-------------------", name)

}
}
}
Expand All @@ -226,6 +237,32 @@ func ContainerAddrWithHost(name string, privatePort uint16, host string) string
return host + ":" + strconv.Itoa(int(privatePort))
}

func ContainerAddrWithHostRetry(name string, privatePort uint16, host string) string {
maxAttempts := 60
for attempt := range maxAttempts {
fmt.Printf("Attempt %d to get container address for %s:%d with host %s\n", attempt, name, privatePort, host)
c := getContainer(name)
fmt.Println("-------------c-------------------", c)

for _, p := range c.Ports {
fmt.Println("-------------p.Private port-------------------", p.PrivatePort)
fmt.Println("-------------private port-------------------", privatePort)
if p.PrivatePort == privatePort {
// Found the mapping - return immediately without waiting
return host + ":" + strconv.Itoa(int(p.PublicPort))
}
}

if attempt < maxAttempts-1 {
time.Sleep(500 * time.Millisecond)
}
}

fmt.Printf("\n\n\ndid not find container address for %s:%d with host %s\n\n\n", name, privatePort, host)

return host + ":" + strconv.Itoa(int(privatePort))
}

func ContainerAddrLocalhost(name string, privatePort uint16) string {
return ContainerAddrWithHost(name, privatePort, "localhost")
}
Expand All @@ -234,6 +271,10 @@ func ContainerAddr(name string, privatePort uint16) string {
return ContainerAddrWithHost(name, privatePort, "0.0.0.0")
}

func ContainerAddrRetry(name string, privatePort uint16) string {
return ContainerAddrWithHostRetry(name, privatePort, "0.0.0.0")
}

// DockerStart starts the specified services.
func DockerRun(instance string, op int) error {
c := getContainer(instance)
Expand Down
10 changes: 2 additions & 8 deletions worker/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func InStream(stream api.Dgraph_StreamExtSnapshotServer) error {
return fmt.Errorf("failed to establish stream with leader: %v", err)
}
glog.Infof("[import] [forward %d -> %d] start", groups().Node.gid, groupId)
glog.Infof("[import] [forward %d -> %d] start", groups().Node.MyAddr, groups().Leader(groupId).Addr)
glog.Infof("[import] [forward %v -> %d] start", groups().Node.MyAddr, groups().Leader(groupId).Addr)

glog.Infof("[import] sending forward true to leader of group [%v]", groupId)
forwardReq := &api.StreamExtSnapshotRequest{Forward: true}
Expand All @@ -313,12 +313,6 @@ func InStream(stream api.Dgraph_StreamExtSnapshotServer) error {
func pipeTwoStream(in api.Dgraph_StreamExtSnapshotServer, out pb.Worker_StreamExtSnapshotClient, groupId uint32) error {
currentGroup := groups().Node.gid
ctx := in.Context()
if err := out.Send(&api.StreamExtSnapshotRequest{GroupId: groupId}); err != nil {
return fmt.Errorf("send groupId downstream(%d): %w", groupId, err)
}
if _, err := out.Recv(); err != nil {
return fmt.Errorf("ack groupId downstream(%d): %w", groupId, err)
}

for {
if err := ctx.Err(); err != nil {
Expand Down Expand Up @@ -487,7 +481,7 @@ func streamInGroup(stream api.Dgraph_StreamExtSnapshotServer, forward bool) erro
if forward {
// We are not going to return any error from here because we care about the majority of nodes.
// If the majority of nodes are able to receive the data, the remaining ones can catch up later.
glog.Infof("[import] Streaming external snapshot to [%v] from [%v] forward [%v]", member.Addr, node.MyAddr)
glog.Infof("[import] Streaming external snapshot to [%v] from [%v]", member.Addr, node.MyAddr)
eg.Go(func() error {
glog.Infof(`[import:forward] streaming external snapshot to [%v] from [%v]`, member.Addr, node.MyAddr)
if member.AmDead {
Expand Down
Loading