Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions lib/compat/wordpress-7.0/collaboration.php
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,124 @@
add_action( 'init', 'gutenberg_rest_api_crdt_post_meta' );
}

if ( ! function_exists( 'gutenberg_crdt_intercept_post_meta_update' ) ) {
/**
* Intercepts post meta updates for the persisted CRDT document to
* implement optimistic concurrency control. Clients embed a `baseVersion`
* field in the serialized document. Before writing, this filter checks
* that the client's base version matches the version currently stored on
* the server. When versions match the write proceeds and the version is
* incremented atomically using a compare-and-swap on the database row.
*
* When the stored version does not match the client's base version the
* write is rejected — another client has already updated the document.
* The rejected client will retry after receiving the latest version
* through the next server response.
*
* @param mixed $check Whether to allow the update. Returning a
* non-null value short-circuits update_metadata().
* @param int $object_id Post ID.
* @param string $meta_key Meta key being updated.
* @param mixed $meta_value New meta value (JSON string).
* @return mixed Null to allow WordPress to proceed, false to reject, true
* when the write was handled directly.
*/
function gutenberg_crdt_intercept_post_meta_update( $check, $object_id, $meta_key, $meta_value ) {
if ( '_crdt_document' !== $meta_key ) {
return $check;
}

$incoming = json_decode( $meta_value, true );
if ( ! is_array( $incoming ) ) {
return $check;
}

$base_version = (int) ( $incoming['baseVersion'] ?? 0 );

global $wpdb;

$row = $wpdb->get_row(
$wpdb->prepare(
"SELECT meta_id, meta_value FROM $wpdb->postmeta
WHERE post_id = %d AND meta_key = %s",
$object_id,
'_crdt_document'
)
);

if ( $row ) {
// Existing meta — check version before writing.
$current = json_decode( $row->meta_value, true );

Check warning on line 148 in lib/compat/wordpress-7.0/collaboration.php

View workflow job for this annotation

GitHub Actions / PHP coding standards

Equals sign not aligned with surrounding assignments; expected 9 spaces but found 1 space
$current_version = (int) ( is_array( $current ) ? ( $current['baseVersion'] ?? 0 ) : 0 );

if ( $current_version !== $base_version ) {
return false; // Stale — client's base version does not match the server.
}

$incoming['baseVersion'] = $current_version + 1;
$new_value = wp_json_encode( $incoming );

Check warning on line 156 in lib/compat/wordpress-7.0/collaboration.php

View workflow job for this annotation

GitHub Actions / PHP coding standards

Equals sign not aligned with surrounding assignments; expected 15 spaces but found 1 space

// Atomic compare-and-swap: only update if the row hasn't changed
// since we last read it.
$affected = $wpdb->update(
$wpdb->postmeta,
array( 'meta_value' => $new_value ),
array(
'meta_id' => $row->meta_id,
'meta_value' => $row->meta_value,
)
);

if ( 0 === $affected ) {
return false; // Lost the race — another request updated first.
}

wp_cache_delete( $object_id, 'post_meta' );

/**
* Fires immediately after a CRDT-document post meta row is updated.
* Mirrors WordPress Core's `updated_post_meta` action to keep
* caches and hooks consistent.
*
* @param int $meta_id Meta ID.
* @param int $object_id Post ID.
* @param string $meta_key Meta key.
* @param mixed $meta_value Meta value.
*/
do_action( 'updated_post_meta', $row->meta_id, $object_id, $meta_key, $meta_value );

return true; // Handled — short-circuit WordPress.
}

// First-time write: no stored row yet. Let WordPress do the INSERT.
// The `pre_update_post__crdt_document` filter will inject
// baseVersion=1 before the row is created.
return $check;
}
add_filter( 'update_post_metadata', 'gutenberg_crdt_intercept_post_meta_update', 10, 4 );
}

if ( ! function_exists( 'gutenberg_crdt_set_initial_base_version' ) ) {
/**
* Sets the initial `baseVersion` on the first write of a persisted CRDT
* document. WordPress's `update_post_metadata` filter cannot modify the
* value for INSERTs (only short-circuit them), so this companion filter
* injects `baseVersion=1` when the stored row doesn't exist yet.
*
* @param mixed $meta_value New meta value (JSON string).
* @return mixed Modified meta value with baseVersion set, or the original.
*/
function gutenberg_crdt_set_initial_base_version( $meta_value ) {
$decoded = json_decode( $meta_value, true );
if ( is_array( $decoded ) && empty( $decoded['baseVersion'] ) ) {
$decoded['baseVersion'] = 1;
return wp_json_encode( $decoded );
}
return $meta_value;
}
add_filter( 'pre_update_post__crdt_document', 'gutenberg_crdt_set_initial_base_version', 10, 1 );
}

if ( ! function_exists( 'wp_collaboration_inject_setting' ) ) {
/**
* Registers the real-time collaboration setting.
Expand Down
16 changes: 16 additions & 0 deletions packages/core-data/src/actions.js
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,22 @@ export const saveEntityRecord =
// is intentionally excluded to avoid stale values
// overriding reverted fields.
const merged = { ...persistedRecord, ...record };

// The persisted CRDT document is managed through explicit
// saves (which call __unstablePrePersist to serialize a
// fresh copy). Autosaves carry a stale copy from the last
// server response and will either be rejected by the
// server's version check or, worse, overwrite a newer
// document. Strip it so autosaves don't touch sync data.
if ( merged.meta ) {
const {
/* eslint-disable-next-line camelcase */
_crdt_document,
...metaWithoutCRDT
} = merged.meta;
merged.meta = metaWithoutCRDT;
}

const data = [
'title',
'excerpt',
Expand Down
15 changes: 14 additions & 1 deletion packages/core-data/src/entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,22 @@ export const prePersistPostType = async (
if ( persistedRecord ) {
const objectType = `postType/${ name }`;
const objectId = persistedRecord.id;

let baseVersion = 0;
try {
const persistedCrdtDoc =
persistedRecord.meta?.[
POST_META_KEY_FOR_CRDT_DOC_PERSISTENCE
];
if ( persistedCrdtDoc ) {
const parsed = JSON.parse( persistedCrdtDoc );
baseVersion = parsed.baseVersion ?? 0;
}
} catch {}
const serializedDoc = await getSyncManager()?.createPersistedCRDTDoc(
objectType,
objectId
objectId,
baseVersion
);

if ( serializedDoc ) {
Expand Down
85 changes: 68 additions & 17 deletions packages/core-data/src/resolvers.js
Original file line number Diff line number Diff line change
Expand Up @@ -236,26 +236,77 @@ export const getEntityRecord =
// This effectively means that only post entities support CRDT
// persistence. As we add support for syncing additional entity,
// we'll need to revisit where persisted CRDT documents are stored.
persistCRDTDoc: () => {
resolveSelect
.getEditedEntityRecord( kind, name, key )
.then( ( editedRecord ) => {
// Don't persist the CRDT document if the record is still an
// auto-draft or if the entity does not support meta.
const { meta, status } = editedRecord;
if ( 'auto-draft' === status || ! meta ) {
return;
}

// Trigger a save to persist the CRDT document. The entity's
// pre-persist hooks will create the persisted CRDT document
// and apply it to the record's meta.
dispatch.saveEntityRecord(
persistCRDTDoc: async () => {
const editedRecord =
await resolveSelect.getEditedEntityRecord(
kind,
name,
key
);

const { meta, status } = editedRecord;
if ( 'auto-draft' === status || ! meta ) {
return;
}

let baseVersionSent = 0;
try {
const persistedDoc = meta._crdt_document;
if ( persistedDoc ) {
const parsed = JSON.parse( persistedDoc );
baseVersionSent = parsed.baseVersion ?? 0;
}
} catch {}

const savedRecord = await dispatch.saveEntityRecord(
kind,
name,
editedRecord
);
if ( ! savedRecord?.meta?._crdt_document ) {
return;
}

let serverVersion = 0;
try {
const savedDoc =
savedRecord.meta._crdt_document;
const parsed = JSON.parse( savedDoc );
serverVersion = parsed.baseVersion ?? 0;
} catch {}

if ( serverVersion === baseVersionSent ) {
return;
}

// Another client's CRDT doc was persisted between
// our read and write. Merge the server's CRDT doc
// into our local Y.Doc so we don't overwrite their
// changes on retry.
try {
const syncManager = getSyncManager();
if ( syncManager ) {
await resolveSelect.getEntityRecord(
kind,
name,
key
);
// getEntityRecord re-applies the persisted CRDT
// doc to the local Y.Doc via load(). The
// edited record now reflects merged state.
const mergedRecord =
await resolveSelect.getEditedEntityRecord(
kind,
name,
key
);
await dispatch.saveEntityRecord(
kind,
name,
editedRecord
mergedRecord
);
} );
}
} catch {}
},
addUndoMeta: ( ydoc, meta ) => {
const selectionHistory =
Expand Down
7 changes: 5 additions & 2 deletions packages/sync/src/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -637,15 +637,18 @@ export function createSyncManager( debug = false ): SyncManager {
handlers.editRecord( changes );
}

/* eslint-disable-next-line jsdoc/check-line-alignment */
/**
* Create object meta to persist the CRDT document in the entity record.
*
* @param {ObjectType} objectType Object type.
* @param {ObjectID} objectId Object ID.
* @param {number} [baseVersion=0] Base version from the server.
*/
async function createPersistedCRDTDoc(
objectType: ObjectType,
objectId: ObjectID
objectId: ObjectID,
baseVersion: number = 0
): Promise< string | null > {
const entityId = getEntityId( objectType, objectId );
const entityState = entityStates.get( entityId );
Expand All @@ -659,7 +662,7 @@ export function createSyncManager( debug = false ): SyncManager {
// before we serialize the document.
await new Promise( ( resolve ) => setTimeout( resolve, 0 ) );

return serializeCrdtDoc( entityState.ydoc );
return serializeCrdtDoc( entityState.ydoc, baseVersion );
}

// Collect internal functions so that they can be wrapped before calling.
Expand Down
3 changes: 2 additions & 1 deletion packages/sync/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ export interface SyncConfig {
export interface SyncManager {
createPersistedCRDTDoc: (
objectType: ObjectType,
objectId: ObjectID
objectId: ObjectID,
baseVersion?: number
) => Promise< string | null >;
getAwareness: < State extends Awareness >(
objectType: ObjectType,
Expand Down
6 changes: 5 additions & 1 deletion packages/sync/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,13 @@ function pseudoRandomID(): number {
return Math.floor( Math.random() * 1000000000 );
}

export function serializeCrdtDoc( crdtDoc: CRDTDoc ): string {
export function serializeCrdtDoc(
crdtDoc: CRDTDoc,
baseVersion: number = 0
): string {
return JSON.stringify( {
document: buffer.toBase64( Y.encodeStateAsUpdateV2( crdtDoc ) ),
baseVersion,
updateId: pseudoRandomID(), // helps with debugging
} );
}
Expand Down
Loading