44package org .lfdecentralizedtrust .splice .environment
55
66import cats .data .{EitherT , OptionT }
7- import cats .implicits .catsSyntaxOptionId
7+ import cats .implicits .{ catsSyntaxApplicativeId , catsSyntaxOptionId }
88import cats .syntax .either .*
99import com .daml .nonempty .NonEmpty
1010import com .digitalasset .canton .admin .api .client .commands .TopologyAdminCommands .Init .GetIdResult
@@ -41,9 +41,13 @@ import com.digitalasset.canton.protocol.DynamicSynchronizerParameters
4141import com .digitalasset .canton .time .FetchTimeResponse
4242import com .digitalasset .canton .topology .*
4343import com .digitalasset .canton .topology .admin .grpc
44- import com .digitalasset .canton .topology .admin .grpc .BaseQuery
44+ import com .digitalasset .canton .topology .admin .grpc .{
45+ BaseQuery ,
46+ TopologyStoreId as ProtoTopologyStoreId ,
47+ }
4548import com .digitalasset .canton .topology .admin .v30 .ExportTopologySnapshotResponse
4649import com .digitalasset .canton .topology .store .TimeQuery .HeadState
50+ import com .digitalasset .canton .topology .store .TopologyStoreId .AuthorizedStore
4751import com .digitalasset .canton .topology .store .{
4852 StoredTopologyTransaction ,
4953 TimeQuery ,
@@ -65,8 +69,6 @@ import org.lfdecentralizedtrust.splice.environment.TopologyAdminConnection.{
6569 AuthorizedStateChanged ,
6670 TopologyTransactionType ,
6771}
68- import com .digitalasset .canton .topology .admin .grpc .TopologyStoreId as ProtoTopologyStoreId
69- import com .digitalasset .canton .topology .store .TopologyStoreId .AuthorizedStore
7072
7173import java .util .concurrent .atomic .AtomicReference
7274import scala .concurrent .{ExecutionContextExecutor , Future }
@@ -596,18 +598,46 @@ abstract class TopologyAdminConnection(
596598 )
597599 )
598600
599- private def proposeMapping [M <: TopologyMapping : ClassTag ](
600- store : TopologyStoreId ,
601- mapping : Either [String , M ],
602- serial : PositiveInt ,
603- isProposal : Boolean ,
604- )(implicit traceContext : TraceContext ): Future [SignedTopologyTransaction [TopologyChangeOp , M ]] =
605- proposeMapping(
606- store,
607- mapping.valueOr(err => throw new IllegalArgumentException (s " Invalid topology mapping: $err" )),
608- serial,
609- isProposal,
601+ private def ensureInitialMapping [M <: TopologyMapping : ClassTag ](
602+ mappingE : Either [String , M ]
603+ )(implicit traceContext : TraceContext ): Future [SignedTopologyTransaction [TopologyChangeOp , M ]] = {
604+ val mapping =
605+ mappingE.valueOr(err => throw new IllegalArgumentException (s " Invalid topology mapping: $err" ))
606+ listAllTransactions(
607+ TopologyStoreId .AuthorizedStore ,
608+ includeMappings = Set (
609+ mapping.code
610+ ),
611+ ).flatMap(sameCodeTopologyMappings =>
612+ sameCodeTopologyMappings
613+ .find(_.mapping.uniqueKey == mapping.uniqueKey)
614+ .flatMap(_.selectMapping[M ])
615+ .fold({
616+ logger.info(
617+ s " Proposing initial mapping for ${mapping.code} with serial 1: $mapping"
618+ )
619+ proposeMapping(
620+ TopologyStoreId .AuthorizedStore ,
621+ mapping,
622+ PositiveInt .one,
623+ isProposal = false ,
624+ )
625+ }) { existingTxWithSameUniqueCode =>
626+ if (existingTxWithSameUniqueCode == mapping) {
627+ logger.info(
628+ s " Existing mapping found for ${mapping.code}: $mapping, returning existing transaction with serial ${existingTxWithSameUniqueCode.serial}"
629+ )
630+ existingTxWithSameUniqueCode.transaction.pure[Future ]
631+ } else {
632+ throw Status .ALREADY_EXISTS
633+ .withDescription(
634+ s " Mapping with unique key ${mapping.uniqueKey} already exists with a different mapping: $existingTxWithSameUniqueCode"
635+ )
636+ .asRuntimeException()
637+ }
638+ }
610639 )
640+ }
611641
612642 private def proposeMapping [M <: TopologyMapping : ClassTag ](
613643 store : TopologyStoreId ,
@@ -762,16 +792,13 @@ abstract class TopologyAdminConnection(
762792 )(implicit
763793 traceContext : TraceContext
764794 ): Future [SignedTopologyTransaction [TopologyChangeOp , SequencerSynchronizerState ]] = {
765- proposeMapping(
766- TopologyStoreId .AuthorizedStore ,
795+ ensureInitialMapping(
767796 SequencerSynchronizerState .create(
768797 synchronizerId,
769798 PositiveInt .one,
770799 active,
771800 observers,
772- ),
773- serial = PositiveInt .one,
774- isProposal = false ,
801+ )
775802 )
776803 }
777804
@@ -864,17 +891,14 @@ abstract class TopologyAdminConnection(
864891 )(implicit
865892 traceContext : TraceContext
866893 ): Future [SignedTopologyTransaction [TopologyChangeOp , MediatorSynchronizerState ]] =
867- proposeMapping(
868- TopologyStoreId .AuthorizedStore ,
894+ ensureInitialMapping(
869895 MediatorSynchronizerState .create(
870896 synchronizerId = synchronizerId,
871897 group = group,
872898 threshold = PositiveInt .one,
873899 active = active,
874900 observers = observers,
875- ),
876- serial = PositiveInt .one,
877- isProposal = false ,
901+ )
878902 )
879903
880904 def ensureMediatorSynchronizerStateAdditionProposal (
@@ -961,15 +985,12 @@ abstract class TopologyAdminConnection(
961985 )(implicit
962986 traceContext : TraceContext
963987 ): Future [SignedTopologyTransaction [TopologyChangeOp , DecentralizedNamespaceDefinition ]] =
964- proposeMapping(
965- TopologyStoreId .AuthorizedStore ,
988+ ensureInitialMapping(
966989 DecentralizedNamespaceDefinition .create(
967990 namespace,
968991 threshold,
969992 owners,
970- ),
971- serial = PositiveInt .one,
972- isProposal = false ,
993+ )
973994 )
974995
975996 def ensureDecentralizedNamespaceDefinitionProposalAccepted (
@@ -1194,16 +1215,13 @@ abstract class TopologyAdminConnection(
11941215 )(implicit
11951216 traceContext : TraceContext
11961217 ): Future [SignedTopologyTransaction [TopologyChangeOp , NamespaceDelegation ]] =
1197- proposeMapping(
1198- TopologyStoreId .AuthorizedStore ,
1218+ ensureInitialMapping(
11991219 NamespaceDelegation .create(
12001220 namespace,
12011221 target,
12021222 if (isRootDelegation) DelegationRestriction .CanSignAllMappings
12031223 else DelegationRestriction .CanSignAllButNamespaceDelegations ,
1204- ),
1205- serial = PositiveInt .one,
1206- isProposal = false ,
1224+ )
12071225 )
12081226
12091227 def initId (id : NodeIdentity )(implicit traceContext : TraceContext ): Future [Unit ] = {
0 commit comments