Skip to content

Commit 0592a90

Browse files
committed
1. Updated TestPrepareExecuteMetadataChangedFlag to validate Metadata_changed handling logic by forcing the driver to execute queries with old metadata
2. Added checking of integrity of the result to the test
1 parent bf0d7fa commit 0592a90

File tree

1 file changed

+87
-21
lines changed

1 file changed

+87
-21
lines changed

cassandra_test.go

Lines changed: 87 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3467,7 +3467,15 @@ func TestPrepareExecuteMetadataChangedFlag(t *testing.T) {
34673467
t.Fatal(err)
34683468
}
34693469

3470-
err := session.Query("INSERT INTO gocql_test.metadata_changed (id) VALUES (?)", 1).Exec()
3470+
type record struct {
3471+
id int
3472+
newCol int
3473+
}
3474+
3475+
firstRecord := record{
3476+
id: 1,
3477+
}
3478+
err := session.Query("INSERT INTO gocql_test.metadata_changed (id) VALUES (?)", firstRecord.id).Exec()
34713479
if err != nil {
34723480
t.Fatal(err)
34733481
}
@@ -3486,6 +3494,8 @@ func TestPrepareExecuteMetadataChangedFlag(t *testing.T) {
34863494
}
34873495

34883496
require.Len(t, row, 1, "Expected to retrieve a single column")
3497+
require.Equal(t, 1, row["id"])
3498+
34893499
stmtCacheKey := session.stmtsLRU.keyFor(conn.host.HostID(), conn.currentKeyspace, queryBeforeTableAltering.stmt)
34903500
inflight, _ := session.stmtsLRU.get(stmtCacheKey)
34913501
preparedStatementBeforeTableAltering := inflight.preparedStatment
@@ -3498,45 +3508,101 @@ func TestPrepareExecuteMetadataChangedFlag(t *testing.T) {
34983508
t.Fatal(err)
34993509
}
35003510

3511+
secondRecord := record{
3512+
id: 2,
3513+
newCol: 10,
3514+
}
3515+
err = session.Query("INSERT INTO gocql_test.metadata_changed (id, new_col) VALUES (?, ?)", secondRecord.id, secondRecord.newCol).
3516+
Exec()
3517+
if err != nil {
3518+
t.Fatal(err)
3519+
}
3520+
3521+
// Handles result from iter and ensures integrity of the result,
3522+
// closes iter and handles error
3523+
handleRows := func(iter *Iter) {
3524+
t.Helper()
3525+
3526+
var scannedID int
3527+
var scannedNewCol *int // to perform null values
3528+
3529+
// when the driver handling null values during unmarshalling
3530+
// it sets to dest type its zero value, which is (*int)(nil) for this case
3531+
var nilIntPtr *int
3532+
3533+
// Scanning first row
3534+
if iter.Scan(&scannedID, &scannedNewCol) {
3535+
require.Equal(t, firstRecord.id, scannedID)
3536+
require.Equal(t, nilIntPtr, scannedNewCol)
3537+
}
3538+
3539+
// Scanning second row
3540+
if iter.Scan(&scannedID, &scannedNewCol) {
3541+
require.Equal(t, secondRecord.id, scannedID)
3542+
require.Equal(t, &secondRecord.newCol, scannedNewCol)
3543+
}
3544+
3545+
err := iter.Close()
3546+
if err != nil {
3547+
if errors.Is(err, context.DeadlineExceeded) {
3548+
t.Fatal("It is likely failed due deadlock")
3549+
}
3550+
t.Fatal(err)
3551+
}
3552+
}
3553+
35013554
// Expecting C* will return RESULT/ROWS Metadata_changed
35023555
// and it will be properly handled
35033556
queryAfterTableAltering := session.Query(selectStmt)
35043557
queryAfterTableAltering.conn = conn
3505-
row = make(map[string]interface{})
3506-
err = queryAfterTableAltering.MapScan(row)
3507-
if err != nil {
3508-
t.Fatal(err)
3509-
}
3558+
iter := queryAfterTableAltering.Iter()
3559+
handleRows(iter)
35103560

35113561
// Ensuring if cache contains updated prepared statement
3512-
require.Len(t, row, 2, "Expected to retrieve both columns")
35133562
inflight, _ = session.stmtsLRU.get(stmtCacheKey)
35143563
preparedStatementAfterTableAltering := inflight.preparedStatment
35153564
require.NotEqual(t, preparedStatementBeforeTableAltering.resultMetadataID, preparedStatementAfterTableAltering.resultMetadataID)
35163565
require.NotEqual(t, preparedStatementBeforeTableAltering.response, preparedStatementAfterTableAltering.response)
35173566

3518-
// Executing prepared stmt and expecting that C* won't return
3519-
// Metadata_changed because the table is not being changed.
3567+
// FORCE SEND OLD RESULT METADATA ID (https://issues.apache.org/jira/browse/CASSANDRA-20028)
3568+
closedCh := make(chan struct{})
3569+
close(closedCh)
3570+
session.stmtsLRU.add(stmtCacheKey, &inflightPrepare{
3571+
done: closedCh,
3572+
err: nil,
3573+
preparedStatment: preparedStatementBeforeTableAltering,
3574+
})
3575+
35203576
// Running query with timeout to ensure there is no deadlocks.
35213577
// However, it doesn't 100% proves that there is a deadlock...
3522-
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30000)
3578+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
35233579
defer cancel()
35243580

35253581
queryAfterTableAltering2 := session.Query(selectStmt).WithContext(ctx)
35263582
queryAfterTableAltering2.conn = conn
3527-
row = make(map[string]interface{})
3528-
err = queryAfterTableAltering2.MapScan(row)
3529-
if err != nil {
3530-
if errors.Is(err, context.DeadlineExceeded) {
3531-
t.Fatal("It is likely failed due deadlock")
3532-
}
3533-
t.Fatal(err)
3534-
}
3583+
iter = queryAfterTableAltering2.Iter()
3584+
handleRows(iter)
3585+
err = iter.Close()
35353586

3536-
// Ensuring metadata of prepared stmt is not changed
3537-
require.Len(t, row, 2, "Expected to retrieve both columns")
35383587
inflight, _ = session.stmtsLRU.get(stmtCacheKey)
35393588
preparedStatementAfterTableAltering2 := inflight.preparedStatment
3589+
require.NotEqual(t, preparedStatementBeforeTableAltering.resultMetadataID, preparedStatementAfterTableAltering2.resultMetadataID)
3590+
require.NotEqual(t, preparedStatementBeforeTableAltering.response, preparedStatementAfterTableAltering2.response)
3591+
35403592
require.Equal(t, preparedStatementAfterTableAltering.resultMetadataID, preparedStatementAfterTableAltering2.resultMetadataID)
3541-
require.Equal(t, preparedStatementAfterTableAltering.response, preparedStatementAfterTableAltering2.response)
3593+
require.NotEqual(t, preparedStatementAfterTableAltering.response, preparedStatementAfterTableAltering2.response) // METADATA_CHANGED flag
3594+
require.True(t, preparedStatementAfterTableAltering2.response.flags&flagMetaDataChanged != 0)
3595+
3596+
// Executing prepared stmt and expecting that C* won't return
3597+
// Metadata_changed because the table is not being changed.
3598+
queryAfterTableAltering3 := session.Query(selectStmt).WithContext(ctx)
3599+
queryAfterTableAltering3.conn = conn
3600+
iter = queryAfterTableAltering2.Iter()
3601+
handleRows(iter)
3602+
3603+
// Ensuring metadata of prepared stmt is not changed
3604+
inflight, _ = session.stmtsLRU.get(stmtCacheKey)
3605+
preparedStatementAfterTableAltering3 := inflight.preparedStatment
3606+
require.Equal(t, preparedStatementAfterTableAltering2.resultMetadataID, preparedStatementAfterTableAltering3.resultMetadataID)
3607+
require.Equal(t, preparedStatementAfterTableAltering2.response, preparedStatementAfterTableAltering3.response)
35423608
}

0 commit comments

Comments
 (0)