19
19
*/
20
20
21
21
#include < stdio.h>
22
+ #include < stdlib.h>
23
+ #include < memory>
24
+ #include < vector>
22
25
23
26
#include " kinetic/kinetic.h"
24
27
@@ -37,15 +40,18 @@ using std::make_pair;
37
40
38
41
kinetic::P2PPushRequest prepare_request (const vector<kinetic::P2PPushOperation>& operations, const vector<pair<string, int >>& destinations, size_t currentDestination) {
39
42
kinetic::P2PPushRequest request;
40
- if (currentDestination < destinations.size () - 1 ) {
41
- request.requests .push_back (prepare_request (operations, destinations, currentDestination + 1 ));
42
- }
43
43
44
44
request.host = destinations[currentDestination].first ;
45
45
request.port = destinations[currentDestination].second ;
46
46
47
47
request.operations = operations;
48
48
49
+ if (currentDestination < destinations.size () - 1 ) {
50
+ // Add the pipleline request onto this request's first operation
51
+ request.operations [0 ].request = make_shared<::kinetic::P2PPushRequest>(
52
+ prepare_request (operations, destinations, currentDestination + 1 ));
53
+ }
54
+
49
55
return request;
50
56
}
51
57
@@ -55,8 +61,9 @@ void dispatch_request(shared_ptr<kinetic::BlockingKineticConnection> connection,
55
61
kinetic::P2PPushRequest request = prepare_request (operations, destinations, 0 );
56
62
57
63
unique_ptr<vector<kinetic::KineticStatus>> statuses (new vector<kinetic::KineticStatus>());
58
- if (!connection->P2PPush (request, statuses).ok ()) {
59
- printf (" Error pushing\n " );
64
+ auto status = connection->P2PPush (request, statuses);
65
+ if (!status.ok ()) {
66
+ printf (" Error pushing: %d, %s\n " , status.statusCode (), status.message ().c_str ());
60
67
exit (1 );
61
68
}
62
69
@@ -100,7 +107,7 @@ int main(int argc, char* argv[]) {
100
107
kinetic::KineticConnectionFactory kinetic_connection_factory = kinetic::NewKineticConnectionFactory ();
101
108
102
109
shared_ptr<kinetic::BlockingKineticConnection> blocking_connection;
103
- if (!kinetic_connection_factory.NewBlockingConnection (options, blocking_connection, 5 ).ok ()){
110
+ if (!kinetic_connection_factory.NewBlockingConnection (options, blocking_connection, 20 ).ok ()){
104
111
printf (" Unable to connect\n " );
105
112
return 1 ;
106
113
}
0 commit comments