Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions denokv/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -748,3 +748,76 @@ async fn read_key_1<P: RemotePermissions, T: RemoteTransport>(
assert_eq!(range.entries[0].key, vec![1]);
range.entries.into_iter().next().unwrap()
}

#[tokio::test]
async fn atomic_write_reports_failed_checks() {
let (_child, addr) = start_server().await;
let client = ReqwestClient(reqwest::Client::new());
let url = format!("http://localhost:{}", addr.port()).parse().unwrap();

let metadata_endpoint = denokv_remote::MetadataEndpoint {
url,
access_token: ACCESS_TOKEN.to_string(),
};

let remote =
denokv_remote::Remote::new(client, DummyPermissions, metadata_endpoint);

remote
.atomic_write(AtomicWrite {
checks: vec![],
mutations: vec![denokv_proto::Mutation {
key: vec![1],
kind: denokv_proto::MutationKind::Set(denokv_proto::KvValue::U64(1)),
expire_at: None,
}],
enqueues: vec![],
})
.await
.unwrap()
.expect("commit success");

// The first and third checks expect the (existing) key [1] to be
// absent and must fail; the second check expects the absent key [2]
// to be absent and must pass.
let write = denokv_proto::datapath::AtomicWrite {
checks: vec![
denokv_proto::datapath::Check {
key: vec![1],
versionstamp: vec![],
},
denokv_proto::datapath::Check {
key: vec![2],
versionstamp: vec![],
},
denokv_proto::datapath::Check {
key: vec![1],
versionstamp: vec![],
},
],
mutations: vec![],
enqueues: vec![],
};

let response = reqwest::Client::new()
.post(format!("http://localhost:{}/v2/atomic_write", addr.port()))
.header("authorization", format!("Bearer {ACCESS_TOKEN}"))
.header("x-denokv-version", "3")
.header(
"x-denokv-database-id",
"00000000-0000-0000-0000-000000000000",
)
.body(prost::Message::encode_to_vec(&write))
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
let output: denokv_proto::datapath::AtomicWriteOutput =
prost::Message::decode(response.bytes().await.unwrap()).unwrap();

assert_eq!(
output.status(),
denokv_proto::datapath::AtomicWriteStatus::AwCheckFailure
);
assert_eq!(output.failed_checks, vec![0, 2]);
}
29 changes: 23 additions & 6 deletions proto/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::num::NonZeroU32;

use crate::datapath as pb;
use crate::AtomicWrite;
use crate::AtomicWriteOutcome;
use crate::Check;
use crate::CommitResult;
use crate::Enqueue;
Expand Down Expand Up @@ -260,12 +261,28 @@ impl From<Vec<ReadRangeOutput>> for pb::SnapshotReadOutput {
impl From<Option<CommitResult>> for pb::AtomicWriteOutput {
fn from(commit_result: Option<CommitResult>) -> pb::AtomicWriteOutput {
match commit_result {
None => pb::AtomicWriteOutput {
status: pb::AtomicWriteStatus::AwCheckFailure as i32,
failed_checks: vec![], // todo!
..Default::default()
},
Some(commit_result) => pb::AtomicWriteOutput {
None => AtomicWriteOutcome::CheckFailed {
failed_checks: vec![],
}
.into(),
Some(commit_result) => {
AtomicWriteOutcome::Committed(commit_result).into()
}
}
}
}

impl From<AtomicWriteOutcome> for pb::AtomicWriteOutput {
fn from(outcome: AtomicWriteOutcome) -> pb::AtomicWriteOutput {
match outcome {
AtomicWriteOutcome::CheckFailed { failed_checks } => {
pb::AtomicWriteOutput {
status: pb::AtomicWriteStatus::AwCheckFailure as i32,
failed_checks,
..Default::default()
}
}
AtomicWriteOutcome::Committed(commit_result) => pb::AtomicWriteOutput {
status: pb::AtomicWriteStatus::AwSuccess as i32,
versionstamp: commit_result.versionstamp.to_vec(),
..Default::default()
Expand Down
22 changes: 22 additions & 0 deletions proto/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,28 @@ pub struct CommitResult {
pub versionstamp: Versionstamp,
}

/// The outcome of an atomic write operation.
#[derive(Debug)]
pub enum AtomicWriteOutcome {
/// All checks passed and the write was committed.
Committed(CommitResult),
/// One or more checks failed and the write was not committed.
CheckFailed {
/// Indexes into the `checks` of the originating [`AtomicWrite`]
/// identifying the checks that failed.
failed_checks: Vec<u32>,
},
}

impl From<AtomicWriteOutcome> for Option<CommitResult> {
fn from(outcome: AtomicWriteOutcome) -> Option<CommitResult> {
match outcome {
AtomicWriteOutcome::Committed(commit_result) => Some(commit_result),
AtomicWriteOutcome::CheckFailed { .. } => None,
}
}
}

#[derive(Debug)]
/// The message notifying about the status of a single key in a watch request.
pub enum WatchKeyOutput {
Expand Down
6 changes: 3 additions & 3 deletions proto/kv-connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ Protobuf message in the format `com.deno.kv.datapath.AtomicWriteResponse`. If
the server is unable to perform the write because the database is not available
from this server, the `status` field MUST be set to `AW_WRITE_DISABLED`. If a
write operation fails due to a check conflict, the `status` field MUST be set to
`AW_CHECK_FAILED`. If the write operation succeeds, the `status` field MUST be
`AW_CHECK_FAILURE`. If the write operation succeeds, the `status` field MUST be
set to `AW_SUCCESS`. If the request succeeds, the server MUST include the
`versionstamp` field with the versionstamp of the write operation.

Expand All @@ -367,10 +367,10 @@ If the response has a `status` field set to `AW_WRITE_DISABLED`, the client
SHOULD perform a metadata exchange with the server to get a new list of
endpoints, and then retry the request.

If the response has a `status` field set to `AW_CHECK_FAILED`, the client MUST
If the response has a `status` field set to `AW_CHECK_FAILURE`, the client MUST
return an error to the user indicating that the write operation failed due to a
check conflict. The client SHOULD report the checks that failed to the user by
interpereting the `check_failures` field of the response.
interpreting the `failed_checks` field of the response.

If the response has a `status` field set to `AW_UNDEFINED`, the client MUST
return an error to the user indicating that the write operation failed due to an
Expand Down
17 changes: 11 additions & 6 deletions sqlite/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use denokv_proto::decode_value;
use denokv_proto::encode_value;
use denokv_proto::encode_value_owned;
use denokv_proto::AtomicWrite;
use denokv_proto::AtomicWriteOutcome;
use denokv_proto::CommitResult;
use denokv_proto::KvEntry;
use denokv_proto::KvValue;
Expand Down Expand Up @@ -278,7 +279,7 @@ impl SqliteBackend {
pub fn atomic_write_batched(
&mut self,
writes: Vec<AtomicWrite>,
) -> Vec<Result<Option<CommitResult>, SqliteBackendError>> {
) -> Vec<Result<AtomicWriteOutcome, SqliteBackendError>> {
if self.readonly {
return writes
.iter()
Expand Down Expand Up @@ -325,7 +326,7 @@ impl SqliteBackend {
}

for (write, commit_result) in writes.iter().zip(commit_results.iter()) {
if let Ok(Some(commit_result)) = &commit_result {
if let Ok(AtomicWriteOutcome::Committed(commit_result)) = &commit_result {
for mutation in &write.mutations {
self
.notifier
Expand All @@ -341,17 +342,21 @@ impl SqliteBackend {
tx: &mut rusqlite::Transaction,
rng: &mut dyn RngCore,
write: &AtomicWrite,
) -> Result<(bool, Option<CommitResult>), SqliteBackendError> {
for check in &write.checks {
) -> Result<(bool, AtomicWriteOutcome), SqliteBackendError> {
let mut failed_checks = Vec::new();
for (index, check) in write.checks.iter().enumerate() {
let real_versionstamp = tx
.prepare_cached(STATEMENT_KV_POINT_GET_VERSION_ONLY)?
.query_row([check.key.as_slice()], |row| row.get(0))
.optional()?
.map(version_to_versionstamp);
if real_versionstamp != check.versionstamp {
return Ok((false, None));
failed_checks.push(index as u32);
}
}
if !failed_checks.is_empty() {
return Ok((false, AtomicWriteOutcome::CheckFailed { failed_checks }));
}

let incrementer_count = rng.gen_range(1..10);
let version: i64 = tx
Expand Down Expand Up @@ -470,7 +475,7 @@ impl SqliteBackend {

Ok((
has_enqueues,
Some(CommitResult {
AtomicWriteOutcome::Committed(CommitResult {
versionstamp: new_versionstamp,
}),
))
Expand Down
7 changes: 4 additions & 3 deletions sqlite/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use chrono::DateTime;
use chrono::Utc;
use deno_error::JsErrorBox;
use denokv_proto::AtomicWrite;
use denokv_proto::AtomicWriteOutcome;
use denokv_proto::CommitResult;
use denokv_proto::Database;
use denokv_proto::QueueMessageHandle;
Expand Down Expand Up @@ -77,7 +78,7 @@ enum SqliteRequest {
},
AtomicWrite {
write: AtomicWrite,
sender: oneshot::Sender<Result<Option<CommitResult>, SqliteBackendError>>,
sender: oneshot::Sender<Result<AtomicWriteOutcome, SqliteBackendError>>,
},
QueueDequeueMessage {
sender: oneshot::Sender<DequeuedMessage>,
Expand Down Expand Up @@ -512,7 +513,7 @@ impl Sqlite {
pub async fn atomic_write(
&self,
write: AtomicWrite,
) -> Result<Option<CommitResult>, SqliteBackendError> {
) -> Result<AtomicWriteOutcome, SqliteBackendError> {
let (sender, receiver) = oneshot::channel();
self
.write_worker
Expand Down Expand Up @@ -644,7 +645,7 @@ impl Database for Sqlite {
let res = Sqlite::atomic_write(self, write)
.await
.map_err(JsErrorBox::from_err)?;
Ok(res)
Ok(res.into())
}

async fn dequeue_next_message(
Expand Down
Loading