diff --git a/sdk/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/Runner.scala b/sdk/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/Runner.scala index 77cb61ba408f..91179e4e280b 100644 --- a/sdk/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/Runner.scala +++ b/sdk/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/Runner.scala @@ -26,7 +26,6 @@ import com.digitalasset.daml.lf.engine.script.ledgerinteraction.{ IdeLedgerClient, ScriptLedgerClient, } -import com.digitalasset.daml.lf.engine.script.v2.ledgerinteraction.grpcLedgerClient.AdminLedgerClient import com.digitalasset.daml.lf.language.Ast._ import com.digitalasset.daml.lf.script.{IdeLedger, IdeLedgerRunner} import com.digitalasset.daml.lf.engine.ScriptEngine.{ @@ -278,16 +277,7 @@ object Runner { clientChannelConfig, namedLoggerFactory, ) - maybeAdminLedgerClient <- params.adminPort - .traverse(adminPort => - AdminLedgerClient.singleHostWithUnknownParticipantId( - params.host, - adminPort, - clientConfig.token(), - clientChannelConfig, - ) - ) - } yield GrpcLedgerClient(ledgerClient, userId, maybeAdminLedgerClient) + } yield GrpcLedgerClient(ledgerClient, userId) } // We might want to have one config per participant at some point but for now this should be sufficient. def connect( diff --git a/sdk/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/ledgerinteraction/ScriptLedgerClient.scala b/sdk/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/ledgerinteraction/ScriptLedgerClient.scala index 35a9d20ffc0d..3ba40f496566 100644 --- a/sdk/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/ledgerinteraction/ScriptLedgerClient.scala +++ b/sdk/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/ledgerinteraction/ScriptLedgerClient.scala @@ -6,7 +6,6 @@ package engine.script.ledgerinteraction import com.digitalasset.canton.ledger.client.LedgerClient import com.digitalasset.daml.lf.data.Ref -import com.digitalasset.daml.lf.engine.script.v2.ledgerinteraction.grpcLedgerClient.AdminLedgerClient import com.digitalasset.daml.lf.engine.ScriptEngine.{TraceLog, WarningLog} // Ledger clients before implementation is chosen @@ -14,8 +13,7 @@ sealed trait ScriptLedgerClient extends Product with Serializable final case class GrpcLedgerClient( grpcClient: LedgerClient, - val userId: Option[Ref.UserId], - val grpcAdminClient: Option[AdminLedgerClient] = None, + userId: Option[Ref.UserId], ) extends ScriptLedgerClient object GrpcLedgerClient {} diff --git a/sdk/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/v2/ScriptF.scala b/sdk/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/v2/ScriptF.scala index 3119a94683d7..8db98b3a5716 100644 --- a/sdk/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/v2/ScriptF.scala +++ b/sdk/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/v2/ScriptF.scala @@ -513,41 +513,16 @@ object ScriptF { */ final case class AllocParty( partyHint: String, - participants: Option[ - (Participant, List[Participant]) - ], + owningParticipant: Option[Participant], ) extends Cmd { override def execute(env: Env)(implicit ec: ExecutionContext, mat: Materializer, esf: ExecutionSequencerFactory, ): Future[ExtendedValue] = { - val owningParticipant = participants.map(_._1) for { owningClient <- env.clients.assertGetParticipantFuture(owningParticipant) - - party <- - if (participants.map(_._2.isEmpty).getOrElse(true)) { - owningClient.allocateParty(partyHint) - } else { - for { - otherClients <- Future.traverse(participants.map(_._2).getOrElse(List.empty))( - participant => env.clients.assertGetParticipantFuture(participant) - ) - clients = owningClient +: otherClients - participantIds = clients.map(_.getParticipantUid) - - p <- owningClient.aggregateAllocatePartyOnMultipleParticipants( - clients, - partyHint, - owningClient.getParticipantUid.split("::").last, - participantIds, - ) - _ <- Future.traverse(env.clients.participants.values)( - _.waitUntilHostingVisible(p, participantIds) - ) - } yield p - } + party <- owningClient.allocateParty(partyHint) } yield { owningParticipant.foreach(env.addPartyParticipantMapping(party, _)) ValueParty(party) @@ -883,8 +858,9 @@ object ScriptF { for { client <- Converter.toFuture(env.clients.getParticipant(participant)) _ <- client.vetPackages(packages) + participantUid <- client.getParticipantUid() _ <- Future.traverse(env.clients.participants.values)( - _.waitUntilVettingVisible(packages, client.getParticipantUid) + _.waitUntilVettingVisible(packages, participantUid) ) } yield ValueUnit } @@ -901,8 +877,9 @@ object ScriptF { for { client <- Converter.toFuture(env.clients.getParticipant(participant)) _ <- client.unvetPackages(packages) + participantUid <- client.getParticipantUid() _ <- Future.traverse(env.clients.participants.values)( - _.waitUntilUnvettingVisible(packages, client.getParticipantUid) + _.waitUntilUnvettingVisible(packages, participantUid) ) } yield ValueUnit } @@ -1128,7 +1105,7 @@ object ScriptF { for { participantName <- Converter.toOptionalParticipantName(participantName) idHint <- Converter.toPartyIdHint(givenHint, requestedName, globalRandom) - } yield AllocParty(idHint, participantName.map(p => (p, List.empty))) + } yield AllocParty(idHint, participantName) case _ => Left(s"Expected AllocParty payload but got $v") } @@ -1139,17 +1116,22 @@ object ScriptF { ImmArray( (_, ValueText(requestedName)), (_, ValueText(givenHint)), - (_, participantNames), + (_, participantNames @ ValueList(vs)), ), - ) => + ) if vs.length <= 1 => for { participantNames <- Converter.toParticipantNames(participantNames) idHint <- Converter.toPartyIdHint(givenHint, requestedName, globalRandom) - allocArg = participantNames match { - case head :: tail => Some((head, tail)) - case Nil => None - } - } yield AllocParty(idHint, allocArg) + } yield AllocParty(idHint, participantNames.headOption) + case ValueRecord( + _, + ImmArray( + (_, ValueText(_)), + (_, ValueText(_)), + (_, ValueList(_)), + ), + ) => + Left(s"Expected AllocParty payload with at most one participant name but got $v") case _ => Left(s"Expected AllocParty payload but got $v") } diff --git a/sdk/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/v2/ledgerinteraction/IdeLedgerClient.scala b/sdk/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/v2/ledgerinteraction/IdeLedgerClient.scala index 549a318e3aa5..e141c9c25b06 100644 --- a/sdk/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/v2/ledgerinteraction/IdeLedgerClient.scala +++ b/sdk/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/v2/ledgerinteraction/IdeLedgerClient.scala @@ -20,10 +20,10 @@ import com.digitalasset.daml.lf.command.ApiCommand import com.digitalasset.daml.lf.data.Ref._ import com.digitalasset.daml.lf.data.{Bytes, ImmArray, Ref, Time} import com.digitalasset.daml.lf.engine.ScriptEngine.{ + ExtendedValueComputationMode, TraceLog, WarningLog, runExtendedValueComputation, - ExtendedValueComputationMode, } import com.digitalasset.daml.lf.interpretation.Error.ContractIdInContractKey import com.digitalasset.daml.lf.language.{Ast, LanguageVersion, LookupError, Reference} @@ -1084,33 +1084,6 @@ class IdeLedgerClient( ): Future[List[ScriptLedgerClient.ReadablePackageId]] = Future.successful(getPackageIdMap().keys.toList) - override def allocatePartyOnMultipleParticipants( - party: Ref.Party, - toParticipantIds: Iterable[String], - )(implicit - ec: ExecutionContext, - mat: Materializer, - ): Future[Unit] = Future.failed( - new RuntimeException( - "allocatePartyOnMultipleParticipants should not be called on IDE ledger, use aggregateAllocatePartyOnMultipleParticipants instead" - ) - ) - - override def aggregateAllocatePartyOnMultipleParticipants( - clients: List[ScriptLedgerClient], - partyHint: String, - namespace: String, - toParticipantIds: Iterable[String], - )(implicit - ec: ExecutionContext, - mat: Materializer, - ): Future[Ref.Party] = allocateParty(partyHint) - - override def waitUntilHostingVisible( - party: Ref.Party, - onParticipantUid: Iterable[String], - ): Future[Unit] = - Future.successful(()) - - override def getParticipantUid: String = "" + override def getParticipantUid()(implicit ec: ExecutionContext): Future[String] = + Future.successful("") } diff --git a/sdk/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/v2/ledgerinteraction/ScriptLedgerClient.scala b/sdk/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/v2/ledgerinteraction/ScriptLedgerClient.scala index 9cc3ccdd074f..ab154b0aed01 100644 --- a/sdk/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/v2/ledgerinteraction/ScriptLedgerClient.scala +++ b/sdk/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/v2/ledgerinteraction/ScriptLedgerClient.scala @@ -82,11 +82,10 @@ object ScriptLedgerClient { compiledPackages: CompiledPackages, )(implicit namedLoggerFactory: NamedLoggerFactory): ScriptLedgerClient = ledger match { - case abstractLedgers.GrpcLedgerClient(grpcClient, userId, oAdminClient) => + case abstractLedgers.GrpcLedgerClient(grpcClient, userId) => new grpcLedgerClient.GrpcLedgerClient( grpcClient, userId, - oAdminClient, compiledPackages, ) case abstractLedgers.IdeLedgerClient(pureCompiledPackages, traceLog, warningLog, canceled) => @@ -104,6 +103,25 @@ object ScriptLedgerClient { name: PackageName, version: PackageVersion, ) + + object ReadablePackageId { + private val versionedNamePattern = raw"(.+)-(\d+\.\d+\.\d+)".r + + @throws[IllegalArgumentException] + def assertFromString(versionedName: String): ReadablePackageId = + versionedName match { + case versionedNamePattern(pkgName, pkgVersion) => + ReadablePackageId( + name = PackageName.assertFromString(pkgName), + version = PackageVersion.assertFromString(pkgVersion), + ) + + case _ => + throw new IllegalArgumentException( + s"""versioned name "$versionedName" does not match regex "$versionedNamePattern"""" + ) + } + } } // This abstracts over the interaction with the ledger. This allows @@ -293,23 +311,5 @@ trait ScriptLedgerClient { mat: Materializer, ): Future[List[ScriptLedgerClient.ReadablePackageId]] - def allocatePartyOnMultipleParticipants(party: Ref.Party, toParticipantIds: Iterable[String])( - implicit - ec: ExecutionContext, - mat: Materializer, - ): Future[Unit] - - def aggregateAllocatePartyOnMultipleParticipants( - clients: List[ScriptLedgerClient], - partyHint: String, - namespace: String, - toParticipantIds: Iterable[String], - )(implicit - ec: ExecutionContext, - mat: Materializer, - ): Future[Ref.Party] - - def waitUntilHostingVisible(party: Ref.Party, onParticipantUid: Iterable[String]): Future[Unit] - - def getParticipantUid: String + def getParticipantUid()(implicit ec: ExecutionContext): Future[String] } diff --git a/sdk/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/v2/ledgerinteraction/grpcLedgerClient/AdminLedgerClient.scala b/sdk/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/v2/ledgerinteraction/grpcLedgerClient/AdminLedgerClient.scala deleted file mode 100644 index d9ebe8cae3d5..000000000000 --- a/sdk/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/v2/ledgerinteraction/grpcLedgerClient/AdminLedgerClient.scala +++ /dev/null @@ -1,544 +0,0 @@ -// Copyright (c) 2025 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -// Temporary stand-in for the real admin api clients defined in canton. Needed only for upgrades testing -// We should intend to replace this as soon as possible -package com.digitalasset.daml.lf.engine.script.v2.ledgerinteraction -package grpcLedgerClient - -import com.daml.grpc.AuthCallCredentials -import com.daml.timer.RetryStrategy -import com.digitalasset.canton.ledger.client.configuration.LedgerClientChannelConfiguration -import com.digitalasset.canton.ledger.client.GrpcChannel -import com.digitalasset.canton.admin.participant.{v30 => admin_participant} -import com.digitalasset.canton.topology.admin.v30.ForceFlag -import com.digitalasset.canton.topology.admin.{v30 => admin_topology} -import com.digitalasset.canton.protocol.{v30 => protocol} -import com.digitalasset.daml.lf.data.Ref.{PackageName, PackageVersion} -import com.google.protobuf.ByteString -import io.grpc.Channel -import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder -import io.grpc.stub.AbstractStub - -import java.io.{Closeable, File, FileInputStream} -import scala.concurrent.duration.{Duration, DurationInt} -import scala.concurrent.{ExecutionContext, Future} - -class AdminLedgerClient private[grpcLedgerClient] ( - val channel: Channel, - token: Option[String], - val participantUid: String, -)(implicit ec: ExecutionContext) - extends Closeable { - - // Follow community/app-base/src/main/scala/com/digitalasset/canton/console/commands/TopologyAdministration.scala:1149 - // Shows how to do a list request - // Try filtering for just Adds, assuming a Remove cancels an Add. - // If it doesn't, change the filter to all and fold them - - private[grpcLedgerClient] val packageServiceStub = - AdminLedgerClient.stub(admin_participant.PackageServiceGrpc.stub(channel), token) - - private[grpcLedgerClient] val topologyReadServiceStub = - AdminLedgerClient.stub( - admin_topology.TopologyManagerReadServiceGrpc.stub(channel), - token, - ) - - private[grpcLedgerClient] val topologyWriteServiceStub = - AdminLedgerClient.stub( - admin_topology.TopologyManagerWriteServiceGrpc.stub(channel), - token, - ) - - private[grpcLedgerClient] val synchronizerConnectivityStub = - AdminLedgerClient.stub( - admin_participant.SynchronizerConnectivityServiceGrpc.stub(channel), - token, - ) - - def listVettedPackages(): Future[Map[String, Seq[protocol.VettedPackages.VettedPackage]]] = for { - synchronizerId <- getSynchronizerId - res <- topologyReadServiceStub - .listVettedPackages(makeListVettedPackagesRequest(synchronizerId)) - .map(_.results.view.map(res => (res.item.get.participantUid -> res.item.get.packages)).toMap) - } yield res - - private[this] def makeListVettedPackagesRequest(synchronizerId: String) = - admin_topology.ListVettedPackagesRequest( - baseQuery = Some( - admin_topology.BaseQuery( - store = Some( - admin_topology.StoreId( - admin_topology.StoreId.Store.Synchronizer( - admin_topology.Synchronizer( - admin_topology.Synchronizer.Kind.Id(synchronizerId) - ) - ) - ) - ), - proposals = false, - operation = protocol.Enums.TopologyChangeOp.TOPOLOGY_CHANGE_OP_UNSPECIFIED, - timeQuery = admin_topology.BaseQuery.TimeQuery - .HeadState(com.google.protobuf.empty.Empty()), - filterSignedKey = "", - protocolVersion = None, - ) - ), - filterParticipant = "", - ) - - def vetPackagesById(packageIds: Iterable[String]): Future[Unit] = { - for { - vettedPackages <- listVettedPackages() - newVettedPackages = packageIds.map(pkgId => - protocol.VettedPackages.VettedPackage(pkgId, None, None) - ) ++ vettedPackages(participantUid) - synchronizerId <- getSynchronizerId - _ <- topologyWriteServiceStub.authorize( - makeAuthorizeRequest(participantUid, synchronizerId, newVettedPackages) - ) - } yield () - } - - def unvetPackagesById(packageIds: Iterable[String]): Future[Unit] = { - val packageIdsSet = packageIds.toSet - for { - vettedPackages <- listVettedPackages() - newVettedPackages = vettedPackages(participantUid).filterNot(pkg => - packageIdsSet.contains(pkg.packageId) - ) - synchronizerId <- getSynchronizerId - _ <- topologyWriteServiceStub.authorize( - makeAuthorizeRequest(participantUid, synchronizerId, newVettedPackages) - ) - } yield () - } - - private[this] def makeAuthorizeRequest( - participantId: String, - synchronizerId: String, - vettedPackages: Iterable[protocol.VettedPackages.VettedPackage], - ): admin_topology.AuthorizeRequest = - admin_topology.AuthorizeRequest( - admin_topology.AuthorizeRequest.Type.Proposal( - admin_topology.AuthorizeRequest.Proposal( - protocol.Enums.TopologyChangeOp.TOPOLOGY_CHANGE_OP_ADD_REPLACE, - 0, // will be picked by the participant - Some( - protocol.TopologyMapping( - protocol.TopologyMapping.Mapping.VettedPackages( - protocol.VettedPackages( - participantId, - Seq.empty, - vettedPackages.toSeq, - ) - ) - ) - ), - ) - ), - mustFullyAuthorize = true, - forceChanges = Seq( - ForceFlag.FORCE_FLAG_ALLOW_UNVETTED_DEPENDENCIES - ), - signedBy = Seq.empty, - store = Some( - admin_topology.StoreId( - admin_topology.StoreId.Store.Synchronizer( - admin_topology.Synchronizer( - admin_topology.Synchronizer.Kind.Id(synchronizerId) - ) - ) - ) - ), - waitToBecomeEffective = None, - ) - - def unvetPackages(packages: Iterable[ScriptLedgerClient.ReadablePackageId]): Future[Unit] = for { - packageIds <- getPackageIds(packages) - _ <- unvetPackagesById(packageIds) - } yield () - - def waitUntilUnvettingVisible( - packages: Iterable[ScriptLedgerClient.ReadablePackageId], - onParticipantUid: String, - attempts: Int = 10, - firstWaitTime: Duration = 100.millis, - ): Future[Unit] = for { - packageIds <- getPackageIds(packages) - _ <- RetryStrategy - .exponentialBackoff(attempts, firstWaitTime) { (_, _) => - for { - vettedPackages <- listVettedPackages() - _ <- Future { - val vettedPackageIds = vettedPackages - .getOrElse(onParticipantUid, Seq.empty) - .map(_.packageId) - .toSet - assert( - packageIds.toSet.intersect(vettedPackageIds).isEmpty, - s"Participant $participantUid does not see that $onParticipantUid unvets ${packages.mkString(",")}", - ) - } - } yield () - } - } yield () - - def vetPackages(packages: Iterable[ScriptLedgerClient.ReadablePackageId]): Future[Unit] = for { - packageIds <- getPackageIds(packages) - _ <- vetPackagesById(packageIds) - } yield () - - def waitUntilVettingVisible( - packages: Iterable[ScriptLedgerClient.ReadablePackageId], - onParticipantUid: String, - attempts: Int = 10, - firstWaitTime: Duration = 100.millis, - ): Future[Unit] = for { - packageIds <- getPackageIds(packages) - _ <- RetryStrategy - .exponentialBackoff(attempts, firstWaitTime) { (_, _) => - for { - vettedPackages <- listVettedPackages() - _ <- Future { - val vettedPackageIds = vettedPackages - .getOrElse(onParticipantUid, Seq.empty) - .map(_.packageId) - .toSet - assert( - packageIds.toSet.subsetOf(vettedPackageIds), - s"Participant $participantUid does not see that $onParticipantUid vets ${packages.mkString(",")}", - ) - } - } yield () - } - } yield () - - private[this] def getPackageIds( - packages: Iterable[ScriptLedgerClient.ReadablePackageId] - ): Future[Iterable[String]] = for { - packageMap <- getPackageMap() - packageIds = packages.map(pkg => - packageMap.getOrElse( - pkg, - throw new IllegalArgumentException(s"Package $pkg not found on participant"), - ) - ) - } yield packageIds - - private[this] def getPackageMap(): Future[Map[ScriptLedgerClient.ReadablePackageId, String]] = - for { - mainPkgIds <- listMainPackageIds() - darContentsResps <- Future.traverse(mainPkgIds)(pkgId => - packageServiceStub.getDarContents(admin_participant.GetDarContentsRequest(pkgId)) - ) - } yield { - darContentsResps.view - .flatMap(_.packages) - .map(pkgDesc => { - def invalidPackageDesc = - throw new IllegalStateException(s"Invalid package description: $pkgDesc") - val pname = PackageName.fromString(pkgDesc.name).getOrElse(invalidPackageDesc) - val pversion = PackageVersion.fromString(pkgDesc.version).getOrElse(invalidPackageDesc) - (ScriptLedgerClient.ReadablePackageId(pname, pversion), pkgDesc.packageId) - }) - .toMap - } - - /** Lists the main package IDs of up to 1000 dars hosted on the participant. - */ - private[this] def listMainPackageIds(): Future[Seq[String]] = - packageServiceStub - .listDars( - admin_participant.ListDarsRequest(1000, "") - ) // Empty filterName is the default value - .map { res => - if (res.dars.length == 1000) - println( - "Warning: AdminLedgerClient.listDars gave the maximum number of results, some may have been truncated." - ) - res.dars.map(_.main) - } - - def uploadDar(file: File): Future[Either[String, String]] = - packageServiceStub - .uploadDar( - admin_participant.UploadDarRequest( - dars = Seq( - admin_participant.UploadDarRequest.UploadDarData( - ByteString.readFrom(new FileInputStream(file)), - description = Some(file.getName), - expectedMainPackageId = None, // empty string is the default expected_main_package_id - ) - ), - vetAllPackages = true, - synchronizeVetting = true, - synchronizerId = None, - ) - ) - .map { response => - import admin_participant.UploadDarResponse - response match { - case UploadDarResponse(hash) => Right(hash.head) - } - } - - def allocatePartyOnMultipleParticipants( - partyId: String, - toParticipantUids: Iterable[String], - ): Future[Unit] = - for { - synchronizerId <- getSynchronizerId - _ <- topologyWriteServiceStub.authorize( - makeAllocatePartyOnMultipleParticipantsRequest( - partyId, - toParticipantUids, - synchronizerId, - ) - ) - } yield () - - def proposePartyReplication(partyId: String, toParticipantUid: String): Future[Unit] = { - for { - synchronizerId <- getSynchronizerId - hostingParticipants <- listHostingParticipants(partyId, synchronizerId) - _ <- topologyWriteServiceStub.authorize( - makePartyReplicationAuthorizeRequest( - hostingParticipants, - partyId, - toParticipantUid, - synchronizerId, - ) - ) - } yield () - } - - def waitUntilHostingVisible( - partyId: String, - onParticipantUids: Iterable[String], - attempts: Int = 10, - firstWaitTime: Duration = 100.millis, - ): Future[Unit] = { - val expectedSet = onParticipantUids.toSet - for { - synchronizerId <- getSynchronizerId - _ <- RetryStrategy - .exponentialBackoff(attempts, firstWaitTime) { (_, _) => - for { - hostingParticipants <- listHostingParticipants(partyId, synchronizerId) - visibleSet = hostingParticipants.map(_.participantUid).toSet - _ <- Future { - assert( - visibleSet == expectedSet, - s"Participant $participantUid does not yet see that $expectedSet host $partyId but instead lists $visibleSet", - ) - } - } yield () - } - } yield () - } - - private[this] def getSynchronizerId: Future[String] = - synchronizerConnectivityStub - .listConnectedSynchronizers(admin_participant.ListConnectedSynchronizersRequest()) - .map(_.connectedSynchronizers.head.synchronizerId) - - private[this] def listHostingParticipants( - partyId: String, - synchronizerId: String, - ): Future[Seq[protocol.PartyToParticipant.HostingParticipant]] = - topologyReadServiceStub - .listPartyToParticipant(makeListPartyToParticipantRequest(partyId, synchronizerId)) - .map(response => { - // We expect at most one result because makeListPartyToParticipantRequest filters by partyId - if (response.results.length > 1) - throw new IllegalStateException( - s"Expected at most one result, but got ${response.results.length}" - ) - response.results.headOption.toList.flatMap(_.item.get.participants) - }) - - private[this] def makeListPartyToParticipantRequest( - partyId: String, - synchronizerId: String, - ): admin_topology.ListPartyToParticipantRequest = - admin_topology.ListPartyToParticipantRequest( - baseQuery = Some( - admin_topology.BaseQuery( - store = Some( - admin_topology.StoreId( - admin_topology.StoreId.Store.Synchronizer( - admin_topology.Synchronizer( - admin_topology.Synchronizer.Kind.Id(synchronizerId) - ) - ) - ) - ), - proposals = false, - operation = protocol.Enums.TopologyChangeOp.TOPOLOGY_CHANGE_OP_UNSPECIFIED, - timeQuery = admin_topology.BaseQuery.TimeQuery - .HeadState(com.google.protobuf.empty.Empty()), - filterSignedKey = "", - protocolVersion = None, - ) - ), - filterParty = partyId, - filterParticipant = "", - ) - - private[this] def makeAllocatePartyOnMultipleParticipantsRequest( - partyId: String, - participantIds: Iterable[String], - synchronizerId: String, - ): admin_topology.AuthorizeRequest = { - - val entries = participantIds.map( - protocol.PartyToParticipant.HostingParticipant( - _, - protocol.Enums.ParticipantPermission.PARTICIPANT_PERMISSION_SUBMISSION, - None, - ) - ) - admin_topology.AuthorizeRequest( - admin_topology.AuthorizeRequest.Type.Proposal( - admin_topology.AuthorizeRequest.Proposal( - protocol.Enums.TopologyChangeOp.TOPOLOGY_CHANGE_OP_ADD_REPLACE, - 1, // first topology transaction for this party - Some( - protocol.TopologyMapping( - protocol.TopologyMapping.Mapping.PartyToParticipant( - protocol.PartyToParticipant( - partyId, - 1, - entries.toSeq, - None, - ) - ) - ) - ), - ) - ), - mustFullyAuthorize = false, - forceChanges = Seq.empty, - signedBy = Seq.empty, - store = Some( - admin_topology.StoreId( - admin_topology.StoreId.Store.Synchronizer( - admin_topology.Synchronizer( - admin_topology.Synchronizer.Kind.Id(synchronizerId) - ) - ) - ) - ), - waitToBecomeEffective = Some(com.google.protobuf.duration.Duration(1, 0)), - ) - } - - private[this] def makePartyReplicationAuthorizeRequest( - currentHostingParticipants: Seq[protocol.PartyToParticipant.HostingParticipant], - partyId: String, - participantId: String, - synchronizerId: String, - ): admin_topology.AuthorizeRequest = { - val newEntry = protocol.PartyToParticipant.HostingParticipant( - participantId, - protocol.Enums.ParticipantPermission.PARTICIPANT_PERMISSION_SUBMISSION, - None, - ) - admin_topology.AuthorizeRequest( - admin_topology.AuthorizeRequest.Type.Proposal( - admin_topology.AuthorizeRequest.Proposal( - protocol.Enums.TopologyChangeOp.TOPOLOGY_CHANGE_OP_ADD_REPLACE, - 0, // will be picked by the participant - Some( - protocol.TopologyMapping( - protocol.TopologyMapping.Mapping.PartyToParticipant( - protocol.PartyToParticipant( - partyId, - 1, - newEntry +: currentHostingParticipants, - None, - ) - ) - ) - ), - ) - ), - mustFullyAuthorize = false, - forceChanges = Seq.empty, - signedBy = Seq.empty, - store = Some( - admin_topology.StoreId( - admin_topology.StoreId.Store.Synchronizer( - admin_topology.Synchronizer( - admin_topology.Synchronizer.Kind.Id(synchronizerId) - ) - ) - ) - ), - waitToBecomeEffective = Some(com.google.protobuf.duration.Duration(1, 0)), - ) - } - - override def close(): Unit = GrpcChannel.close(channel) -} - -object AdminLedgerClient { - private[grpcLedgerClient] def stub[A <: AbstractStub[A]](stub: A, token: Option[String]): A = - token.fold(stub)(AuthCallCredentials.authorizingStub(stub, _)) - - /** Retrieves the identifier of the participant hosted at hostIp:port and calls [[singleHost]] - * with the result. - */ - def singleHostWithUnknownParticipantId( - hostIp: String, - port: Int, - token: Option[String] = None, - channelConfig: LedgerClientChannelConfiguration, - )(implicit ec: ExecutionContext): Future[AdminLedgerClient] = { - for { - participantId <- { - val identityServiceClient = IdentityServiceClient - .singleHost(hostIp, port, token, channelConfig) - val future = identityServiceClient.getId() - val _ = future.onComplete(_ => identityServiceClient.close()) - future - } - } yield AdminLedgerClient - .singleHost( - hostIp, - port, - token, - channelConfig, - participantId.getOrElse( - throw new IllegalStateException("unexpected uninitialized participant") - ), - ) - } - - /** A convenient shortcut to build a [[AdminLedgerClient]], use [[fromBuilder]] for a more - * flexible alternative. - */ - def singleHost( - hostIp: String, - port: Int, - token: Option[String] = None, - channelConfig: LedgerClientChannelConfiguration, - participantId: String, - )(implicit - ec: ExecutionContext - ): AdminLedgerClient = - fromBuilder(channelConfig.builderFor(hostIp, port), token, participantId) - - def fromBuilder( - builder: NettyChannelBuilder, - token: Option[String] = None, - participantId: String, - )(implicit ec: ExecutionContext): AdminLedgerClient = - new AdminLedgerClient( - GrpcChannel.withShutdownHook(builder), - token, - participantId, - ) -} diff --git a/sdk/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/v2/ledgerinteraction/grpcLedgerClient/GrpcLedgerClient.scala b/sdk/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/v2/ledgerinteraction/grpcLedgerClient/GrpcLedgerClient.scala index deb8af297956..6a5e1f6ef5e7 100644 --- a/sdk/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/v2/ledgerinteraction/grpcLedgerClient/GrpcLedgerClient.scala +++ b/sdk/daml-script/runner/src/main/scala/com/digitalasset/daml/lf/engine/script/v2/ledgerinteraction/grpcLedgerClient/GrpcLedgerClient.scala @@ -7,7 +7,6 @@ package grpcLedgerClient import java.time.Instant import java.util.UUID - import org.apache.pekko.stream.Materializer import com.daml.grpc.adapter.ExecutionSequencerFactory import com.digitalasset.canton.ledger.api.{PartyDetails, User, UserRight} @@ -18,8 +17,8 @@ import com.daml.ledger.api.v2.admin.package_management_service.{ } import com.daml.ledger.api.v2.admin.package_management_service.VettedPackagesChange.{ Operation, - Vet, Unvet, + Vet, } import com.daml.ledger.api.v2.commands.Commands import com.daml.ledger.api.v2.commands._ @@ -77,7 +76,6 @@ import scala.concurrent.duration.DurationInt class GrpcLedgerClient( val grpcClient: LedgerClient, val userId: Option[Ref.UserId], - val oAdminClient: Option[AdminLedgerClient], val compiledPackages: CompiledPackages, ) extends ScriptLedgerClient { override val transport = "gRPC API" @@ -852,53 +850,6 @@ class GrpcLedgerClient( mat: Materializer, ): Future[List[ScriptLedgerClient.ReadablePackageId]] = unsupportedOn("listAllPackages") - override def allocatePartyOnMultipleParticipants( - party: Ref.Party, - participantIds: Iterable[String], - )(implicit - ec: ExecutionContext, - mat: Materializer, - ): Future[Unit] = { - val adminClient = oAdminClient.getOrElse( - throw new IllegalArgumentException( - "Attempted to use exportParty without specifying a adminPort" - ) - ) - adminClient.allocatePartyOnMultipleParticipants(party, participantIds) - } - - override def aggregateAllocatePartyOnMultipleParticipants( - clients: List[ScriptLedgerClient], - partyHint: String, - namespace: String, - participantIds: Iterable[String], - )(implicit - ec: ExecutionContext, - mat: Materializer, - ): Future[Ref.Party] = { - val party = Party.assertFromString(partyHint + "::" + namespace) - for { - _ <- Future.traverse(clients)(_.allocatePartyOnMultipleParticipants(party, participantIds)) - } yield party - } - - override def waitUntilHostingVisible( - party: Ref.Party, - onParticipantUids: Iterable[String], - ): Future[Unit] = { - val adminClient = oAdminClient.getOrElse( - throw new IllegalArgumentException( - "Attempted to use waitUntilHostingVisible without specifying a adminPort" - ) - ) - adminClient.waitUntilHostingVisible(party, onParticipantUids) - } - - override def getParticipantUid: String = oAdminClient - .getOrElse( - throw new IllegalArgumentException( - "Attempted to use getParticipantUid without specifying a adminPort" - ) - ) - .participantUid + override def getParticipantUid()(implicit ec: ExecutionContext): Future[String] = + grpcClient.partyManagementClient.getParticipantId().map(_.asInstanceOf[String]) } diff --git a/sdk/daml-script/test/BUILD.bazel b/sdk/daml-script/test/BUILD.bazel index 14384ea73c14..ac4d7f7ec5cb 100644 --- a/sdk/daml-script/test/BUILD.bazel +++ b/sdk/daml-script/test/BUILD.bazel @@ -401,7 +401,6 @@ da_scala_test( deps = [ ":test-utils", "//bazel_tools/runfiles:scala_runfiles", - "//canton:community_base_proto_scala", "//canton:community_ledger_ledger-common", "//canton:community_util-observability", "//canton:ledger-resources", @@ -410,7 +409,6 @@ da_scala_test( "//canton:resources", "//canton:rs-grpc-bridge", "//canton:testing-utils", - "//canton:timer-utils", "//canton/community/daml-lf/data", "//canton/community/daml-lf/engine:script-engine", "//canton/community/daml-lf/interpreter", diff --git a/sdk/daml-script/test/src/main/scala/com/digitalasset/daml/lf/engine/script/test/UpgradesIT.scala b/sdk/daml-script/test/src/main/scala/com/digitalasset/daml/lf/engine/script/test/UpgradesIT.scala index af8dc561c941..49c63fa6ca7a 100644 --- a/sdk/daml-script/test/src/main/scala/com/digitalasset/daml/lf/engine/script/test/UpgradesIT.scala +++ b/sdk/daml-script/test/src/main/scala/com/digitalasset/daml/lf/engine/script/test/UpgradesIT.scala @@ -5,29 +5,29 @@ package com.digitalasset.daml.lf.engine.script package test import com.daml.bazeltools.BazelRunfiles.rlocation -import com.daml.timer.RetryStrategy -import com.digitalasset.canton.ledger.client.configuration.LedgerClientChannelConfiguration import com.digitalasset.daml.lf.UpgradeTestUtil import com.digitalasset.daml.lf.UpgradeTestUtil.TestCase import com.digitalasset.daml.lf.data.Ref._ import com.digitalasset.daml.lf.data.{FrontStack, ImmArray} import com.digitalasset.daml.lf.engine.script.ScriptTimeMode -import com.digitalasset.daml.lf.engine.script.test.DarUtil.Dar -import com.digitalasset.daml.lf.engine.script.v2.ledgerinteraction.grpcLedgerClient.AdminLedgerClient +import com.digitalasset.daml.lf.engine.script.v2.ledgerinteraction.ScriptLedgerClient +import com.digitalasset.daml.lf.PureCompiledPackages import com.digitalasset.daml.lf.language.LanguageVersion import com.digitalasset.daml.lf.engine.ScriptEngine.{ + defaultCompilerConfig, newTraceLog, newWarningLog, - defaultCompilerConfig, } +import com.digitalasset.daml.lf.engine.script.v2.ledgerinteraction.grpcLedgerClient.GrpcLedgerClient import com.digitalasset.daml.lf.value.Value +import com.google.protobuf.ByteString import org.scalatest.Inside import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AsyncWordSpec +import java.io.FileInputStream import java.nio.file.{Path, Paths} import scala.concurrent.Future -import scala.concurrent.duration.DurationInt class UpgradesIT( runCantonInDevMode: Boolean, @@ -103,40 +103,39 @@ class UpgradesIT( // Build dars (testDarPath, deps) <- testUtil.buildTestCaseDarMemoized(languageVersion, testCase) - // Connection - clients <- scriptClients(provideAdminPorts = true) - adminClients <- traverseSequential(ledgerPorts) { portInfo => - AdminLedgerClient - .singleHostWithUnknownParticipantId( - "localhost", - portInfo.adminPort.value, - None, - LedgerClientChannelConfiguration.InsecureDefaults, - ) - .map(portInfo.ledgerPort.value -> _) - } - _ <- traverseSequential(adminClients) { case (ledgerPort, adminClient) => + // Upload dars + client <- defaultLedgerClient() + ledgerClient = new GrpcLedgerClient( + client, + None, + PureCompiledPackages.Empty(defaultCompilerConfig), + ) + defaultParticipantUid <- ledgerClient.getParticipantUid() + scriptClients <- scriptClients() + _ <- traverseSequential(scriptClients.participants.toSeq) { case (participant, client) => Future.traverse(deps.reverse) { dep => Thread.sleep(500) println( - s"Uploading ${dep.versionedName} (${dep.mainPackageId}) to participant on port ${ledgerPort}" + s"Uploading ${dep.versionedName} (${dep.mainPackageId}) to participant ${participant.participant}" ) - adminClient - .uploadDar(dep.path.toFile) - .map(_.left.map(msg => throw new Exception(msg))) + + client.grpcClient.packageManagementClient + .uploadDarFile(ByteString.readFrom(new FileInputStream(dep.path.toFile))) } } - // Wait for upload - _ <- RetryStrategy.constant(attempts = 20, waitTime = 1.seconds) { (_, _) => - assertDepsVetted(adminClients.head._2, deps) - } + // Vet dars + pkgs = deps + .map(dep => ScriptLedgerClient.ReadablePackageId.assertFromString(dep.versionedName)) + .toList + _ <- ledgerClient.vetPackages(pkgs) + _ <- ledgerClient.waitUntilVettingVisible(pkgs, defaultParticipantUid) _ = println("All packages vetted on all participants") // Run tests testDar = CompiledDar.read(testDarPath, defaultCompilerConfig) _ <- run( - clients, + scriptClients, QualifiedName.assertFromString(s"${testCase.name}:main"), inputValue = Some( mkInitialTestState( @@ -191,25 +190,6 @@ class UpgradesIT( ), ) } - - private def assertDepsVetted( - client: AdminLedgerClient, - deps: Seq[Dar], - ): Future[Unit] = { - client - .listVettedPackages() - .map(_.foreach { - case (participantId, packages) => { - val packageIds = packages.view.map(_.packageId).toSet - deps.foreach { dep => - if (!packageIds.contains(dep.mainPackageId)) - throw new Exception( - s"Couldn't find package ${dep.versionedName} on participant $participantId" - ) - } - } - }) - } } // Because the test cases are listed even before the canton fixture is initialized, diff --git a/sdk/daml-script/test/src/main/scala/com/digitalasset/daml/lf/engine/script/test/UpgradesMatrixIT.scala b/sdk/daml-script/test/src/main/scala/com/digitalasset/daml/lf/engine/script/test/UpgradesMatrixIT.scala index d6251b8a69b0..9fba56692408 100644 --- a/sdk/daml-script/test/src/main/scala/com/digitalasset/daml/lf/engine/script/test/UpgradesMatrixIT.scala +++ b/sdk/daml-script/test/src/main/scala/com/digitalasset/daml/lf/engine/script/test/UpgradesMatrixIT.scala @@ -12,10 +12,7 @@ import com.digitalasset.daml.lf.command.ApiCommand import com.digitalasset.daml.lf.data.Ref._ import com.digitalasset.daml.lf.data._ import com.digitalasset.daml.lf.engine.script.v2.ledgerinteraction.ScriptLedgerClient.ReadablePackageId -import com.digitalasset.daml.lf.engine.script.v2.ledgerinteraction.grpcLedgerClient.{ - AdminLedgerClient, - GrpcLedgerClient, -} +import com.digitalasset.daml.lf.engine.script.v2.ledgerinteraction.grpcLedgerClient.GrpcLedgerClient import com.digitalasset.daml.lf.engine.script.v2.ledgerinteraction.{ScriptLedgerClient, SubmitError} import com.digitalasset.daml.lf.engine.{ UpgradesMatrix, @@ -128,19 +125,9 @@ abstract class UpgradesMatrixIntegration(upgradesMatrixCases: UpgradesMatrixCase _ <- Future.traverse( List(commonDefsDar, templateDefsV1Dar, templateDefsV2Dar, clientLocalDar, clientGlobalDar) )(dar => client.packageManagementClient.uploadDarFile(dar)) - adminClient <- { - import com.digitalasset.canton.ledger.client.configuration._ - AdminLedgerClient.singleHostWithUnknownParticipantId( - hostIp = "localhost", - port = ledgerPorts.head.adminPort.value, - token = None, - channelConfig = LedgerClientChannelConfiguration.InsecureDefaults, - ) - } scriptClient = new GrpcLedgerClient( client, Some(Ref.UserId.assertFromString("upgrade-test-matrix")), - Some(adminClient), cases.compiledPackages, ) } yield scriptClient, @@ -217,10 +204,11 @@ abstract class UpgradesMatrixIntegration(upgradesMatrixCases: UpgradesMatrixCase else for { _ <- scriptClient.unvetPackages(packages) - _ <- scriptClient.waitUntilUnvettingVisible(packages, scriptClient.getParticipantUid) + participantUid <- scriptClient.getParticipantUid() + _ <- scriptClient.waitUntilUnvettingVisible(packages, participantUid) result <- action _ <- scriptClient.vetPackages(packages) - _ <- scriptClient.waitUntilVettingVisible(packages, scriptClient.getParticipantUid) + _ <- scriptClient.waitUntilVettingVisible(packages, participantUid) } yield result }