Skip to content
This repository was archived by the owner on Sep 27, 2019. It is now read-only.

Protocol Layer Refactor #1445

Open
wants to merge 60 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
f368aa2
Refactor I/O part out of connection handle and simplify state machine…
tli2 Jun 12, 2018
da0e307
Fix off-by-one and byte buf bug
tli2 Jun 12, 2018
8f11ebf
Merge branch 'master' into tianyu-refactor
tli2 Jun 13, 2018
5e18e1d
Run formatter on all code in network layer
tli2 Jun 13, 2018
964b034
Merge branch 'tianyu-refactor' of https://github.com/tli2/peloton int…
tli2 Jun 13, 2018
334f303
Fix ssl and write bugs
tli2 Jun 14, 2018
94e1cb5
Shorten travis build time
tli2 Jun 18, 2018
3735825
Merge branch 'master' of https://github.com/cmu-db/peloton into tiany…
tli2 Jun 19, 2018
b10e01c
Buffer reset and shutdown ordering
tli2 Jun 19, 2018
79bffba
Add in termination state
tli2 Jun 19, 2018
96dba94
Revert Travis changes
tli2 Jun 19, 2018
aa9cc36
Start new protocol layer
tli2 Jun 20, 2018
d7bf5ed
Restructure serialization logic to use builder-like structure.
tli2 Jun 21, 2018
c69f5d6
Start infrastructure for new protocol layer
tli2 Jun 22, 2018
01d63a0
Switch to templates for marshalling ints
tli2 Jun 22, 2018
4014dad
Move things around a bit more. Make layer separation clear
tli2 Jun 23, 2018
1de49c9
Hook I/O layer up with new code. Lay ground work to start porting old…
tli2 Jun 24, 2018
5e9baf4
Partially write some marshalling code
tli2 Jun 26, 2018
dac4597
remove hacky EXPLAIN and PREPARE/EXEC
Jun 26, 2018
7a82bc1
parser and statement cache push down
Jun 26, 2018
8307f5f
add in execution helper
Jun 26, 2018
f2dd0b5
Implement txn handle wrapper class
Jun 26, 2018
7d55f68
Add use cases in comments
Jun 26, 2018
f91d0c0
Add txn state handle into tcop
Jun 26, 2018
e37b19d
Merge branch 'master' of https://github.com/cmu-db/peloton into tiany…
tli2 Jun 26, 2018
fb54c5f
Merge branch 'tianyu-protocol-refactor' of https://github.com/tli2/pe…
tli2 Jun 26, 2018
43e59e0
Put types in order
tli2 Jun 26, 2018
87536fc
Refactor function signature of ExecuteStatement()
Jun 27, 2018
67d8016
minor code clean up
tli2 Jun 27, 2018
aaa37d1
Save work
tli2 Jun 28, 2018
3832d74
Finish porting protocol handler to new code
tli2 Jun 28, 2018
e8ecd6d
Merge branch 'tianyu-protocol-refactor-structure' of https://github.c…
tli2 Jun 28, 2018
66c5444
Finish porting over old code
tli2 Jun 29, 2018
deaa03b
Fix compilation
tli2 Jun 29, 2018
86fc600
Fix test cases. There are still some issues with test failure.
tli2 Jun 29, 2018
3ed77ce
Merge branch 'master' of https://github.com/cmu-db/peloton into tiany…
tli2 Jun 29, 2018
6fc5483
Remove stale traffic_cop.cpp
tli2 Jun 29, 2018
169338b
Fix some memory issues in unit tests.
tli2 Jul 1, 2018
763b5b9
Fix some tests
tli2 Jul 1, 2018
685b8bc
fix more tests
tli2 Jul 2, 2018
fa583f2
Fix Compile failure by importing library and using different function…
Jul 2, 2018
dc96b75
One line fix for initial value change.
Jul 2, 2018
c0a7fe4
Convert byte swaps to portable ones
tli2 Jul 2, 2018
bf8ebb1
fix catalog test
tli2 Jul 2, 2018
0e253c2
Remove stale enum class
tli2 Jul 3, 2018
016e367
Merge branch 'master' into tianyu-protocol-refactor-structure
tli2 Jul 3, 2018
f8d32c4
Fix zero denominator problem of row affected calculation
Jul 3, 2018
e72ed4e
Save work
tli2 Jul 3, 2018
d508fb3
Remove dead code
Jul 4, 2018
75b2a55
Fix bug of invalid state of Sync command by changing the initial stat…
Jul 4, 2018
0ba72d4
Restore Junit tests
tli2 Jul 4, 2018
d1b64cd
Merge branch 'tianyu-protocol-refactor-structure' of https://github.c…
tli2 Jul 4, 2018
a384dc3
Restore peloton startup in junit
tli2 Jul 4, 2018
7b84245
Merge branch 'master' into tianyu-protocol-refactor-structure
tli2 Jul 4, 2018
8f48f47
Merge branch 'master' into tianyu-protocol-refactor-structure
tli2 Jul 4, 2018
a4c17d8
Fix LOG_TRACE compilation
tli2 Jul 4, 2018
6d1d86b
Actually fix LOG_TRACE
tli2 Jul 4, 2018
cd16c71
Reset Optimizer instead of create a new one
ChTimTsubasa Jul 6, 2018
2dca58d
Revert to ConnectionHandleFactory to solve occasional memory leak on …
tli2 Jul 6, 2018
8e73f06
Merge branch 'master' into tianyu-protocol-refactor-structure
tli2 Jul 7, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/common/notifiable_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

namespace peloton {

NotifiableTask::NotifiableTask(int task_id) : task_id_(task_id) {
NotifiableTask::NotifiableTask(size_t task_id) : task_id_(task_id) {
base_ = EventUtil::EventBaseNew();
// For exiting a loop
terminate_ = RegisterManualEvent([](int, short, void *arg) {
Expand Down
6 changes: 2 additions & 4 deletions src/common/portal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ namespace peloton {

Portal::Portal(const std::string& portal_name,
std::shared_ptr<Statement> statement,
std::vector<type::Value> bind_parameters,
std::shared_ptr<stats::QueryMetric::QueryParams> param_stat)
std::vector<type::Value> bind_parameters)
: portal_name_(portal_name),
statement_(statement),
bind_parameters_(std::move(bind_parameters)),
param_stat_(param_stat) {}
bind_parameters_(std::move(bind_parameters)) {}

Portal::~Portal() { statement_.reset(); }

Expand Down
4 changes: 2 additions & 2 deletions src/common/statement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ std::string Statement::GetQueryTypeString() const { return query_type_string_; }

QueryType Statement::GetQueryType() const { return query_type_; }

void Statement::SetParamTypes(const std::vector<int32_t>& param_types) {
void Statement::SetParamTypes(const std::vector<PostgresValueType>& param_types) {
param_types_ = param_types;
}

std::vector<int32_t> Statement::GetParamTypes() const { return param_types_; }
std::vector<PostgresValueType> Statement::GetParamTypes() const { return param_types_; }

void Statement::SetTupleDescriptor(
const std::vector<FieldInfo>& tuple_descriptor) {
Expand Down
12 changes: 6 additions & 6 deletions src/executor/copy_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
#include "executor/logical_tile_factory.h"
#include "planner/export_external_file_plan.h"
#include "storage/table_factory.h"
#include "network/postgres_protocol_handler.h"
#include "common/exception.h"
#include "common/macros.h"
#include "network/marshal.h"

namespace peloton {
namespace executor {
Expand Down Expand Up @@ -202,7 +202,7 @@ bool CopyExecutor::DExecute() {
// Read param types
types.resize(num_params);
//TODO: Instead of passing packet to executor, some data structure more generic is need
network::PostgresProtocolHandler::ReadParamType(&packet, num_params, types);
network::OldReadParamType(&packet, num_params, types);

// Write all the types to output file
for (int i = 0; i < num_params; i++) {
Expand All @@ -219,7 +219,7 @@ bool CopyExecutor::DExecute() {
// Read param formats
formats.resize(num_params);
//TODO: Instead of passing packet to executor, some data structure more generic is need
network::PostgresProtocolHandler::ReadParamFormat(&packet, num_params, formats);
network::OldReadParamFormat(&packet, num_params, formats);

} else if (origin_col_id == param_val_col_id) {
// param_values column
Expand All @@ -230,9 +230,9 @@ bool CopyExecutor::DExecute() {
bind_parameters.resize(num_params);
param_values.resize(num_params);
//TODO: Instead of passing packet to executor, some data structure more generic is need
network::PostgresProtocolHandler::ReadParamValue(&packet, num_params, types,
bind_parameters, param_values,
formats);
network::OldReadParamValue(&packet, num_params, types,
bind_parameters, param_values,
formats);

// Write all the values to output file
for (int i = 0; i < num_params; i++) {
Expand Down
34 changes: 18 additions & 16 deletions src/include/common/internal_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -862,10 +862,10 @@ enum class ResultType {
SUCCESS = 1,
FAILURE = 2,
ABORTED = 3, // aborted
NOOP = 4, // no op
UNKNOWN = 5,
QUEUING = 6,
TO_ABORT = 7,
NOOP = 4, // no op // TODO Remove this type
UNKNOWN = 5, // TODO Remove this type
QUEUING = 6, // TODO Remove this type
TO_ABORT = 7, // TODO Remove this type
};
std::string ResultTypeToString(ResultType type);
ResultType StringToResultType(const std::string &str);
Expand Down Expand Up @@ -1419,25 +1419,14 @@ typedef std::map<oid_t, std::pair<oid_t, oid_t>> column_map_type;
//===--------------------------------------------------------------------===//
// Wire protocol typedefs
//===--------------------------------------------------------------------===//
#define SOCKET_BUFFER_SIZE 8192
#define SOCKET_BUFFER_CAPACITY 8192

/* byte type */
typedef unsigned char uchar;

/* type for buffer of bytes */
typedef std::vector<uchar> ByteBuf;

//===--------------------------------------------------------------------===//
// Packet Manager: ProcessResult
//===--------------------------------------------------------------------===//
enum class ProcessResult {
COMPLETE,
TERMINATE,
PROCESSING,
MORE_DATA_REQUIRED,
NEED_SSL_HANDSHAKE,
};

enum class NetworkProtocolType {
POSTGRES_JDBC,
POSTGRES_PSQL,
Expand All @@ -1449,6 +1438,19 @@ enum class SSLLevel {
SSL_VERIIFY = 2,
};

using CallbackFunc = std::function<void(void)>;
using BindParameter = std::pair<type::TypeId, std::string>;

enum class PostgresDataFormat : int16_t {
TEXT = 0,
BINARY = 1
};

enum class PostgresNetworkObjectType : uchar {
PORTAL = 'P',
STATEMENT = 'S'
};

// Eigen/Matrix types used in brain
// TODO(saatvik): Generalize Eigen utilities across all types
typedef std::vector<std::vector<float>> matrix_t;
Expand Down
6 changes: 3 additions & 3 deletions src/include/common/notifiable_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class NotifiableTask {
* Constructs a new NotifiableTask instance.
* @param task_id a unique id assigned to this task
*/
explicit NotifiableTask(int task_id);
explicit NotifiableTask(size_t task_id);

/**
* Destructs this NotifiableTask. All events currently registered to its base
Expand All @@ -60,7 +60,7 @@ class NotifiableTask {
/**
* @return unique id assigned to this task
*/
inline int Id() const { return task_id_; }
inline size_t Id() const { return task_id_; }

/**
* @brief Register an event with the event base associated with this
Expand Down Expand Up @@ -183,7 +183,7 @@ class NotifiableTask {
inline void ExitLoop(int, short) { ExitLoop(); }

private:
const int task_id_;
const size_t task_id_;
struct event_base *base_;

// struct event and lifecycle management
Expand Down
10 changes: 1 addition & 9 deletions src/include/common/portal.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,14 @@ class Portal {
Portal &operator=(Portal &&) = delete;

Portal(const std::string &portal_name, std::shared_ptr<Statement> statement,
std::vector<type::Value> bind_parameters,
std::shared_ptr<stats::QueryMetric::QueryParams> param_stat);
std::vector<type::Value> bind_parameters);

~Portal();

std::shared_ptr<Statement> GetStatement() const;

const std::vector<type::Value> &GetParameters() const;

inline std::shared_ptr<stats::QueryMetric::QueryParams> GetParamStat() const {
return param_stat_;
}

// Portal name
std::string portal_name_;

Expand All @@ -52,9 +47,6 @@ class Portal {

// Values bound to the statement of this portal
std::vector<type::Value> bind_parameters_;

// The serialized params for stats collection
std::shared_ptr<stats::QueryMetric::QueryParams> param_stat_;
};

} // namespace peloton
10 changes: 5 additions & 5 deletions src/include/common/statement.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,21 @@ class Statement : public Printable {

QueryType GetQueryType() const;

void SetParamTypes(const std::vector<int32_t> &param_types);
void SetParamTypes(const std::vector<PostgresValueType> &param_types);

std::vector<int32_t> GetParamTypes() const;
std::vector<PostgresValueType> GetParamTypes() const;

void SetTupleDescriptor(const std::vector<FieldInfo> &tuple_descriptor);

void SetReferencedTables(const std::set<oid_t> table_ids);
void SetReferencedTables(std::set<oid_t> table_ids);

const std::set<oid_t> GetReferencedTables() const;

void SetPlanTree(std::shared_ptr<planner::AbstractPlan> plan_tree);

const std::shared_ptr<planner::AbstractPlan> &GetPlanTree() const;

std::unique_ptr<parser::SQLStatementList> const &GetStmtParseTreeList() {
const std::unique_ptr<parser::SQLStatementList> &GetStmtParseTreeList() {
return sql_stmt_list_;
}

Expand Down Expand Up @@ -113,7 +113,7 @@ class Statement : public Printable {
std::string query_type_string_;

// format codes of the parameters
std::vector<int32_t> param_types_;
std::vector<PostgresValueType> param_types_;

// schema of result tuple
std::vector<FieldInfo> tuple_descriptor_;
Expand Down
2 changes: 1 addition & 1 deletion src/include/network/connection_dispatcher_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include "common/notifiable_task.h"
#include "concurrency/epoch_manager_factory.h"
#include "connection_handler_task.h"
#include "network_state.h"
#include "network_types.h"

namespace peloton {
namespace network {
Expand Down
62 changes: 30 additions & 32 deletions src/include/network/connection_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@
#include "marshal.h"
#include "network/connection_handler_task.h"
#include "network/network_io_wrappers.h"
#include "network_state.h"
#include "protocol_handler.h"
#include "network/network_types.h"
#include "network/protocol_interpreter.h"
#include "network/postgres_protocol_interpreter.h"

#include <openssl/err.h>
#include <openssl/ssl.h>
Expand All @@ -43,19 +44,23 @@ namespace peloton {
namespace network {

/**
* @brief A ConnectionHandle encapsulates all information about a client
* connection for its entire duration. This includes a state machine and the
* necessary libevent infrastructure for a handler to work on this connection.
* A ConnectionHandle encapsulates all information we need to do IO about
* a client connection for its entire duration. This includes a state machine
* and the necessary libevent infrastructure for a handler to work on this
* connection.
*/
class ConnectionHandle {
public:

/**
* Constructs a new ConnectionHandle
* @param sock_fd Client's connection fd
* @param handler The handler responsible for this handle
*/
ConnectionHandle(int sock_fd, ConnectionHandlerTask *handler);

DISALLOW_COPY_AND_MOVE(ConnectionHandle);

/**
* @brief Signal to libevent that this ConnectionHandle is ready to handle
* events
Expand All @@ -70,14 +75,6 @@ class ConnectionHandle {
workpool_event_ = conn_handler_->RegisterManualEvent(
METHOD_AS_CALLBACK(ConnectionHandle, HandleEvent), this);

// TODO(Tianyi): should put the initialization else where.. check
// correctness first.
tcop_.SetTaskCallback(
[](void *arg) {
struct event *event = static_cast<struct event *>(arg);
event_active(event, EV_WRITE, 0);
},
workpool_event_);

network_event_ = conn_handler_->RegisterEvent(
io_wrapper_->GetSocketFd(), EV_READ | EV_PERSIST,
Expand All @@ -94,8 +91,20 @@ class ConnectionHandle {
/* State Machine Actions */
// TODO(Tianyu): Write some documentation when feeling like it
inline Transition TryRead() { return io_wrapper_->FillReadBuffer(); }
Transition TryWrite();
Transition Process();

inline Transition TryWrite() {
if (io_wrapper_->ShouldFlush())
return io_wrapper_->FlushAllWrites();
return Transition::PROCEED;
}

inline Transition Process() {
return protocol_interpreter_->
Process(io_wrapper_->GetReadBuffer(),
io_wrapper_->GetWriteQueue(),
[=] { event_active(workpool_event_, EV_WRITE, 0); });
}

Transition GetResult();
Transition TrySslHandshake();
Transition TryCloseConnection();
Expand Down Expand Up @@ -173,26 +182,15 @@ class ConnectionHandle {
};

friend class StateMachine;
friend class NetworkIoWrapperFactory;

/**
* @brief: Determine if there is still responses in the buffer
* @return true if there is still responses to flush out in either wbuf or
* responses
*/
inline bool HasResponse() {
return (protocol_handler_->responses_.size() != 0) ||
(io_wrapper_->wbuf_->size_ != 0);
}
friend class ConnectionHandleFactory;

// A raw pointer is used here because references cannot be rebound.
ConnectionHandlerTask *conn_handler_;
std::shared_ptr<NetworkIoWrapper> io_wrapper_;
StateMachine state_machine_;
std::unique_ptr<NetworkIoWrapper> io_wrapper_;
// TODO(Tianyu): Probably use a factory for this
std::unique_ptr<ProtocolInterpreter> protocol_interpreter_;
StateMachine state_machine_{};
struct event *network_event_ = nullptr, *workpool_event_ = nullptr;
std::unique_ptr<ProtocolHandler> protocol_handler_ = nullptr;
tcop::TrafficCop tcop_;
// TODO(Tianyu): Put this into protocol handler in a later refactor
unsigned int next_response_ = 0;
};
} // namespace network
} // namespace peloton
Loading