11#include " et_feeder/et_feeder.h"
2+ #include < cassert>
23
34using namespace std ;
45using namespace Chakra ;
56
67ETFeeder::ETFeeder (string filename)
7- : trace_(filename), window_size_(4096 * 256 ), et_complete_(false ) {
8+ : trace_(filename), window_size_(4096 * 256 ), et_complete_(false ) {
89 readGlobalMetadata ();
910 readNextWindow ();
1011}
1112
12- ETFeeder::~ETFeeder () {
13- }
13+ ETFeeder::~ETFeeder () {}
1414
1515void ETFeeder::addNode (shared_ptr<ETFeederNode> node) {
1616 dep_graph_[node->getChakraNode ()->id ()] = node;
@@ -19,8 +19,7 @@ void ETFeeder::addNode(shared_ptr<ETFeederNode> node) {
1919void ETFeeder::removeNode (uint64_t node_id) {
2020 dep_graph_.erase (node_id);
2121
22- if (!et_complete_
23- && (dep_free_node_queue_.size () < window_size_)) {
22+ if (!et_complete_ && (dep_free_node_queue_.size () < window_size_)) {
2423 readNextWindow ();
2524 }
2625}
@@ -47,16 +46,18 @@ void ETFeeder::pushBackIssuableNode(uint64_t node_id) {
4746}
4847
4948shared_ptr<ETFeederNode> ETFeeder::lookupNode (uint64_t node_id) {
50- return dep_graph_[node_id];
49+ auto node = dep_graph_.find (node_id);
50+ assert (node != dep_graph_.end ());
51+ return node->second ;
5152}
5253
5354void ETFeeder::freeChildrenNodes (uint64_t node_id) {
5455 shared_ptr<ETFeederNode> node = dep_graph_[node_id];
55- for (auto child: node->getChildren ()) {
56+ for (auto child : node->getChildren ()) {
5657 auto child_chakra = child->getChakraNode ();
5758 for (auto it = child_chakra->mutable_data_deps ()->begin ();
58- it != child_chakra->mutable_data_deps ()->end ();
59- ++it) {
59+ it != child_chakra->mutable_data_deps ()->end ();
60+ ++it) {
6061 if (*it == node_id) {
6162 child_chakra->mutable_data_deps ()->erase (it);
6263 break ;
@@ -70,12 +71,14 @@ void ETFeeder::freeChildrenNodes(uint64_t node_id) {
7071}
7172
7273void ETFeeder::readGlobalMetadata () {
73- shared_ptr<ChakraProtoMsg::GlobalMetadata> pkt_msg = make_shared<ChakraProtoMsg::GlobalMetadata>();
74+ shared_ptr<ChakraProtoMsg::GlobalMetadata> pkt_msg =
75+ make_shared<ChakraProtoMsg::GlobalMetadata>();
7476 trace_.read (*pkt_msg);
7577}
7678
7779shared_ptr<ETFeederNode> ETFeeder::readNode () {
78- shared_ptr<ChakraProtoMsg::Node> pkt_msg = make_shared<ChakraProtoMsg::Node>();
80+ shared_ptr<ChakraProtoMsg::Node> pkt_msg =
81+ make_shared<ChakraProtoMsg::Node>();
7982 if (!trace_.read (*pkt_msg)) {
8083 return nullptr ;
8184 }
@@ -101,11 +104,12 @@ shared_ptr<ETFeederNode> ETFeeder::readNode() {
101104
102105void ETFeeder::resolveDep () {
103106 for (auto it = dep_unresolved_node_set_.begin ();
104- it != dep_unresolved_node_set_.end ();) {
107+ it != dep_unresolved_node_set_.end ();) {
105108 shared_ptr<ETFeederNode> node = *it;
106- vector<uint64_t > dep_unresolved_parent_ids = node->getDepUnresolvedParentIDs ();
109+ vector<uint64_t > dep_unresolved_parent_ids =
110+ node->getDepUnresolvedParentIDs ();
107111 for (auto inner_it = dep_unresolved_parent_ids.begin ();
108- inner_it != dep_unresolved_parent_ids.end ();) {
112+ inner_it != dep_unresolved_parent_ids.end ();) {
109113 auto parent_node = dep_graph_.find (*inner_it);
110114 if (parent_node != dep_graph_.end ()) {
111115 parent_node->second ->addChild (node);
@@ -126,24 +130,27 @@ void ETFeeder::resolveDep() {
126130void ETFeeder::readNextWindow () {
127131 uint32_t num_read = 0 ;
128132 do {
129- shared_ptr<ETFeederNode> new_node = readNode ();
130- if (new_node == nullptr ) {
131- et_complete_ = true ;
132- break ;
133+ if (this ->et_complete_ ) {
134+ // graph read finished, but still nodes unresolved
135+ // which means the graph is broken
136+ assert (false );
137+ }
138+ for (uint32_t num_read = 0 ; num_read < this ->window_size_ ; num_read++) {
139+ std::shared_ptr<ETFeederNode> new_node = readNode ();
140+ if (new_node == nullptr ) {
141+ et_complete_ = true ;
142+ break ;
143+ }
144+ addNode (new_node);
133145 }
134-
135- addNode (new_node);
136- ++num_read;
137-
138146 resolveDep ();
139- } while ((num_read < window_size_)
140- || (dep_unresolved_node_set_.size () != 0 ));
147+ } while (dep_unresolved_node_set_.size () != 0 );
141148
142- for (auto node_id_node: dep_graph_) {
149+ for (auto node_id_node : dep_graph_) {
143150 uint64_t node_id = node_id_node.first ;
144151 shared_ptr<ETFeederNode> node = node_id_node.second ;
145- if ((dep_free_node_id_set_.count (node_id) == 0 )
146- && (node->getChakraNode ()->data_deps ().size () == 0 )) {
152+ if ((dep_free_node_id_set_.count (node_id) == 0 ) &&
153+ (node->getChakraNode ()->data_deps ().size () == 0 )) {
147154 dep_free_node_id_set_.emplace (node_id);
148155 dep_free_node_queue_.emplace (node);
149156 }
0 commit comments