-
Notifications
You must be signed in to change notification settings - Fork 89
Expand file tree
/
Copy pathArchiveReader.hpp
More file actions
290 lines (249 loc) · 9.92 KB
/
ArchiveReader.hpp
File metadata and controls
290 lines (249 loc) · 9.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
#ifndef CLP_S_ARCHIVEREADER_HPP
#define CLP_S_ARCHIVEREADER_HPP
#include <cstddef>
#include <map>
#include <memory>
#include <span>
#include <string>
#include <string_view>
#include <utility>
#include <vector>
#include <nlohmann/json_fwd.hpp>
#include <ystdlib/error_handling/Result.hpp>
#include <clp_s/ArchiveReaderAdaptor.hpp>
#include <clp_s/DictionaryEntry.hpp>
#include <clp_s/DictionaryReader.hpp>
#include <clp_s/InputConfig.hpp>
#include <clp_s/PackedStreamReader.hpp>
#include <clp_s/ReaderUtils.hpp>
#include <clp_s/SchemaReader.hpp>
#include <clp_s/search/Projection.hpp>
#include <clp_s/SingleFileArchiveDefs.hpp>
#include <clp_s/TimestampDictionaryReader.hpp>
namespace clp_s {
class ArchiveReader {
public:
class OperationFailed : public TraceableException {
public:
// Constructors
OperationFailed(ErrorCode error_code, char const* const filename, int line_number)
: TraceableException(error_code, filename, line_number) {}
};
// Constructor
ArchiveReader() : m_is_open(false) {}
/**
* Opens an archive for reading.
* @param archive_path
* @param network_auth
*/
void open(Path const& archive_path, NetworkAuthOption const& network_auth);
/**
* Opens a single-file archive for reading from an already open `clp::ReaderInterface`.
* @param single_file_archive_reader The already opened archive reader
* @param archive_id The unique name or identifier for the archive
*/
auto open(
std::shared_ptr<clp::ReaderInterface> single_file_archive_reader,
std::string_view archive_id
) -> void;
/**
* Reads the dictionaries and metadata.
* @throws OperationFailed if reading or decompressing metadata fails.
*/
void read_dictionaries_and_metadata();
/**
* Opens packed streams for reading.
*/
void open_packed_streams();
/**
* Reads the variable dictionary from the archive.
* @param lazy
* @return the variable dictionary reader
*/
std::shared_ptr<VariableDictionaryReader> read_variable_dictionary(bool lazy = false) {
m_var_dict->read_entries(lazy);
return m_var_dict;
}
/**
* Reads the log type dictionary from the archive.
* @param lazy
* @return the log type dictionary reader
*/
std::shared_ptr<LogTypeDictionaryReader> read_log_type_dictionary(bool lazy = false) {
m_log_dict->read_entries(lazy);
return m_log_dict;
}
/**
* Reads the array dictionary from the archive.
* @param lazy
* @return the array dictionary reader
*/
std::shared_ptr<LogTypeDictionaryReader> read_array_dictionary(bool lazy = false) {
m_array_dict->read_entries(lazy);
return m_array_dict;
}
/**
* Reads the metadata from the archive.
* @return A void result on success, or an error code indicating the failure:
* - Forwards `ArchiveReader::read_single_schema_metadata`'s return values on failure.
* - Forwards `PackedStreamReader::read_metadata`'s return values on failure.
* @throws OperationFailed if archive metadata is empty or corrupt.
* @throws OperationFailed if archive metadata stream offset is not strictly incremental.
* @throws OperationFailed if reading or decompressing metadata fails.
*/
[[nodiscard]] auto read_metadata() -> ystdlib::error_handling::Result<void>;
/**
* Reads a table from the archive.
* @param schema_id
* @param should_extract_timestamp
* @param should_marshal_records
* @return the schema reader
*/
SchemaReader& read_schema_table(
int32_t schema_id,
bool should_extract_timestamp,
bool should_marshal_records
);
/**
* Loads all of the tables in the archive and returns SchemaReaders for them.
* @return the schema readers for every table in the archive
*/
std::vector<std::shared_ptr<SchemaReader>> read_all_tables();
std::string_view get_archive_id() { return m_archive_id; }
std::shared_ptr<VariableDictionaryReader> get_variable_dictionary() { return m_var_dict; }
std::shared_ptr<LogTypeDictionaryReader> get_log_type_dictionary() { return m_log_dict; }
std::shared_ptr<LogTypeDictionaryReader> get_array_dictionary() { return m_array_dict; }
std::shared_ptr<TimestampDictionaryReader> get_timestamp_dictionary() {
return m_archive_reader_adaptor->get_timestamp_dictionary();
}
std::shared_ptr<SchemaTree> get_schema_tree() { return m_schema_tree; }
std::shared_ptr<ReaderUtils::SchemaMap> get_schema_map() { return m_schema_map; }
auto get_range_index() const -> std::vector<RangeIndexEntry> const& {
return m_archive_reader_adaptor->get_range_index();
}
[[nodiscard]] auto get_header() const -> ArchiveHeader const& {
return m_archive_reader_adaptor->get_header();
}
/**
* Writes decoded messages to a file.
* @param writer
*/
void store(FileWriter& writer);
/**
* Closes the archive.
*/
void close();
/**
* @return The schema ids in the archive. It also defines the order that tables should be
* read in to avoid seeking backwards.
*/
[[nodiscard]] std::vector<int32_t> const& get_schema_ids() const { return m_schema_ids; }
void set_projection(std::shared_ptr<search::Projection> projection) {
m_projection = projection;
}
/**
* @return true if this archive has log ordering information, and false otherwise.
*/
[[nodiscard]] auto has_log_order() const -> bool { return m_log_event_idx_column_id >= 0; }
/**
* @return Whether this archive can contain columns with the deprecated DateString timestamp
* format.
*/
[[nodiscard]] auto has_deprecated_timestamp_format() const -> bool {
return get_header().has_deprecated_timestamp_format();
}
/**
* @param log_event_idx
* @return The file-level metadata associated with the record at `log_event_idx`.
* @throws ArchiveReaderAdaptor::OperationFailed when `log_event_idx` cannot be mapped to
* any metadata.
*/
[[nodiscard]] auto get_metadata_for_log_event(int64_t log_event_idx) -> nlohmann::json const& {
return m_archive_reader_adaptor->get_metadata_for_log_event(log_event_idx);
}
private:
/**
* Reads archive metadata and prepares the archive reader for subsequent archive reads.
*/
auto initialize_archive_reader() -> void;
/**
* Reads a single schema table entry from the table metadata stream.
* @return A result containing a pair:
* - The schema ID.
* - The schema metadata with `uncompressed_size` not yet computed.
* on success, or an error code indicating the failure:
* - std::errc::io_error if reading from the metadata stream fails.
* - std::errc::illegal_byte_sequence if the stream offset exceeds the stream size.
* - Forwards `ReaderUtils::try_uint64_to_size_t`'s return values on failure.
*/
[[nodiscard]] auto read_single_schema_metadata()
-> ystdlib::error_handling::Result<std::pair<int32_t, SchemaReader::SchemaMetadata>>;
/**
* Initializes a schema reader passed by reference to become a reader for a given schema.
* @param reader
* @param schema_id
* @param should_extract_timestamp
* @param should_marshal_records
*/
void initialize_schema_reader(
SchemaReader& reader,
int32_t schema_id,
bool should_extract_timestamp,
bool should_marshal_records
);
/**
* Appends a column to the schema reader.
* @param reader
* @param column_id
* @return a pointer to the newly appended column reader or nullptr if no column reader was
* created
*/
BaseColumnReader* append_reader_column(SchemaReader& reader, int32_t column_id);
/**
* Appends columns for the entire schema of an unordered object.
* @param reader
* @param mst_subtree_root_node_id
* @param schema_ids
* @param should_marshal_records
*/
void append_unordered_reader_columns(
SchemaReader& reader,
int32_t mst_subtree_root_node_id,
std::span<int32_t> schema_ids,
bool should_marshal_records
);
/**
* Reads a table with given ID from the packed stream reader. If read_stream is called
* multiple times in a row for the same stream_id a cached buffer is returned. This function
* allows the caller to ask for the same buffer to be reused to read multiple different
* tables: this can save memory allocations, but can only be used when tables are read one
* at a time.
* @param stream_id
* @param reuse_buffer when true the same buffer is reused across invocations, overwriting
* data returned previous calls to read_stream
* @return a buffer containing the decompressed stream identified by stream_id
*/
std::shared_ptr<char[]> read_stream(size_t stream_id, bool reuse_buffer);
bool m_is_open;
std::string m_archive_id;
std::shared_ptr<VariableDictionaryReader> m_var_dict;
std::shared_ptr<LogTypeDictionaryReader> m_log_dict;
std::shared_ptr<LogTypeDictionaryReader> m_array_dict;
std::shared_ptr<ArchiveReaderAdaptor> m_archive_reader_adaptor;
std::shared_ptr<SchemaTree> m_schema_tree;
std::shared_ptr<ReaderUtils::SchemaMap> m_schema_map;
std::vector<int32_t> m_schema_ids;
std::map<int32_t, SchemaReader::SchemaMetadata> m_id_to_schema_metadata;
std::shared_ptr<search::Projection> m_projection{
std::make_shared<search::Projection>(search::ProjectionMode::ReturnAllColumns)
};
PackedStreamReader m_stream_reader;
ZstdDecompressor m_table_metadata_decompressor;
SchemaReader m_schema_reader;
std::shared_ptr<char[]> m_stream_buffer{};
size_t m_stream_buffer_size{0ULL};
size_t m_cur_stream_id{0ULL};
int32_t m_log_event_idx_column_id{-1};
};
} // namespace clp_s
#endif // CLP_S_ARCHIVEREADER_HPP