From a26f11350f5c80560fdde548d6ad21333823d265 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Wed, 15 Jun 2022 13:27:25 +0200 Subject: [PATCH 1/5] Feat: started coding sth --- packages/manaverse/priorityqueue.go | 1 + 1 file changed, 1 insertion(+) create mode 100644 packages/manaverse/priorityqueue.go diff --git a/packages/manaverse/priorityqueue.go b/packages/manaverse/priorityqueue.go new file mode 100644 index 0000000000..59cd47df55 --- /dev/null +++ b/packages/manaverse/priorityqueue.go @@ -0,0 +1 @@ +package manaverse From bb083f50ecb20a672867f1d59e8f7333f18b4be7 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Fri, 17 Jun 2022 14:44:41 +0200 Subject: [PATCH 2/5] Feat: started to implement scheduler / WIP --- go.mod | 2 +- go.sum | 4 +- packages/manaverse/bucket.go | 46 +++++++++++ packages/manaverse/manaverse_test.go | 16 ++++ packages/manaverse/priorityqueue.go | 1 - packages/manaverse/scheduler.go | 113 ++++++++++++++++++++++++++ packages/pow/pow.go | 11 ++- packages/tangle/message.go | 51 +++++++++++- packages/tangle/payload/payload.go | 2 +- tools/integration-tests/tester/go.mod | 2 +- tools/integration-tests/tester/go.sum | 4 +- 11 files changed, 238 insertions(+), 14 deletions(-) create mode 100644 packages/manaverse/bucket.go create mode 100644 packages/manaverse/manaverse_test.go delete mode 100644 packages/manaverse/priorityqueue.go create mode 100644 packages/manaverse/scheduler.go diff --git a/go.mod b/go.mod index 0fb5e807eb..a67b81730d 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/gin-gonic/gin v1.7.0 github.com/go-resty/resty/v2 v2.6.0 github.com/gorilla/websocket v1.5.0 - github.com/iotaledger/hive.go v0.0.0-20220607150119-1be29e962175 + github.com/iotaledger/hive.go v0.0.0-20220615225152-812f42b43f23 github.com/labstack/echo v3.3.10+incompatible github.com/labstack/gommon v0.3.0 github.com/libp2p/go-libp2p v0.15.0 diff --git a/go.sum b/go.sum index f3a49db149..206f8a730a 100644 --- a/go.sum +++ b/go.sum @@ -473,8 +473,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/imkira/go-interpol v1.1.0/go.mod h1:z0h2/2T3XF8kyEPpRgJ3kmNv+C43p+I/CoI+jC3w2iA= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= -github.com/iotaledger/hive.go v0.0.0-20220607150119-1be29e962175 h1:IgXxiPx51WJglOL5EtIurlMbujnrLP4vLYQqyfmR0zg= -github.com/iotaledger/hive.go v0.0.0-20220607150119-1be29e962175/go.mod h1:8f9U7qHFby0W3cxv/nKnz9LHn9BbwWU0tMsWDnfqzRI= +github.com/iotaledger/hive.go v0.0.0-20220615225152-812f42b43f23 h1:gLiEPbxzrnzNOI7JD+kKZqfG5EjKKXRBfYcBO/hgCUY= +github.com/iotaledger/hive.go v0.0.0-20220615225152-812f42b43f23/go.mod h1:8f9U7qHFby0W3cxv/nKnz9LHn9BbwWU0tMsWDnfqzRI= github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= diff --git a/packages/manaverse/bucket.go b/packages/manaverse/bucket.go new file mode 100644 index 0000000000..20ce26fa1a --- /dev/null +++ b/packages/manaverse/bucket.go @@ -0,0 +1,46 @@ +package manaverse + +import ( + "sync" + + "github.com/iotaledger/hive.go/generics/priorityqueue" + + "github.com/iotaledger/goshimmer/packages/tangle" +) + +type Bucket struct { + mana uint64 + priorityQueue *priorityqueue.PriorityQueue[*tangle.Message] + + sync.RWMutex +} + +func NewManaBucket(mana uint64) *Bucket { + return &Bucket{ + mana: mana, + priorityQueue: priorityqueue.New[*tangle.Message](), + } +} + +func (f *Bucket) Mana() (mana uint64) { + return f.mana +} + +func (f *Bucket) Push(block *tangle.Message) { + f.Lock() + defer f.Unlock() + + f.priorityQueue.Push(block) +} + +func (f *Bucket) Compare(other *Bucket) int { + if f.Mana() < other.Mana() { + return -1 + } + + if f.Mana() > other.Mana() { + return 1 + } + + return 0 +} diff --git a/packages/manaverse/manaverse_test.go b/packages/manaverse/manaverse_test.go new file mode 100644 index 0000000000..ad7574ed5f --- /dev/null +++ b/packages/manaverse/manaverse_test.go @@ -0,0 +1,16 @@ +package manaverse + +import ( + "fmt" + "testing" + "time" +) + +func Test(t *testing.T) { + scheduler := NewScheduler() + scheduler.Start() + + time.Sleep(20 * time.Second) + + fmt.Println(scheduler.iterations) +} diff --git a/packages/manaverse/priorityqueue.go b/packages/manaverse/priorityqueue.go deleted file mode 100644 index 59cd47df55..0000000000 --- a/packages/manaverse/priorityqueue.go +++ /dev/null @@ -1 +0,0 @@ -package manaverse diff --git a/packages/manaverse/scheduler.go b/packages/manaverse/scheduler.go new file mode 100644 index 0000000000..b25926d986 --- /dev/null +++ b/packages/manaverse/scheduler.go @@ -0,0 +1,113 @@ +package manaverse + +import ( + "sync" + "time" + + "github.com/iotaledger/hive.go/generics/priorityqueue" + "github.com/iotaledger/hive.go/types" + + "github.com/iotaledger/goshimmer/packages/tangle" +) + +type Scheduler struct { + queue *priorityqueue.PriorityQueue[*Bucket] + bucketsByMana map[uint64]*Bucket + iterations int + sync.RWMutex +} + +func NewScheduler() *Scheduler { + return &Scheduler{ + queue: priorityqueue.New[*Bucket](), + } +} + +func (s *Scheduler) Push(block *tangle.Message) { + s.Lock() + defer s.Unlock() + + s.bucket(block.BurnedMana()).Push(block) +} + +func (s *Scheduler) Start() { + go func() { + tickerRate := 1 * time.Millisecond + tickerPrecision := 16 * time.Millisecond + + start := time.Now() + for true { + s.iterations++ + + if tickerOffset := start.Add(time.Duration(s.iterations) * tickerRate).Sub(time.Now()); tickerOffset > tickerPrecision { + time.Sleep(tickerOffset) + } + } + }() +} + +func (s *Scheduler) bucket(mana uint64) (bucket *Bucket) { + bucket, exists := s.bucketsByMana[mana] + if exists { + return bucket + } + + bucket = NewManaBucket(mana) + s.bucketsByMana[mana] = bucket + + return bucket +} + +type PrecisionTicker struct { + tickerFunc func() + iterations int + shutdownWG sync.WaitGroup + shutdownChan chan types.Empty +} + +func NewPrecisionTicker(tickerFunc func()) (precisionTicker *PrecisionTicker) { + precisionTicker = &PrecisionTicker{ + tickerFunc: tickerFunc, + shutdownChan: make(chan types.Empty, 1), + } + + go precisionTicker.run() + + return precisionTicker +} + +func (p *PrecisionTicker) Shutdown() { + close(p.shutdownChan) +} + +func (p *PrecisionTicker) WaitForShutdown() { + <-p.shutdownChan +} + +func (p *PrecisionTicker) WaitForGracefulShutdown() { + p.shutdownWG.Wait() +} + +func (p *PrecisionTicker) run() { + p.shutdownWG.Add(1) + defer p.shutdownWG.Done() + + tickerRate := 1 * time.Millisecond + tickerPrecision := 16 * time.Millisecond + + start := time.Now() + for true { + select { + case <-p.shutdownChan: + default: + p.iterations++ + + p.tickerFunc() + + if tickerOffset := start.Add(time.Duration(p.iterations) * tickerRate).Sub(time.Now()); tickerOffset > tickerPrecision { + time.Sleep(tickerOffset) + } + } + + } +} diff --git a/packages/pow/pow.go b/packages/pow/pow.go index 08dbccdac0..fecafa7a0f 100644 --- a/packages/pow/pow.go +++ b/packages/pow/pow.go @@ -98,9 +98,7 @@ func (w *Worker) Mine(ctx context.Context, msg []byte, target int) (uint64, erro // LeadingZeros returns the number of leading zeros in the digest of the given data. func (w *Worker) LeadingZeros(data []byte) (int, error) { - digest := blake2b.Sum512(data) - asAnInt := new(big.Int).SetBytes(digest[:]) - return 8*blake2b.Size - asAnInt.BitLen(), nil + return LeadingZeros(data) } // LeadingZerosWithNonce returns the number of leading zeros in the digest @@ -142,3 +140,10 @@ func (w *Worker) worker(msg []byte, startNonce uint64, target int, done *uint32, func putUint64(b []byte, v uint64) { binary.LittleEndian.PutUint64(b, v) } + +// LeadingZeros returns the number of leading zeros in the digest of the given data. +func LeadingZeros(data []byte) (int, error) { + digest := blake2b.Sum512(data) + asAnInt := new(big.Int).SetBytes(digest[:]) + return 8*blake2b.Size - asAnInt.BitLen(), nil +} diff --git a/packages/tangle/message.go b/packages/tangle/message.go index c09e77fc27..02a80a2d77 100644 --- a/packages/tangle/message.go +++ b/packages/tangle/message.go @@ -23,6 +23,7 @@ import ( "github.com/iotaledger/goshimmer/packages/ledger/utxo" "github.com/iotaledger/goshimmer/packages/ledger/vm/devnetvm" "github.com/iotaledger/goshimmer/packages/markers" + "github.com/iotaledger/goshimmer/packages/pow" "github.com/iotaledger/goshimmer/packages/tangle/payload" ) @@ -292,6 +293,7 @@ type Message struct { model.Storable[MessageID, Message, *Message, MessageModel] `serix:"0"` payload payload.Payload } + type MessageModel struct { // core properties (get sent over the wire) Version uint8 `serix:"0"` @@ -299,9 +301,10 @@ type MessageModel struct { IssuerPublicKey ed25519.PublicKey `serix:"2"` IssuingTime time.Time `serix:"3"` SequenceNumber uint64 `serix:"4"` - PayloadBytes []byte `serix:"5,lengthPrefixType=uint32"` - Nonce uint64 `serix:"6"` - Signature ed25519.Signature `serix:"7"` + BurnedMana uint64 `serix:"5"` + PayloadBytes []byte `serix:"6,lengthPrefixType=uint32"` + Nonce uint64 `serix:"7"` + Signature ed25519.Signature `serix:"8"` } // NewMessage creates a new message with the details provided by the issuer. @@ -424,6 +427,11 @@ func (m *Message) SequenceNumber() uint64 { return m.M.SequenceNumber } +// BurnedMana returns the mana burned by this message. +func (m *Message) BurnedMana() uint64 { + return m.M.BurnedMana +} + // Payload returns the Payload of the message. func (m *Message) Payload() payload.Payload { m.Lock() @@ -491,12 +499,49 @@ func (m *Message) String() string { builder.AddField(stringify.StructField("Issuer", m.IssuerPublicKey())) builder.AddField(stringify.StructField("IssuingTime", m.IssuingTime())) builder.AddField(stringify.StructField("SequenceNumber", m.SequenceNumber())) + builder.AddField(stringify.StructField("BurnedMana", m.BurnedMana())) builder.AddField(stringify.StructField("Payload", m.Payload())) builder.AddField(stringify.StructField("Nonce", m.Nonce())) builder.AddField(stringify.StructField("Signature", m.Signature())) return builder.String() } +// PoW returns the PoW of the Block. +func (m *Message) PoW() (leadingZeros int) { + serializedMessage, err := m.Bytes() + if err != nil { + return 0 + } + + leadingZeros, err = pow.LeadingZeros(serializedMessage) + if err != nil { + return 0 + } + + return leadingZeros +} + +// Compare is a comparator for Blocks. +func (m *Message) Compare(other *Message) int { + if m.BurnedMana() < other.BurnedMana() { + return -1 + } + + if m.BurnedMana() > other.BurnedMana() { + return 1 + } + + if m.PoW() < other.PoW() { + return -1 + } + + if m.PoW() > other.PoW() { + return 1 + } + + return 0 +} + // sorts given parents and returns a new slice with sorted parents func sortParents(parents MessageIDs) (sorted []MessageID) { sorted = parents.Slice() diff --git a/packages/tangle/payload/payload.go b/packages/tangle/payload/payload.go index 20bdd4c6e5..639818b68b 100644 --- a/packages/tangle/payload/payload.go +++ b/packages/tangle/payload/payload.go @@ -11,7 +11,7 @@ import ( // (version(1) + parentsBlocksCount(1) + 4 * (parentsType(1) + parentsCount(1) + 8 * reference(32)) + // issuerPK(32) + issuanceTime(8) + seqNum(8) + payloadLength(4) + nonce(8) + signature(64) // = MaxMessageSize - 1158 bytes = 64378 -const MaxSize = 64378 +const MaxSize = 64370 // Payload represents the generic interface for an object that can be embedded in Messages of the Tangle. type Payload interface { diff --git a/tools/integration-tests/tester/go.mod b/tools/integration-tests/tester/go.mod index e3398286c8..919532353c 100644 --- a/tools/integration-tests/tester/go.mod +++ b/tools/integration-tests/tester/go.mod @@ -7,7 +7,7 @@ require ( github.com/docker/docker v1.13.1 github.com/docker/go-connections v0.4.0 github.com/iotaledger/goshimmer v0.1.3 - github.com/iotaledger/hive.go v0.0.0-20220607150119-1be29e962175 + github.com/iotaledger/hive.go v0.0.0-20220615225152-812f42b43f23 github.com/mr-tron/base58 v1.2.0 github.com/stretchr/testify v1.7.1 golang.org/x/crypto v0.0.0-20220214200702-86341886e292 diff --git a/tools/integration-tests/tester/go.sum b/tools/integration-tests/tester/go.sum index ff87a649e5..a3c84e6872 100644 --- a/tools/integration-tests/tester/go.sum +++ b/tools/integration-tests/tester/go.sum @@ -461,8 +461,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/imkira/go-interpol v1.1.0/go.mod h1:z0h2/2T3XF8kyEPpRgJ3kmNv+C43p+I/CoI+jC3w2iA= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= -github.com/iotaledger/hive.go v0.0.0-20220607150119-1be29e962175 h1:IgXxiPx51WJglOL5EtIurlMbujnrLP4vLYQqyfmR0zg= -github.com/iotaledger/hive.go v0.0.0-20220607150119-1be29e962175/go.mod h1:8f9U7qHFby0W3cxv/nKnz9LHn9BbwWU0tMsWDnfqzRI= +github.com/iotaledger/hive.go v0.0.0-20220615225152-812f42b43f23 h1:gLiEPbxzrnzNOI7JD+kKZqfG5EjKKXRBfYcBO/hgCUY= +github.com/iotaledger/hive.go v0.0.0-20220615225152-812f42b43f23/go.mod h1:8f9U7qHFby0W3cxv/nKnz9LHn9BbwWU0tMsWDnfqzRI= github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= From d20389bf98a9d7a8aa03d3db4ddb36a54e1df52b Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Sat, 18 Jun 2022 01:46:47 +0200 Subject: [PATCH 3/5] Feat: cleaned up some code --- go.mod | 2 +- go.sum | 4 +- packages/manaverse/bucket.go | 18 +++--- packages/manaverse/manaverse_test.go | 7 +-- packages/manaverse/scheduler.go | 88 +++++---------------------- tools/integration-tests/tester/go.mod | 2 +- tools/integration-tests/tester/go.sum | 4 +- 7 files changed, 32 insertions(+), 93 deletions(-) diff --git a/go.mod b/go.mod index a67b81730d..c06e63374d 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/gin-gonic/gin v1.7.0 github.com/go-resty/resty/v2 v2.6.0 github.com/gorilla/websocket v1.5.0 - github.com/iotaledger/hive.go v0.0.0-20220615225152-812f42b43f23 + github.com/iotaledger/hive.go v0.0.0-20220617234148-49535b2e12d9 github.com/labstack/echo v3.3.10+incompatible github.com/labstack/gommon v0.3.0 github.com/libp2p/go-libp2p v0.15.0 diff --git a/go.sum b/go.sum index 206f8a730a..0305a5f203 100644 --- a/go.sum +++ b/go.sum @@ -473,8 +473,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/imkira/go-interpol v1.1.0/go.mod h1:z0h2/2T3XF8kyEPpRgJ3kmNv+C43p+I/CoI+jC3w2iA= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= -github.com/iotaledger/hive.go v0.0.0-20220615225152-812f42b43f23 h1:gLiEPbxzrnzNOI7JD+kKZqfG5EjKKXRBfYcBO/hgCUY= -github.com/iotaledger/hive.go v0.0.0-20220615225152-812f42b43f23/go.mod h1:8f9U7qHFby0W3cxv/nKnz9LHn9BbwWU0tMsWDnfqzRI= +github.com/iotaledger/hive.go v0.0.0-20220617234148-49535b2e12d9 h1:DCruInBsJ6gMaHbes8MzaiO3EksiKzJKXftd9mAgl7I= +github.com/iotaledger/hive.go v0.0.0-20220617234148-49535b2e12d9/go.mod h1:8f9U7qHFby0W3cxv/nKnz9LHn9BbwWU0tMsWDnfqzRI= github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= diff --git a/packages/manaverse/bucket.go b/packages/manaverse/bucket.go index 20ce26fa1a..bc85143825 100644 --- a/packages/manaverse/bucket.go +++ b/packages/manaverse/bucket.go @@ -22,23 +22,23 @@ func NewManaBucket(mana uint64) *Bucket { } } -func (f *Bucket) Mana() (mana uint64) { - return f.mana +func (b *Bucket) Mana() (mana uint64) { + return b.mana } -func (f *Bucket) Push(block *tangle.Message) { - f.Lock() - defer f.Unlock() +func (b *Bucket) Push(block *tangle.Message) { + b.Lock() + defer b.Unlock() - f.priorityQueue.Push(block) + b.priorityQueue.Push(block) } -func (f *Bucket) Compare(other *Bucket) int { - if f.Mana() < other.Mana() { +func (b *Bucket) Compare(other *Bucket) int { + if b.Mana() < other.Mana() { return -1 } - if f.Mana() > other.Mana() { + if b.Mana() > other.Mana() { return 1 } diff --git a/packages/manaverse/manaverse_test.go b/packages/manaverse/manaverse_test.go index ad7574ed5f..cd070b4c02 100644 --- a/packages/manaverse/manaverse_test.go +++ b/packages/manaverse/manaverse_test.go @@ -1,16 +1,15 @@ package manaverse import ( - "fmt" "testing" "time" ) func Test(t *testing.T) { scheduler := NewScheduler() - scheduler.Start() + if false { + scheduler.Push(nil) + } time.Sleep(20 * time.Second) - - fmt.Println(scheduler.iterations) } diff --git a/packages/manaverse/scheduler.go b/packages/manaverse/scheduler.go index b25926d986..66d555579d 100644 --- a/packages/manaverse/scheduler.go +++ b/packages/manaverse/scheduler.go @@ -1,26 +1,32 @@ package manaverse import ( + "fmt" "sync" "time" "github.com/iotaledger/hive.go/generics/priorityqueue" - "github.com/iotaledger/hive.go/types" + "github.com/iotaledger/hive.go/timeutil" "github.com/iotaledger/goshimmer/packages/tangle" ) type Scheduler struct { - queue *priorityqueue.PriorityQueue[*Bucket] + priorityQueue *priorityqueue.PriorityQueue[*Bucket] bucketsByMana map[uint64]*Bucket - iterations int + ticker *timeutil.PrecisionTicker + sync.RWMutex } -func NewScheduler() *Scheduler { - return &Scheduler{ - queue: priorityqueue.New[*Bucket](), +func NewScheduler() (newScheduler *Scheduler) { + newScheduler = &Scheduler{ + priorityQueue: priorityqueue.New[*Bucket](), + bucketsByMana: make(map[uint64]*Bucket, 0), } + newScheduler.ticker = timeutil.NewPrecisionTicker(newScheduler.scheduleMessages, time.Second) + + return newScheduler } func (s *Scheduler) Push(block *tangle.Message) { @@ -30,22 +36,6 @@ func (s *Scheduler) Push(block *tangle.Message) { s.bucket(block.BurnedMana()).Push(block) } -func (s *Scheduler) Start() { - go func() { - tickerRate := 1 * time.Millisecond - tickerPrecision := 16 * time.Millisecond - - start := time.Now() - for true { - s.iterations++ - - if tickerOffset := start.Add(time.Duration(s.iterations) * tickerRate).Sub(time.Now()); tickerOffset > tickerPrecision { - time.Sleep(tickerOffset) - } - } - }() -} - func (s *Scheduler) bucket(mana uint64) (bucket *Bucket) { bucket, exists := s.bucketsByMana[mana] if exists { @@ -58,56 +48,6 @@ func (s *Scheduler) bucket(mana uint64) (bucket *Bucket) { return bucket } -type PrecisionTicker struct { - tickerFunc func() - iterations int - shutdownWG sync.WaitGroup - shutdownChan chan types.Empty -} - -func NewPrecisionTicker(tickerFunc func()) (precisionTicker *PrecisionTicker) { - precisionTicker = &PrecisionTicker{ - tickerFunc: tickerFunc, - shutdownChan: make(chan types.Empty, 1), - } - - go precisionTicker.run() - - return precisionTicker -} - -func (p *PrecisionTicker) Shutdown() { - close(p.shutdownChan) -} - -func (p *PrecisionTicker) WaitForShutdown() { - <-p.shutdownChan -} - -func (p *PrecisionTicker) WaitForGracefulShutdown() { - p.shutdownWG.Wait() -} - -func (p *PrecisionTicker) run() { - p.shutdownWG.Add(1) - defer p.shutdownWG.Done() - - tickerRate := 1 * time.Millisecond - tickerPrecision := 16 * time.Millisecond - - start := time.Now() - for true { - select { - case <-p.shutdownChan: - default: - p.iterations++ - - p.tickerFunc() - - if tickerOffset := start.Add(time.Duration(p.iterations) * tickerRate).Sub(time.Now()); tickerOffset > tickerPrecision { - time.Sleep(tickerOffset) - } - } - - } +func (s *Scheduler) scheduleMessages() { + fmt.Println("SEND") } diff --git a/tools/integration-tests/tester/go.mod b/tools/integration-tests/tester/go.mod index 919532353c..b66eddd228 100644 --- a/tools/integration-tests/tester/go.mod +++ b/tools/integration-tests/tester/go.mod @@ -7,7 +7,7 @@ require ( github.com/docker/docker v1.13.1 github.com/docker/go-connections v0.4.0 github.com/iotaledger/goshimmer v0.1.3 - github.com/iotaledger/hive.go v0.0.0-20220615225152-812f42b43f23 + github.com/iotaledger/hive.go v0.0.0-20220617234148-49535b2e12d9 github.com/mr-tron/base58 v1.2.0 github.com/stretchr/testify v1.7.1 golang.org/x/crypto v0.0.0-20220214200702-86341886e292 diff --git a/tools/integration-tests/tester/go.sum b/tools/integration-tests/tester/go.sum index a3c84e6872..14f9983258 100644 --- a/tools/integration-tests/tester/go.sum +++ b/tools/integration-tests/tester/go.sum @@ -461,8 +461,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/imkira/go-interpol v1.1.0/go.mod h1:z0h2/2T3XF8kyEPpRgJ3kmNv+C43p+I/CoI+jC3w2iA= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= -github.com/iotaledger/hive.go v0.0.0-20220615225152-812f42b43f23 h1:gLiEPbxzrnzNOI7JD+kKZqfG5EjKKXRBfYcBO/hgCUY= -github.com/iotaledger/hive.go v0.0.0-20220615225152-812f42b43f23/go.mod h1:8f9U7qHFby0W3cxv/nKnz9LHn9BbwWU0tMsWDnfqzRI= +github.com/iotaledger/hive.go v0.0.0-20220617234148-49535b2e12d9 h1:DCruInBsJ6gMaHbes8MzaiO3EksiKzJKXftd9mAgl7I= +github.com/iotaledger/hive.go v0.0.0-20220617234148-49535b2e12d9/go.mod h1:8f9U7qHFby0W3cxv/nKnz9LHn9BbwWU0tMsWDnfqzRI= github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= From a68ec1788decf63a2881126eaff777df8bb358cd Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Tue, 21 Jun 2022 11:43:13 +0200 Subject: [PATCH 4/5] Feat: almost done --- go.mod | 2 +- go.sum | 4 +- packages/manaverse/bucket.go | 33 ++---- packages/manaverse/events.go | 13 +++ packages/manaverse/interfaces.go | 10 ++ packages/manaverse/manaverse_test.go | 31 ++++- packages/manaverse/scheduler.go | 159 +++++++++++++++++++++++--- packages/manaverse/testframework.go | 38 ++++++ packages/tangle/message.go | 4 +- packages/tangle/testutils.go | 8 ++ tools/integration-tests/tester/go.mod | 2 +- tools/integration-tests/tester/go.sum | 4 +- 12 files changed, 256 insertions(+), 52 deletions(-) create mode 100644 packages/manaverse/events.go create mode 100644 packages/manaverse/interfaces.go create mode 100644 packages/manaverse/testframework.go diff --git a/go.mod b/go.mod index c06e63374d..adccdf25bc 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/gin-gonic/gin v1.7.0 github.com/go-resty/resty/v2 v2.6.0 github.com/gorilla/websocket v1.5.0 - github.com/iotaledger/hive.go v0.0.0-20220617234148-49535b2e12d9 + github.com/iotaledger/hive.go v0.0.0-20220620133504-13cc326a3d17 github.com/labstack/echo v3.3.10+incompatible github.com/labstack/gommon v0.3.0 github.com/libp2p/go-libp2p v0.15.0 diff --git a/go.sum b/go.sum index 0305a5f203..6925999cb8 100644 --- a/go.sum +++ b/go.sum @@ -473,8 +473,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/imkira/go-interpol v1.1.0/go.mod h1:z0h2/2T3XF8kyEPpRgJ3kmNv+C43p+I/CoI+jC3w2iA= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= -github.com/iotaledger/hive.go v0.0.0-20220617234148-49535b2e12d9 h1:DCruInBsJ6gMaHbes8MzaiO3EksiKzJKXftd9mAgl7I= -github.com/iotaledger/hive.go v0.0.0-20220617234148-49535b2e12d9/go.mod h1:8f9U7qHFby0W3cxv/nKnz9LHn9BbwWU0tMsWDnfqzRI= +github.com/iotaledger/hive.go v0.0.0-20220620133504-13cc326a3d17 h1:VH6ZeNKdnuQ/TrRgZRscyL2jXQeI/pN4RfhNMUXHFL0= +github.com/iotaledger/hive.go v0.0.0-20220620133504-13cc326a3d17/go.mod h1:8f9U7qHFby0W3cxv/nKnz9LHn9BbwWU0tMsWDnfqzRI= github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= diff --git a/packages/manaverse/bucket.go b/packages/manaverse/bucket.go index bc85143825..1d9b4bb838 100644 --- a/packages/manaverse/bucket.go +++ b/packages/manaverse/bucket.go @@ -1,46 +1,31 @@ package manaverse import ( - "sync" - "github.com/iotaledger/hive.go/generics/priorityqueue" "github.com/iotaledger/goshimmer/packages/tangle" ) type Bucket struct { - mana uint64 - priorityQueue *priorityqueue.PriorityQueue[*tangle.Message] + mana int64 - sync.RWMutex + *priorityqueue.PriorityQueue[*tangle.Message] } -func NewManaBucket(mana uint64) *Bucket { +func NewManaBucket(mana int64) *Bucket { return &Bucket{ mana: mana, - priorityQueue: priorityqueue.New[*tangle.Message](), + PriorityQueue: priorityqueue.New[*tangle.Message](), } } -func (b *Bucket) Mana() (mana uint64) { - return b.mana -} - -func (b *Bucket) Push(block *tangle.Message) { - b.Lock() - defer b.Unlock() - - b.priorityQueue.Push(block) -} - func (b *Bucket) Compare(other *Bucket) int { - if b.Mana() < other.Mana() { + switch true { + case b.mana < other.mana: return -1 - } - - if b.Mana() > other.Mana() { + case b.mana > other.mana: return 1 + default: + return 0 } - - return 0 } diff --git a/packages/manaverse/events.go b/packages/manaverse/events.go new file mode 100644 index 0000000000..a0252ba85f --- /dev/null +++ b/packages/manaverse/events.go @@ -0,0 +1,13 @@ +package manaverse + +import ( + "github.com/iotaledger/hive.go/generics/event" + + "github.com/iotaledger/goshimmer/packages/tangle" +) + +type SchedulerEvents struct { + BlockQueued *event.Event[*tangle.Message] + BlockScheduled *event.Event[*tangle.Message] + BlockDropped *event.Event[*tangle.Message] +} diff --git a/packages/manaverse/interfaces.go b/packages/manaverse/interfaces.go new file mode 100644 index 0000000000..d9ffb19824 --- /dev/null +++ b/packages/manaverse/interfaces.go @@ -0,0 +1,10 @@ +package manaverse + +import ( + "github.com/iotaledger/hive.go/identity" +) + +type ManaLedger interface { + IncreaseMana(id identity.ID, mana int64) (newBalance int64) + DecreaseMana(id identity.ID, mana int64) (newBalance int64) +} diff --git a/packages/manaverse/manaverse_test.go b/packages/manaverse/manaverse_test.go index cd070b4c02..cd075f2d1c 100644 --- a/packages/manaverse/manaverse_test.go +++ b/packages/manaverse/manaverse_test.go @@ -1,15 +1,38 @@ package manaverse import ( + "fmt" "testing" "time" + + "github.com/iotaledger/hive.go/crypto/ed25519" + "github.com/iotaledger/hive.go/generics/event" + "github.com/iotaledger/hive.go/identity" + + "github.com/iotaledger/goshimmer/packages/tangle" ) func Test(t *testing.T) { - scheduler := NewScheduler() - if false { - scheduler.Push(nil) - } + identity1KeyPair := ed25519.GenerateKeyPair() + identity1 := identity.New(identity1KeyPair.PublicKey) + + manaLedger := NewMockedManaLedger() + manaLedger.IncreaseMana(identity1.ID(), 100) + + testTangle := tangle.NewTestTangle() + testFramework := tangle.NewMessageTestFramework(testTangle) + testFramework.CreateMessage("A", tangle.WithStrongParents("Genesis"), tangle.WithIssuer(identity1.PublicKey())) + testFramework.CreateMessage("B", tangle.WithStrongParents("A"), tangle.WithIssuer(identity1.PublicKey())) + testFramework.CreateMessage("C", tangle.WithStrongParents("B"), tangle.WithIssuer(identity1.PublicKey())) + + scheduler := NewScheduler(manaLedger) + scheduler.Events.BlockScheduled.Attach(event.NewClosure(func(block *tangle.Message) { + fmt.Println(time.Now(), "SCHEDULED", block.ID()) + })) + + scheduler.Push(testFramework.Message("A")) + scheduler.Push(testFramework.Message("B")) + scheduler.Push(testFramework.Message("C")) time.Sleep(20 * time.Second) } diff --git a/packages/manaverse/scheduler.go b/packages/manaverse/scheduler.go index 66d555579d..ccfdab8073 100644 --- a/packages/manaverse/scheduler.go +++ b/packages/manaverse/scheduler.go @@ -1,53 +1,180 @@ package manaverse import ( - "fmt" "sync" "time" + "github.com/cockroachdb/errors" + "github.com/iotaledger/hive.go/generics/event" + "github.com/iotaledger/hive.go/generics/lo" "github.com/iotaledger/hive.go/generics/priorityqueue" + "github.com/iotaledger/hive.go/identity" "github.com/iotaledger/hive.go/timeutil" "github.com/iotaledger/goshimmer/packages/tangle" ) type Scheduler struct { + Events *SchedulerEvents + + manaLedger ManaLedger priorityQueue *priorityqueue.PriorityQueue[*Bucket] - bucketsByMana map[uint64]*Bucket + bucketsByMana map[int64]*Bucket ticker *timeutil.PrecisionTicker + mutex sync.Mutex - sync.RWMutex + // internal cache + unscheduledParentsCounter map[tangle.MessageID]int + unscheduledBlockChildren map[tangle.MessageID][]*tangle.Message } -func NewScheduler() (newScheduler *Scheduler) { +func NewScheduler(manaLedger ManaLedger) (newScheduler *Scheduler) { newScheduler = &Scheduler{ + Events: &SchedulerEvents{ + BlockQueued: event.New[*tangle.Message](), + BlockScheduled: event.New[*tangle.Message](), + BlockDropped: event.New[*tangle.Message](), + }, + + manaLedger: manaLedger, priorityQueue: priorityqueue.New[*Bucket](), - bucketsByMana: make(map[uint64]*Bucket, 0), + bucketsByMana: make(map[int64]*Bucket, 0), + + unscheduledParentsCounter: make(map[tangle.MessageID]int), + unscheduledBlockChildren: make(map[tangle.MessageID][]*tangle.Message), } - newScheduler.ticker = timeutil.NewPrecisionTicker(newScheduler.scheduleMessages, time.Second) + newScheduler.ticker = timeutil.NewPrecisionTicker(newScheduler.scheduleNextBlock, 500*time.Millisecond) return newScheduler } +func (s *Scheduler) Setup() { + // TODO: trigger cleanupQueuedBlock on blocks that are confirmed as well + // TODO: trigger cleanupScheduledBlock on blocks that are confirmed as well + // s.tangle.Events.BlockConfirmed.Hook(event.NewClosure(s.cleanupQueuedBlock)) + // s.tangle.Events.BlockConfirmed.Hook(event.NewClosure(s.cleanupScheduledBlock)) +} + func (s *Scheduler) Push(block *tangle.Message) { - s.Lock() - defer s.Unlock() + s.mutex.Lock() + defer s.mutex.Unlock() + + if !s.hasUnscheduledParents(block) { + s.queueBlock(block) + } +} + +func (s *Scheduler) scheduleNextBlock() { + s.mutex.Lock() + defer s.mutex.Unlock() + + if blockToSchedule := s.nextBlockToSchedule(); blockToSchedule != nil { + s.cleanupScheduledBlock(blockToSchedule) + + s.Events.BlockScheduled.Trigger(blockToSchedule) + } +} + +func (s *Scheduler) queueBlock(block *tangle.Message) { + s.cleanupQueuedBlock(block) s.bucket(block.BurnedMana()).Push(block) + + s.Events.BlockQueued.Trigger(block) } -func (s *Scheduler) bucket(mana uint64) (bucket *Bucket) { - bucket, exists := s.bucketsByMana[mana] - if exists { - return bucket +func (s *Scheduler) cleanupQueuedBlock(block *tangle.Message) { + delete(s.unscheduledParentsCounter, block.ID()) +} + +func (s *Scheduler) cleanupScheduledBlock(block *tangle.Message) { + s.decreaseUnscheduledParentsCounters(block.ID()) + delete(s.unscheduledBlockChildren, block.ID()) +} + +func (s *Scheduler) nextBlockToSchedule() (blockToSchedule *tangle.Message) { + for blockToSchedule = s.nextBlock(); blockToSchedule != nil && !s.issuerCanIssue(blockToSchedule); blockToSchedule = s.nextBlock() { + s.Events.BlockDropped.Trigger(blockToSchedule) + + // TODO: REMEMBER IT WAS DROPPED AN IGNORE FUTURE CONE? } - bucket = NewManaBucket(mana) - s.bucketsByMana[mana] = bucket + return blockToSchedule +} + +func (s *Scheduler) nextBlock() (block *tangle.Message) { + bucket, success := s.priorityQueue.Peek() + if !success { + return nil + } + + if block, success = bucket.Pop(); !success { + panic(errors.Errorf("bucket %v should never be empty", bucket)) + } + + if bucket.IsEmpty() { + s.dropBucket() + } + + return block +} + +func (s *Scheduler) dropBucket() { + bucket, exists := s.priorityQueue.Pop() + if !exists { + panic(errors.New("bucket should never be empty")) + } + + delete(s.bucketsByMana, bucket.mana) + + // TODO: UPDATE FEE ESTIMATION +} + +func (s *Scheduler) issuerCanIssue(block *tangle.Message) (canSchedule bool) { + return s.manaLedger.DecreaseMana(identity.NewID(block.IssuerPublicKey()), block.BurnedMana()) >= 0 +} + +func (s *Scheduler) hasUnscheduledParents(block *tangle.Message) (hasUnscheduledParents bool) { + s.unscheduledBlockChildren[block.ID()] = make([]*tangle.Message, 0) + + if unscheduledParents := s.unscheduledParents(block); unscheduledParents > 0 { + s.unscheduledParentsCounter[block.ID()] = unscheduledParents + + return true + } + + return false +} + +func (s *Scheduler) unscheduledParents(block *tangle.Message) (unscheduledParents int) { + for it := lo.Unique(block.Parents()).Iterator(); it.HasNext(); { + parentID := it.Next() + + if children, isUnscheduled := s.unscheduledBlockChildren[parentID]; isUnscheduled { + s.unscheduledBlockChildren[parentID] = append(children, block) + + unscheduledParents++ + } + } + + return unscheduledParents +} + +func (s *Scheduler) bucket(mana int64) (bucket *Bucket) { + bucket, exists := s.bucketsByMana[mana] + if !exists { + bucket = NewManaBucket(mana) + s.bucketsByMana[mana] = bucket + s.priorityQueue.Push(bucket) + } return bucket } -func (s *Scheduler) scheduleMessages() { - fmt.Println("SEND") +func (s *Scheduler) decreaseUnscheduledParentsCounters(blockID tangle.MessageID) { + for _, child := range s.unscheduledBlockChildren[blockID] { + if s.unscheduledParentsCounter[child.ID()]--; s.unscheduledParentsCounter[child.ID()] == 0 { + s.queueBlock(child) + } + } } diff --git a/packages/manaverse/testframework.go b/packages/manaverse/testframework.go new file mode 100644 index 0000000000..7cc87f950f --- /dev/null +++ b/packages/manaverse/testframework.go @@ -0,0 +1,38 @@ +package manaverse + +import ( + "sync" + + "github.com/iotaledger/hive.go/identity" +) + +type MockedManaLedger struct { + manaBalances map[identity.ID]int64 + manaBalancesMutex sync.Mutex +} + +func NewMockedManaLedger() (newMockedManaLedger *MockedManaLedger) { + return &MockedManaLedger{ + manaBalances: make(map[identity.ID]int64), + } +} + +func (m *MockedManaLedger) IncreaseMana(id identity.ID, mana int64) (newBalance int64) { + m.manaBalancesMutex.Lock() + defer m.manaBalancesMutex.Unlock() + + m.manaBalances[id] += mana + + return m.manaBalances[id] +} + +func (m *MockedManaLedger) DecreaseMana(id identity.ID, mana int64) (newBalance int64) { + m.manaBalancesMutex.Lock() + defer m.manaBalancesMutex.Unlock() + + m.manaBalances[id] -= mana + + return m.manaBalances[id] +} + +var _ ManaLedger = new(MockedManaLedger) diff --git a/packages/tangle/message.go b/packages/tangle/message.go index 02a80a2d77..952374861b 100644 --- a/packages/tangle/message.go +++ b/packages/tangle/message.go @@ -301,7 +301,7 @@ type MessageModel struct { IssuerPublicKey ed25519.PublicKey `serix:"2"` IssuingTime time.Time `serix:"3"` SequenceNumber uint64 `serix:"4"` - BurnedMana uint64 `serix:"5"` + BurnedMana int64 `serix:"5"` PayloadBytes []byte `serix:"6,lengthPrefixType=uint32"` Nonce uint64 `serix:"7"` Signature ed25519.Signature `serix:"8"` @@ -428,7 +428,7 @@ func (m *Message) SequenceNumber() uint64 { } // BurnedMana returns the mana burned by this message. -func (m *Message) BurnedMana() uint64 { +func (m *Message) BurnedMana() int64 { return m.M.BurnedMana } diff --git a/packages/tangle/testutils.go b/packages/tangle/testutils.go index 11ad4bd4a5..0a130cfebc 100644 --- a/packages/tangle/testutils.go +++ b/packages/tangle/testutils.go @@ -475,6 +475,7 @@ type MessageTestFrameworkMessageOptions struct { shallowDislikeParents map[string]types.Empty issuer ed25519.PublicKey issuingTime time.Time + burnedMana int64 reattachmentMessageAlias string sequenceNumber uint64 overrideSequenceNumber bool @@ -568,6 +569,13 @@ func WithIssuer(issuer ed25519.PublicKey) MessageOption { } } +// WithBurnedMana returns a MessageOption that is used to define the amount of burned mana. +func WithBurnedMana(burnedMana int64) MessageOption { + return func(options *MessageTestFrameworkMessageOptions) { + options.burnedMana = burnedMana + } +} + // WithIssuingTime returns a MessageOption that is used to set issuing time of the Message. func WithIssuingTime(issuingTime time.Time) MessageOption { return func(options *MessageTestFrameworkMessageOptions) { diff --git a/tools/integration-tests/tester/go.mod b/tools/integration-tests/tester/go.mod index b66eddd228..ab6f707660 100644 --- a/tools/integration-tests/tester/go.mod +++ b/tools/integration-tests/tester/go.mod @@ -7,7 +7,7 @@ require ( github.com/docker/docker v1.13.1 github.com/docker/go-connections v0.4.0 github.com/iotaledger/goshimmer v0.1.3 - github.com/iotaledger/hive.go v0.0.0-20220617234148-49535b2e12d9 + github.com/iotaledger/hive.go v0.0.0-20220620133504-13cc326a3d17 github.com/mr-tron/base58 v1.2.0 github.com/stretchr/testify v1.7.1 golang.org/x/crypto v0.0.0-20220214200702-86341886e292 diff --git a/tools/integration-tests/tester/go.sum b/tools/integration-tests/tester/go.sum index 14f9983258..59c5afe511 100644 --- a/tools/integration-tests/tester/go.sum +++ b/tools/integration-tests/tester/go.sum @@ -461,8 +461,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/imkira/go-interpol v1.1.0/go.mod h1:z0h2/2T3XF8kyEPpRgJ3kmNv+C43p+I/CoI+jC3w2iA= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= -github.com/iotaledger/hive.go v0.0.0-20220617234148-49535b2e12d9 h1:DCruInBsJ6gMaHbes8MzaiO3EksiKzJKXftd9mAgl7I= -github.com/iotaledger/hive.go v0.0.0-20220617234148-49535b2e12d9/go.mod h1:8f9U7qHFby0W3cxv/nKnz9LHn9BbwWU0tMsWDnfqzRI= +github.com/iotaledger/hive.go v0.0.0-20220620133504-13cc326a3d17 h1:VH6ZeNKdnuQ/TrRgZRscyL2jXQeI/pN4RfhNMUXHFL0= +github.com/iotaledger/hive.go v0.0.0-20220620133504-13cc326a3d17/go.mod h1:8f9U7qHFby0W3cxv/nKnz9LHn9BbwWU0tMsWDnfqzRI= github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= From a819b6d41823a8a84ef565c2fab0284b1b659d9b Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Wed, 22 Jun 2022 13:08:46 +0200 Subject: [PATCH 5/5] Refactor: refactored code --- packages/manaverse/bucket.go | 2 +- packages/manaverse/events.go | 58 +++++++++- packages/manaverse/manaverse_test.go | 18 ++- packages/manaverse/scheduler.go | 159 +++++++++++++++------------ 4 files changed, 155 insertions(+), 82 deletions(-) diff --git a/packages/manaverse/bucket.go b/packages/manaverse/bucket.go index 1d9b4bb838..489277eec6 100644 --- a/packages/manaverse/bucket.go +++ b/packages/manaverse/bucket.go @@ -12,7 +12,7 @@ type Bucket struct { *priorityqueue.PriorityQueue[*tangle.Message] } -func NewManaBucket(mana int64) *Bucket { +func newManaBucket(mana int64) *Bucket { return &Bucket{ mana: mana, PriorityQueue: priorityqueue.New[*tangle.Message](), diff --git a/packages/manaverse/events.go b/packages/manaverse/events.go index a0252ba85f..0925e5ffee 100644 --- a/packages/manaverse/events.go +++ b/packages/manaverse/events.go @@ -1,13 +1,65 @@ package manaverse import ( + "time" + "github.com/iotaledger/hive.go/generics/event" "github.com/iotaledger/goshimmer/packages/tangle" ) +// region SchedulerEvents ////////////////////////////////////////////////////////////////////////////////////////////// + type SchedulerEvents struct { - BlockQueued *event.Event[*tangle.Message] - BlockScheduled *event.Event[*tangle.Message] - BlockDropped *event.Event[*tangle.Message] + BlockQueued *event.Event[*SchedulerBlockEvent] + BlockScheduled *event.Event[*SchedulerBlockEvent] + BlockDropped *event.Event[*SchedulerBlockEvent] + BucketProcessingStarted *event.Event[*SchedulerBucketEvent] + BucketProcessingFinished *event.Event[*SchedulerBucketEvent] } + +func newSchedulerEvents() (newInstance *SchedulerEvents) { + return &SchedulerEvents{ + BlockQueued: event.New[*SchedulerBlockEvent](), + BlockScheduled: event.New[*SchedulerBlockEvent](), + BlockDropped: event.New[*SchedulerBlockEvent](), + BucketProcessingStarted: event.New[*SchedulerBucketEvent](), + BucketProcessingFinished: event.New[*SchedulerBucketEvent](), + } +} + +// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// + +// region SchedulerBlockEvent ////////////////////////////////////////////////////////////////////////////////////////// + +type SchedulerBlockEvent struct { + Block *tangle.Message + Bucket int64 + Time time.Time +} + +func newSchedulerBlockEvent(block *tangle.Message, bucket int64, time time.Time) (newInstance *SchedulerBlockEvent) { + return &SchedulerBlockEvent{ + Block: block, + Bucket: bucket, + Time: time, + } +} + +// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// + +// region SchedulerBucketEvent ///////////////////////////////////////////////////////////////////////////////////////// + +type SchedulerBucketEvent struct { + Bucket int64 + Time time.Time +} + +func newSchedulerBucketEvent(bucket int64, time time.Time) (newInstance *SchedulerBucketEvent) { + return &SchedulerBucketEvent{ + Bucket: bucket, + Time: time, + } +} + +// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/packages/manaverse/manaverse_test.go b/packages/manaverse/manaverse_test.go index cd075f2d1c..b092e233a5 100644 --- a/packages/manaverse/manaverse_test.go +++ b/packages/manaverse/manaverse_test.go @@ -23,16 +23,24 @@ func Test(t *testing.T) { testFramework := tangle.NewMessageTestFramework(testTangle) testFramework.CreateMessage("A", tangle.WithStrongParents("Genesis"), tangle.WithIssuer(identity1.PublicKey())) testFramework.CreateMessage("B", tangle.WithStrongParents("A"), tangle.WithIssuer(identity1.PublicKey())) - testFramework.CreateMessage("C", tangle.WithStrongParents("B"), tangle.WithIssuer(identity1.PublicKey())) + testFramework.CreateMessage("C", tangle.WithStrongParents("A"), tangle.WithIssuer(identity1.PublicKey())) - scheduler := NewScheduler(manaLedger) - scheduler.Events.BlockScheduled.Attach(event.NewClosure(func(block *tangle.Message) { - fmt.Println(time.Now(), "SCHEDULED", block.ID()) + scheduler := NewScheduler(testTangle.ConfirmationOracle, manaLedger) + scheduler.Events.BlockScheduled.Hook(event.NewClosure(func(event *SchedulerBlockEvent) { + fmt.Println(event.Time, "BlockScheduled", event.Block.ID(), event.Bucket) + })) + + scheduler.Events.BucketProcessingStarted.Hook(event.NewClosure(func(event *SchedulerBucketEvent) { + fmt.Println(event.Time, "BucketProcessingStarted", event.Bucket) + })) + + scheduler.Events.BucketProcessingFinished.Hook(event.NewClosure(func(event *SchedulerBucketEvent) { + fmt.Println(event.Time, "BucketProcessingFinished", event.Bucket) })) scheduler.Push(testFramework.Message("A")) scheduler.Push(testFramework.Message("B")) scheduler.Push(testFramework.Message("C")) - time.Sleep(20 * time.Second) + time.Sleep(2 * time.Second) } diff --git a/packages/manaverse/scheduler.go b/packages/manaverse/scheduler.go index ccfdab8073..b43f10bbfb 100644 --- a/packages/manaverse/scheduler.go +++ b/packages/manaverse/scheduler.go @@ -17,42 +17,45 @@ import ( type Scheduler struct { Events *SchedulerEvents - manaLedger ManaLedger - priorityQueue *priorityqueue.PriorityQueue[*Bucket] - bucketsByMana map[int64]*Bucket - ticker *timeutil.PrecisionTicker - mutex sync.Mutex + confirmationOracle tangle.ConfirmationOracle + manaLedger ManaLedger + priorityQueue *priorityqueue.PriorityQueue[*Bucket] + bucketsByMana map[int64]*Bucket + currentBucket int64 + ticker *timeutil.PrecisionTicker + mutex sync.Mutex - // internal cache - unscheduledParentsCounter map[tangle.MessageID]int - unscheduledBlockChildren map[tangle.MessageID][]*tangle.Message + // unqueuedBlocks + unqueuedBlocks map[tangle.MessageID]int + unscheduledBlocks map[tangle.MessageID][]*tangle.Message } -func NewScheduler(manaLedger ManaLedger) (newScheduler *Scheduler) { +func NewScheduler(confirmationOracle tangle.ConfirmationOracle, manaLedger ManaLedger) (newScheduler *Scheduler) { newScheduler = &Scheduler{ - Events: &SchedulerEvents{ - BlockQueued: event.New[*tangle.Message](), - BlockScheduled: event.New[*tangle.Message](), - BlockDropped: event.New[*tangle.Message](), - }, - - manaLedger: manaLedger, - priorityQueue: priorityqueue.New[*Bucket](), - bucketsByMana: make(map[int64]*Bucket, 0), - - unscheduledParentsCounter: make(map[tangle.MessageID]int), - unscheduledBlockChildren: make(map[tangle.MessageID][]*tangle.Message), + Events: newSchedulerEvents(), + confirmationOracle: confirmationOracle, + manaLedger: manaLedger, + priorityQueue: priorityqueue.New[*Bucket](), + bucketsByMana: make(map[int64]*Bucket, 0), + currentBucket: -1, + unqueuedBlocks: make(map[tangle.MessageID]int), + unscheduledBlocks: make(map[tangle.MessageID][]*tangle.Message), } - newScheduler.ticker = timeutil.NewPrecisionTicker(newScheduler.scheduleNextBlock, 500*time.Millisecond) + newScheduler.ticker = timeutil.NewPrecisionTicker(newScheduler.ScheduleBlock, 500*time.Millisecond) return newScheduler } func (s *Scheduler) Setup() { - // TODO: trigger cleanupQueuedBlock on blocks that are confirmed as well - // TODO: trigger cleanupScheduledBlock on blocks that are confirmed as well - // s.tangle.Events.BlockConfirmed.Hook(event.NewClosure(s.cleanupQueuedBlock)) - // s.tangle.Events.BlockConfirmed.Hook(event.NewClosure(s.cleanupScheduledBlock)) + s.confirmationOracle.Events().MessageConfirmed.Attach(event.NewClosure(s.onMessageConfirmed)) +} + +func (s *Scheduler) onMessageConfirmed(event *tangle.MessageConfirmedEvent) { + s.mutex.Lock() + defer s.mutex.Unlock() + + s.markBlockQueued(event.Message) + s.markBlockScheduled(event.Message, time.Now()) } func (s *Scheduler) Push(block *tangle.Message) { @@ -60,85 +63,95 @@ func (s *Scheduler) Push(block *tangle.Message) { defer s.mutex.Unlock() if !s.hasUnscheduledParents(block) { - s.queueBlock(block) + s.queueBlock(block, time.Now()) } } -func (s *Scheduler) scheduleNextBlock() { +func (s *Scheduler) ScheduleBlock() { s.mutex.Lock() defer s.mutex.Unlock() - if blockToSchedule := s.nextBlockToSchedule(); blockToSchedule != nil { - s.cleanupScheduledBlock(blockToSchedule) - - s.Events.BlockScheduled.Trigger(blockToSchedule) - } + s.scheduleNextBlock(time.Now()) } -func (s *Scheduler) queueBlock(block *tangle.Message) { - s.cleanupQueuedBlock(block) +func (s *Scheduler) queueBlock(block *tangle.Message, now time.Time) { + bucket := s.bucket(block.BurnedMana()) + bucket.Push(block) - s.bucket(block.BurnedMana()).Push(block) + s.markBlockQueued(block) - s.Events.BlockQueued.Trigger(block) + s.Events.BlockQueued.Trigger(newSchedulerBlockEvent(block, bucket.mana, now)) } -func (s *Scheduler) cleanupQueuedBlock(block *tangle.Message) { - delete(s.unscheduledParentsCounter, block.ID()) +func (s *Scheduler) markBlockQueued(block *tangle.Message) { + delete(s.unqueuedBlocks, block.ID()) } -func (s *Scheduler) cleanupScheduledBlock(block *tangle.Message) { - s.decreaseUnscheduledParentsCounters(block.ID()) - delete(s.unscheduledBlockChildren, block.ID()) +func (s *Scheduler) markBlockScheduled(block *tangle.Message, now time.Time) { + s.decreaseUnscheduledParentCountersOfChildren(block.ID(), now) + + delete(s.unscheduledBlocks, block.ID()) } -func (s *Scheduler) nextBlockToSchedule() (blockToSchedule *tangle.Message) { - for blockToSchedule = s.nextBlock(); blockToSchedule != nil && !s.issuerCanIssue(blockToSchedule); blockToSchedule = s.nextBlock() { - s.Events.BlockDropped.Trigger(blockToSchedule) +func (s *Scheduler) dropBucket() { + bucket, exists := s.priorityQueue.Pop() + if !exists { + panic(errors.New("bucket should never be empty")) + } - // TODO: REMEMBER IT WAS DROPPED AN IGNORE FUTURE CONE? + delete(s.bucketsByMana, bucket.mana) + s.currentBucket = -1 +} + +func (s *Scheduler) scheduleNextBlock(now time.Time) { + blockToSchedule, bucket := s.nextBlock(now) + for ; blockToSchedule != nil && !s.issuerHasEnoughMana(blockToSchedule); blockToSchedule, bucket = s.nextBlock(now) { + s.Events.BlockDropped.Trigger(newSchedulerBlockEvent(blockToSchedule, bucket, now)) + + } + + if blockToSchedule == nil { + return } - return blockToSchedule + s.markBlockScheduled(blockToSchedule, now) + + s.Events.BlockScheduled.Trigger(newSchedulerBlockEvent(blockToSchedule, bucket, now)) } -func (s *Scheduler) nextBlock() (block *tangle.Message) { - bucket, success := s.priorityQueue.Peek() +func (s *Scheduler) nextBlock(now time.Time) (block *tangle.Message, bucket int64) { + firstBucket, success := s.priorityQueue.Peek() if !success { - return nil + return nil, 0 } - if block, success = bucket.Pop(); !success { - panic(errors.Errorf("bucket %v should never be empty", bucket)) + if s.currentBucket != firstBucket.mana { + s.currentBucket = firstBucket.mana + s.Events.BucketProcessingStarted.Trigger(newSchedulerBucketEvent(firstBucket.mana, now)) } - if bucket.IsEmpty() { - s.dropBucket() + if block, success = firstBucket.Pop(); !success { + panic(errors.Errorf("bucket %v should never be empty", firstBucket)) } - return block -} + if firstBucket.IsEmpty() { + s.Events.BucketProcessingFinished.Trigger(newSchedulerBucketEvent(firstBucket.mana, now)) -func (s *Scheduler) dropBucket() { - bucket, exists := s.priorityQueue.Pop() - if !exists { - panic(errors.New("bucket should never be empty")) + s.dropBucket() } - delete(s.bucketsByMana, bucket.mana) - - // TODO: UPDATE FEE ESTIMATION + return block, firstBucket.mana } -func (s *Scheduler) issuerCanIssue(block *tangle.Message) (canSchedule bool) { +func (s *Scheduler) issuerHasEnoughMana(block *tangle.Message) (canSchedule bool) { return s.manaLedger.DecreaseMana(identity.NewID(block.IssuerPublicKey()), block.BurnedMana()) >= 0 } func (s *Scheduler) hasUnscheduledParents(block *tangle.Message) (hasUnscheduledParents bool) { - s.unscheduledBlockChildren[block.ID()] = make([]*tangle.Message, 0) + s.unscheduledBlocks[block.ID()] = make([]*tangle.Message, 0) if unscheduledParents := s.unscheduledParents(block); unscheduledParents > 0 { - s.unscheduledParentsCounter[block.ID()] = unscheduledParents + s.unqueuedBlocks[block.ID()] = unscheduledParents return true } @@ -150,8 +163,8 @@ func (s *Scheduler) unscheduledParents(block *tangle.Message) (unscheduledParent for it := lo.Unique(block.Parents()).Iterator(); it.HasNext(); { parentID := it.Next() - if children, isUnscheduled := s.unscheduledBlockChildren[parentID]; isUnscheduled { - s.unscheduledBlockChildren[parentID] = append(children, block) + if children, isUnscheduled := s.unscheduledBlocks[parentID]; isUnscheduled { + s.unscheduledBlocks[parentID] = append(children, block) unscheduledParents++ } @@ -163,7 +176,7 @@ func (s *Scheduler) unscheduledParents(block *tangle.Message) (unscheduledParent func (s *Scheduler) bucket(mana int64) (bucket *Bucket) { bucket, exists := s.bucketsByMana[mana] if !exists { - bucket = NewManaBucket(mana) + bucket = newManaBucket(mana) s.bucketsByMana[mana] = bucket s.priorityQueue.Push(bucket) } @@ -171,10 +184,10 @@ func (s *Scheduler) bucket(mana int64) (bucket *Bucket) { return bucket } -func (s *Scheduler) decreaseUnscheduledParentsCounters(blockID tangle.MessageID) { - for _, child := range s.unscheduledBlockChildren[blockID] { - if s.unscheduledParentsCounter[child.ID()]--; s.unscheduledParentsCounter[child.ID()] == 0 { - s.queueBlock(child) +func (s *Scheduler) decreaseUnscheduledParentCountersOfChildren(blockID tangle.MessageID, now time.Time) { + for _, child := range s.unscheduledBlocks[blockID] { + if s.unqueuedBlocks[child.ID()]--; s.unqueuedBlocks[child.ID()] == 0 { + s.queueBlock(child, now) } } }