Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ include (SubDirList)

enable_testing()

project(SAE)

# binary output path
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${SAE_BINARY_DIR}")

Expand Down Expand Up @@ -45,6 +47,7 @@ add_subdirectory (storage)
add_subdirectory (serialization)
add_subdirectory (zrpc)
add_subdirectory (computing)
add_subdirectory (streaming)

# add toolkits
add_all_subdirectories ("${CMAKE_CURRENT_SOURCE_DIR}/toolkit")
22 changes: 22 additions & 0 deletions streaming/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
project(streaming)

file(GLOB STREAMING_SOURCES "streaming.hpp" "engine.*" "sgraph.*" "sgraph_format_*.cpp")
add_library(streaming ${STREAMING_SOURCES})
target_link_libraries(streaming gflags google-glog)

add_library(sgraph_main "sgraph_main.cpp")

# NOTE that it's important to include the whole archive.
# Otherwise the graph formats won't get registered.
# TODO add windows specific flags.
# TODO how to disable all_load after sgraph_main?
if (APPLE)
set(SGRAPH_BASE -Wl,-all_load sgraph_main streaming)
elseif (UNIX)
set(SGRAPH_BASE -Wl,-whole-archive sgraph_main streaming -Wl,-no-whole-archive)
endif()
# Set the second time to expose to the parent scope
set(SGRAPH_BASE ${SGRAPH_BASE} PARENT_SCOPE)

add_executable(edge_counter "edge_counter.cpp")
target_link_libraries(edge_counter ${SGRAPH_BASE})
34 changes: 34 additions & 0 deletions streaming/edge_counter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Simple edge counting program
#include <iostream>

#include "streaming.hpp"

using namespace std;
using namespace sae::streaming;

struct InOutEdgeCounter {
eid_t in_edges, out_edges;

void init(const Context<InOutEdgeCounter>& context, const Vertex& v) {
in_edges = out_edges = 0;
}

void edge(const Context<InOutEdgeCounter>& context, vid_t id, const Edge& e) {
if (e.source == id) {
out_edges ++;
} else if (e.target == id) {
in_edges ++;
}
}

void output(const Context<InOutEdgeCounter>& context, vid_t id, std::ostream& os) const {
os << id << " " << in_edges << " " << out_edges << "\n";
}
};

int sgraph_main(StreamingGraph* g) {
Context<InOutEdgeCounter> context;
SinglePassRun(context, g);
Output(context, cout);
return 0;
}
10 changes: 10 additions & 0 deletions streaming/engine.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#include "engine.hpp"

namespace sae {
namespace streaming {

decltype(std::placeholders::_1)& _vertex_id = std::placeholders::_1;
decltype(std::placeholders::_2)& _vertex_program = std::placeholders::_2;

}
}
122 changes: 122 additions & 0 deletions streaming/engine.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#pragma once

#include <atomic>
#include <functional>
#include <iostream>
#include <thread>
#include <typeinfo>

#include "glog/logging.h"
#include "sgraph.hpp"

namespace sae {
namespace streaming {

// Aliasing placeholders for better readability.
extern decltype(std::placeholders::_1)& _vertex_id;
extern decltype(std::placeholders::_2)& _vertex_program;

enum EdgeType {
NO_EDGES = 0,
IN_EDGES = 0x1,
OUT_EDGES = 0x2,
ALL_EDGES = IN_EDGES | OUT_EDGES
};

// Helper method for making functors behave like manipulators
inline std::ostream& operator<<(std::ostream& stream, const std::function<std::ostream& (std::ostream&)>& func) {
return func(stream);
}

template<class Program>
struct Context {
typedef Program UserProgram;

int iteration;
std::vector<Program> vertices;

// Handy helper for running through all vertices.
// The function will be binded to the vertex program with provided args.
// You can optionally pass _vertex_id as an argument to receive the vertex id.
// _vertex_program is used for referring to the vertex.
template<typename M, typename... Args>
void run(std::string job_name, M m, Args&&... args) {
LOG(INFO) << "Started running " << job_name;
auto func = std::bind(m, _vertex_program, std::forward<Args>(args)...);
for (vid_t i = 0; i < vertices.size(); i++) {
LOG_EVERY_N(INFO, vertices.size() / 100) << "Running " << job_name << " Progress: " << google::COUNTER << "/" << vertices.size();
DLOG(INFO) << "Vertex " << i << " running " << job_name;
func(i, vertices[i]);
}
LOG(INFO) << "Finished " << job_name;
}

// Exactly the same with `run`, except that it runs parallelly.
// TODO a dynamic scheduler
template<typename M, typename... Args>
void run_parallel(std::string job_name, size_t threads, M m, Args&&... args) {
if (threads > vertices.size()) {
threads = vertices.size();
}
auto func = std::bind(m, _vertex_program, std::forward<Args>(args)...);
LOG(INFO) << "Started running " << job_name << " with " << threads << " threads.";

vid_t progress_interval = vertices.size() / 100;
std::atomic<vid_t> counter;
auto worker = [&](vid_t begin, vid_t end) {
LOG(INFO) << "Worker started: " << begin << ", " << end;
for (vid_t i = begin; i < end; i++) {
if (counter.load() % progress_interval == 0) {
LOG(INFO) << "Running " << job_name << " Progress: " << counter.load() << "/" << vertices.size();
}
DLOG(INFO) << "Vertex " << i << " running " << job_name;
func(i, vertices[i]);
counter++;
}
LOG(INFO) << "Worker finished: " << begin << ", " << end;
};

std::vector<std::thread> pool;
vid_t shard_size = vertices.size() / threads;
for (size_t i = 0; i < threads; i++) {
vid_t begin = shard_size * i;
vid_t end = std::min(vid_t(shard_size * (i + 1)), vid_t(vertices.size()));
pool.emplace_back(std::thread(worker, begin, end));
}
for (auto& t : pool) {
t.join();
}
LOG(INFO) << "Finished " << job_name;
}
};

template<class ProgramContext>
void SinglePassRun(ProgramContext& context, StreamingGraph* g) {
std::string job_name = typeid(typename ProgramContext::UserProgram).name();
LOG(INFO) << "Single Pass Runner for " << job_name << " started.";
Graph graph;
g->process([&](const Graph& g) {
CHECK(g.n > 0) << "Vertices number must be positive.";
LOG(INFO) << "Graph infomation: n=" << g.n << ", m=" << g.m;
context.vertices.resize(g.n);
graph = g;
}, [&](const Vertex& v) {
LOG_EVERY_N(INFO, graph.n / 100) << "Processing vertex " << v.id << ", Progress: " << google::COUNTER << "/" << graph.n;
DLOG(INFO) << "Processing vertex: " << v.id;
context.vertices[v.id].init(context, v);
}, [&](const Edge& e) {
LOG_EVERY_N(INFO, graph.m == 0 ? 100000 : graph.m / 100) << "Processing edge " << e.id << ", Progress: " << google::COUNTER << "/" << graph.m;
DLOG(INFO) << "Processing edge: " << e.id;
context.vertices[e.source].edge(context, e.source, e);
context.vertices[e.target].edge(context, e.target, e);
});

LOG(INFO) << "Single Pass Runner for " << job_name << " finished.";
}

template<class ProgramContext>
void Output(ProgramContext& context, std::ostream& os) {
context.run("output", &ProgramContext::UserProgram::output, std::ref(context), _vertex_id, std::ref(os));
}

}}
22 changes: 22 additions & 0 deletions streaming/sgraph.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#include <mutex>
#include <iostream>

#include "sgraph.hpp"

namespace sae {
namespace streaming {

GraphFormatMap *graph_format_map = nullptr;
static std::mutex graph_format_map_lock;

GraphFormatRegisterer::GraphFormatRegisterer(const char *name, StreamingGraphCreator&& creator) {
graph_format_map_lock.lock();
if (graph_format_map == nullptr) {
graph_format_map = new std::map<std::string, StreamingGraphCreator>;
}
graph_format_map->emplace(name, creator);
graph_format_map_lock.unlock();
}

}
}
98 changes: 98 additions & 0 deletions streaming/sgraph.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// The Steraming Graph Interface

#pragma once

#include <cstdint>
#include <string>
#include <vector>
#include <functional>
#include <map>
#include <iostream>

#include "trigger.hpp"

namespace sae {
namespace streaming {

using vid_t = int64_t;
using eid_t = int64_t;
using tid_t = uint8_t;
using data_t = std::string;

struct Graph {
vid_t n;
vid_t m;
};

inline std::ostream& operator<<(std::ostream& os, const Graph& g) {
os << "graph{n=" << g.n << ", m=" << g.m << "}";
return os;
}

struct Vertex {
vid_t id;
tid_t type;
data_t data;

bool operator<(const Vertex& v) const {
return id < v.id;
}

bool operator==(const Vertex& v) const {
return id == v.id;
}
};

inline std::ostream& operator<<(std::ostream& os, const Vertex& v) {
os << "vertex{id=" << v.id << ", type=" << int(v.type) << ", data=" << v.data << "}";
return os;
}

struct Edge {
eid_t id;
vid_t source;
vid_t target;
tid_t type;
data_t data;

bool operator<(const Edge& e) const {
return id < e.id;
}

bool operator==(const Edge& e) const {
return id == e.id;
}

};

inline std::ostream& operator<<(std::ostream& os, const Edge& e) {
os << "edge{id=" << e.id << ", source=" << e.source << ", target=" << e.target << ", type=" << int(e.type) << ", data=" << e.data << "}";
return os;
}

// Implementations should guarantee that the callbacks are in the specific order: graph, vertex, edge.
struct StreamingGraph {
virtual ~StreamingGraph() {};
virtual void process(const Trigger<Graph>&,
const Trigger<Vertex>&,
const Trigger<Edge>&) = 0;
};

// The following code are for automatic graph format registering
using StreamingGraphCreator = std::function<StreamingGraph*(std::istream&)>;
using GraphFormatMap = std::map<std::string, StreamingGraphCreator>;
extern GraphFormatMap *graph_format_map;

struct GraphFormatRegisterer {
GraphFormatRegisterer(const char *, StreamingGraphCreator&&);
};

} // namespace streaming
} // namespace sae

#define REGSITER_GRAPH_FORMAT(name, klass) \
namespace sae { namespace streaming { \
GraphFormatRegisterer graph_format_registerer_##name(#name, [](std::istream& is) { \
return new klass(is); \
}); \
}}
44 changes: 44 additions & 0 deletions streaming/sgraph_format_csv.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// A reader for CSV
// source,target,weight

#include <algorithm>
#include <limits>
#include <string>
#include <iostream>

#include "gflags/gflags.h"
#include "glog/logging.h"
#include "sgraph.hpp"

DEFINE_int64(csv_n, 0, "number of vertices");
DEFINE_int64(csv_m, 0, "number of edges (optional)");

namespace sae {
namespace streaming {

struct CSV : public StreamingGraph {
std::istream& is;

CSV(std::istream& is) : is(is) {
}

void process(const Trigger<Graph>& onGraph, const Trigger<Vertex>& onVertex, const Trigger<Edge>& onEdge) {
onGraph(Graph{FLAGS_csv_n, FLAGS_csv_m});
for (vid_t i = 0; i < FLAGS_csv_n; i++) {
onVertex(Vertex{i, 0, ""});
}
vid_t source, target;
eid_t eid = 0;
int weight;
char c;
while (is >> source >> c >> target >> c >> weight) {
onEdge(Edge{eid, source, target, 0, std::to_string(weight)});
eid++;
}
}
};

} // namespace streaming
} // namespace sae

REGSITER_GRAPH_FORMAT(csv, sae::streaming::CSV);
Loading