Skip to content

test: Fixes intermittent test failure related to CosmosDB #8660

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 82 additions & 30 deletions pkg/kv/cosmosdb/main_test.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package cosmosdb_test

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"runtime"
"testing"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
"github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos"
"github.com/treeverse/lakefs/pkg/kv"
"github.com/treeverse/lakefs/pkg/kv/cosmosdb"
Expand All @@ -27,19 +30,13 @@ var (

func TestCosmosDB(t *testing.T) {
kvtest.DriverTest(t, func(t testing.TB, ctx context.Context) kv.Store {
if runtime.GOOS == "darwin" {
t.Skipf("this test hangs for macOS users, and fails. skipping - see Issue#8476 for more details")
}

t.Helper()

databaseClient, err := client.NewDatabase(testParams.Database)
if err != nil {
log.Fatalf("creating database client: %v", err)
t.Fatalf("creating database client: %s", err)
}

testParams.Container = "test-container" + testutil.UniqueName()
log.Printf("Creating container %s", testParams.Container)
t.Logf("Creating container %s", testParams.Container)
resp2, err := databaseClient.CreateContainer(ctx, azcosmos.ContainerProperties{
ID: testParams.Container,
PartitionKeyDefinition: azcosmos.PartitionKeyDefinition{
Expand All @@ -59,58 +56,113 @@ func TestCosmosDB(t *testing.T) {
})
}

func TestMain(m *testing.M) {
// SKIP CosmoDB tests until we find a bettwe way to test with the curret emulator which fail our tests
return
// TestingTransport is a custom HTTP transport for testing purposes.
// It modifies the response body of CosmosDB requests to remove
// certain fields from the JSON response of the account properties endpoint.
type TestingTransport struct{}

func (t *TestingTransport) RoundTrip(req *http.Request) (*http.Response, error) {
originalResp, err := http.DefaultTransport.RoundTrip(req)
if err != nil {
return nil, err
}

if runtime.GOOS == "darwin" {
// this part hangs for macOS users, and fails. skipping - see Issue#8476 for more details
return
// Check if the original request was successful and content type is JSON
if originalResp.StatusCode != http.StatusOK {
return originalResp, nil // Return original non-OK response as is
}

// Basic check for JSON content type (can be more robust)
contentType := originalResp.Header.Get("Content-Type")
if contentType != "application/json" {
return originalResp, nil // Return original non-JSON response as is
}

if req.Method != http.MethodGet || req.URL.Path != "" {
return originalResp, nil // Return original response as is
}

// parse account properties response
originalBodyBytes, err := io.ReadAll(originalResp.Body)
if err != nil {
// Cannot modify if we can't read it. Return an error or the original response?
// Returning an error might be cleaner here.
return nil, fmt.Errorf("failed to read original body: %w", err)
}
_ = originalResp.Body.Close()

// Parse the original JSON body into a map, delete specified keys, and marshal it back
var dataMap map[string]any
if err := json.Unmarshal(originalBodyBytes, &dataMap); err != nil {
return nil, fmt.Errorf("failed to unmarshal original JSON: %w", err)
}

// Dont't want the azcosmos client use the values from the server
delete(dataMap, "readableLocations")
delete(dataMap, "writableLocations")

modifiedBodyBytes, err := json.Marshal(dataMap)
if err != nil {
return nil, fmt.Errorf("failed to marshal modified JSON: %w", err)
}
originalResp.Body = streaming.NopCloser(bytes.NewReader(modifiedBodyBytes)) // Reset the body
return originalResp, nil
}

func TestMain(m *testing.M) {
// This part hangs for macOS users, and fails. skipping - see Issue#8476 for more details
//if runtime.GOOS == "darwin" {
// return
//}
// use defer to ensure cleanup is called even if os.Exit is called
var code int
defer func() {
os.Exit(code)
}()

databaseURI, cleanupFunc, err := testutil.GetCosmosDBInstance()
if err != nil {
log.Fatalf("Could not connect to Docker: %s", err)
}
defer cleanupFunc()

const clientTimeout = 30 * time.Second
testParams = &kvparams.CosmosDB{
Endpoint: databaseURI,
Key: "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==",
Database: "test-db",
Client: &http.Client{Timeout: 30 * time.Second, Transport: &http.Transport{
// tests, safe
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, //nolint:gosec // ignore self-signed cert for local testing using the emulator
}},
StrongConsistency: false,
Client: &http.Client{
Timeout: clientTimeout,
Transport: &TestingTransport{},
},
StrongConsistency: true,
}

cred, err := azcosmos.NewKeyCredential(testParams.Key)
if err != nil {
log.Fatalf("creating key: %v", err)
log.Fatalf("creating credential key: %v", err)
}

// Create a CosmosDB client
client, err = azcosmos.NewClientWithKey(testParams.Endpoint, cred, &azcosmos.ClientOptions{
ClientOptions: azcore.ClientOptions{
Transport: &http.Client{Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}},
InsecureAllowCredentialWithHTTP: true,
Transport: testParams.Client,
},
})
if err != nil {
log.Fatalf("creating client using access key: %v", err)
}

log.Printf("Creating database %s", testParams.Database)
log.Printf("creating database %s", testParams.Database)
ctx := context.Background()
resp, err := client.CreateDatabase(ctx,
_, err = client.CreateDatabase(ctx,
azcosmos.DatabaseProperties{ID: testParams.Database},
&azcosmos.CreateDatabaseOptions{ThroughputProperties: &throughput},
)
if err != nil {
log.Fatalf("creating database: %v, raw response: %v", err, resp.RawResponse)
log.Fatalf("creating database failed: %v", err)
}

code := m.Run()
os.Exit(code)
code = m.Run()
}
2 changes: 1 addition & 1 deletion pkg/kv/cosmosdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (s *Store) SetIf(ctx context.Context, partitionKey, key, value []byte, valu
return kv.ErrMissingValue
}

// Specifies the value of the partiton key
// Specifies the value of the partition key
item := Document{
PartitionKey: encoding.EncodeToString(partitionKey),
ID: s.hashID(key),
Expand Down
98 changes: 68 additions & 30 deletions pkg/testutil/cosmosdb.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
package testutil

import (
"crypto/tls"
"bytes"
"errors"
"fmt"
"log"
"net/http"
"net/url"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/ory/dockertest/v3"
)

const (
CosmosDBLocalPort = "8081"
maxWait = 5 * time.Minute // Cosmosdb emulator takes time to start
"github.com/ory/dockertest/v3/docker"
)

var cosmosdbLocalURI string

var (
ErrContainerExited = errors.New("container exited")
ErrEmulatorNotReady = errors.New("emulator not ready")
)

func GetCosmosDBInstance() (string, func(), error) {
dockerPool, err := dockertest.NewPool("")
if err != nil {
Expand All @@ -26,60 +28,96 @@ func GetCosmosDBInstance() (string, func(), error) {

cosmosdbDockerRunOptions := &dockertest.RunOptions{
Repository: "mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator",
Tag: "latest",
Tag: "vnext-preview",
Env: []string{
"AZURE_COSMOS_EMULATOR_PARTITION_COUNT=100",
"AZURE_COSMOS_EMULATOR_ENABLE_DATA_PERSISTENCE=false",
"AZURE_COSMOS_EMULATOR_DISABLE_THROTTLING=true",
"ENABLE_TELEMETRY=false",
"PROTOCOL=http",
},
ExposedPorts: []string{
"8081/tcp",
"1234/tcp",
},
ExposedPorts: []string{CosmosDBLocalPort},
}

resource, err := dockerPool.RunWithOptions(cosmosdbDockerRunOptions)
if err != nil {
return "", nil, fmt.Errorf("could not start cosmosdb emulator: %w", err)
}

cosmosdbLocalURI = "https://localhost:" + resource.GetPort("8081/tcp")
// explorer endpoint to verify when container is up
cosmosdbExplorerURI := "http://localhost:" + resource.GetPort("1234/tcp")
// set cleanup
closer := func() {
err = dockerPool.Purge(resource)
if err != nil {
fmt.Println("could not kill cosmosdb local container :%w", err)
// Fetch logs from the container
var containerOut bytes.Buffer
if err := dockerPool.Client.Logs(docker.LogsOptions{
Container: resource.Container.ID,
OutputStream: &containerOut,
ErrorStream: &containerOut,
Stdout: true,
Stderr: true,
Follow: false,
}); err != nil {
log.Printf("Error in cosmosdb emulator logs: %s", err)
} else {
log.Printf("CosmosDB emulator output: %s", containerOut.String())
}

if err := dockerPool.Purge(resource); err != nil {
log.Printf("could not kill cosmosdb local container: %s", err)
}
}
// cleanup is called when we return without databaseURI
defer func() {
if cosmosdbLocalURI == "" {
closer()
}
}()

// expire, just to make sure
err = resource.Expire(dbContainerTimeoutSeconds)
if err != nil {
defer closer() // defer so that error is logged appropriately
return "", nil, fmt.Errorf("could not expire cosmosdb local emulator: %w", err)
}
p, err := url.JoinPath(cosmosdbLocalURI, "/_explorer/emulator.pem")
if err != nil {
defer closer()
return "", nil, fmt.Errorf("joining urls: %w", err)

const clientTimeout = 5 * time.Second
httpClient := http.Client{
Timeout: clientTimeout,
}

dockerPool.MaxWait = maxWait
log.Printf("Waiting up to %v for emulator to start", dockerPool.MaxWait)
// Note: this hangs for macOS users, and fails. See https://github.com/treeverse/lakeFS/issues/8476
// waiting for cosmosdb container to be ready by issuing an HTTP get request with
// exponential backoff retry. The response is not really meaningful for that case
// and so is ignored
err = dockerPool.Retry(func() error {
// waiting for cosmosdb container to be ready by issuing an HTTP get request with
// exponential backoff retry. The response is not really meaningful for that case
// and so is ignored
client := http.Client{Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, //nolint:gosec // ignore self-signed cert for local testing using the emulator
}}
resp, err := client.Get(p)
// Check if the container is still running
container, err := dockerPool.Client.InspectContainer(resource.Container.ID)
if err != nil {
return backoff.Permanent(fmt.Errorf("could not inspect container: %w", err))
}
if !container.State.Running {
return backoff.Permanent(fmt.Errorf("%w with status: %s", ErrContainerExited, container.State.Status))
}
// Check if the cosmosdb emulator is up and running
resp, err := httpClient.Get(cosmosdbExplorerURI)
if err != nil {
return err
}
_ = resp.Body.Close()
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("cosmosdb %w: %s", ErrEmulatorNotReady, resp.Status)
}
return nil
})
if err != nil {
defer closer()
return "", nil, fmt.Errorf("could not connect to cosmosdb emulator at %s: %w", cosmosdbLocalURI, err)
}

// cosmosdb emulator is running on port 8081.
// set the URI last as the cleanup occur in case we fail before.
cosmosdbLocalURI = "http://localhost:" + resource.GetPort("8081/tcp")

return cosmosdbLocalURI, closer, nil
}
Loading