Skip to content

Commit d3d3a28

Browse files
authored
Merge pull request #322 from ozer550/implement-self-ref-fk-ordering
Implement self ref fk ordering
2 parents 4879cc2 + 7b427cb commit d3d3a28

20 files changed

Lines changed: 919 additions & 67 deletions

File tree

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,3 +101,5 @@ Pipfile
101101

102102
# Version file generated by setuptools-scm
103103
morango/_version.py
104+
105+
graphify-out/

morango/api/serializers.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,5 +178,6 @@ class Meta:
178178
"profile",
179179
"rmcb_list",
180180
"_self_ref_fk",
181+
"_self_ref_order",
181182
)
182183
read_only_fields = fields

morango/constants/capabilities.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@
22
ALLOW_CERTIFICATE_PUSHING = "ALLOW_CERTIFICATE_PUSHING"
33
ASYNC_OPERATIONS = "ASYNC_OPERATIONS"
44
FSIC_V2_FORMAT = "FSIC_V2_FORMAT"
5+
SELF_REF_ORDER = "SELF_REF_ORDER"
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# Generated by Django 3.2.25 on 2026-04-22 10:53
2+
3+
import django.core.validators
4+
from django.db import migrations, models
5+
6+
7+
class Migration(migrations.Migration):
8+
9+
dependencies = [
10+
('morango', '0003_store_deserialization_errors'),
11+
]
12+
13+
operations = [
14+
migrations.AddField(
15+
model_name='buffer',
16+
name='_self_ref_order',
17+
field=models.IntegerField(blank=True, null=True, validators=[django.core.validators.MinValueValidator(0)]),
18+
),
19+
migrations.AddField(
20+
model_name='store',
21+
name='_self_ref_order',
22+
field=models.IntegerField(blank=True, null=True, validators=[django.core.validators.MinValueValidator(0)]),
23+
),
24+
]

morango/models/core.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from functools import reduce
77

88
from django.core import exceptions
9+
from django.core.validators import MinValueValidator
910
from django.db import connection, models, router, transaction
1011
from django.db.models import F, Func, Max, Q, TextField, Value, signals
1112
from django.db.models.deletion import Collector
@@ -391,6 +392,9 @@ class AbstractStore(models.Model):
391392
conflicting_serialized_data = models.TextField(blank=True)
392393

393394
_self_ref_fk = models.CharField(max_length=32, blank=True)
395+
_self_ref_order = models.IntegerField(
396+
blank=True, null=True, validators=[MinValueValidator(0)]
397+
)
394398

395399
class Meta:
396400
abstract = True
@@ -786,6 +790,7 @@ class SyncableModel(UUIDModelMixin):
786790

787791
_morango_internal_fields_not_to_serialize = ("_morango_dirty_bit",)
788792
morango_model_dependencies = ()
793+
morango_ordering = ()
789794
morango_fields_not_to_serialize = ()
790795
morango_profile = None
791796

morango/registry.py

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55

66
import inspect
77
import sys
8-
from collections import OrderedDict
9-
from typing import Generator
8+
from collections import OrderedDict, defaultdict
9+
from typing import Generator, Optional
1010

11-
from django.db.models import QuerySet
11+
from django.db.models import F, QuerySet
1212
from django.db.models.fields.related import ForeignKey
1313

1414
from morango.constants import transfer_stages
@@ -17,7 +17,9 @@
1717
ModelRegistryNotReady,
1818
UnsupportedFieldType,
1919
)
20-
from morango.utils import SETTINGS, do_import
20+
from morango.utils import SETTINGS, do_import, self_referential_fk
21+
22+
_UNSET = object()
2123

2224

2325
def _get_foreign_key_classes(m):
@@ -57,6 +59,7 @@ def __init__(self):
5759
self.profile_models = {}
5860
self.ready = False
5961
self.models_ready = {}
62+
self.self_referential_fks = defaultdict(dict)
6063
if hasattr(sys.modules[__name__], "syncable_models"):
6164
raise RuntimeError("Master registry has already been initialized.")
6265

@@ -79,13 +82,45 @@ def get_models(self, profile):
7982
self.check_models_ready(profile)
8083
return list(self.profile_models.get(profile, {}).values())
8184

85+
def get_self_referential_fk(self, model) -> Optional[str]:
86+
"""
87+
Cached helper for determining a syncable model's self-referential foreign key attribute name
88+
:param model: The Morango syncable model
89+
:type model: Type[MorangoSyncableModel]
90+
"""
91+
profile_self_ref_fks = self.self_referential_fks[model.morango_profile]
92+
model_self_ref_fk = profile_self_ref_fks.get(model.morango_model_name, _UNSET)
93+
if model_self_ref_fk is _UNSET:
94+
model_self_ref_fk = self_referential_fk(model)
95+
profile_self_ref_fks[model.morango_model_name] = model_self_ref_fk
96+
return model_self_ref_fk
97+
8298
def get_model_querysets(self, profile) -> Generator[QuerySet, None, None]:
8399
"""
84100
Method for future enhancement to iterate over model's and their querysets in a fashion
85101
(particularly, an order) that is aware of FK dependencies.
86102
"""
87103
for model in self.get_models(profile):
88-
yield model.syncing_objects.all()
104+
queryset = model.syncing_objects.all()
105+
ordering = getattr(model, "morango_ordering", ())
106+
if ordering:
107+
queryset = queryset.order_by(*self._get_nulls_last_ordering(ordering))
108+
yield queryset
109+
110+
@staticmethod
111+
def _get_nulls_last_ordering(ordering):
112+
normalized = []
113+
for order_expr in ordering:
114+
if isinstance(order_expr, str):
115+
descending = order_expr.startswith("-")
116+
field_name = order_expr[1:] if descending else order_expr
117+
if descending:
118+
normalized.append(F(field_name).desc(nulls_last=True))
119+
else:
120+
normalized.append(F(field_name).asc(nulls_last=True))
121+
else:
122+
normalized.append(order_expr)
123+
return normalized
89124

90125
def _insert_model_in_dependency_order(self, model, profile):
91126
# When we add models to be synced, we need to make sure

morango/sync/backends/postgres.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ def _dequeuing_merge_conflict_buffer(self, cursor, current_id, transfersession_i
217217
conflicting_serialized_data,
218218
dirty_bit,
219219
_self_ref_fk,
220+
_self_ref_order,
220221
deserialization_error,
221222
deserialization_exception,
222223
last_transfer_session_id
@@ -233,6 +234,7 @@ def _dequeuing_merge_conflict_buffer(self, cursor, current_id, transfersession_i
233234
CASE buffer.hard_deleted WHEN TRUE THEN '' ELSE buffer.serialized || '\n' || store.conflicting_serialized_data END,
234235
TRUE,
235236
store._self_ref_fk,
237+
store._self_ref_order,
236238
NULL,
237239
NULL,
238240
'{transfer_session_id}'
@@ -320,7 +322,8 @@ def _dequeuing_insert_remaining_buffer(self, cursor, transfersession_id):
320322
buffer.partition,
321323
buffer.source_id,
322324
buffer.conflicting_serialized_data,
323-
buffer._self_ref_fk
325+
buffer._self_ref_fk,
326+
buffer._self_ref_order
324327
FROM {buffer} as buffer
325328
WHERE buffer.transfer_session_id = '{transfer_session_id}'
326329
),
@@ -339,6 +342,7 @@ def _dequeuing_insert_remaining_buffer(self, cursor, transfersession_id):
339342
conflicting_serialized_data,
340343
dirty_bit,
341344
_self_ref_fk,
345+
_self_ref_order,
342346
deserialization_error,
343347
deserialization_exception,
344348
last_transfer_session_id
@@ -355,6 +359,7 @@ def _dequeuing_insert_remaining_buffer(self, cursor, transfersession_id):
355359
nv.conflicting_serialized_data,
356360
TRUE,
357361
nv._self_ref_fk,
362+
nv._self_ref_order,
358363
NULL,
359364
NULL,
360365
'{transfer_session_id}'
@@ -377,6 +382,7 @@ def _dequeuing_insert_remaining_buffer(self, cursor, transfersession_id):
377382
conflicting_serialized_data,
378383
dirty_bit,
379384
_self_ref_fk,
385+
_self_ref_order,
380386
deserialization_error,
381387
deserialization_exception,
382388
last_transfer_session_id
@@ -395,6 +401,7 @@ def _dequeuing_insert_remaining_buffer(self, cursor, transfersession_id):
395401
ut.conflicting_serialized_data,
396402
TRUE,
397403
ut._self_ref_fk,
404+
ut._self_ref_order,
398405
NULL,
399406
NULL,
400407
'{transfer_session_id}'

morango/sync/backends/sqlite.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ def _dequeuing_merge_conflict_buffer(self, cursor, current_id, transfersession_i
151151
conflicting_serialized_data,
152152
dirty_bit,
153153
_self_ref_fk,
154+
_self_ref_order,
154155
deserialization_error,
155156
deserialization_exception,
156157
last_transfer_session_id
@@ -175,6 +176,7 @@ def _dequeuing_merge_conflict_buffer(self, cursor, current_id, transfersession_i
175176
),
176177
1,
177178
store._self_ref_fk,
179+
store._self_ref_order,
178180
NULL,
179181
NULL,
180182
'{transfer_session_id}'
@@ -244,6 +246,7 @@ def _dequeuing_insert_remaining_buffer(self, cursor, transfersession_id):
244246
conflicting_serialized_data,
245247
dirty_bit,
246248
_self_ref_fk,
249+
_self_ref_order,
247250
deserialization_error,
248251
deserialization_exception,
249252
last_transfer_session_id
@@ -262,6 +265,7 @@ def _dequeuing_insert_remaining_buffer(self, cursor, transfersession_id):
262265
buffer.conflicting_serialized_data,
263266
1,
264267
buffer._self_ref_fk,
268+
buffer._self_ref_order,
265269
NULL,
266270
NULL,
267271
'{transfer_session_id}'

morango/sync/operations.py

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,14 @@
1010
from django.db import transaction
1111
from django.db.models import CharField
1212
from django.db.models import F
13+
from django.db.models import Exists
14+
from django.db.models import OuterRef
1315
from django.db.models import Q
16+
from django.db.models import Subquery
1417
from django.db.models import signals
1518
from django.db.models import Value
1619
from django.db.models.fields import BooleanField
17-
from django.db.models.functions import NullIf
20+
from django.db.models.functions import NullIf, Cast
1821
from django.db.utils import IntegrityError
1922
from django.db.utils import OperationalError
2023
from django.utils import timezone
@@ -25,6 +28,7 @@
2528
from morango.constants import transfer_statuses
2629
from morango.constants.capabilities import ASYNC_OPERATIONS
2730
from morango.constants.capabilities import FSIC_V2_FORMAT
31+
from morango.constants.capabilities import SELF_REF_ORDER
2832
from morango.errors import MorangoDirtyParent
2933
from morango.errors import MorangoInvalidFSICPartition
3034
from morango.errors import MorangoLimitExceeded
@@ -525,7 +529,7 @@ def _queue_into_buffer_v1(transfersession):
525529
"""SELECT
526530
id, serialized, deleted, last_saved_instance, last_saved_counter, hard_deleted, model_name, profile,
527531
partition, source_id, conflicting_serialized_data,
528-
CAST ('{transfer_session_id}' AS {transfer_session_id_type}), _self_ref_fk
532+
CAST ('{transfer_session_id}' AS {transfer_session_id_type}), _self_ref_fk, _self_ref_order
529533
FROM {store} WHERE {condition}
530534
""".format(
531535
transfer_session_id=transfersession.id,
@@ -556,7 +560,7 @@ def _queue_into_buffer_v1(transfersession):
556560
"""INSERT INTO {outgoing_buffer}
557561
(model_uuid, serialized, deleted, last_saved_instance, last_saved_counter,
558562
hard_deleted, model_name, profile, partition, source_id, conflicting_serialized_data,
559-
transfer_session_id, _self_ref_fk)
563+
transfer_session_id, _self_ref_fk, _self_ref_order)
560564
{select}
561565
""".format(
562566
outgoing_buffer=Buffer._meta.db_table,
@@ -674,7 +678,7 @@ def _queue_into_buffer_v2(transfersession, chunk_size=200):
674678
"""SELECT
675679
id, serialized, deleted, last_saved_instance, last_saved_counter, hard_deleted, model_name, profile,
676680
partition, source_id, conflicting_serialized_data,
677-
CAST ('{transfer_session_id}' AS {transfer_session_id_type}), _self_ref_fk
681+
CAST ('{transfer_session_id}' AS {transfer_session_id_type}), _self_ref_fk, _self_ref_order
678682
FROM {store} WHERE {condition}
679683
""".format(
680684
transfer_session_id=transfersession.id,
@@ -703,7 +707,7 @@ def _queue_into_buffer_v2(transfersession, chunk_size=200):
703707
"""INSERT INTO {outgoing_buffer}
704708
(model_uuid, serialized, deleted, last_saved_instance, last_saved_counter,
705709
hard_deleted, model_name, profile, partition, source_id, conflicting_serialized_data,
706-
transfer_session_id, _self_ref_fk)
710+
transfer_session_id, _self_ref_fk, _self_ref_order)
707711
{select}
708712
""".format(
709713
outgoing_buffer=Buffer._meta.db_table,
@@ -721,7 +725,41 @@ def _queue_into_buffer_v2(transfersession, chunk_size=200):
721725
)
722726

723727

724-
def _dequeue_into_store(transfer_session, fsic, v2_format=False):
728+
def _update_legacy_self_ref_order_for_model(queryset):
729+
# root nodes set the _self_ref_order to 0
730+
queryset.filter(_self_ref_fk="").exclude(_self_ref_order=0).update(_self_ref_order=0)
731+
# reset the _self_ref_order to None for all records that have a parent
732+
queryset.exclude(_self_ref_fk="").exclude(_self_ref_order=None).update(
733+
_self_ref_order=None
734+
)
735+
736+
parent = Store.objects.filter(
737+
id=Cast(OuterRef("_self_ref_fk"), UUIDField()),
738+
_self_ref_order__isnull=False,
739+
)
740+
parent_order = parent.values("_self_ref_order")[:1]
741+
pending = queryset.exclude(_self_ref_fk="").filter(_self_ref_order=None)
742+
743+
while pending.filter(Exists(parent)).update(_self_ref_order=Subquery(parent_order) + 1):
744+
pass
745+
746+
747+
def _update_legacy_self_ref_order(transfer_session):
748+
profile = transfer_session.sync_session.profile
749+
transferred_store_records = Store.objects.filter(
750+
last_transfer_session_id=transfer_session.id,
751+
profile=profile,
752+
)
753+
754+
for Model in syncable_models.get_models(profile):
755+
queryset = transferred_store_records.filter(model_name=Model.morango_model_name)
756+
if self_referential_fk(Model):
757+
_update_legacy_self_ref_order_for_model(queryset)
758+
else:
759+
queryset.exclude(_self_ref_order=None).update(_self_ref_order=None)
760+
761+
762+
def _dequeue_into_store(transfer_session, fsic, v2_format=False, self_ref_order=True):
725763
"""
726764
Takes data from the buffers and merges into the store and record max counters.
727765
@@ -745,6 +783,8 @@ def _dequeue_into_store(transfer_session, fsic, v2_format=False):
745783
DBBackend._dequeuing_delete_mc_buffer(cursor, transfer_session.id)
746784
DBBackend._dequeuing_insert_remaining_buffer(cursor, transfer_session.id)
747785
DBBackend._dequeuing_insert_remaining_rmcb(cursor, transfer_session.id)
786+
if not self_ref_order:
787+
_update_legacy_self_ref_order(transfer_session)
748788
DBBackend._dequeuing_delete_remaining_rmcb(cursor, transfer_session.id)
749789
DBBackend._dequeuing_delete_remaining_buffer(cursor, transfer_session.id)
750790

@@ -1083,6 +1123,7 @@ def handle(self, context):
10831123
context.transfer_session,
10841124
fsic,
10851125
v2_format=FSIC_V2_FORMAT in context.capabilities,
1126+
self_ref_order=SELF_REF_ORDER in context.capabilities,
10861127
)
10871128

10881129
return transfer_statuses.COMPLETED

0 commit comments

Comments
 (0)