-
Notifications
You must be signed in to change notification settings - Fork 371
Feature: Add solidification stage to pipeline and only broadcast solid transactions #1646
base: dev
Are you sure you want to change the base?
Changes from 4 commits
53ea166
be7c132
6593388
33f193f
ff58495
1a0ea43
ad3a203
2283fb0
24e31a5
03590dd
06242eb
1ec3794
bc5897f
62a2330
9fc4cba
ccca691
f0c1955
fa72acc
c3b1417
1538b36
a34c5e8
14e784d
89ec550
8801ef2
26b9521
16170cf
7e34c91
778c833
0b61c89
1a2d950
864c2db
4bfb138
d623d1d
48258b1
1cde4bf
6543981
7b4d3c5
5e0a974
f86fe55
de08907
4086fce
b16c752
f5de8fe
30f226e
303648b
7e36e1c
eb53b2f
0409d3e
1b5294a
ab6e628
839d9cb
031180b
f61e6ff
6e0a383
4126903
ff4a13c
5bd3066
10baada
4327b10
092cfa5
cc220b0
82359a7
7adfdc5
d3baeee
ffea048
0f29416
0084b48
4664000
cc5e8b8
d488a1d
9ad4886
9ab2a7d
5663d30
b2cb269
31223b3
ea3bbf8
e4feaf6
059e7e4
7528b22
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
package com.iota.iri.network.pipeline; | ||
|
||
import java.util.concurrent.ArrayBlockingQueue; | ||
import java.util.concurrent.BlockingQueue; | ||
|
||
/** | ||
* A queue for transactions intended to be submitted to the {@link BroadcastStage} | ||
* for processing | ||
*/ | ||
public class BroadcastQueue { | ||
|
||
/** A blocking queue to store transactions for broadcasting */ | ||
private BlockingQueue<ProcessingContext> broadcastStageQueue = new ArrayBlockingQueue<>(100); | ||
DyrellC marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
/** An object to be used for synchronizing calls */ | ||
private final Object broadcastSync = new Object(); | ||
|
||
|
||
/** | ||
* Add transactions to the Broadcast Queue | ||
* @param context Transaction context to be passed to the {@link BroadcastStage} | ||
* @return True if added properly, False if not | ||
*/ | ||
public boolean add(ProcessingContext context) { | ||
synchronized (broadcastSync) { | ||
try { | ||
this.broadcastStageQueue.put(context); | ||
return true; | ||
} catch (Exception e) { | ||
return false; | ||
} | ||
} | ||
|
||
} | ||
|
||
/** | ||
* Getter for the current Broadcast Queue | ||
* @return BlockingQueue of all transactions left to be broadcasted | ||
*/ | ||
public BlockingQueue<ProcessingContext> get(){ | ||
/** Call is synchronized to ensure all pending additions have completed before sending the state */ | ||
synchronized (broadcastSync) { | ||
return this.broadcastStageQueue; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -66,8 +66,8 @@ public class TransactionProcessingPipelineImpl implements TransactionProcessingP | |
private BlockingQueue<ProcessingContext> preProcessStageQueue = new ArrayBlockingQueue<>(100); | ||
private BlockingQueue<ProcessingContext> validationStageQueue = new ArrayBlockingQueue<>(100); | ||
private BlockingQueue<ProcessingContext> receivedStageQueue = new ArrayBlockingQueue<>(100); | ||
private BlockingQueue<ProcessingContext> broadcastStageQueue = new ArrayBlockingQueue<>(100); | ||
private BlockingQueue<ProcessingContext> replyStageQueue = new ArrayBlockingQueue<>(100); | ||
private BroadcastQueue broadcastStageQueue; | ||
|
||
/** | ||
* Creates a {@link TransactionProcessingPipeline}. | ||
|
@@ -84,7 +84,7 @@ public class TransactionProcessingPipelineImpl implements TransactionProcessingP | |
public TransactionProcessingPipelineImpl(NeighborRouter neighborRouter, NodeConfig config, | ||
TransactionValidator txValidator, Tangle tangle, SnapshotProvider snapshotProvider, | ||
TipsViewModel tipsViewModel, LatestMilestoneTracker latestMilestoneTracker, | ||
TransactionRequester transactionRequester) { | ||
TransactionRequester transactionRequester, BroadcastQueue broadcastStageQueue) { | ||
FIFOCache<Long, Hash> recentlySeenBytesCache = new FIFOCache<>(config.getCacheSizeBytes()); | ||
this.preProcessStage = new PreProcessStage(recentlySeenBytesCache); | ||
this.replyStage = new ReplyStage(neighborRouter, config, tangle, tipsViewModel, latestMilestoneTracker, | ||
|
@@ -94,6 +94,7 @@ public TransactionProcessingPipelineImpl(NeighborRouter neighborRouter, NodeConf | |
this.receivedStage = new ReceivedStage(tangle, txValidator, snapshotProvider, transactionRequester); | ||
this.batchedHasher = BatchedHasherFactory.create(BatchedHasherFactory.Type.BCTCURL81, 20); | ||
this.hashingStage = new HashingStage(batchedHasher); | ||
this.broadcastStageQueue = broadcastStageQueue; | ||
} | ||
|
||
@Override | ||
|
@@ -103,7 +104,7 @@ public void start() { | |
addStage("validation", validationStageQueue, validationStage); | ||
addStage("reply", replyStageQueue, replyStage); | ||
addStage("received", receivedStageQueue, receivedStage); | ||
addStage("broadcast", broadcastStageQueue, broadcastStage); | ||
addStage("broadcast", broadcastStageQueue.get(), broadcastStage); | ||
} | ||
|
||
/** | ||
|
@@ -118,7 +119,14 @@ private void addStage(String name, BlockingQueue<ProcessingContext> queue, | |
stagesThreadPool.submit(new Thread(() -> { | ||
try { | ||
while (!Thread.currentThread().isInterrupted()) { | ||
ProcessingContext ctx = stage.process(queue.take()); | ||
ProcessingContext queueTake; | ||
if(name.equals("broadcast")) { | ||
queueTake = broadcastStageQueue.get().take(); | ||
} else{ | ||
queueTake = queue.take(); | ||
} | ||
ProcessingContext ctx = stage.process(queueTake); | ||
|
||
switch (ctx.getNextStage()) { | ||
case REPLY: | ||
replyStageQueue.put(ctx); | ||
|
@@ -135,7 +143,7 @@ private void addStage(String name, BlockingQueue<ProcessingContext> queue, | |
receivedStageQueue.put(payload.getRight()); | ||
break; | ||
case BROADCAST: | ||
broadcastStageQueue.put(ctx); | ||
broadcastStageQueue.add(ctx); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So if I understand correctly, you still add transactions that are not solid... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @galrogo Just to address this comment and subsequently issue #1512, currently transaction requests are dependent on broadcasting, so if you do not broadcast unsolid transactions, you will never send a transaction request. So it is necessary to pass on transactions before they are solid in the current framework. May be worth discussing a new method of transaction requesting from neighbours to avoid this dependency. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we just broadcast random solid tips like before? I guess moving on to the new messages that were developed for hornet will solve this problem There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC, you implemented my suggestion above? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When you say before are you referring to before the refactor? Because as is, after a transaction is placed into the solidifier, the solidify stage will fetch a random solid tip and broadcast it. If the tip is not solid the pipeline goes straight to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @DyrellC, marking this as point to discuss before merge There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. never mind, I am good |
||
break; | ||
case ABORT: | ||
break; | ||
|
@@ -160,7 +168,7 @@ public BlockingQueue<ProcessingContext> getReceivedStageQueue() { | |
|
||
@Override | ||
public BlockingQueue<ProcessingContext> getBroadcastStageQueue() { | ||
return broadcastStageQueue; | ||
return broadcastStageQueue.get(); | ||
} | ||
|
||
@Override | ||
|
Uh oh!
There was an error while loading. Please reload this page.