Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 15 additions & 23 deletions client.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <iostream>
#include <string>

#include "kvs_client.hpp"
#include "kvs_types.hpp"
Expand All @@ -7,27 +8,18 @@ using namespace std;

const string HOST = "localhost";
const int PORT = 9199;
const string NAME = "";

int main(int argc, char* argv[]){
jubatus::client::kvs c(HOST, PORT, NAME, 5);

cout << c.put("apple", "pomme") << endl;
cout << c.put("orange", "orangé") << endl;
cout << c.put("banana", "banane") << endl;
cout << c.put("strawberry", "fraise") << endl;
cout << c.put("unknown", "???") << endl;
cout << c.del("unknown") << endl;

try {
cout << c.get("apple") << endl;
cout << c.get("orange") << endl;
cout << c.get("banana") << endl;
cout << c.get("strawberry") << endl;
cout << c.get("unknown") << endl;
} catch (const runtime_error& e) {
cout << "Exception: " << e.what() << endl;
}

return 0;
const string NAME = "test";

int main(int argc, char* argv[]) {
jubatus::client::kvs c(HOST, PORT, NAME, 5);

cout << c.put("key1", 20) << endl;
cout << c.put("key2", -5) << endl;
cout << c.put("key3", 5) << endl;

cout << c.get("key2").value << endl;

cout << c.get_average() << endl;

return 0;
}
20 changes: 11 additions & 9 deletions kvs.idl
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
service kvs {
#@cht(2) #@update #@all_and
bool put(0: string key, 1: string value)
message entry {
0: string key
1: int value
}

#@cht(2) #@analysis #@pass
string get(0: string key)
service kvs {
#@cht(1) #@update #@pass
bool put(0: string key, 1: int value)

#@cht(2) #@update #@all_and
bool del(0: string key)
#@cht(1) #@analysis #@pass
entry get(0: string key)

#@broadcast #@update #@all_and
bool clear()
#@random #@analysis #@pass
float get_average()
}
156 changes: 156 additions & 0 deletions kvs_algorithm.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
#include "kvs_algorithm.hpp"

#include <jubatus/util/lang/cast.h>

#include <string>

using jubatus::util::lang::lexical_cast;

namespace jubatus {
namespace core {
namespace kvs {

/**
* Update: `put` operation adds the specified key-value data to the model.
* If the key already exists in the model, it will be overwritten.
*/
void kvs_algorithm::put(const std::string& key, int32_t value) {
/*
* Check if the key already exists in our model. If the key exists,
* preserve the old value to maintain `local_sum_`.
*/
int32_t old_value = 0;
if (local_data_.find(key) != local_data_.end()) {
old_value = local_data_[key];
}

/*
* Update the local model.
*/
local_data_[key] = value;
local_sum_ = local_sum_ - old_value + value;
}

/**
* Analyze: `get` operation returns the value for the specified key in the
* model. An exception will be thrown if the key does not exist in the model.
*/
int32_t kvs_algorithm::get(const std::string& key) const {
data_t::const_iterator it = local_data_.find(key);
if (it != local_data_.end()) {
return it->second;
}
throw std::out_of_range(key + " not found");
}

/**
* Analyze: `get_average` operation returns the cluster-wide average of every
* value.
*/
float kvs_algorithm::get_average() const {
return (float(global_sum_ + local_sum_) / (global_size_ + local_data_.size()));
}

/**
* MIX: At the beginning of the MIX session, MIX master server calls `get_diff`
* for every member servers in the cluster. This method will be invoked by the
* framework when `get_diff` RPC API is called.
* This method is expected to returns `diff` object, which is a "summarized"
* version of the local model that needs to be shared within the cluster.
*
* In this KVS, we share the sum of entry values and number of entries, instead
* of the actual raw data (`local_data_`), so that every server in the cluster
* can compute average (in `get_average`) without having all the data.
*/
void kvs_algorithm::get_diff(kvs_diff& diff) const {
diff.sum_ = local_sum_;
diff.size_ = local_data_.size();
}

/**
* MIX: `mix` is a static operation to aggregate two `diff` objects. MIX is
* is a fold-left operation; the master server aggregates `diff`s collected
* from member servers in the clusters by using this method.
*
* In this KVS, we just add two values to compute the "sum of sum" and
* "sum of size".
*/
void kvs_algorithm::mix(const kvs_diff& lhs, kvs_diff& rhs) const {
rhs.sum_ += lhs.sum_;
rhs.size_ += lhs.size_;
}

/**
* MIX: At the end of the MIX session, MIX master server calls `put_diff` for
* every member servers in the cluster. This method will be invoked by the
* framework when `put_diff` RPC API is called.
* This method is expected to take in the `mixed_diff` (which is a `diff` object
* created by aggregating `diff`s from every servers) to the local model.
*
* In this KVS, we update `global_sum_` and `global_size_` to keep up with the
* sum of entry values / number of entries from other cluster members.
*/
bool kvs_algorithm::put_diff(const kvs_diff& mixed_diff) {
global_sum_ = mixed_diff.sum_ - local_sum_;
global_size_ = mixed_diff.size_ - local_data_.size();

/*
* Returning `true` means `put_diff` succeeded. On failure, return false.
*/
return true;
}

/**
* MIX: `get_version` operation is expected to return a sequential revision
* of the model. This can be used to ensure that all cluster members have
* models with the same revision. If you need to do so, you need to write
* your own validation code in MIX code above. See the implementation of
* `jubatus::core::storage::local_storage_mixture` class in the framework
* for an example.
*
* In this KVS, MIX is an idempotent operation (i.e., even if some server
* failed to take part in the MIX session, next MIX session will correct
* the situation), so no revision checking is necessary. We just return
* the default model version.
*/
jubatus::core::storage::version kvs_algorithm::get_version() const {
return jubatus::core::storage::version();
}

/**
* This method returns KVS-specific status metrics.
*/
void kvs_algorithm::get_status(std::map<std::string, std::string>& status) const {
status.insert(std::make_pair(
"local_size", lexical_cast<std::string>(local_data_.size())));
status.insert(std::make_pair(
"global_size", lexical_cast<std::string>(global_size_)));
}

/**
* This method clears the model to the initial state.
*/
void kvs_algorithm::clear() {
local_data_ = data_t();
local_sum_ = 0;
global_sum_ = 0;
global_size_ = 0;
}

/**
* This method serializes the model as MessagePack format.
*/
void kvs_algorithm::pack(jubatus::core::framework::packer& pk) const {
pk.pack(*this);
}

/**
* This method deserializes the model in MessagePack format.
*/
void kvs_algorithm::unpack(msgpack::object o) {
o.convert(this);
}

} // namespace kvs
} // namespace core
} // namespace jubatus
113 changes: 113 additions & 0 deletions kvs_algorithm.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#ifndef KVS_ALGORITHM_HPP_
#define KVS_ALGORITHM_HPP_

#include <stdint.h>
#include <utility>
#include <string>

#include <msgpack.hpp>

#include <jubatus/core/framework/mixable_helper.hpp>

namespace jubatus {
namespace core {
namespace kvs {

/**
* `kvs_diff` is a `diff` object for this KVS service. See comments of
* `get_diff` implementation for detailed description.
*
* `diff` objects must be able to be serialized by MessagePack, so we
* list fields to be serialized in `MSGPACK_DEFINE`.
*/
struct kvs_diff {
int32_t sum_;
uint32_t size_;

MSGPACK_DEFINE(sum_, size_);
};

/**
* The class to implement the acutual algorithm - Update/Analyze/MIX.
* See the comments for each method implementation for details.
*
* This class also holds the model data structure as fields; see below.
*/
class kvs_algorithm {
public:
kvs_algorithm()
: local_sum_(0),
global_sum_(0),
global_size_(0) {};
virtual ~kvs_algorithm() {};

void put(const std::string& key, int32_t value);
int32_t get(const std::string& key) const;
float get_average() const;

// For MIX:
// (note: these 4 methods are requirement to use
// `linear_mixable_helper`; see below for the details)
void get_diff(kvs_diff& diff) const;
void mix(const kvs_diff& lhs, kvs_diff& rhs) const;
bool put_diff(const kvs_diff& mixed_diff);
jubatus::core::storage::version get_version() const;

// For `clear` API:
void clear();

// For `get_stauts` API:
void get_status(std::map<std::string, std::string>& status) const;

// For `save` / `load` API:
void pack(jubatus::core::framework::packer& pk) const;
void unpack(msgpack::object o);

private:
typedef std::map<std::string, int32_t> data_t;

/**
* `local_data_` holds key-value data added via `put` API.
*/
data_t local_data_;

/**
* `local_sum_` is a sum of values in `local_data_`. This will be calculated
* in `put` method. This value will be aggregated among cluster on MIX.
*/
int32_t local_sum_;

/**
* `global_sum_` and `global_size_` are sum of values / number of entries in
* the whole cluster (except mine.) These values will be updated on MIX.
*/
int32_t global_sum_;
uint32_t global_size_;

public:
MSGPACK_DEFINE(local_data_, local_sum_, global_sum_, global_size_);
};

/**
* This is an idiom to a automagically create "linear-mixer-capabile" version
* of `kvs_algorithm` class. `mixable_kvs` class will be instantiated in the
* driver layer.
*
* `linear_mixable_helper` is a helper class provided by the framework that
* makes easy to write `get_diff`, `mix` and `put_diff` operations. Instead
* of using the helper class, you can write your own raw MIX operations by
* defining class that inherits `linear_mixable`.
*
* Note that `linear_mixable_helper` only enables Linear Mixer. If you need
* to MIX your model by Push/Pull Mixer, you need to create a class that
* inherits `push_mixable`, which defines operations that is different from
* `linear_mixable`. You can also inherit both classes at the same time.
*/
typedef jubatus::core::framework::linear_mixable_helper<kvs_algorithm, kvs_diff>
mixable_kvs;
\
} // namespace kvs
} // namespace core
} // namespace jubatus

#endif // KVS_ALGORITHM_HPP_
19 changes: 7 additions & 12 deletions kvs_client.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// This file is auto-generated from kvs.idl with jenerator version 0.6.4-146-g79178f8/develop
// This file is auto-generated from kvs.idl with jenerator version 0.8.1-11-g6aaff17/master
// *** DO NOT EDIT ***

#ifndef KVS_CLIENT_HPP_
Expand All @@ -21,24 +21,19 @@ class kvs : public jubatus::client::common::client {
: client(host, port, name, timeout_sec) {
}

bool put(const std::string& key, const std::string& value) {
bool put(const std::string& key, int32_t value) {
msgpack::rpc::future f = c_.call("put", name_, key, value);
return f.get<bool>();
}

std::string get(const std::string& key) {
entry get(const std::string& key) {
msgpack::rpc::future f = c_.call("get", name_, key);
return f.get<std::string>();
return f.get<entry>();
}

bool del(const std::string& key) {
msgpack::rpc::future f = c_.call("del", name_, key);
return f.get<bool>();
}

bool clear() {
msgpack::rpc::future f = c_.call("clear", name_);
return f.get<bool>();
float get_average() {
msgpack::rpc::future f = c_.call("get_average", name_);
return f.get<float>();
}
};

Expand Down
Loading