diff --git a/build.sbt b/build.sbt index a0343b70..cd85dce1 100644 --- a/build.sbt +++ b/build.sbt @@ -7,7 +7,7 @@ lazy val commonSettings = List( scalacOptions ++= commonScalacOptions, scalaVersion := "2.12.15", organization := "org.ergoplatform", - version := "1.0.1-M3", + version := "1.2.0-M1", resolvers ++= Seq( Resolver.sonatypeRepo("public"), Resolver.sonatypeRepo("snapshots"), diff --git a/modules/amm-executor/src/main/resources/application.conf b/modules/amm-executor/src/main/resources/application.conf index e4dbe859..429d7e64 100644 --- a/modules/amm-executor/src/main/resources/application.conf +++ b/modules/amm-executor/src/main/resources/application.conf @@ -1,5 +1,3 @@ -rotation.retry-delay = 120s - exchange.reward-address = "9gCigPc9cZNRhKgbgdmTkVxo1ZKgw79G8DvLjCcYWAvEF3XRUKy" execution.order-lifetime = 300s @@ -18,12 +16,9 @@ consumers.unconfirmed-orders.group-id = "ergo" consumers.unconfirmed-orders.client-id = "ergo" consumers.unconfirmed-orders.topic-id = "dex.amm.cfmm.unconfirmed.orders" -consumers.orders-retry.group-id = "ergo" -consumers.orders-retry.client-id = "ergo-retry" -consumers.orders-retry.topic-id = "dex.amm.cfmm.orders.retry" - -producers.orders-retry.topic-id = "dex.amm.cfmm.orders.retry" -producers.orders-retry.parallelism = 3 +consumers.evaluated-orders.group-id = "ergo" +consumers.evaluated-orders.client-id = "ergo" +consumers.evaluated-orders.topic-id = "dex.cfmm.history.orders" kafka.bootstrap-servers = ["kafka1:9092"] diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala index ada1e081..170016ef 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala @@ -1,6 +1,5 @@ package org.ergoplatform.dex.executor.amm -import cats.Id import cats.effect.{Blocker, Resource} import fs2.kafka.RecordDeserializer import fs2.kafka.serde._ @@ -8,14 +7,15 @@ import org.ergoplatform.ErgoAddressEncoder import org.ergoplatform.common.EnvApp import org.ergoplatform.common.streaming._ import org.ergoplatform.dex.configs.ConsumerConfig -import org.ergoplatform.dex.domain.amm.{CFMMOrder, OrderId} +import org.ergoplatform.dex.domain.amm.{CFMMOrder, EvaluatedCFMMOrder, OrderId} import org.ergoplatform.dex.executor.amm.config.ConfigBundle import org.ergoplatform.dex.executor.amm.context.AppContext import org.ergoplatform.dex.executor.amm.interpreters.{CFMMInterpreter, N2TCFMMInterpreter, T2TCFMMInterpreter} -import org.ergoplatform.dex.executor.amm.processes.Executor +import org.ergoplatform.dex.executor.amm.modules.CFMMBacklog +import org.ergoplatform.dex.executor.amm.processes.{BacklogCleaner, Executor, Registerer} import org.ergoplatform.dex.executor.amm.repositories.CFMMPools import org.ergoplatform.dex.executor.amm.services.Execution -import org.ergoplatform.dex.executor.amm.streaming.{CFMMCircuit, CFMMConsumerIn, CFMMConsumerRetries, CFMMProducerRetries} +import org.ergoplatform.dex.executor.amm.streaming.{CFMMOrders, CFMMOrdersGen, EvaluatedCFMMOrders} import org.ergoplatform.dex.protocol.amm.AMMType.{CFMMType, N2T_CFMM, T2T_CFMM} import org.ergoplatform.ergo.modules.ErgoNetwork import org.ergoplatform.ergo.services.explorer.{ErgoExplorer, ErgoExplorerStreaming} @@ -23,11 +23,9 @@ import org.ergoplatform.ergo.services.node.ErgoNode import org.ergoplatform.ergo.state.{Confirmed, Unconfirmed} import sttp.capabilities.fs2.Fs2Streams import sttp.client3.SttpBackend -import sttp.client3.asynchttpclient.cats.AsyncHttpClientCatsBackend import sttp.client3.asynchttpclient.fs2.AsyncHttpClientFs2Backend import tofu.WithRun import tofu.fs2Instances._ -import tofu.lift.IsoK import tofu.syntax.unlift._ import zio.interop.catz._ import zio.{ExitCode, URIO, ZEnv} @@ -35,29 +33,26 @@ import zio.{ExitCode, URIO, ZEnv} object App extends EnvApp[AppContext] { def run(args: List[String]): URIO[ZEnv, ExitCode] = - init(args.headOption).use { case (executor, ctx) => - val appF = executor.run.compile.drain + init(args.headOption).use { case (executor, registerer, cleaner, ctx) => + val appF = fs2.Stream(executor.run, registerer.run, cleaner.run).parJoinUnbounded.compile.drain appF.run(ctx) as ExitCode.success }.orDie - private def init(configPathOpt: Option[String]): Resource[InitF, (Executor[StreamF], AppContext)] = + private def init(configPathOpt: Option[String]) = for { blocker <- Blocker[InitF] configs <- Resource.eval(ConfigBundle.load[InitF](configPathOpt, blocker)) ctx = AppContext.init(configs) - implicit0(isoKRun: IsoK[RunF, InitF]) = isoKRunByContext(ctx) implicit0(e: ErgoAddressEncoder) = ErgoAddressEncoder(configs.protocol.networkType.prefix) - implicit0(confirmedOrders: CFMMConsumerIn[StreamF, RunF, Confirmed]) = + implicit0(confirmedOrders: CFMMOrdersGen[StreamF, RunF, Confirmed]) = makeConsumer[OrderId, Confirmed[CFMMOrder]](configs.consumers.confirmedOrders) - implicit0(unconfirmedOrders: CFMMConsumerIn[StreamF, RunF, Unconfirmed]) = + implicit0(unconfirmedOrders: CFMMOrdersGen[StreamF, RunF, Unconfirmed]) = makeConsumer[OrderId, Unconfirmed[CFMMOrder]](configs.consumers.unconfirmedOrders) - implicit0(consumerRetries: CFMMConsumerRetries[StreamF, RunF]) = - makeConsumer[OrderId, Delayed[CFMMOrder]](configs.consumers.ordersRetry) - implicit0(orders: CFMMConsumerIn[StreamF, RunF, Id]) = + implicit0(orders: CFMMOrders[StreamF, RunF]) = Consumer.combine2(confirmedOrders, unconfirmedOrders)(_.entity, _.entity) - implicit0(producerRetries: CFMMProducerRetries[StreamF]) <- - Producer.make[InitF, StreamF, RunF, OrderId, Delayed[CFMMOrder]](configs.producers.ordersRetry) - implicit0(consumer: CFMMCircuit[StreamF, RunF]) = StreamingCircuit.make[StreamF, RunF, OrderId, CFMMOrder] + implicit0(evaluatedOrders: EvaluatedCFMMOrders[StreamF, RunF]) = + makeConsumer[OrderId, EvaluatedCFMMOrder.Any](configs.consumers.evaluatedOrders) + implicit0(backlog: CFMMBacklog[RunF]) <- Resource.eval(CFMMBacklog.make[InitF, RunF]) implicit0(backend: SttpBackend[RunF, Fs2Streams[RunF]]) <- makeBackend(ctx, blocker) implicit0(explorer: ErgoExplorer[RunF]) = ErgoExplorerStreaming.make[StreamF, RunF] implicit0(node: ErgoNode[RunF]) <- Resource.eval(ErgoNode.make[InitF, RunF]) @@ -68,7 +63,9 @@ object App extends EnvApp[AppContext] { implicit0(interpreter: CFMMInterpreter[CFMMType, RunF]) = CFMMInterpreter.make[RunF] implicit0(execution: Execution[RunF]) <- Resource.eval(Execution.make[InitF, RunF]) executor <- Resource.eval(Executor.make[InitF, StreamF, RunF]) - } yield executor -> ctx + registerer <- Resource.eval(Registerer.make[InitF, StreamF, RunF]) + cleaner <- Resource.eval(BacklogCleaner.make[InitF, StreamF, RunF]) + } yield (executor, registerer, cleaner, ctx) private def makeBackend( ctx: AppContext, diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ConfigBundle.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ConfigBundle.scala index 6a534573..7c187de6 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ConfigBundle.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ConfigBundle.scala @@ -11,13 +11,11 @@ import tofu.optics.macros.{promote, ClassyOptics} @derive(pureconfigReader, loggable) @ClassyOptics final case class ConfigBundle( - @promote rotation: RotationConfig, @promote exchange: ExchangeConfig, @promote execution: ExecutionConfig, @promote monetary: MonetaryConfig, @promote protocol: ProtocolConfig, consumers: Consumers, - producers: Producers, @promote kafka: KafkaConfig, @promote network: NetworkConfig, @promote resolver: ResolverConfig diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/Consumers.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/Consumers.scala index a28ff859..e81e93c2 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/Consumers.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/Consumers.scala @@ -9,5 +9,5 @@ import tofu.logging.derivation.loggable final case class Consumers( confirmedOrders: ConsumerConfig, unconfirmedOrders: ConsumerConfig, - ordersRetry: ConsumerConfig + evaluatedOrders: ConsumerConfig ) diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala index 41a413ce..4c33a57f 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala @@ -1,6 +1,15 @@ package org.ergoplatform.dex.executor.amm.modules +import cats.Monad +import cats.effect.{Sync, Timer} +import cats.effect.concurrent.Ref import org.ergoplatform.dex.domain.amm.{CFMMOrder, OrderId} +import tofu.concurrent.MakeRef +import tofu.generate.GenRandom +import tofu.syntax.monadic._ + +import scala.collection.immutable.{HashSet, TreeSet} +import scala.concurrent.duration.DurationInt trait CFMMBacklog[F[_]] { @@ -8,11 +17,70 @@ trait CFMMBacklog[F[_]] { */ def put(order: CFMMOrder): F[Unit] - /** Get candidate order for execution. Blocks until an order is available. + def putLowPriority(order: CFMMOrder): F[Unit] + + /** Pop a candidate order for execution. Blocks until an order is available. */ - def get: F[CFMMOrder] + def pop: F[CFMMOrder] /** Put an order from the backlog. */ - def drop(id: OrderId): F[Unit] + def drop(id: OrderId): F[Boolean] +} + +object CFMMBacklog { + + private val PollInterval = 200.millis + private val PriorityThreshold = 9 + private val PrioritySpace = 99 + + def make[I[_]: Sync, F[_]: Sync: Timer](implicit makeRef: MakeRef[I, F]): I[CFMMBacklog[F]] = + for { + implicit0(rnd: GenRandom[F]) <- GenRandom.instance[I, F]() + candidatesR <- makeRef.refOf(TreeSet.empty[CFMMOrder]) + lpCandidatesR <- makeRef.refOf(TreeSet.empty[CFMMOrder]) + survivorsR <- makeRef.refOf(HashSet.empty[OrderId]) + } yield new EphemeralCFMMBacklog(candidatesR, lpCandidatesR, survivorsR) + + // In-memory orders backlog. + // Note: Not thread safe. + final class EphemeralCFMMBacklog[F[_]: Monad: GenRandom]( + candidatesR: Ref[F, TreeSet[CFMMOrder]], + lowPriorityCandidatesR: Ref[F, TreeSet[CFMMOrder]], + survivorsR: Ref[F, HashSet[OrderId]] + )(implicit T: Timer[F]) + extends CFMMBacklog[F] { + + def put(order: CFMMOrder): F[Unit] = + candidatesR.update(_ + order) >> survivorsR.update(_ + order.id) + + def putLowPriority(order: CFMMOrder): F[Unit] = + lowPriorityCandidatesR.update(_ + order) >> survivorsR.update(_ + order.id) + + def pop: F[CFMMOrder] = { + def tryPop: F[CFMMOrder] = + for { + rnd <- GenRandom.nextInt(PrioritySpace) + lpc <- lowPriorityCandidatesR.get.map(_.headOption) + maybeWinner <- lpc match { + case Some(c) if rnd <= PriorityThreshold => Left(c).pure + case _ => candidatesR.get.map(xs => Right(xs.headOption)) + } + winner <- maybeWinner match { + case Right(Some(order)) => candidatesR.update(_ - order) as order + case Left(order) => lowPriorityCandidatesR.update(_ - order) as order + case _ => T.sleep(PollInterval) >> tryPop + } + } yield winner + for { + c <- tryPop + res <- survivorsR.get + .map(_.contains(c.id)) + .ifM(survivorsR.update(_ - c.id) as c, pop) + } yield res + } + + def drop(id: OrderId): F[Boolean] = + survivorsR.get.map(_.contains(id)).ifM(survivorsR.update(_ - id) as true, false.pure) + } } diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/BacklogCleaner.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/BacklogCleaner.scala new file mode 100644 index 00000000..227dffb0 --- /dev/null +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/BacklogCleaner.scala @@ -0,0 +1,49 @@ +package org.ergoplatform.dex.executor.amm.processes + +import cats.{Functor, Monad} +import derevo.derive +import org.ergoplatform.dex.executor.amm.modules.CFMMBacklog +import org.ergoplatform.dex.executor.amm.streaming.EvaluatedCFMMOrders +import tofu.Catches +import tofu.higherKind.derived.representableK +import tofu.logging.{Logging, Logs} +import tofu.streams.Evals +import tofu.syntax.logging._ +import tofu.syntax.monadic._ +import tofu.syntax.handle._ +import tofu.syntax.streams.all._ + +@derive(representableK) +trait BacklogCleaner[F[_]] { + + def run: F[Unit] +} + +object BacklogCleaner { + + def make[ + I[_]: Functor, + F[_]: Monad: Evals[*[_], G]: Catches, + G[_]: Monad + ](implicit + orders: EvaluatedCFMMOrders[F, G], + backlog: CFMMBacklog[G], + logs: Logs[I, G] + ): I[BacklogCleaner[F]] = + logs.forService[BacklogCleaner[F]].map(implicit l => new Live[F, G]) + + final private class Live[ + F[_]: Monad: Evals[*[_], G]: Catches, + G[_]: Monad: Logging + ](implicit + orders: EvaluatedCFMMOrders[F, G], + backlog: CFMMBacklog[G] + ) extends BacklogCleaner[F] { + + def run: F[Unit] = + orders.stream + .evalTap(rec => backlog.drop(rec.message.order.id).ifM(debug"Order ${rec.message} is evicted", unit[G])) + .evalMap(_.commit) + .handleWith[Throwable](e => eval(warnCause"BacklogCleaner failed. Restarting .." (e)) >> run) + } +} diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Executor.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Executor.scala index 747ee2cb..bf69d61b 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Executor.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Executor.scala @@ -2,13 +2,14 @@ package org.ergoplatform.dex.executor.amm.processes import cats.effect.Clock import cats.syntax.option._ -import cats.{Functor, Monad} +import cats.{Defer, Functor, Monad, SemigroupK} import derevo.derive import mouse.any._ import org.ergoplatform.common.TraceId import org.ergoplatform.common.streaming.syntax._ import org.ergoplatform.dex.domain.amm.CFMMOrder import org.ergoplatform.dex.executor.amm.config.ExecutionConfig +import org.ergoplatform.dex.executor.amm.modules.CFMMBacklog import org.ergoplatform.dex.executor.amm.services.Execution import org.ergoplatform.dex.executor.amm.streaming.CFMMCircuit import org.ergoplatform.ergo.services.explorer.TxSubmissionErrorParser @@ -34,10 +35,10 @@ object Executor { def make[ I[_]: Functor, - F[_]: Monad: Evals[*[_], G]: ExecutionConfig.Has, + F[_]: Monad: SemigroupK: Defer: Evals[*[_], G]: ExecutionConfig.Has, G[_]: Monad: TraceId.Local: Clock: Catches ](implicit - orders: CFMMCircuit[F, G], + backlog: CFMMBacklog[G], service: Execution[G], logs: Logs[I, G] ): I[Executor[F]] = @@ -48,34 +49,32 @@ object Executor { } final private class Live[ - F[_]: Monad: Evals[*[_], G], + F[_]: Monad: SemigroupK: Defer: Evals[*[_], G], G[_]: Monad: Logging: TraceId.Local: Clock: Catches ](conf: ExecutionConfig)(implicit - orders: CFMMCircuit[F, G], + backlog: CFMMBacklog[G], service: Execution[G], errParser: TxSubmissionErrorParser ) extends Executor[F] { def run: F[Unit] = - orders.stream - .evalMap { rec => + eval(backlog.pop).repeat + .evalMap { order => service - .executeAttempt(rec.message) + .executeAttempt(order) .handleWith[Throwable](e => warnCause"Order execution failed fatally" (e) as none[CFMMOrder]) - .local(_ => TraceId.fromString(rec.message.id.value)) - .tupleLeft(rec) + .local(_ => TraceId.fromString(order.id.value)) + .tupleLeft(order) } - .flatTap { - case (_, None) => unit[F] + .evalMap { + case (_, None) => unit[G] case (_, Some(order)) => - eval(now.millis) >>= { + now.millis >>= { case ts if ts - order.timestamp < conf.orderLifetime.toMillis => - eval(warn"Failed to execute $order. Going to retry.") >> - orders.retry((order.id -> order).pure[F]) + warn"Failed to execute $order. Going to retry." >> backlog.put(order) case _ => - eval(warn"Failed to execute $order. Order expired.") + warn"Failed to execute $order. Order expired." } } - .evalMap { case (rec, _) => rec.commit } } } diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Registerer.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Registerer.scala new file mode 100644 index 00000000..11ec3a98 --- /dev/null +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Registerer.scala @@ -0,0 +1,50 @@ +package org.ergoplatform.dex.executor.amm.processes + +import cats.{Functor, Monad} +import derevo.derive +import org.ergoplatform.common.TraceId +import org.ergoplatform.dex.executor.amm.modules.CFMMBacklog +import org.ergoplatform.dex.executor.amm.streaming.CFMMOrders +import tofu.Catches +import tofu.higherKind.derived.representableK +import tofu.logging.{Logging, Logs} +import tofu.streams.Evals +import tofu.syntax.logging._ +import tofu.syntax.monadic._ +import tofu.syntax.handle._ +import tofu.syntax.streams.all._ + +@derive(representableK) +trait Registerer[F[_]] { + + def run: F[Unit] +} + +object Registerer { + + def make[ + I[_]: Functor, + F[_]: Monad: Evals[*[_], G]: Catches, + G[_]: Monad: TraceId.Local + ](implicit + orders: CFMMOrders[F, G], + backlog: CFMMBacklog[G], + logs: Logs[I, G] + ): I[Registerer[F]] = + logs.forService[Registerer[F]].map(implicit l => new Live[F, G]) + + final private class Live[ + F[_]: Monad: Evals[*[_], G]: Catches, + G[_]: Monad: Logging: TraceId.Local + ](implicit + orders: CFMMOrders[F, G], + backlog: CFMMBacklog[G] + ) extends Registerer[F] { + + def run: F[Unit] = + orders.stream + .evalTap(rec => debug"Registered ${rec.message}" >> backlog.put(rec.message)) + .evalMap(_.commit) + .handleWith[Throwable](e => eval(warnCause"Registerer failed. Restarting .." (e)) >> run) + } +} diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/streaming.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/streaming.scala index 8ee7e122..8fa3c154 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/streaming.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/streaming.scala @@ -1,14 +1,15 @@ package org.ergoplatform.dex.executor.amm +import cats.Id import fs2.kafka.types.KafkaOffset import org.ergoplatform.common.streaming._ -import org.ergoplatform.dex.domain.amm.{CFMMOrder, OrderId} +import org.ergoplatform.dex.domain.amm.{CFMMOrder, EvaluatedCFMMOrder, OrderId} object streaming { - type CFMMConsumerIn[F[_], G[_], Status[_]] = Consumer.Aux[OrderId, Status[CFMMOrder], KafkaOffset, F, G] - type CFMMConsumerRetries[F[_], G[_]] = Consumer.Aux[OrderId, Delayed[CFMMOrder], KafkaOffset, F, G] - type CFMMProducerRetries[F[_]] = Producer[OrderId, Delayed[CFMMOrder], F] + type CFMMOrdersGen[F[_], G[_], Status[_]] = Consumer.Aux[OrderId, Status[CFMMOrder], KafkaOffset, F, G] + type CFMMOrders[F[_], G[_]] = CFMMOrdersGen[F, G, Id] + type EvaluatedCFMMOrders[F[_], G[_]] = Consumer.Aux[OrderId, EvaluatedCFMMOrder.Any, KafkaOffset, F, G] type CFMMCircuit[F[_], G[_]] = StreamingCircuit[OrderId, CFMMOrder, F, G] } diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/common/streaming/StreamingCircuit.scala b/modules/dex-core/src/main/scala/org/ergoplatform/common/streaming/StreamingCircuit.scala index ebccdfd3..78334e78 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/common/streaming/StreamingCircuit.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/common/streaming/StreamingCircuit.scala @@ -3,10 +3,8 @@ package org.ergoplatform.common.streaming import cats.effect.Timer import cats.{FlatMap, Monad} import fs2.kafka.types.KafkaOffset -import org.ergoplatform.ergo.state.{Confirmed, Unconfirmed} import tofu.higherKind.Embed import tofu.streams.{Evals, ParFlatten} -import tofu.syntax.embed._ import tofu.syntax.monadic._ import tofu.syntax.streams.all._ import tofu.syntax.time.now diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/common/streaming/syntax.scala b/modules/dex-core/src/main/scala/org/ergoplatform/common/streaming/syntax.scala index e1755256..de16c839 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/common/streaming/syntax.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/common/streaming/syntax.scala @@ -24,4 +24,18 @@ object syntax { commit.traverse_(_(batch.foldLeft(Chain.empty[O])(_ append _.offset))) } } + + implicit final class CommittableBatchOps[S[_], C[_], F[_], K, V, O](private val fa: S[C[Committable[K, V, O, F]]]) + extends AnyVal { + + def commitBatch(implicit + S: Evals[S, F], + F: Applicative[F], + C: Foldable[C] + ): S[Unit] = + fa.evalMap { batch => + val commit = batch.get(0).map(_.commitBatch) + commit.traverse_(_(batch.foldLeft(Chain.empty[O])(_ append _.offset))) + } + } } diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/CFMMOrder.scala b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/CFMMOrder.scala index d85ef88c..2a8fe583 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/CFMMOrder.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/CFMMOrder.scala @@ -15,6 +15,18 @@ sealed trait CFMMOrder { def id: OrderId = OrderId.fromBoxId(box.boxId) } +object CFMMOrder { + + implicit val orderingByFee: Ordering[CFMMOrder] = Ordering[(Long, Long)] + .on[CFMMOrder] { + case Deposit(_, _, timestamp, params, _) => (-params.dexFee, timestamp) + case Redeem(_, _, timestamp, params, _) => (-params.dexFee, timestamp) + case Swap(_, _, timestamp, params, _) => + val minFee = params.dexFeePerTokenNum * params.minOutput.value / params.dexFeePerTokenDenom + (-minFee, timestamp) + } +} + @derive(encoder, decoder, loggable) final case class Deposit(poolId: PoolId, maxMinerFee: Long, timestamp: Long, params: DepositParams, box: Output) extends CFMMOrder diff --git a/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/processes/HistoryIndexing.scala b/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/processes/HistoryIndexing.scala index d3815c83..67e704a8 100644 --- a/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/processes/HistoryIndexing.scala +++ b/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/processes/HistoryIndexing.scala @@ -10,6 +10,7 @@ import org.ergoplatform.dex.index.db.Extract.syntax.ExtractOps import org.ergoplatform.dex.index.db.models.{DBDeposit, DBRedeem, DBSwap} import org.ergoplatform.dex.index.repositories.RepoBundle import org.ergoplatform.dex.index.streaming.CFMMHistConsumer +import org.ergoplatform.common.streaming.syntax._ import tofu.doobie.transactor.Txr import tofu.logging.{Logging, Logs} import tofu.streams.{Chunks, Evals} @@ -52,11 +53,9 @@ object HistoryIndexing { ) extends HistoryIndexing[S] { def run: S[Unit] = - orders.stream.chunks - .map(_.map(r => r.message).toList) - .evalTap(xs => warn"[${xs.count(_.isEmpty)}] records discarded.") - .evalMap { rs => - val orders = rs.flatten + orders.stream.chunks.evalTap { rs => + val ordersIn = rs.map(_.message).toList + val orders = ordersIn.flatten val (swaps, others) = orders.partitionEither { case EvaluatedCFMMOrder(o: Swap, Some(ev: SwapEvaluation), p) => Left(EvaluatedCFMMOrder(o, Some(ev), p).extract[DBSwap]) @@ -80,7 +79,8 @@ object HistoryIndexing { ds <- insertNel(deposits)(repos.deposits.insert) rs <- insertNel(redeems)(repos.redeems.insert) } yield ss + ds + rs - txr.trans(insert) >>= (n => info"[$n] orders indexed") - } + txr.trans(insert) >>= + (n => warn"[${ordersIn.count(_.isEmpty)}] records discarded." >> info"[$n] orders indexed") + }.commitBatch } } diff --git a/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/processes/LocksIndexing.scala b/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/processes/LocksIndexing.scala index 5a0c3883..84e1b9b6 100644 --- a/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/processes/LocksIndexing.scala +++ b/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/processes/LocksIndexing.scala @@ -7,6 +7,7 @@ import org.ergoplatform.dex.index.db.Extract.syntax.ExtractOps import org.ergoplatform.dex.index.db.models.DBLiquidityLock import org.ergoplatform.dex.index.repositories.RepoBundle import org.ergoplatform.dex.index.streaming.LqLocksConsumer +import org.ergoplatform.common.streaming.syntax._ import tofu.doobie.transactor.Txr import tofu.logging.{Logging, Logs} import tofu.streams.{Chunks, Evals} @@ -49,7 +50,7 @@ object LocksIndexing { ) extends LocksIndexing[S] { def run: S[Unit] = - locks.stream.chunks.evalMap { rs => + locks.stream.chunks.evalTap { rs => val locks = rs.map(r => r.message.entity).toList def insertNel[A](xs: List[A])(insert: NonEmptyList[A] => D[Int]) = NonEmptyList.fromList(xs).fold(0.pure[D])(insert) @@ -58,6 +59,6 @@ object LocksIndexing { txr.trans(insert) >>= { ls => info"[$ls] locks indexed" } - } + }.commitBatch } } diff --git a/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/processes/PoolsIndexing.scala b/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/processes/PoolsIndexing.scala index 9512f008..db97a5b0 100644 --- a/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/processes/PoolsIndexing.scala +++ b/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/processes/PoolsIndexing.scala @@ -12,6 +12,7 @@ import org.ergoplatform.dex.protocol.constants.ErgoAssetId import org.ergoplatform.ergo.TokenId import org.ergoplatform.ergo.services.explorer.ErgoExplorer import org.ergoplatform.ergo.services.explorer.models.TokenInfo.ErgoTokenInfo +import org.ergoplatform.common.streaming.syntax._ import tofu.doobie.transactor.Txr import tofu.logging.{Logging, Logs} import tofu.streams.{Chunks, Evals} @@ -56,7 +57,7 @@ object PoolsIndexing { ) extends PoolsIndexing[S] { def run: S[Unit] = - pools.stream.chunks.evalMap { rs => + pools.stream.chunks.evalTap { rs => val poolSnapshots = rs.map(r => r.message).toList val assets = poolSnapshots.flatMap(p => List(p.entity.lp.id, p.entity.x.id, p.entity.y.id)).distinct @@ -82,6 +83,6 @@ object PoolsIndexing { (pn, an) <- txr.trans(insert) _ <- info"[$pn] pool snapshots indexed" >> info"[$an] assets indexed" } yield () - } + }.commitBatch } }