-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlargefileclient.cpp
More file actions
161 lines (139 loc) · 4.93 KB
/
largefileclient.cpp
File metadata and controls
161 lines (139 loc) · 4.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#include <grpc/grpc.h>
#include <grpcpp/ext/proto_server_reflection_plugin.h>
#include <grpcpp/grpcpp.h>
#include <cstddef>
#include <fstream>
#include <ostream>
#include <string>
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "absl/log/initialize.h"
#include "largefiles.grpc.pb.h"
ABSL_FLAG(std::string, target, "localhost:50051", "Server address");
ABSL_FLAG(std::string, file, "", "File to upload");
ABSL_FLAG(size_t, buffer, 16 * 1024, "Size of file chunks to stream");
using grpc::CallbackServerContext;
using grpc::Channel;
using grpc::ClientContext;
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerUnaryReactor;
using grpc::Status;
using largefiles::Chunk;
using largefiles::FileStreamBlock;
using largefiles::FileUploadResponse;
using largefiles::FileWriter;
namespace
{
// Sends requests as quickly as possible and times how long it takes to perform
// the write operation.
class UploadFileClientReactor final
: public grpc::ClientWriteReactor<largefiles::FileStreamBlock>
{
public:
explicit UploadFileClientReactor(std::string fileToUpload, size_t bufferSize)
: fileToUpload_(fileToUpload)
, bufferSize_(bufferSize) {}
void Start()
{
absl::MutexLock lock(&mu_);
StartCall();
Write();
}
void awaitDone() {
absl::MutexLock lock(&mu_);
mu_.Await(absl::Condition(+[](bool *done)
{ return *done; },
&done_));
}
~UploadFileClientReactor() override
{
awaitDone();
fileStream_.close();
}
void OnWriteDone(bool ok) override
{
absl::MutexLock lock(&mu_);
std::cout << "Writing took " << absl::Now() - *time_ << std::endl;
time_ = std::nullopt;
if (ok)
{
Write();
}
}
void OnDone(const grpc::Status &status) override
{
if (status.ok())
{
std::cout << "Done\n";
}
else
{
std::cout << "Done with error: [" << status.error_code() << "] "
<< status.error_message() << "\n";
}
absl::MutexLock lock(&mu_);
done_ = true;
}
private:
void Write() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_)
{
if (reqs_ == 0) {
req_.mutable_metadata()->set_name(fileToUpload_);
req_.mutable_metadata()->set_populated(true);
// Open file on first write
fileStream_.open(fileToUpload_, std::ios::in | std::ios::binary);
if (!fileStream_) {
std::cerr << "Failed to open file: " << fileToUpload_ << std::endl;
throw std::runtime_error("Failed to open file" + fileToUpload_);
}
} else {
req_.clear_metadata();
}
// Write next chunk
std::vector<char> buffer(bufferSize_);
if (fileStream_.read(buffer.data(), bufferSize_) || fileStream_.gcount() > 0) {
req_.mutable_filedata()->mutable_chunk()->assign(buffer.data(), fileStream_.gcount());
std::cout << "Writing chunk of size " << fileStream_.gcount() << std::endl;
reqs_++;
StartWrite(&req_);
time_ = absl::Now();
} else {
std::cout << "Closing file. All data sent." << std::endl;
fileStream_.close();
StartWritesDone();
}
}
std::string fileToUpload_;
size_t bufferSize_;
absl::Mutex mu_;
bool done_ ABSL_GUARDED_BY(&mu_) = false;
largefiles::FileStreamBlock req_;
size_t reqs_;
std::optional<absl::Time> time_ ABSL_GUARDED_BY(mu_);
std::ifstream fileStream_;
};
} // namespace
int main(int argc, char **argv)
{
absl::ParseCommandLine(argc, argv);
absl::InitializeLog();
if (absl::GetFlag(FLAGS_file).empty())
{
std::cerr << "Please specify a file to upload with --file=<filename>\n";
return 1;
}
grpc::ChannelArguments channel_arguments;
auto channel = grpc::CreateCustomChannel(absl::GetFlag(FLAGS_target),
grpc::InsecureChannelCredentials(),
channel_arguments);
auto stub = FileWriter::NewStub(channel);
UploadFileClientReactor reactor(absl::GetFlag(FLAGS_file), absl::GetFlag(FLAGS_buffer));
grpc::ClientContext context;
::largefiles::FileUploadResponse response;
stub->async()->UploadFile(&context, &response, &reactor);
reactor.Start();
reactor.awaitDone();
std::cout << "File upload completed with response size: " << response.readsize() << std::endl;
return 0;
}