braft 本身并不提供server功能, 你可以将braft集成到包括brpc在内的任意编程框架中,本文主要是阐述如何在分布式Server中使用braft来构建高可用系统。具体业务如何实现一个Server,本文不在展开。
server-side code of Counter
braft需要运行在具体的brpc server里面你可以让braft和你的业务共享同样的端口, 也可以将braft启动到不同的端口中.
brpc允许一个端口上注册多个逻辑Service, 如果你的Service同样运行在brpc Server里面,你可以管理brpc Server并且调用以下任意一个接口将braft相关的Service加入到你的Server中。这样能让braft和你的业务跑在同样的端口里面, 降低运维的复杂度。如果对brpc Server的使用不是非常了解, 可以先查看wiki页面. 注意: 如果你提供的是对外网用户暴露的服务,不要让braft跑在相同的端口上。
// Attach raft services to |server|, this makes the raft services share the same
// listen address with the user services.
//
// NOTE: Now we only allow the backing Server to be started with a specific
// listen address, if the Server is going to be started from a range of ports,
// the behavior is undefined.
// Returns 0 on success, -1 otherwise.
int add_service(brpc::Server* server, const butil::EndPoint& listen_addr);
int add_service(brpc::Server* server, int port);
int add_service(brpc::Server* server, const char* const butil::EndPoint& listen_addr);
你需要继承braft::StateMachine并且实现里面的接口
#include <braft/raft.h>
// NOTE: All the interfaces are not required to be thread safe and they are
// called sequentially, saying that every single method will block all the
// following ones.
class YourStateMachineImple : public braft::StateMachine {
protected:
// on_apply是*必须*实现的
// on_apply会在一条或者多条日志被多数节点持久化之后调用, 通知用户将这些日志所表示的操作应用到业务状态机中.
// 通过iter, 可以从遍历所有未处理但是已经提交的日志, 如果你的状态机支持批量更新,可以一次性获取多
// 条日志提高状态机的吞吐.
//
void on_apply(braft::Iterator& iter) {
// A batch of tasks are committed, which must be processed through
// |iter|
for (; iter.valid(); iter.next()) {
// This guard helps invoke iter.done()->Run() asynchronously to
// avoid that callback blocks the StateMachine.
braft::AsyncClosureGuard closure_guard(iter.done());
// Parse operation from iter.data() and execute this operation
// op = parse(iter.data());
// result = process(op)
// The purpose of following logs is to help you understand the way
// this StateMachine works.
// Remove these logs in performance-sensitive servers.
LOG_IF(INFO, FLAGS_log_applied_task)
<< "Exeucted operation " << op
<< " and the result is " << result
<< " at log_index=" << iter.index();
}
}
// 当这个braft节点被shutdown之后, 当所有的操作都结束, 会调用on_shutdown, 来通知用户这个状态机不再被使用。
// 这时候你可以安全的释放一些资源了.
virtual void on_shutdown() {
// Cleanup resources you'd like
}
通过braft::iterator你可以遍历从所有有的任务
class Iterator {
// Move to the next task.
void next();
// Return a unique and monotonically increasing identifier of the current
// task:
// - Uniqueness guarantees that committed tasks in different peers with
// the same index are always the same and kept unchanged.
// - Monotonicity guarantees that for any index pair i, j (i < j), task
// at index |i| must be applied before task at index |j| in all the
// peers from the group.
int64_t index() const;
// Returns the term of the leader which to task was applied to.
int64_t term() const;
// Return the data whose content is the same as what was passed to
// Node::apply in the leader node.
const butil::IOBuf& data() const;
// If done() is non-NULL, you must call done()->Run() after applying this
// task no matter this operation succeeds or fails, otherwise the
// corresponding resources would leak.
//
// If this task is proposed by this Node when it was the leader of this
// group and the leadership has not changed before this point, done() is
// exactly what was passed to Node::apply which may stand for some
// continuation (such as respond to the client) after updating the
// StateMachine with the given task. Otherweise done() must be NULL.
Closure* done() const;
// Return true this iterator is currently references to a valid task, false
// otherwise, indicating that the iterator has reached the end of this
// batch of tasks or some error has occurred
bool valid() const;
// Invoked when some critical error occurred. And we will consider the last
// |ntail| tasks (starting from the last iterated one) as not applied. After
// this point, no further changes on the StateMachine as well as the Node
// would be allowed and you should try to repair this replica or just drop
// it.
//
// If |st| is not NULL, it should describe the detail of the error.
void set_error_and_rollback(size_t ntail = 1, const butil::Status* st = NULL);
};
一个Node代表了一个RAFT实例, Node的ID由两个部分组成:
Node(const GroupId& group_id, const PeerId& peer_id);
启动这个节点:
struct NodeOptions {
// A follower would become a candidate if it doesn't receive any message
// from the leader in |election_timeout_ms| milliseconds
// Default: 1000 (1s)
int election_timeout_ms;
// A snapshot saving would be triggered every |snapshot_interval_s| seconds
// if this was reset as a positive number
// If |snapshot_interval_s| <= 0, the time based snapshot would be disabled.
//
// Default: 3600 (1 hour)
int snapshot_interval_s;
// We will regard a adding peer as caught up if the margin between the
// last_log_index of this peer and the last_log_index of leader is less than
// |catchup_margin|
//
// Default: 1000
int catchup_margin;
// If node is starting from a empty environment (both LogStorage and
// SnapshotStorage are empty), it would use |initial_conf| as the
// configuration of the group, otherwise it would load configuration from
// the existing environment.
//
// Default: A empty group
Configuration initial_conf;
// The specific StateMachine implemented your business logic, which must be
// a valid instance.
StateMachine* fsm;
// If |node_owns_fsm| is true. |fms| would be destroyed when the backing
// Node is no longer referenced.
//
// Default: false
bool node_owns_fsm;
// Describe a specific LogStorage in format ${type}://${parameters}
std::string log_uri;
// Describe a specific StableStorage in format ${type}://${parameters}
std::string raft_meta_uri;
// Describe a specific SnapshotStorage in format ${type}://${parameters}
std::string snapshot_uri;
// If enable, duplicate files will be filtered out before copy snapshot from remote
// to avoid useless transmission. Two files in local and remote are duplicate,
// only if they has the same filename and the same checksum (stored in file meta).
// Default: false
bool filter_before_copy_remote;
// If true, RPCs through raft_cli will be denied.
// Default: false
bool disable_cli;
};
class Node {
int init(const NodeOptions& options);
};
initial_conf只有在这个复制组从空节点启动才会生效,当有snapshot和log里的数据不为空的时候的时候从其中恢复Configuration。initial_conf只用于创建复制组,第一个节点将自己设置进initial_conf,再调用add_peer添加其他节点,其他节点initial_conf设置为空;也可以多个节点同时设置相同的inital_conf(多个节点的ip:port)来同时启动空节点。
RAFT需要三种不同的持久存储, 分别是:
用三个不同的uri来表示, 并且提供了基于本地文件系统的默认实现,type为local, 比如 local://data 就是存放到当前文件夹的data目录,local:///home/disk1/data 就是存放在 /home/disk1/data中。libraft中有默认的local://实现,用户可以根据需要继承实现相应的Storage。
你需要将你的操作序列化成IOBuf, 这是一个非连续零拷贝的缓存结构。构造一个Task, 并且向braft::Node提交
#include <braft/raft.h>
...
void function(op, callback) {
butil::IOBuf data;
serialize(op, &data);
braft::Task task;
task.data = &data;
task.done = make_closure(callback);
task.expected_term = expected_term;
return _node->apply(task);
}
具体接口
struct Task {
Task() : data(NULL), done(NULL) {}
// The data applied to StateMachine
base::IOBuf* data;
// Continuation when the data is applied to StateMachine or error occurs.
Closure* done;
// Reject this task if expected_term doesn't match the current term of
// this Node if the value is not -1
// Default: -1
int64_t expected_term;
};
// apply task to the replicated-state-machine
//
// About the ownership:
// |task.data|: for the performance consideration, we will take way the
// content. If you want keep the content, copy it before call
// this function
// |task.done|: If the data is successfully committed to the raft group. We
// will pass the ownership to StateMachine::on_apply.
// Otherwise we will specify the error and call it.
//
void apply(const Task& task);
Thread-Safety: apply是线程安全的,并且实现基本等价于是wait-free. 这意味着你可以在多线程向同一个Node中提交WAL.
apply不一定成功,如果失败的话会设置done中的status,并回调。on_apply中一定是成功committed的,但是apply的结果在leader发生切换的时候存在false negative, 即框架通知这次WAL写失败了, 但最终相同内容的日志被新的leader确认提交并且通知到StateMachine. 这个时候通常客户端会重试(超时一般也是这么处理的), 所以一般需要确保日志所代表的操作是幂等的
不同的日志处理结果是独立的, 一个线程连续提交了A,B两个日志, 那么以下组合都有可能发生:
当A, B都成功的时候, 他们在日志中的顺序会和提交顺序严格保证一致.
由于apply是异步的,有可能某个节点在term1是leader,apply了一条log,但是中间发生了主从切换,在很短的时间内这个节点又变为term3的leader,之前apply的日志才开始进行处理,这种情况下要实现严格意义上的复制状态机,需要解决这种ABA问题,可以在apply的时候设置leader当时的term.
raft::Closure是一个特殊的protobuf::Closure的子类, 可以用了标记一次异步调用成功或者失败. 和protobuf::Closure一样, 你需要继承这个类,实现Run接口。 当一次异步调用真正结束之后, Run会被框架调用, 此时你可以通过status()来确认这次调用是否成功或者失败。
// Raft-specific closure which encloses a base::Status to report if the
// operation was successful.
class Closure : public google::protobuf::Closure {
public:
base::Status& status() { return _st; }
const base::Status& status() const { return _st; }
};
StateMachine中还提供了一些接口, 实现这些接口能够监听Node的状态变化,你的系统可以针对这些状态变化实现一些特定的逻辑(比如转发消息给leader节点)
class StateMachine {
...
// Invoked once when the raft node was shut down. Corresponding resources are safe
// to cleared ever after.
// Default do nothing
virtual void on_shutdown();
// Invoked when the belonging node becomes the leader of the group at |term|
// Default: Do nothing
virtual void on_leader_start(int64_t term);
// Invoked when this node is no longer the leader of the belonging group.
// |status| describes more details about the reason.
virtual void on_leader_stop(const butil::Status& status);
// Invoked when some critical error occurred and this Node stops working
// ever after.
virtual void on_error(const ::braft::Error& e);
// Invoked when a configuration has been committed to the group
virtual void on_configuration_committed(const ::braft::Configuration& conf);
// Invoked when a follower stops following a leader
// situations including:
// 1. Election timeout is expired.
// 2. Received message from a node with higher term
virtual void on_stop_following(const ::braft::LeaderChangeContext& ctx);
// Invoked when this node starts to follow a new leader.
virtual void on_start_following(const ::braft::LeaderChangeContext& ctx);
...
};
在braft中,Snapshot被定义为在特定持久化存储中的文件集合, 用户将状态机序列化到一个或者多个文件中, 并且任何节点都能从这些文件中恢复状态机到当时的状态.
Snapshot有两个作用:
在braft的中, 可以通过SnapshotReader和SnapshotWriter来控制访问相应的Snapshot.
class Snapshot : public butil::Status {
public:
Snapshot() {}
virtual ~Snapshot() {}
// Get the path of the Snapshot
virtual std::string get_path() = 0;
// List all the existing files in the Snapshot currently
virtual void list_files(std::vector<std::string> *files) = 0;
// Get the implementation-defined file_meta
virtual int get_file_meta(const std::string& filename,
::google::protobuf::Message* file_meta) {
(void)filename;
file_meta->Clear();
return 0;
}
};
class SnapshotWriter : public Snapshot {
public:
SnapshotWriter() {}
virtual ~SnapshotWriter() {}
// Save the meta information of the snapshot which is used by the raft
// framework.
virtual int save_meta(const SnapshotMeta& meta) = 0;
// Add a file to the snapshot.
// |file_meta| is an implmentation-defined protobuf message
// All the implementation must handle the case that |file_meta| is NULL and
// no error can be raised.
// Note that whether the file will be created onto the backing storage is
// implementation-defined.
virtual int add_file(const std::string& filename) {
return add_file(filename, NULL);
}
virtual int add_file(const std::string& filename,
const ::google::protobuf::Message* file_meta) = 0;
// Remove a file from the snapshot
// Note that whether the file will be removed from the backing storage is
// implementation-defined.
virtual int remove_file(const std::string& filename) = 0;
};
class SnapshotReader : public Snapshot {
public:
SnapshotReader() {}
virtual ~SnapshotReader() {}
// Load meta from
virtual int load_meta(SnapshotMeta* meta) = 0;
// Generate uri for other peers to copy this snapshot.
// Return an empty string if some error has occcured
virtual std::string generate_uri_for_copy() = 0;
};
不同业务的Snapshot千差万别,因为SnapshotStorage并没有抽象具体读写Snapshot的接口,而是抽象出SnapshotReader和SnapshotWriter,交由用户扩展具体的snapshot创建和加载逻辑。
Snapshot创建流程:
Snapshot读取流程:
libraft内提供了基于文件列表的LocalSnapshotWriter和LocalSnapshotReader默认实现,具体使用方式为:
实际情况下,用户业务状态机数据的snapshot有下面几种实现方式:
对于业界一些newsql系统,它们大都使用类rocksdb的lsm tree的存储引擎,支持MVCC。在进行raft snapshot的时候,使用上面的方案1,先创建一个db的snapshot,然后创建一个iterator,遍历并持久化数据。tidb、cockroachdb都是类似的解决方案。
braft::Node可以通过调用api控制也可以通过braft_cli来控制, 本章主要说明如何使用api.
在分布式系统中,机器故障,扩容,副本均衡是管理平面需要解决的基本问题,braft提供了几种方式:
// Add a new peer to the raft group. done->Run() would be invoked after this
// operation finishes, describing the detailed result.
void add_peer(const PeerId& peer, Closure* done);
// Remove the peer from the raft group. done->Run() would be invoked after
// this operation finishes, describing the detailed result.
void remove_peer(const PeerId& peer, Closure* done);
// Gracefully change the configuration of the raft group to |new_peers| , done->Run()
// would be invoked after this operation finishes, describing the detailed
// result.
void change_peers(const Configuration& new_peers, Closure* done);
节点变更分为几个阶段:
当考虑节点删除的时候, 情况会变得有些复杂, 由于判断成功提交的节点数量变少, 可能会出现在前面的日志没有成功提交的情况下, 后面的日志已经被判断已经提交。 这时候为了状态机的操作有序性, 即使之前的日志还未提交, 我们也会强制判断为成功.
举个例子:
- 当前集群为 (A, B, C, D), 其中C D属于故障, 由于多数节点处于故障阶段, 存在10条还未被提交的日志(A B 已经写入, C D 未写入), 这时候发起操作,将D从集群中删除, 这条日志的成功判定条件变为在(A, B, C), 这时候只需要A, B都成功写入这条日志即可认为这个日志已经成功提交, 但是之前还存在10条未写入日志. 这时候我们会强制认为之前的10条已经成功提交.
- 这个case比较极端, 通常这个情况下leader都会step down, 集群会进入无主状态, 需要至少修复CD中的一个节点之后集群才能正常提供服务。
当多数节点故障的时候,是不能通过add_peer/remove_peer/change_peers进行节点变更的,这个时候安全的做法是等待多数节点恢复,能够保证数据安全。如果业务追求服务的可用性,放弃数据安全性的话,可以使用reset_peers飞线设置复制组Configuration。
// Reset the configuration of this node individually, without any repliation
// to other peers before this node beomes the leader. This function is
// supposed to be inovoked when the majority of the replication group are
// dead and you'd like to revive the service in the consideration of
// availability.
// Notice that neither consistency nor consensus are guaranteed in this
// case, BE CAREFULE when dealing with this method.
butil::Status reset_peers(const Configuration& new_peers);
reset_peer之后,新的Configuration的节点会开始重新选主,当新的leader选主成功之后,会写一条新Configuration的Log,这条Log写成功之后,reset_peer才算成功。如果中间又发生了失败的话,外部需要重新选取peers并发起reset_peers。
不建议使用reset_peers,reset_peers会破坏raft对数据一致性的保证,而且可能会造成脑裂。例如,{A B C D E}组成的复制组G,其中{C D E}故障,将{A B} set_peer成功恢复复制组G’,{C D E}又重新启动它们也会形成一个复制组G’‘,这样复制组G中会存在两个Leader,且{A B}这两个复制组中都存在,其中的follower会接收两个leader的AppendEntries,当前只检测term和index,可能会导致其上数据错乱。
// Add a new peer to the raft group when the current configuration matches
// |old_peers|. done->Run() would be invoked after this operation finishes,
// describing the detailed result.
void add_peer(const std::vector<PeerId>& old_peers, const PeerId& peer, Closure* done);
// Try transferring leadership to |peer|.
// If peer is ANY_PEER, a proper follower will be chosen as the leader the
// the next term.
// Returns 0 on success, -1 otherwise.
int transfer_leadership_to(const PeerId& peer);
在一些场景中,我们会需要外部强制将leader切换到另外的节点, 比如:
braft实现了主迁移算法, 这个算法包含如下步骤:
braft中在Node启动之后,会在http://${your_server_endpoint}/raft_stat中列出当前这个进程上Node的列表,及其每个Node的内部状态。
其中包括:
字段 | 说明 |
---|---|
state | 节点状态,包括LEADER/FOLLOWER/CANDIDATE |
term | 当前term |
conf_index | 上一个Configuration产生的log index |
peers | 当前Configuration中节点列表 |
leader | 当前Configuration中Leader节点 |
election_timer | 选主定时器,FOLLOWER状态下启用 |
vote_timer | 投票定时器,CANDIDATE状态下启用 |
stepdown_timer | 主切从定时器,LEADER状态下启用 |
snapshot_timer | 快照定时器 |
storage | log storage中first log index和last log index |
disk_index | 持久化的最后一个log index |
known_applied_index | fsm已经apply的最后一个log index |
last_log_id | 最后一条内存log信息(log先写内存再批量刷disk) |
state_machine | fsm状态,包括IDLE/COMMITTED/SNAPSHOT_SAVE/SNAPSHOT_LOAD/LEADER_STOP/ERROR |
last_committed_index | 已经committed的最大log index |
last_snapshot_index | 上一次snapshot中包含的最后一条log index |
last_snapshot_term | 上一次snapshot中包含的最后一条log的term |
snapshot_status | snapshot状态,包括:LOADING/DOWNLOADING/SAVING/IDLE,其中LOADING和DOWNLOADING会显示snapshot uri和snapshot meta |
raft中有很多flags配置项,运行中可以通过http://endpoint/flags 查看,具体如下:
flags名 | 说明 |
---|---|
raft_sync | 是否开启sync |
raft_max_append_buffer_size | log manager中内存缓存大小 |
raft_leader_batch | log manager中最大batch合并 |
raft_max_entries_size | AppendEntries包含entries最大数量 |
raft_max_body_size | AppendEntris最大body大小 |
raft_max_segment_size | 单个logsegment大小 |
raft_max_byte_count_per_rpc | snapshot每次rpc下载大小 |
raft_apply_batch | apply的时候最大batch数量 |
raft_election_heartbeat_factor | election超时与heartbeat超时的比例 |
raft_sync_policy | raft_sync为true时的细化策略,0表示每次写都立即sync,1表示每写入多少bytes才进行一次sync |
raft_sync_per_bytes | raft_sync_policy 为1 时生效,表示每写bytes进行sync |