@@ -35,16 +35,145 @@ import io.grpc.StatusRuntimeException
3535import java .time .Instant
3636import scala .concurrent .{Await , Promise , TimeoutException }
3737
38- abstract class ParticipantRepairAdministration (
38+ class ParticipantRepairAdministration (
3939 val consoleEnvironment : ConsoleEnvironment ,
4040 runner : AdminCommandRunner ,
4141 val loggerFactory : NamedLoggerFactory ,
4242) extends FeatureFlagFilter
4343 with NoTracing
4444 with Helpful {
4545
46- protected def access [T ](handler : ParticipantNode => T ): T
46+ @ Help .Summary (" Migrate contracts from one domain to another one." )
47+ @ Help .Description (
48+ """ This method can be used to migrate all the contracts associated with a domain to a new domain connection.
49+ This method will register the new domain, connect to it and then re-associate all contracts on the source
50+ domain to the target domain. Please note that this migration needs to be done by all participants
51+ at the same time. The domain should only be used once all participants have finished their migration.
52+
53+ The arguments are:
54+ source: the domain alias of the source domain
55+ target: the configuration for the target domain
56+ """
57+ )
58+ def migrate_domain (
59+ source : DomainAlias ,
60+ target : DomainConnectionConfig ,
61+ ): Unit = {
62+ consoleEnvironment.run {
63+ runner.adminCommand(
64+ ParticipantAdminCommands .ParticipantRepairManagement .MigrateDomain (source, target)
65+ )
66+ }
67+ }
68+
69+ @ Help .Summary (" Download all contracts for the given set of parties to a file." )
70+ @ Help .Description (
71+ """ This command can be used to download the current active contract set of a given set of parties to a text file.
72+ |This is mainly interesting for recovery and operational purposes.
73+ |
74+ |The file will contain base64 encoded strings, one line per contract. The lines are written
75+ |sorted according to their domain and contract id. This allows to compare the contracts stored
76+ |by two participants using standard file comparison tools.
77+ |The domain-id is printed with the prefix domain-id before the block of contracts starts.
78+ |
79+ |This command may take a long time to complete and may require significant resources.
80+ |It will first load the contract ids of the active contract set into memory and then subsequently
81+ |load the contracts in batches and inspect their stakeholders. As this operation needs to traverse
82+ |the entire datastore, it might take a long time to complete.
83+ |
84+ |The command will return a map of domainId -> number of active contracts stored
85+ |
86+ The arguments are:
87+ - parties: identifying contracts having at least one stakeholder from the given set
88+ - outputFile: the output file name where to store the data. Use .gz as a suffix to get a compressed file (recommended)
89+ - filterDomainId: restrict the export to a given domain
90+ - timestamp: optionally a timestamp for which we should take the state (useful to reconcile states of a domain)
91+ - protocolVersion: optional the protocol version to use for the serialization. Defaults to the one of the domains.
92+ - chunkSize: size of the byte chunks to stream back: default 1024 * 1024 * 2 = (2MB)
93+ """
94+ )
95+ def download (
96+ parties : Set [PartyId ],
97+ outputFile : String = ParticipantRepairAdministration .defaultFile,
98+ filterDomainId : String = " " ,
99+ timestamp : Option [Instant ] = None ,
100+ protocolVersion : Option [ProtocolVersion ] = None ,
101+ chunkSize : Option [PositiveInt ] = None ,
102+ ): Unit = {
103+ consoleEnvironment.run {
104+ val target = File (outputFile)
105+ val requestComplete = Promise [String ]()
106+ val observer = new GrpcByteChunksToFileObserver [AcsSnapshotChunk ](
107+ target,
108+ requestComplete,
109+ )
110+ val timeout = consoleEnvironment.commandTimeouts.ledgerCommand
47111
112+ def call = consoleEnvironment.run {
113+ runner.adminCommand(
114+ ParticipantAdminCommands .ParticipantRepairManagement
115+ .Download (
116+ parties,
117+ filterDomainId,
118+ timestamp,
119+ protocolVersion,
120+ chunkSize,
121+ observer,
122+ target.toJava.getName.endsWith(" .gz" ),
123+ )
124+ )
125+ }
126+
127+ try {
128+ ResourceUtil .withResource(call) { _ =>
129+ CommandSuccessful (
130+ Await
131+ .result(
132+ requestComplete.future,
133+ timeout.duration,
134+ )
135+ .discard
136+ )
137+ }
138+ } catch {
139+ case sre : StatusRuntimeException =>
140+ GenericCommandError (
141+ GrpcError (" Generating acs snapshot file" , " download_acs_snapshot" , sre).toString
142+ )
143+ case _ : TimeoutException =>
144+ target.delete(swallowIOExceptions = true )
145+ CommandErrors .ConsoleTimeout .Error (timeout.asJavaApproximation)
146+ }
147+ }
148+ }
149+
150+ @ Help .Summary (" Import ACS snapshot" , FeatureFlag .Preview )
151+ @ Help .Description (""" Uploads a binary into the participant's ACS""" )
152+ def upload (
153+ inputFile : String = ParticipantRepairAdministration .defaultFile
154+ ): Unit = {
155+ val file = File (inputFile)
156+ consoleEnvironment.run {
157+ runner.adminCommand(
158+ ParticipantAdminCommands .ParticipantRepairManagement .Upload (
159+ ByteString .copyFrom(file.loadBytes)
160+ )
161+ )
162+ }
163+ }
164+ }
165+
166+ abstract class LocalParticipantRepairAdministration (
167+ override val consoleEnvironment : ConsoleEnvironment ,
168+ runner : AdminCommandRunner ,
169+ override val loggerFactory : NamedLoggerFactory ,
170+ ) extends ParticipantRepairAdministration (
171+ consoleEnvironment = consoleEnvironment,
172+ runner = runner,
173+ loggerFactory = loggerFactory,
174+ ) {
175+
176+ protected def access [T ](handler : ParticipantNode => T ): T
48177 @ Help .Summary (" Add specified contracts to specific domain on local participant." )
49178 @ Help .Description (
50179 """ This is a last resort command to recover from data corruption, e.g. in scenarios in which participant
@@ -153,29 +282,6 @@ abstract class ParticipantRepairAdministration(
153282 )
154283 )
155284
156- @ Help .Summary (" Migrate domain to a new version." )
157- @ Help .Description (
158- """ This method can be used to migrate all the contracts associated with a domain to a new domain connection.
159- This method will register the new domain, connect to it and then re-associate all contracts on the source
160- domain to the target domain. Please note that this migration needs to be done by all participants
161- at the same time. The domain should only be used once all participants have finished their migration.
162-
163- The arguments are:
164- source: the domain alias of the source domain
165- target: the configuration for the target domain
166- """
167- )
168- def migrate_domain (
169- source : DomainAlias ,
170- target : DomainConnectionConfig ,
171- ): Unit = {
172- consoleEnvironment.run {
173- runner.adminCommand(
174- ParticipantAdminCommands .ParticipantRepairManagement .MigrateDomain (source, target)
175- )
176- }
177- }
178-
179285 @ Help .Summary (" Mark sequenced events as ignored." )
180286 @ Help .Description (
181287 """ This is the last resort to ignore events that the participant is unable to process.
@@ -199,7 +305,9 @@ abstract class ParticipantRepairAdministration(
199305 force : Boolean = false ,
200306 ): Unit =
201307 runRepairCommand(tc =>
202- access { _.sync.repairService.ignoreEvents(domainId, from, to, force)(tc) }
308+ access {
309+ _.sync.repairService.ignoreEvents(domainId, from, to, force)(tc)
310+ }
203311 )
204312
205313 @ Help .Summary (" Remove the ignored status from sequenced events." )
@@ -222,105 +330,10 @@ abstract class ParticipantRepairAdministration(
222330 force : Boolean = false ,
223331 ): Unit =
224332 runRepairCommand(tc =>
225- access { _.sync.repairService.unignoreEvents(domainId, from, to, force)(tc) }
226- )
227-
228- @ Help .Summary (" Download all contracts for the given set of parties to a file." )
229- @ Help .Description (
230- """ This command can be used to download the current active contract set of a given set of parties to a text file.
231- |This is mainly interesting for recovery and operational purposes.
232- |
233- |The file will contain base64 encoded strings, one line per contract. The lines are written
234- |sorted according to their domain and contract id. This allows to compare the contracts stored
235- |by two participants using standard file comparison tools.
236- |The domain-id is printed with the prefix domain-id before the block of contracts starts.
237- |
238- |This command may take a long time to complete and may require significant resources.
239- |It will first load the contract ids of the active contract set into memory and then subsequently
240- |load the contracts in batches and inspect their stakeholders. As this operation needs to traverse
241- |the entire datastore, it might take a long time to complete.
242- |
243- |The command will return a map of domainId -> number of active contracts stored
244- |
245- The arguments are:
246- - parties: identifying contracts having at least one stakeholder from the given set
247- - outputFile: the output file name where to store the data. Use .gz as a suffix to get a compressed file (recommended)
248- - filterDomainId: restrict the export to a given domain
249- - timestamp: optionally a timestamp for which we should take the state (useful to reconcile states of a domain)
250- - protocolVersion: optional the protocol version to use for the serialization. Defaults to the one of the domains.
251- - chunkSize: size of the byte chunks to stream back: default 1024 * 1024 * 2 = (2MB)
252- """
253- )
254- def download (
255- parties : Set [PartyId ],
256- outputFile : String = ParticipantRepairAdministration .defaultFile,
257- filterDomainId : String = " " ,
258- timestamp : Option [Instant ] = None ,
259- protocolVersion : Option [ProtocolVersion ] = None ,
260- chunkSize : Option [PositiveInt ] = None ,
261- ): Unit = {
262- consoleEnvironment.run {
263- val target = File (outputFile)
264- val requestComplete = Promise [String ]()
265- val observer = new GrpcByteChunksToFileObserver [AcsSnapshotChunk ](
266- target,
267- requestComplete,
268- )
269- val timeout = consoleEnvironment.commandTimeouts.ledgerCommand
270-
271- def call = consoleEnvironment.run {
272- runner.adminCommand(
273- ParticipantAdminCommands .ParticipantRepairManagement
274- .Download (
275- parties,
276- filterDomainId,
277- timestamp,
278- protocolVersion,
279- chunkSize,
280- observer,
281- target.toJava.getName.endsWith(" .gz" ),
282- )
283- )
284- }
285-
286- try {
287- ResourceUtil .withResource(call) { _ =>
288- CommandSuccessful (
289- Await
290- .result(
291- requestComplete.future,
292- timeout.duration,
293- )
294- .discard
295- )
296- }
297- } catch {
298- case sre : StatusRuntimeException =>
299- GenericCommandError (
300- GrpcError (" Generating acs snapshot file" , " download_acs_snapshot" , sre).toString
301- )
302- case _ : TimeoutException =>
303- target.delete(swallowIOExceptions = true )
304- CommandErrors .ConsoleTimeout .Error (timeout.asJavaApproximation)
333+ access {
334+ _.sync.repairService.unignoreEvents(domainId, from, to, force)(tc)
305335 }
306- }
307- }
308-
309- @ Help .Summary (" Import ACS snapshot" , FeatureFlag .Preview )
310- @ Help .Description (""" Uploads a binary into the participant's ACS""" )
311- def upload (
312- inputFile : String = ParticipantRepairAdministration .defaultFile
313- ): Unit = {
314- val file = File (inputFile)
315- consoleEnvironment.run {
316- runner.adminCommand(
317- ParticipantAdminCommands .ParticipantRepairManagement .Upload (
318- ByteString .copyFrom(file.loadBytes)
319- )
320- )
321- }
322- }
323-
336+ )
324337}
325338
326339object ParticipantRepairAdministration {
0 commit comments