Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rsky-common/src/ipld.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use sha2::{Digest, Sha256};

const SHA2_256: u64 = 0x12;
const DAGCBORCODEC: u64 = 0x71;
// https://docs.rs/libipld-core/0.16.0/src/libipld_core/raw.rs.html#19
const RAWCODEC: u64 = 0x77;
// ATProto requires raw codec (0x55) for blob CIDs — NOT 0x77 (dag-raw/obsolete)
const RAWCODEC: u64 = 0x55;

pub fn cid_for_cbor<T: Serialize>(data: &T) -> Result<Cid> {
let bytes = crate::struct_to_cbor(data)?;
Expand Down
108 changes: 83 additions & 25 deletions rsky-pds/src/actor_store/aws/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,22 @@ struct MoveObject {
#[derive(Debug, Clone)]
pub struct S3BlobStore {
client: s3::Client,
/// The actual S3 bucket name (from PDS_BLOBSTORE_S3_BUCKET env var).
/// Falls back to the DID for backwards compatibility with single-tenant setups.
s3_bucket: String,
/// The actor DID, used as a path prefix within the bucket.
pub bucket: String,
}

// Intended to work with DigitalOcean Spaces Object Storage which is an
// S3-compatible object storage service
// Intended to work with S3-compatible object storage services
// (DigitalOcean Spaces, AWS S3, Google Cloud Storage, MinIO, etc.)
impl S3BlobStore {
pub fn new(did: String, cfg: &SdkConfig) -> Self {
let client = aws_sdk_s3::Client::new(cfg);
let s3_bucket = env_str("PDS_BLOBSTORE_S3_BUCKET").unwrap_or_else(|| did.clone());
S3BlobStore {
client,
s3_bucket,
bucket: did,
}
}
Expand All @@ -52,15 +58,25 @@ impl S3BlobStore {
format!("quarantine/{0}/{1}", self.bucket, cid)
}

fn should_apply_public_read_acl() -> bool {
endpoint_supports_object_acl(env_str("AWS_ENDPOINT").as_deref())
}

pub async fn put_temp(&self, bytes: Vec<u8>) -> Result<String> {
let key = self.gen_key();
let body = ByteStream::from(bytes);
self.client
let request = self
.client
.put_object()
.body(body)
.bucket(&self.bucket)
.key(self.get_tmp_path(&key))
.acl(ObjectCannedAcl::PublicRead)
.bucket(&self.s3_bucket)
.key(self.get_tmp_path(&key));
let request = if Self::should_apply_public_read_acl() {
request.acl(ObjectCannedAcl::PublicRead)
} else {
request
};
request
.send()
.await?;
Ok(key)
Expand All @@ -83,12 +99,18 @@ impl S3BlobStore {

pub async fn put_permanent(&self, cid: Cid, bytes: Vec<u8>) -> Result<()> {
let body = ByteStream::from(bytes);
self.client
let request = self
.client
.put_object()
.body(body)
.bucket(&self.bucket)
.key(self.get_stored_path(cid))
.acl(ObjectCannedAcl::PublicRead)
.bucket(&self.s3_bucket)
.key(self.get_stored_path(cid));
let request = if Self::should_apply_public_read_acl() {
request.acl(ObjectCannedAcl::PublicRead)
} else {
request
};
request
.send()
.await?;
Ok(())
Expand All @@ -114,7 +136,7 @@ impl S3BlobStore {
let res = self
.client
.get_object()
.bucket(&self.bucket)
.bucket(&self.s3_bucket)
.key(self.get_stored_path(cid))
.send()
.await;
Expand Down Expand Up @@ -160,7 +182,7 @@ impl S3BlobStore {
let res = self
.client
.head_object()
.bucket(&self.bucket)
.bucket(&self.s3_bucket)
.key(key)
.send()
.await;
Expand All @@ -170,7 +192,7 @@ impl S3BlobStore {
async fn delete_key(&self, key: String) -> Result<()> {
self.client
.delete_object()
.bucket(&self.bucket)
.bucket(&self.s3_bucket)
.key(key)
.send()
.await?;
Expand All @@ -185,33 +207,69 @@ impl S3BlobStore {
let deletes = Delete::builder().set_objects(Some(objects)).build()?;
self.client
.delete_objects()
.bucket(&self.bucket)
.bucket(&self.s3_bucket)
.delete(deletes)
.send()
.await?;
Ok(())
}

async fn move_object(&self, keys: MoveObject) -> Result<()> {
self.client
let request = self
.client
.copy_object()
.bucket(&self.bucket)
.copy_source(format!(
"{0}/{1}/{2}",
env_str("AWS_ENDPOINT_BUCKET").unwrap(),
self.bucket,
keys.from
))
.key(keys.to)
.acl(ObjectCannedAcl::PublicRead)
.bucket(&self.s3_bucket)
.copy_source(format_copy_source(&self.s3_bucket, &keys.from))
.key(keys.to);
let request = if Self::should_apply_public_read_acl() {
request.acl(ObjectCannedAcl::PublicRead)
} else {
request
};
request
.send()
.await?;
self.client
.delete_object()
.bucket(&self.bucket)
.bucket(&self.s3_bucket)
.key(keys.from)
.send()
.await?;
Ok(())
}
}

fn endpoint_supports_object_acl(endpoint: Option<&str>) -> bool {
!matches!(endpoint, Some(value) if value.contains("storage.googleapis.com"))
}

fn format_copy_source(bucket: &str, key: &str) -> String {
format!("{bucket}/{key}")
}

#[cfg(test)]
mod tests {
use super::{endpoint_supports_object_acl, format_copy_source};

#[test]
fn gcs_endpoint_disables_object_acl() {
assert!(!endpoint_supports_object_acl(Some("https://storage.googleapis.com")));
}

#[test]
fn non_gcs_endpoint_keeps_object_acl() {
assert!(endpoint_supports_object_acl(Some("https://s3.us-west-2.amazonaws.com")));
assert!(endpoint_supports_object_acl(None));
}

#[test]
fn copy_source_uses_bucket_and_object_key_once() {
assert_eq!(
format_copy_source(
"my-pds-blobs",
"tmp/did:plc:abc123/randomkey",
),
"my-pds-blobs/tmp/did:plc:abc123/randomkey",
);
}
}
97 changes: 50 additions & 47 deletions rsky-pds/src/actor_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,65 +272,68 @@ impl ActorStore {
delete_and_update_uris.push(d_at_uri)
}
}
if write.swap_cid().is_none() {
continue;
}

let write_at_uri: &AtUri = &write.uri().try_into()?;
let record = self
.record
.get_record(write_at_uri, None, Some(true))
.await?;
let current_record = match record {
Some(record) => Some(Cid::from_str(&record.cid)?),
None => None,
};
let cid = match &write {
&PreparedWrite::Delete(_) => None,
&PreparedWrite::Create(w) | &PreparedWrite::Update(w) => Some(w.cid),
};

// Always build the op for the firehose
let mut op = CommitOp {
action: commit_action,
path: format_data_key(write_at_uri.get_collection(), write_at_uri.get_rkey()),
cid,
prev: None,
};
if let Some(_) = current_record {
op.prev = current_record;
};
commit_ops.push(op);
match write {
// There should be no current record for a create
PreparedWrite::Create(_) if write.swap_cid().is_some() => {
Err::<(), anyhow::Error>(
FormatCommitError::BadRecordSwap(format!("{:?}", current_record))
.into(),
)
}
// There should be a current record for an update
PreparedWrite::Update(_) if write.swap_cid().is_none() => {
Err::<(), anyhow::Error>(
FormatCommitError::BadRecordSwap(format!("{:?}", current_record))
.into(),
)
}
// There should be a current record for a delete
PreparedWrite::Delete(_) if write.swap_cid().is_none() => {
Err::<(), anyhow::Error>(
FormatCommitError::BadRecordSwap(format!("{:?}", current_record))

// Look up current record for prev and swap_cid validation
if write.swap_cid().is_some() {
let record = self
.record
.get_record(write_at_uri, None, Some(true))
.await?;
let current_record = match record {
Some(record) => Some(Cid::from_str(&record.cid)?),
None => None,
};
if let Some(_) = current_record {
op.prev = current_record;
};

match write {
PreparedWrite::Create(_) if write.swap_cid().is_some() => {
Err::<(), anyhow::Error>(
FormatCommitError::BadRecordSwap(format!("{:?}", current_record))
.into(),
)
}
PreparedWrite::Update(_) if write.swap_cid().is_none() => {
Err::<(), anyhow::Error>(
FormatCommitError::BadRecordSwap(format!("{:?}", current_record))
.into(),
)
}
PreparedWrite::Delete(_) if write.swap_cid().is_none() => {
Err::<(), anyhow::Error>(
FormatCommitError::BadRecordSwap(format!("{:?}", current_record))
.into(),
)
}
_ => Ok::<(), anyhow::Error>(()),
}?;
match (current_record, write.swap_cid()) {
(Some(current_record), Some(swap_cid)) if current_record.eq(swap_cid) => {
Ok::<(), anyhow::Error>(())
}
_ => Err::<(), anyhow::Error>(
FormatCommitError::RecordSwapMismatch(format!("{:?}", current_record))
.into(),
)
}
_ => Ok::<(), anyhow::Error>(()),
}?;
match (current_record, write.swap_cid()) {
(Some(current_record), Some(swap_cid)) if current_record.eq(swap_cid) => {
Ok::<(), anyhow::Error>(())
}
_ => Err::<(), anyhow::Error>(
FormatCommitError::RecordSwapMismatch(format!("{:?}", current_record))
.into(),
),
}?;
),
}?;
}

commit_ops.push(op);
}
let mut repo = Repo::load(self.storage.clone(), Some(current_root.cid)).await?;
let previous_data = repo.commit.data;
Expand Down
7 changes: 3 additions & 4 deletions rsky-pds/src/plc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::plc::operations::update_handle_op;
use crate::plc::types::{CompatibleOp, OpOrTombstone};
use crate::APP_USER_AGENT;
use anyhow::{bail, Result};
use rsky_common::encode_uri_component;
use secp256k1::SecretKey;
use serde::de::DeserializeOwned;
use types::{CompatibleOpOrTombstone, DocumentData};
Expand All @@ -17,7 +16,7 @@ impl Client {
}

pub fn post_op_url(&self, did: &String) -> String {
format!("{0}/{1}", self.url, encode_uri_component(did))
format!("{0}/{1}", self.url, did)
}

// @TODO: Add better failure mode here
Expand Down Expand Up @@ -61,7 +60,7 @@ impl Client {
pub async fn get_document_data(&self, did: &String) -> Result<DocumentData> {
match self
.make_get_req(
format!("{0}/{1}/data", self.url, encode_uri_component(did)),
format!("{0}/{1}/data", self.url, did),
None,
)
.await
Expand All @@ -74,7 +73,7 @@ impl Client {
pub async fn get_last_op(&self, did: &String) -> Result<CompatibleOpOrTombstone> {
match self
.make_get_req(
format!("{0}/{1}/log/last", self.url, encode_uri_component(did)),
format!("{0}/{1}/log/last", self.url, did),
None,
)
.await
Expand Down
Loading