Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ consensus-level : 0
dump-prefix :

# daemonize [yes | no].
daemonize : no
daemonize : yes

# The directory to stored dump files that generated by command "bgsave".
dump-path : ./dump/
Expand Down
186 changes: 128 additions & 58 deletions include/pika_slot_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,11 @@ class PikaMigrate {

class SlotsMgrtTagSlotCmd : public Cmd {
public:
SlotsMgrtTagSlotCmd(const std::string &name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Slot> slot);
virtual void Split(std::shared_ptr<Slot> slot, const HintKeys &hint_keys){};
virtual void Merge(){};
virtual Cmd *Clone() override { return new SlotsMgrtTagSlotCmd(*this); }

SlotsMgrtTagSlotCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
void Do(std::shared_ptr<Slot>slot) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new SlotsMgrtTagSlotCmd(*this); }
private:
std::string dest_ip_;
int64_t dest_port_;
Expand All @@ -98,18 +97,17 @@ class SlotsMgrtTagSlotCmd : public Cmd {
std::basic_string<char, std::char_traits<char>, std::allocator<char>> key_;
char key_type_;

virtual void DoInitial();
int SlotKeyPop(std::shared_ptr<Slot> slot);
void DoInitial() override;
int SlotKeyPop(std::shared_ptr<Slot>slot);
};

class SlotsMgrtTagSlotAsyncCmd : public Cmd {
public:
SlotsMgrtTagSlotAsyncCmd(const std::string &name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Slot> slot);
virtual void Split(std::shared_ptr<Slot> slot, const HintKeys &hint_keys){};
virtual void Merge(){};
virtual Cmd *Clone() override { return new SlotsMgrtTagSlotAsyncCmd(*this); }

SlotsMgrtTagSlotAsyncCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag){}
void Do(std::shared_ptr<Slot>slot) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Merge(){};
Cmd* Clone() override { return new SlotsMgrtTagSlotAsyncCmd(*this); }
private:
std::string dest_ip_;
int64_t dest_port_;
Expand All @@ -119,89 +117,161 @@ class SlotsMgrtTagSlotAsyncCmd : public Cmd {
int64_t slot_num_;
int64_t keys_num_;

virtual void DoInitial();
void DoInitial() override;
};

class SlotsMgrtTagOneCmd : public Cmd {
public:
SlotsMgrtTagOneCmd(const std::string &name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Slot> slot);
virtual void Split(std::shared_ptr<Slot> slot, const HintKeys &hint_keys){};
virtual void Merge(){};
virtual Cmd *Clone() override { return new SlotsMgrtTagOneCmd(*this); }

SlotsMgrtTagOneCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
void Do(std::shared_ptr<Slot>slot) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new SlotsMgrtTagOneCmd(*this); }
private:
std::string dest_ip_;
int64_t dest_port_;
int64_t timeout_ms_;
std::string key_;
int64_t slot_num_;
int64_t slot_id_;
char key_type_;
virtual void DoInitial();
int KeyTypeCheck(std::shared_ptr<Slot> slot);
int SlotKeyRemCheck(std::shared_ptr<Slot> slot);
void DoInitial() override;
int SlotKeyRemCheck(std::shared_ptr<Slot>slot);
int KeyTypeCheck(std::shared_ptr<Slot>slot);
};

class SlotsMgrtAsyncStatusCmd : public Cmd {
public:
SlotsMgrtAsyncStatusCmd(const std::string &name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Slot> slot = nullptr);
virtual void Split(std::shared_ptr<Slot> slot, const HintKeys &hint_keys){};
virtual void Merge(){};
virtual Cmd *Clone() override { return new SlotsMgrtAsyncStatusCmd(*this); }
SlotsMgrtAsyncStatusCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new SlotsMgrtAsyncStatusCmd(*this); }

private:
virtual void DoInitial() override;
void DoInitial() override;
};

class SlotsInfoCmd : public Cmd {
public:
SlotsInfoCmd(const std::string &name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Slot> slot);
virtual void Split(std::shared_ptr<Slot> slot, const HintKeys &hint_keys){};
virtual void Merge(){};
virtual Cmd *Clone() override { return new SlotsInfoCmd(*this); }

SlotsInfoCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
void Do(std::shared_ptr<Slot>slot) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new SlotsInfoCmd(*this); }
private:
virtual void DoInitial();
void DoInitial() override;

int64_t begin_;
int64_t end_ = 1024;
};

class SlotsMgrtAsyncCancelCmd : public Cmd {
public:
SlotsMgrtAsyncCancelCmd(const std::string &name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Slot> slot);
virtual void Split(std::shared_ptr<Slot> slot, const HintKeys &hint_keys){};
virtual void Merge(){};
virtual Cmd *Clone() override { return new SlotsMgrtAsyncCancelCmd(*this); }

SlotsMgrtAsyncCancelCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
void Do(std::shared_ptr<Slot>slot) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new SlotsMgrtAsyncCancelCmd(*this); }
private:
virtual void DoInitial();
void DoInitial() override;
};

class SlotsDelCmd : public Cmd {
public:
SlotsDelCmd(const std::string &name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Slot> slot);
virtual void Split(std::shared_ptr<Slot> slot, const HintKeys &hint_keys){};
virtual void Merge(){};
virtual Cmd *Clone() override { return new SlotsDelCmd(*this); }

SlotsDelCmd(const std::string& name, int arity, uint16_t flag):Cmd(name, arity, flag) {}
void Do(std::shared_ptr<Slot>slot) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new SlotsDelCmd(*this); }
private:
std::vector<std::string> slots_;
virtual void DoInitial();
void DoInitial() override;
};

class SlotsHashKeyCmd : public Cmd {
public:
SlotsHashKeyCmd(const std::string &name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Slot> slot);
virtual void Split(std::shared_ptr<Slot> slot, const HintKeys &hint_keys){};
virtual void Merge(){};
virtual Cmd *Clone() override { return new SlotsHashKeyCmd(*this); }

SlotsHashKeyCmd(const std::string& name, int arity, uint16_t flag):Cmd(name, arity, flag) {}
void Do(std::shared_ptr<Slot>slot) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new SlotsHashKeyCmd(*this); }
private:
std::vector<std::string> keys_;
void DoInitial() override;
};

class SlotsScanCmd : public Cmd {
public:
SlotsScanCmd(const std::string& name, int arity, uint16_t flag):Cmd(name, arity, flag) {}
void Do(std::shared_ptr<Slot>slot) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new SlotsScanCmd(*this); }
private:
std::string key_, pattern_;
int64_t cursor_, count_;
void DoInitial() override;
void Clear() override {
pattern_ = "*";
count_ = 10;
}
};

/* *
* SLOTSMGRT-EXEC-WRAPPER $hashkey $command [$arg1 ...]
* SLOTSMGRT-EXEC-WRAPPER $hashkey $command [$key1 $arg1 ...]
* SLOTSMGRT-EXEC-WRAPPER $hashkey $command [$key1 $arg1 ...] [$key2 $arg2 ...]
* */
class SlotsMgrtExecWrapperCmd : public Cmd {
public:
SlotsMgrtExecWrapperCmd(const std::string& name, int arity, uint16_t flag):Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Slot>slot);
virtual void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys){};
virtual void Merge(){};
virtual Cmd* Clone() override { return new SlotsMgrtExecWrapperCmd(*this); }
private:
std::string key_;
std::vector<std::string> args;
virtual void DoInitial();
};


class SlotsMgrtSenderThread: public net::Thread {
public:
SlotsMgrtSenderThread();
virtual ~SlotsMgrtSenderThread();
int SlotsMigrateOne(const std::string &key, std::shared_ptr<Slot>slot);
bool SlotsMigrateBatch(const std::string &ip, int64_t port, int64_t time_out, int64_t slots, int64_t keys_num, std::shared_ptr<Slot>slot);
bool GetSlotsMigrateResult(int64_t *moved, int64_t *remained);
void GetSlotsMgrtSenderStatus(std::string *ip, int64_t *port, int64_t *slot, bool *migrating, int64_t *moved, int64_t *remained);
bool SlotsMigrateAsyncCancel();
private:
std::string dest_ip_;
int64_t dest_port_;
int64_t timeout_ms_;
int64_t slot_num_;
int64_t keys_num_;
int64_t moved_keys_num_; // during one batch moved
int64_t moved_keys_all_; // all keys moved in the slot
int64_t remained_keys_num_;
bool error_;
std::vector<std::pair<const char, std::string>> migrating_batch_;
std::vector<std::pair<const char, std::string>> migrating_ones_;
net::NetCli *cli_;
pstd::Mutex rwlock_;
pstd::Mutex rwlock_db_;
pstd::Mutex rwlock_batch_;
pstd::Mutex rwlock_ones_;
pstd::Mutex slotsmgrt_cond_mutex_;
pstd::Mutex mutex_;
std::atomic<bool> is_migrating_ = false;
pstd::CondVar slotsmgrt_cond_;
std::shared_ptr<Slot>slot_;

void* ThreadMain() override;

bool ElectMigrateKeys(std::shared_ptr<Slot>slot);
};


#endif
6 changes: 6 additions & 0 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameSlotsDel, std::move(slotsdelptr)));
std::unique_ptr<Cmd> slotshashkeyptr = std::make_unique<SlotsHashKeyCmd>(kCmdNameSlotsHashKey, -2, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameSlotsHashKey, std::move(slotshashkeyptr)));
std::unique_ptr<Cmd> slotsscanptr = std::make_unique<SlotsScanCmd>(kCmdNameSlotsScan, -3, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameSlotsScan, std::move(slotsscanptr)));
std::unique_ptr<Cmd> slotsmgrtexecwrapper =
std::make_unique<SlotsMgrtExecWrapperCmd>(kCmdNameSlotsMgrtExecWrapper, -3, kCmdFlagsWrite | kCmdFlagsAdmin);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameSlotsMgrtExecWrapper, std::move(slotsmgrtexecwrapper)));

// Kv
////SetCmd
std::unique_ptr<Cmd> setptr = std::make_unique<SetCmd>(kCmdNameSet, -3, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsKv);
Expand Down
4 changes: 2 additions & 2 deletions src/pika_migrate_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ static int migrateKeyTTl(net::NetCli *cli, const std::string key, storage::DataT
return 0;
}

if (0 > doMigrate(cli, send_str)) {
if (doMigrate(cli, send_str) < 0) {
return -1;
}

Expand Down Expand Up @@ -743,7 +743,7 @@ void PikaMigrateThread::GetMigrateStatus(std::string *ip, int64_t *port, int64_t
*moved = moved_num_;
std::unique_lock lq(mgrtkeys_queue_mutex_);
int64_t migrating_keys_num = mgrtkeys_queue_.size();
std::string slotKey = GetSlotKey(slot_id_); // SlotKeyPrefix + std::to_string(slot_id_);
std::string slotKey = GetSlotKey(slot_id_);
int32_t slot_size = 0;
rocksdb::Status s = slot_->db()->SCard(slotKey, &slot_size);
if (s.ok()) {
Expand Down
Loading