Skip to content

Commit 534b67a

Browse files
authored
Merge pull request #5 from Irval1337/added-bruh-format
Added columnar bruh format and some overall improvements
2 parents 17c79cb + 86813bb commit 534b67a

32 files changed

+1331
-267
lines changed

CMakeLists.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,13 @@ include(cmake/deps.cmake)
1313

1414
add_library(columnar_lib
1515
src/core/column.cpp
16+
src/csv/csv_batch_reader.cpp
17+
src/csv/csv_batch_writer.cpp
18+
src/csv/csv_row_reader.cpp
19+
src/csv/schema_manager.cpp
20+
src/bruh/bruh_batch_reader.cpp
21+
src/bruh/bruh_batch_writer.cpp
22+
src/util/parse.cpp
1623
)
1724

1825
target_include_directories(columnar_lib PUBLIC ${PROJECT_SOURCE_DIR}/include)

apps/converter/main.cpp

Lines changed: 81 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,90 @@
11
#include <absl/flags/flag.h>
22
#include <absl/flags/parse.h>
3+
#include <absl/flags/usage.h>
4+
#include <csv/csv.h>
5+
#include <bruh/bruh.h>
36

7+
#include <fstream>
48
#include <iostream>
59

6-
// ABSL_FLAG(int, x, 1, "x");
7-
// ABSL_FLAG(int, y, 2, "y");
10+
ABSL_FLAG(std::string, mode, "", "Conversion mode: csv2bruh or bruh2csv");
11+
ABSL_FLAG(std::string, schema, "", "Path to .csv schema file");
12+
ABSL_FLAG(std::string, input, "", "Input file path");
13+
ABSL_FLAG(std::string, output, "", "Output file path");
14+
15+
using namespace columnar; // NOLINT
16+
17+
std::ifstream OpenInput(const std::string& path) {
18+
std::ifstream in(path, std::ios::binary);
19+
if (!in || !in.good()) {
20+
throw std::runtime_error("Cannot open file " + path);
21+
}
22+
return in;
23+
}
24+
25+
std::ofstream OpenOutput(const std::string& path) {
26+
std::ofstream out(path, std::ios::binary);
27+
if (!out || !out.good()) {
28+
throw std::runtime_error("Cannot open file " + path);
29+
}
30+
return out;
31+
}
32+
33+
template <typename Reader, typename Writer>
34+
void Convert(Reader& reader, Writer& writer) {
35+
while (auto batch = reader.ReadNext()) {
36+
writer.Write(*batch);
37+
}
38+
writer.Flush();
39+
}
840

941
int main(int argc, char** argv) {
42+
absl::SetProgramUsageMessage(
43+
"Amazing converter between CSV and BruhDB formats.\n\n"
44+
"Usage:\n"
45+
" converter --mode=csv2bruh --schema=schema.csv --input=data.csv --output=data.bruhdb\n"
46+
" converter --mode=bruh2csv --input=data.bruhdb --output=data.csv [--schema=schema.csv]");
1047
absl::ParseCommandLine(argc, argv);
11-
// int x = absl::GetFlag(FLAGS_x);
12-
// int y = absl::GetFlag(FLAGS_y);
13-
std::cout << "Hello" << "\n";
14-
return 0;
48+
49+
auto mode = absl::GetFlag(FLAGS_mode);
50+
auto input = absl::GetFlag(FLAGS_input);
51+
auto output = absl::GetFlag(FLAGS_output);
52+
auto schema_path = absl::GetFlag(FLAGS_schema);
53+
if (mode.empty() || input.empty() || output.empty()) {
54+
std::cerr << absl::ProgramUsageMessage();
55+
return 1;
56+
}
57+
58+
try {
59+
if (mode == "csv2bruh") {
60+
if (schema_path.empty()) {
61+
throw std::runtime_error("Schema required");
62+
}
63+
auto schema = csv::SchemaManager::ReadFromFile(schema_path);
64+
auto in = OpenInput(input);
65+
auto out = OpenOutput(output);
66+
csv::CSVBatchReader reader(in, schema, {});
67+
bruh::BruhBatchWriter writer(out, schema);
68+
Convert(reader, writer);
69+
} else if (mode == "bruh2csv") {
70+
auto in = OpenInput(input);
71+
bruh::BruhBatchReader reader(in);
72+
if (!schema_path.empty()) {
73+
csv::SchemaManager::WriteToFile(schema_path, reader.GetSchema());
74+
}
75+
auto out = OpenOutput(output);
76+
csv::CSVBatchWriter writer(out, {});
77+
Convert(reader, writer);
78+
} else {
79+
throw std::runtime_error("Unknown mode");
80+
}
81+
std::cout << "Done.\n";
82+
return 0;
83+
} catch (const std::exception& e) {
84+
std::cerr << "Error: " << e.what() << "\n";
85+
return 1;
86+
} catch (...) {
87+
std::cerr << "Unknown error\n";
88+
return 1;
89+
}
1590
}

benchmarks/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
add_executable(columnar_bench
22
bench_csv.cpp
3+
bench_bruh.cpp
34
)
45

56
target_link_libraries(columnar_bench

benchmarks/bench_bruh.cpp

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
#include <benchmark/benchmark.h>
2+
#include <bruh/bruh.h>
3+
#include <core/schema.h>
4+
5+
#include <sstream>
6+
7+
using namespace columnar; // NOLINT
8+
9+
void BenchBruhBatchWriter(benchmark::State& state) {
10+
core::Schema schema({core::Field("id", core::DataType::Int64),
11+
core::Field("name", core::DataType::String),
12+
core::Field("value", core::DataType::Double)});
13+
core::Batch batch(schema);
14+
for (int i = 0; i < state.range(0); ++i) {
15+
batch.ColumnAt(0).AppendFromString(std::to_string(i));
16+
batch.ColumnAt(1).AppendFromString("value" + std::to_string(i));
17+
batch.ColumnAt(2).AppendFromString("0.123");
18+
}
19+
20+
for (auto s : state) {
21+
std::stringstream ss(std::ios::in | std::ios::out | std::ios::binary);
22+
bruh::BruhBatchWriter writer(ss, schema);
23+
writer.Write(batch);
24+
writer.Flush();
25+
benchmark::DoNotOptimize(ss.str());
26+
}
27+
28+
state.SetItemsProcessed(state.iterations() * state.range(0));
29+
}
30+
BENCHMARK(BenchBruhBatchWriter)->Range(100, 10000);
31+
32+
void BenchBruhBatchReader(benchmark::State& state) {
33+
core::Schema schema({core::Field("id", core::DataType::Int64),
34+
core::Field("name", core::DataType::String),
35+
core::Field("value", core::DataType::Double)});
36+
core::Batch batch(schema);
37+
for (int i = 0; i < state.range(0); ++i) {
38+
batch.ColumnAt(0).AppendFromString(std::to_string(i));
39+
batch.ColumnAt(1).AppendFromString("value" + std::to_string(i));
40+
batch.ColumnAt(2).AppendFromString("0.123");
41+
}
42+
43+
std::stringstream ss(std::ios::in | std::ios::out | std::ios::binary);
44+
{
45+
bruh::BruhBatchWriter writer(ss, schema);
46+
writer.Write(batch);
47+
writer.Flush();
48+
}
49+
std::string data = ss.str();
50+
51+
for (auto s : state) {
52+
std::istringstream in(data, std::ios::binary);
53+
bruh::BruhBatchReader reader(in);
54+
while (reader.ReadNext()) {
55+
}
56+
}
57+
58+
state.SetItemsProcessed(state.iterations() * state.range(0));
59+
}
60+
BENCHMARK(BenchBruhBatchReader)->Range(100, 10000);

cmake/deps.cmake

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@ include(FetchContent)
33
FetchContent_Declare(
44
googletest
55
URL https://github.com/google/googletest/archive/refs/tags/v1.15.2.zip
6+
DOWNLOAD_EXTRACT_TIMESTAMP TRUE
67
)
78
set(gtest_force_shared_crt ON CACHE BOOL "" FORCE)
89
FetchContent_MakeAvailable(googletest)
910

1011
FetchContent_Declare(
1112
benchmark
1213
URL https://github.com/google/benchmark/archive/refs/tags/v1.9.1.zip
14+
DOWNLOAD_EXTRACT_TIMESTAMP TRUE
1315
)
1416
set(BENCHMARK_ENABLE_TESTING OFF CACHE BOOL "" FORCE)
1517
set(BENCHMARK_ENABLE_GTEST_TESTS OFF CACHE BOOL "" FORCE)
@@ -18,5 +20,7 @@ FetchContent_MakeAvailable(benchmark)
1820
FetchContent_Declare(
1921
absl
2022
URL https://github.com/abseil/abseil-cpp/archive/refs/tags/20240116.2.zip
23+
DOWNLOAD_EXTRACT_TIMESTAMP TRUE
2124
)
22-
FetchContent_MakeAvailable(absl)
25+
set(ABSL_PROPAGATE_CXX_STD ON CACHE BOOL "" FORCE)
26+
FetchContent_MakeAvailable(absl)

include/bruh/bruh.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#pragma once
2+
3+
#include <bruh/format.h>
4+
#include <bruh/bruh_batch_reader.h>
5+
#include <bruh/bruh_batch_writer.h>

include/bruh/bruh_batch_reader.h

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
#pragma once
2+
3+
#include <core/batch_reader.h>
4+
#include <bruh/format.h>
5+
#include <util/macro.h>
6+
7+
#include <fstream>
8+
#include <memory>
9+
#include <string>
10+
#include <cstring>
11+
12+
// (@Irval1337) TODO: Maybe implement file io using mmap?
13+
namespace columnar::bruh {
14+
class BruhBatchReader final : public core::BatchReader {
15+
public:
16+
BruhBatchReader(std::istream& is) : is_(is), curr_row_group_(0) {
17+
ReadMetaData();
18+
}
19+
20+
std::optional<core::Batch> ReadNext() override {
21+
if (curr_row_group_ >= metadata_.row_groups.size()) {
22+
return std::nullopt;
23+
}
24+
return ReadRowGroup(curr_row_group_++);
25+
}
26+
27+
core::Batch ReadRowGroup(std::size_t i);
28+
29+
std::size_t NumRowGroups() const {
30+
return metadata_.row_groups.size();
31+
}
32+
33+
const FileMetaData& GetMetaData() const {
34+
return metadata_;
35+
}
36+
37+
const core::Schema& GetSchema() const override {
38+
return metadata_.schema;
39+
}
40+
41+
private:
42+
void ReadMetaData();
43+
44+
// Warning: You must call these methods in the same order as below
45+
void EnsureBruhFormat();
46+
47+
void ReadSchema(uint32_t cols_count);
48+
49+
void ReadRowGroupsMetadata(uint32_t cols_count);
50+
51+
void ReadColumn(std::unique_ptr<core::Column>& col, const core::Field& field, std::size_t n);
52+
53+
std::istream& is_;
54+
FileMetaData metadata_;
55+
std::size_t curr_row_group_;
56+
};
57+
} // namespace columnar::bruh

include/bruh/bruh_batch_writer.h

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#pragma once
2+
3+
#include <core/batch_writer.h>
4+
#include <bruh/format.h>
5+
#include <util/macro.h>
6+
7+
#include <fstream>
8+
#include <memory>
9+
#include <ostream>
10+
#include <string>
11+
12+
namespace columnar::bruh {
13+
class BruhBatchWriter final : public core::BatchWriter {
14+
public:
15+
BruhBatchWriter(std::ostream& os, const core::Schema& schema) : os_(os), schema_(schema) {
16+
metadata_.version = kCurrentVersion;
17+
metadata_.schema = schema;
18+
WriteMagic();
19+
}
20+
21+
void Write(const core::Batch& batch) override;
22+
23+
void Flush() override;
24+
25+
private:
26+
void WriteMagic() {
27+
os_.write(reinterpret_cast<const char*>(kMagicBytes), sizeof(kMagicBytes));
28+
}
29+
30+
void WriteColumn(const core::Column& col, const core::Field& field);
31+
32+
void WriteFooter();
33+
34+
void WriteFields();
35+
36+
void WriteRowGroups();
37+
38+
std::ostream& os_;
39+
core::Schema schema_;
40+
FileMetaData metadata_;
41+
};
42+
} // namespace columnar::bruh

include/bruh/format.h

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
#pragma once
2+
3+
#include <core/datatype.h>
4+
#include <core/schema.h>
5+
6+
#include <cstdint>
7+
#include <vector>
8+
9+
// File format was inspired by https://parquet.apache.org/docs/file-format/
10+
// Binary representation looks like this:
11+
// Magic BRUHDB bytes (8 bytes)
12+
// Row groups with data (variable count and size)
13+
// Footer (contains FileMetaData)
14+
// Footer size (4 bytes)
15+
// Magic BRUHDB bytes (8 bytes)
16+
// My amazing DB uses ONLY little-endian bytes ordering
17+
namespace columnar::bruh {
18+
constexpr uint8_t kMagicBytes[8] = {'B', 'R', 'U', 'H', 'D', 'B', 0x67, 0x67};
19+
constexpr int kCurrentVersion = 1;
20+
21+
struct ColumnChunkMetaData {
22+
uint64_t offset;
23+
uint64_t byte_size; // (@Irval1337) TODO: split this into compressed and decompressed sizes
24+
uint64_t values_count;
25+
};
26+
27+
struct RowGroupMetaData {
28+
uint64_t byte_size;
29+
uint64_t rows_count;
30+
std::vector<ColumnChunkMetaData> columns;
31+
};
32+
33+
struct FileMetaData {
34+
int version;
35+
core::Schema schema;
36+
uint64_t rows_count;
37+
std::vector<RowGroupMetaData> row_groups;
38+
};
39+
} // namespace columnar::bruh

0 commit comments

Comments
 (0)