@@ -806,6 +806,114 @@ def _remNonAuthAttachments(self, key):
806806 self .db .kramBSSS .rem (key )
807807 self .db .kramTMQS .rem (key )
808808
809+ @staticmethod
810+ def _dedupeAttachmentItems (items ):
811+ """Return items in first-seen order with duplicates removed.
812+
813+ Items are CESR tuples or bytes; identity uses a nested qb64/raw key.
814+ """
815+ seen = set ()
816+ out = []
817+ for item in items :
818+ key = Kramer ._attachmentItemKey (item )
819+ if key in seen :
820+ continue
821+ seen .add (key )
822+ out .append (item )
823+ return out
824+
825+ @staticmethod
826+ def _attachmentItemKey (item ):
827+ if isinstance (item , (bytes , memoryview )):
828+ return ('b' , bytes (item ))
829+ if isinstance (item , (list , tuple )):
830+ return tuple (Kramer ._attachmentItemKey (x ) for x in item )
831+ if hasattr (item , 'qb64' ):
832+ return item .qb64
833+ return repr (item )
834+
835+ def _rehydrateKwaFromEscrow (self , partialKey , _senderId , kwa ):
836+ """Merge escrowed partial multisig state into ``kwa`` for downstream dispatch.
837+
838+ After threshold is satisfied on a later delivery, ``kwa`` reflects only
839+ that parse; ``kramPMKS`` and non-auth attachment DBs hold the union of
840+ state accumulated across deliveries.
841+
842+ Parameters:
843+ partialKey (tuple): ``(AID, MID)`` escrow key
844+ _senderId (str): message sender AID (qb64); reserved for filtering
845+ kwa (dict): parser attachment dict; mutated in place
846+ """
847+ escrow_sigs = self .db .kramPMKS .get (partialKey )
848+ if escrow_sigs :
849+ kwa ['sigers' ] = sorted (escrow_sigs , key = lambda s : s .index )
850+
851+ trqs_esc = list (self .db .kramTRQS .get (partialKey ) or [])
852+ kwa ['trqs' ] = self ._dedupeAttachmentItems (
853+ trqs_esc + kwa .get ('trqs' , []))
854+
855+ flat_tsgs = list (self .db .kramTSGS .get (partialKey ) or [])
856+ groups = {}
857+ for prefixer , number , diger , siger in flat_tsgs :
858+ gk = (prefixer .qb64 , number .sn , diger .qb64 )
859+ if gk not in groups :
860+ groups [gk ] = [prefixer , number , diger , []]
861+ sigers = groups [gk ][3 ]
862+ if not any (s .qb64 == siger .qb64 for s in sigers ):
863+ sigers .append (siger )
864+
865+ merged_tsgs = {}
866+ for quad in groups .values ():
867+ prefixer , number , diger , sigers = quad
868+ gk = (prefixer .qb64 , number .sn , diger .qb64 )
869+ merged_tsgs [gk ] = (prefixer , number , diger , list (sigers ))
870+
871+ for prefixer , number , diger , sigers in kwa .get ('tsgs' , []):
872+ gk = (prefixer .qb64 , number .sn , diger .qb64 )
873+ if gk not in merged_tsgs :
874+ merged_tsgs [gk ] = (prefixer , number , diger , [])
875+ bucket = merged_tsgs [gk ][3 ]
876+ for s in sigers :
877+ if not any (x .qb64 == s .qb64 for x in bucket ):
878+ bucket .append (s )
879+
880+ if merged_tsgs :
881+ kwa ['tsgs' ] = list (merged_tsgs .values ())
882+ else :
883+ kwa .pop ('tsgs' , None )
884+
885+ ssts_esc = list (self .db .kramSSTS .get (partialKey ) or [])
886+ kwa ['ssts' ] = self ._dedupeAttachmentItems (
887+ ssts_esc + kwa .get ('ssts' , []))
888+
889+ frcs_esc = list (self .db .kramFRCS .get (partialKey ) or [])
890+ kwa ['frcs' ] = self ._dedupeAttachmentItems (
891+ frcs_esc + kwa .get ('frcs' , []))
892+
893+ tdcs_esc = list (self .db .kramTDCS .get (partialKey ) or [])
894+ kwa ['tdcs' ] = self ._dedupeAttachmentItems (
895+ tdcs_esc + kwa .get ('tdcs' , []))
896+
897+ ptds_esc = list (self .db .kramPTDS .get (partialKey ) or [])
898+ kwa ['ptds' ] = self ._dedupeAttachmentItems (
899+ ptds_esc + kwa .get ('ptds' , []))
900+
901+ bsqs_esc = list (self .db .kramBSQS .get (partialKey ) or [])
902+ kwa ['bsqs' ] = self ._dedupeAttachmentItems (
903+ bsqs_esc + kwa .get ('bsqs' , []))
904+
905+ bsss_esc = list (self .db .kramBSSS .get (partialKey ) or [])
906+ kwa ['bsss' ] = self ._dedupeAttachmentItems (
907+ bsss_esc + kwa .get ('bsss' , []))
908+
909+ tmqs_esc = list (self .db .kramTMQS .get (partialKey ) or [])
910+ kwa ['tmqs' ] = self ._dedupeAttachmentItems (
911+ tmqs_esc + kwa .get ('tmqs' , []))
912+
913+ for name in ('trqs' , 'ssts' , 'frcs' , 'tdcs' , 'ptds' , 'bsqs' , 'bsss' ,
914+ 'tmqs' ):
915+ if not kwa .get (name ):
916+ kwa .pop (name , None )
809917
810918 def intake (self , serder , kwa = None ):
811919 """Process message through KRAM denial and cache logic.
@@ -981,6 +1089,7 @@ def kramit(self, msg, kwa=None):
9811089 sigIndices = [sig .index for sig in allSigs ]
9821090
9831091 if kever .tholder .satisfy (indices = sigIndices ):
1092+ self ._rehydrateKwaFromEscrow (key , senderId , kwa )
9841093 return msg
9851094
9861095 # Threshold not satisfied, message remains pending
@@ -1206,6 +1315,7 @@ def kramit(self, msg, kwa=None):
12061315 sigIndices = [sig .index for sig in allSigs ]
12071316
12081317 if kever .tholder .satisfy (indices = sigIndices ):
1318+ self ._rehydrateKwaFromEscrow (partialKey , senderId , kwa )
12091319 return msg
12101320
12111321 # Threshold not satisfied, message remains pending
0 commit comments