This repository was archived by the owner on Sep 27, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 618
/
Copy pathcopy_executor.cpp
276 lines (240 loc) · 8.91 KB
/
copy_executor.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
//===----------------------------------------------------------------------===//
//
// Peloton
//
// copy_executor.cpp
//
// Identification: src/executor/copy_executor.cpp
//
// Copyright (c) 2015-2018, Carnegie Mellon University Database Group
//
//===----------------------------------------------------------------------===//
#include "executor/copy_executor.h"
#include <sys/stat.h>
#include <sys/mman.h>
#include "common/logger.h"
#include "catalog/catalog.h"
#include "concurrency/transaction_manager_factory.h"
#include "executor/executor_context.h"
#include "executor/logical_tile_factory.h"
#include "planner/export_external_file_plan.h"
#include "storage/table_factory.h"
#include "common/exception.h"
#include "common/macros.h"
#include "network/marshal.h"
namespace peloton {
namespace executor {
/**
* @brief Constructor for Copy executor.
* @param node Copy node corresponding to this executor.
*/
CopyExecutor::CopyExecutor(const planner::AbstractPlan *node,
ExecutorContext *executor_context)
: AbstractExecutor(node, executor_context) {}
CopyExecutor::~CopyExecutor() = default;
/**
* @brief Basic initialization.
* @return true on success, false otherwise.
*/
bool CopyExecutor::DInit() {
PELOTON_ASSERT(children_.size() == 1);
// Grab info from plan node and check it
const auto &node = GetPlanNode<planner::ExportExternalFilePlan>();
bool success = InitFileHandle(node.GetFileName().c_str(), "w");
if (success == false) {
throw ExecutorException("Failed to create file " + node.GetFileName() +
". Try absolute path and make sure you have the "
"permission to access this file.");
}
LOG_DEBUG("Created target copy output file: %s", node.GetFileName().c_str());
return true;
}
bool CopyExecutor::InitFileHandle(const char *name, const char *mode) {
auto file = fopen(name, mode);
if (file == NULL) {
LOG_ERROR("File is NULL");
return false;
} else {
file_handle_.file = file;
}
// get the descriptor
auto fd = fileno(file);
if (fd == INVALID_FILE_DESCRIPTOR) {
LOG_ERROR("file fd is -1");
return false;
} else {
file_handle_.fd = fd;
}
file_handle_.size = 0;
return true;
}
/**
* Write buffer data to the file. Although fwrite() is not a system call,
* calling fwrite frequently with small byte is not efficient because fwrite
* does many sanity checks. We use local buffer to amortize that.
*/
void CopyExecutor::FlushBuffer() {
PELOTON_ASSERT(buff_ptr < COPY_BUFFER_SIZE);
PELOTON_ASSERT(buff_size + buff_ptr <= COPY_BUFFER_SIZE);
while (buff_size > 0) {
size_t bytes_written =
fwrite(buff + buff_ptr, sizeof(char), buff_size, file_handle_.file);
// Book keeping
buff_ptr += bytes_written;
buff_size -= bytes_written;
total_bytes_written += bytes_written;
LOG_TRACE("fwrite %d bytes", (int)bytes_written);
}
buff_ptr = 0;
}
void CopyExecutor::FFlushFsync() {
// First, flush
PELOTON_ASSERT(file_handle_.fd != -1);
if (file_handle_.fd == -1) return;
int ret = fflush(file_handle_.file);
if (ret != 0) {
LOG_ERROR("Error occurred in fflush(%s)", strerror(errno));
}
// Finally, sync
ret = fsync(file_handle_.fd);
if (ret != 0) {
LOG_ERROR("Error occurred in fsync(%s)", strerror(errno));
}
}
void CopyExecutor::Copy(const char *data, int len, bool end_of_line) {
// Worst case we need to escape all character and two delimiters
while (COPY_BUFFER_SIZE - buff_size - buff_ptr < (size_t)len * 3) {
FlushBuffer();
}
// Now copy the string to local buffer and escape delimiters
// TODO A better way is to search for delimiter once and perform copy
for (int i = 0; i < len; i++) {
char ch = data[i];
// Check delimiter
if (ch == delimiter) {
buff[buff_size++] = '\\';
buff[buff_size++] = '\\';
} else if (ch == new_line) {
buff[buff_size++] = '\\';
}
buff[buff_size++] = ch;
}
// Append col delimiter and new line delimiter
if (end_of_line == false) {
buff[buff_size++] = delimiter;
} else {
buff[buff_size++] = new_line;
}
PELOTON_ASSERT(buff_size <= COPY_BUFFER_SIZE);
}
/**
* @return true on success, false otherwise.
*/
bool CopyExecutor::DExecute() {
// skip if we're done
if (done) {
return false;
}
while (children_[0]->Execute() == true) {
// Get input a tile
std::unique_ptr<LogicalTile> logical_tile(children_[0]->GetOutput());
LOG_DEBUG("Looping over the output tile..");
// Get physical schema of the tile
std::unique_ptr<catalog::Schema> output_schema(
logical_tile->GetPhysicalSchema());
// vectors for prepared statement parameters
int num_params = 0;
std::vector<std::pair<type::TypeId, std::string>> bind_parameters;
std::vector<type::Value> param_values;
std::vector<int16_t> formats;
std::vector<int32_t> types;
// Construct result format as varchar
auto col_count = output_schema->GetColumnCount();
std::vector<std::vector<std::string>> answer_tuples;
std::vector<int> result_format(col_count, 0);
answer_tuples =
logical_tile->GetAllValuesAsStrings(result_format, true);
// Loop over the returned results
for (auto &tuple : answer_tuples) {
// Loop over the columns
for (unsigned int col_index = 0; col_index < col_count; col_index++) {
auto val = tuple[col_index];
auto origin_col_id =
logical_tile->GetColumnInfo(col_index).origin_column_id;
int len = val.length();
if (origin_col_id == num_param_col_id) {
// num_param column
num_params = std::stoi(val);
Copy(val.c_str(), val.length(), false);
} else if (origin_col_id == param_type_col_id) {
// param_types column
PELOTON_ASSERT(output_schema->GetColumn(col_index).GetType() ==
type::TypeId::VARBINARY);
network::InputPacket packet(len, val);
// Read param types
types.resize(num_params);
//TODO: Instead of passing packet to executor, some data structure more generic is need
network::OldReadParamType(&packet, num_params, types);
// Write all the types to output file
for (int i = 0; i < num_params; i++) {
std::string type_str = std::to_string(types[i]);
Copy(type_str.c_str(), type_str.length(), false);
}
} else if (origin_col_id == param_format_col_id) {
// param_formats column
PELOTON_ASSERT(output_schema->GetColumn(col_index).GetType() ==
type::TypeId::VARBINARY);
network::InputPacket packet(len, val);
// Read param formats
formats.resize(num_params);
//TODO: Instead of passing packet to executor, some data structure more generic is need
network::OldReadParamFormat(&packet, num_params, formats);
} else if (origin_col_id == param_val_col_id) {
// param_values column
PELOTON_ASSERT(output_schema->GetColumn(col_index).GetType() ==
type::TypeId::VARBINARY);
network::InputPacket packet(len, val);
bind_parameters.resize(num_params);
param_values.resize(num_params);
//TODO: Instead of passing packet to executor, some data structure more generic is need
network::OldReadParamValue(&packet, num_params, types,
bind_parameters, param_values,
formats);
// Write all the values to output file
for (int i = 0; i < num_params; i++) {
auto param_value = param_values[i];
LOG_TRACE("param_value.GetTypeId(): %s",
TypeIdToString(param_value.GetTypeId()).c_str());
// Avoid extra copy for varlen types
if (param_value.GetTypeId() == type::TypeId::VARBINARY) {
const char *data = param_value.GetData();
Copy(data, param_value.GetLength(), false);
} else if (param_value.GetTypeId() == type::TypeId::VARCHAR) {
const char *data = param_value.GetData();
// Don't write the NULL character for varchar
Copy(data, param_value.GetLength() - 1, false);
} else {
// Convert integer / double types to string before copying
auto param_str = param_value.ToString();
Copy(param_str.c_str(), param_str.length(), false);
}
}
} else {
// For other columns, just copy the content to local buffer
bool end_of_line = col_index == col_count - 1;
Copy(val.c_str(), val.length(), end_of_line);
}
}
}
LOG_DEBUG("Done writing to csv file for this tile");
}
LOG_INFO("Done copying all logical tiles");
FlushBuffer();
FFlushFsync();
// Sync and close
fclose(file_handle_.file);
done = true;
return true;
}
} // namespace executor
} // namespace peloton