braft

braft 本身并不提供server功能, 你可以将braft集成到包括brpc在内的任意编程框架中,本文主要是阐述如何在分布式Server中使用braft来构建高可用系统。具体业务如何实现一个Server,本文不在展开。

Example

server-side code of Counter

注册并且启动Server

braft需要运行在具体的brpc server里面你可以让braft和你的业务共享同样的端口, 也可以将braft启动到不同的端口中.

brpc允许一个端口上注册多个逻辑Service, 如果你的Service同样运行在brpc Server里面,你可以管理brpc Server并且调用以下任意一个接口将braft相关的Service加入到你的Server中。这样能让braft和你的业务跑在同样的端口里面, 降低运维的复杂度。如果对brpc Server的使用不是非常了解, 可以先查看wiki页面. 注意: 如果你提供的是对外网用户暴露的服务,不要让braft跑在相同的端口上。

// Attach raft services to |server|, this makes the raft services share the same
// listen address with the user services.
//
// NOTE: Now we only allow the backing Server to be started with a specific
// listen address, if the Server is going to be started from a range of ports, 
// the behavior is undefined.
// Returns 0 on success, -1 otherwise.
int add_service(brpc::Server* server, const butil::EndPoint& listen_addr);
int add_service(brpc::Server* server, int port);
int add_service(brpc::Server* server, const char* const butil::EndPoint& listen_addr);

实现业务状态机

你需要继承braft::StateMachine并且实现里面的接口


#include <braft/raft.h>

// NOTE: All the interfaces are not required to be thread safe and they are 
// called sequentially, saying that every single method will block all the 
// following ones.
class YourStateMachineImple : public braft::StateMachine {
protected:
    // on_apply是*必须*实现的
    // on_apply会在一条或者多条日志被多数节点持久化之后调用, 通知用户将这些日志所表示的操作应用到业务状态机中.
    // 通过iter, 可以从遍历所有未处理但是已经提交的日志, 如果你的状态机支持批量更新,可以一次性获取多
    // 条日志提高状态机的吞吐.
    // 
    void on_apply(braft::Iterator& iter) {
        // A batch of tasks are committed, which must be processed through 
        // |iter|
        for (; iter.valid(); iter.next()) {
            // This guard helps invoke iter.done()->Run() asynchronously to
            // avoid that callback blocks the StateMachine.
            braft::AsyncClosureGuard closure_guard(iter.done());
            // Parse operation from iter.data() and execute this operation
            // op = parse(iter.data());
            // result = process(op)
          
            // The purpose of following logs is to help you understand the way
            // this StateMachine works.
            // Remove these logs in performance-sensitive servers.
            LOG_IF(INFO, FLAGS_log_applied_task) 
                    << "Exeucted operation " << op
                    << " and the result is " << result
                    << " at log_index=" << iter.index();
        }
    }
    // 当这个braft节点被shutdown之后, 当所有的操作都结束, 会调用on_shutdown, 来通知用户这个状态机不再被使用。
    // 这时候你可以安全的释放一些资源了.
    virtual void on_shutdown() {
        // Cleanup resources you'd like
    }

iterator

通过braft::iterator你可以遍历从所有有的任务

class Iterator {
    // Move to the next task.
    void next();
    // Return a unique and monotonically increasing identifier of the current 
    // task:
    //  - Uniqueness guarantees that committed tasks in different peers with 
    //    the same index are always the same and kept unchanged.
    //  - Monotonicity guarantees that for any index pair i, j (i < j), task 
    //    at index |i| must be applied before task at index |j| in all the 
    //    peers from the group.
    int64_t index() const;
    // Returns the term of the leader which to task was applied to.
    int64_t term() const;
    // Return the data whose content is the same as what was passed to
    // Node::apply in the leader node.
    const butil::IOBuf& data() const;
    // If done() is non-NULL, you must call done()->Run() after applying this
    // task no matter this operation succeeds or fails, otherwise the
    // corresponding resources would leak.
    //
    // If this task is proposed by this Node when it was the leader of this 
    // group and the leadership has not changed before this point, done() is 
    // exactly what was passed to Node::apply which may stand for some 
    // continuation (such as respond to the client) after updating the 
    // StateMachine with the given task. Otherweise done() must be NULL.
    Closure* done() const;
    // Return true this iterator is currently references to a valid task, false
    // otherwise, indicating that the iterator has reached the end of this
    // batch of tasks or some error has occurred
    bool valid() const;
    // Invoked when some critical error occurred. And we will consider the last 
    // |ntail| tasks (starting from the last iterated one) as not applied. After
    // this point, no further changes on the StateMachine as well as the Node 
    // would be allowed and you should try to repair this replica or just drop 
    // it.
    //
    // If |st| is not NULL, it should describe the detail of the error.
    void set_error_and_rollback(size_t ntail = 1, const butil::Status* st = NULL);
};

构造braft::Node

一个Node代表了一个RAFT实例, Node的ID由两个部分组成:

Node(const GroupId& group_id, const PeerId& peer_id);

启动这个节点:

struct NodeOptions {
    // A follower would become a candidate if it doesn't receive any message 
    // from the leader in |election_timeout_ms| milliseconds
    // Default: 1000 (1s)
    int election_timeout_ms;

    // A snapshot saving would be triggered every |snapshot_interval_s| seconds
    // if this was reset as a positive number
    // If |snapshot_interval_s| <= 0, the time based snapshot would be disabled.
    //
    // Default: 3600 (1 hour)
    int snapshot_interval_s;

    // We will regard a adding peer as caught up if the margin between the
    // last_log_index of this peer and the last_log_index of leader is less than
    // |catchup_margin|
    //
    // Default: 1000
    int catchup_margin;

    // If node is starting from a empty environment (both LogStorage and
    // SnapshotStorage are empty), it would use |initial_conf| as the
    // configuration of the group, otherwise it would load configuration from
    // the existing environment.
    //
    // Default: A empty group
    Configuration initial_conf;

    // The specific StateMachine implemented your business logic, which must be
    // a valid instance.
    StateMachine* fsm;

    // If |node_owns_fsm| is true. |fms| would be destroyed when the backing
    // Node is no longer referenced.
    //
    // Default: false
    bool node_owns_fsm;

    // Describe a specific LogStorage in format ${type}://${parameters}
    std::string log_uri;

    // Describe a specific StableStorage in format ${type}://${parameters}
    std::string raft_meta_uri;

    // Describe a specific SnapshotStorage in format ${type}://${parameters}
    std::string snapshot_uri;
    
    // If enable, duplicate files will be filtered out before copy snapshot from remote
    // to avoid useless transmission. Two files in local and remote are duplicate,
    // only if they has the same filename and the same checksum (stored in file meta).
    // Default: false
    bool filter_before_copy_remote;
    
    // If true, RPCs through raft_cli will be denied.
    // Default: false
    bool disable_cli;
};
class Node {
    int init(const NodeOptions& options);
};

将操作提交到复制组

你需要将你的操作序列化成IOBuf, 这是一个非连续零拷贝的缓存结构。构造一个Task, 并且向braft::Node提交

#include <braft/raft.h>

...
void function(op, callback) {
    butil::IOBuf data;
    serialize(op, &data);
    braft::Task task;
    task.data = &data;
    task.done = make_closure(callback);
    task.expected_term = expected_term;
    return _node->apply(task);
}

具体接口

struct Task {
    Task() : data(NULL), done(NULL) {}

    // The data applied to StateMachine
    base::IOBuf* data;

    // Continuation when the data is applied to StateMachine or error occurs.
    Closure* done;
 
    // Reject this task if expected_term doesn't match the current term of
    // this Node if the value is not -1
    // Default: -1
    int64_t expected_term;
};
    
// apply task to the replicated-state-machine
//
// About the ownership:
// |task.data|: for the performance consideration, we will take way the 
//              content. If you want keep the content, copy it before call
//              this function
// |task.done|: If the data is successfully committed to the raft group. We
//              will pass the ownership to StateMachine::on_apply.
//              Otherwise we will specify the error and call it.
//
void apply(const Task& task);

raft::Closure是一个特殊的protobuf::Closure的子类, 可以用了标记一次异步调用成功或者失败. 和protobuf::Closure一样, 你需要继承这个类,实现Run接口。 当一次异步调用真正结束之后, Run会被框架调用, 此时你可以通过status()来确认这次调用是否成功或者失败。

// Raft-specific closure which encloses a base::Status to report if the
// operation was successful.
class Closure : public google::protobuf::Closure {
public:
    base::Status& status() { return _st; }
    const base::Status& status() const { return _st; }
};

监听braft::Node状态变更

StateMachine中还提供了一些接口, 实现这些接口能够监听Node的状态变化,你的系统可以针对这些状态变化实现一些特定的逻辑(比如转发消息给leader节点)

class StateMachine {
...
    // Invoked once when the raft node was shut down. Corresponding resources are safe
    // to cleared ever after.
    // Default do nothing
    virtual void on_shutdown();
    // Invoked when the belonging node becomes the leader of the group at |term|
    // Default: Do nothing
    virtual void on_leader_start(int64_t term);
    // Invoked when this node is no longer the leader of the belonging group.
    // |status| describes more details about the reason.
    virtual void on_leader_stop(const butil::Status& status);
    // Invoked when some critical error occurred and this Node stops working 
    // ever after.  
    virtual void on_error(const ::braft::Error& e);
    // Invoked when a configuration has been committed to the group
    virtual void on_configuration_committed(const ::braft::Configuration& conf);
    // Invoked when a follower stops following a leader
    // situations including: 
    // 1. Election timeout is expired. 
    // 2. Received message from a node with higher term
    virtual void on_stop_following(const ::braft::LeaderChangeContext& ctx);
    // Invoked when this node starts to follow a new leader.
    virtual void on_start_following(const ::braft::LeaderChangeContext& ctx);
...
};

实现Snapshot

在braft中,Snapshot被定义为在特定持久化存储中的文件集合, 用户将状态机序列化到一个或者多个文件中, 并且任何节点都能从这些文件中恢复状态机到当时的状态.

Snapshot有两个作用:

在braft的中, 可以通过SnapshotReader和SnapshotWriter来控制访问相应的Snapshot.

class Snapshot : public butil::Status {
public:
    Snapshot() {}
    virtual ~Snapshot() {}

    // Get the path of the Snapshot
    virtual std::string get_path() = 0;

    // List all the existing files in the Snapshot currently
    virtual void list_files(std::vector<std::string> *files) = 0;

    // Get the implementation-defined file_meta
    virtual int get_file_meta(const std::string& filename, 
                              ::google::protobuf::Message* file_meta) {
        (void)filename;
        file_meta->Clear();
        return 0;
    }
};

class SnapshotWriter : public Snapshot {
public:
    SnapshotWriter() {}
    virtual ~SnapshotWriter() {}

    // Save the meta information of the snapshot which is used by the raft
    // framework.
    virtual int save_meta(const SnapshotMeta& meta) = 0;

    // Add a file to the snapshot.
    // |file_meta| is an implmentation-defined protobuf message 
    // All the implementation must handle the case that |file_meta| is NULL and
    // no error can be raised.
    // Note that whether the file will be created onto the backing storage is
    // implementation-defined.
    virtual int add_file(const std::string& filename) { 
        return add_file(filename, NULL);
    }

    virtual int add_file(const std::string& filename, 
                         const ::google::protobuf::Message* file_meta) = 0;

    // Remove a file from the snapshot
    // Note that whether the file will be removed from the backing storage is
    // implementation-defined.
    virtual int remove_file(const std::string& filename) = 0;
};

class SnapshotReader : public Snapshot {
public:
    SnapshotReader() {}
    virtual ~SnapshotReader() {}

    // Load meta from 
    virtual int load_meta(SnapshotMeta* meta) = 0;

    // Generate uri for other peers to copy this snapshot.
    // Return an empty string if some error has occcured
    virtual std::string generate_uri_for_copy() = 0;
};

不同业务的Snapshot千差万别,因为SnapshotStorage并没有抽象具体读写Snapshot的接口,而是抽象出SnapshotReader和SnapshotWriter,交由用户扩展具体的snapshot创建和加载逻辑。

Snapshot创建流程

Snapshot读取流程

libraft内提供了基于文件列表的LocalSnapshotWriter和LocalSnapshotReader默认实现,具体使用方式为:

实际情况下,用户业务状态机数据的snapshot有下面几种实现方式:

对于业界一些newsql系统,它们大都使用类rocksdb的lsm tree的存储引擎,支持MVCC。在进行raft snapshot的时候,使用上面的方案1,先创建一个db的snapshot,然后创建一个iterator,遍历并持久化数据。tidb、cockroachdb都是类似的解决方案。

控制这个节点

braft::Node可以通过调用api控制也可以通过braft_cli来控制, 本章主要说明如何使用api.

节点配置变更

在分布式系统中,机器故障,扩容,副本均衡是管理平面需要解决的基本问题,braft提供了几种方式:

// Add a new peer to the raft group. done->Run() would be invoked after this
// operation finishes, describing the detailed result.
void add_peer(const PeerId& peer, Closure* done);

// Remove the peer from the raft group. done->Run() would be invoked after
// this operation finishes, describing the detailed result.
void remove_peer(const PeerId& peer, Closure* done);

// Gracefully change the configuration of the raft group to |new_peers| , done->Run()
// would be invoked after this operation finishes, describing the detailed
// result.
void change_peers(const Configuration& new_peers, Closure* done);

节点变更分为几个阶段:

当考虑节点删除的时候, 情况会变得有些复杂, 由于判断成功提交的节点数量变少, 可能会出现在前面的日志没有成功提交的情况下, 后面的日志已经被判断已经提交。 这时候为了状态机的操作有序性, 即使之前的日志还未提交, 我们也会强制判断为成功.

举个例子:

重置节点列表

当多数节点故障的时候,是不能通过add_peer/remove_peer/change_peers进行节点变更的,这个时候安全的做法是等待多数节点恢复,能够保证数据安全。如果业务追求服务的可用性,放弃数据安全性的话,可以使用reset_peers飞线设置复制组Configuration。

// Reset the configuration of this node individually, without any repliation
// to other peers before this node beomes the leader. This function is
// supposed to be inovoked when the majority of the replication group are
// dead and you'd like to revive the service in the consideration of
// availability.
// Notice that neither consistency nor consensus are guaranteed in this
// case, BE CAREFULE when dealing with this method.
butil::Status reset_peers(const Configuration& new_peers);

reset_peer之后,新的Configuration的节点会开始重新选主,当新的leader选主成功之后,会写一条新Configuration的Log,这条Log写成功之后,reset_peer才算成功。如果中间又发生了失败的话,外部需要重新选取peers并发起reset_peers。

不建议使用reset_peers,reset_peers会破坏raft对数据一致性的保证,而且可能会造成脑裂。例如,{A B C D E}组成的复制组G,其中{C D E}故障,将{A B} set_peer成功恢复复制组G’,{C D E}又重新启动它们也会形成一个复制组G’‘,这样复制组G中会存在两个Leader,且{A B}这两个复制组中都存在,其中的follower会接收两个leader的AppendEntries,当前只检测term和index,可能会导致其上数据错乱。

// Add a new peer to the raft group when the current configuration matches
// |old_peers|. done->Run() would be invoked after this operation finishes,
// describing the detailed result.
void add_peer(const std::vector<PeerId>& old_peers, const PeerId& peer, Closure* done);

转移Leader

// Try transferring leadership to |peer|.
// If peer is ANY_PEER, a proper follower will be chosen as the leader the
// the next term.
// Returns 0 on success, -1 otherwise.
int transfer_leadership_to(const PeerId& peer);

在一些场景中,我们会需要外部强制将leader切换到另外的节点, 比如:

braft实现了主迁移算法, 这个算法包含如下步骤:

  1. 主停止写入, 这时候所有的apply会报错.
  2. 继续向所有的follower同步日志, 当发现目标节点的日志已经和主一样多之后, 向对应节点发起一个TimeoutNow RPC
  3. 节点收到TimeoutNowRequest之后, 直接变为Candidate, 增加term,并开始进入选主
  4. 主收到TimeoutNowResponse之后, 开始step down.
  5. 如果在election_timeout_ms时间内主没有step down, 会取消主迁移操作, 开始重新接受写入请求.

查看节点状态

braft中在Node启动之后,会在http://${your_server_endpoint}/raft_stat中列出当前这个进程上Node的列表,及其每个Node的内部状态。

其中包括:

字段 说明
state 节点状态,包括LEADER/FOLLOWER/CANDIDATE
term 当前term
conf_index 上一个Configuration产生的log index
peers 当前Configuration中节点列表
leader 当前Configuration中Leader节点
election_timer 选主定时器,FOLLOWER状态下启用
vote_timer 投票定时器,CANDIDATE状态下启用
stepdown_timer 主切从定时器,LEADER状态下启用
snapshot_timer 快照定时器
storage log storage中first log index和last log index
disk_index 持久化的最后一个log index
known_applied_index fsm已经apply的最后一个log index
last_log_id 最后一条内存log信息(log先写内存再批量刷disk)
state_machine fsm状态,包括IDLE/COMMITTED/SNAPSHOT_SAVE/SNAPSHOT_LOAD/LEADER_STOP/ERROR
last_committed_index 已经committed的最大log index
last_snapshot_index 上一次snapshot中包含的最后一条log index
last_snapshot_term 上一次snapshot中包含的最后一条log的term
snapshot_status snapshot状态,包括:LOADING/DOWNLOADING/SAVING/IDLE,其中LOADING和DOWNLOADING会显示snapshot uri和snapshot meta

flags配置项

raft中有很多flags配置项,运行中可以通过http://endpoint/flags 查看,具体如下:

flags名 说明
raft_sync 是否开启sync
raft_max_append_buffer_size log manager中内存缓存大小
raft_leader_batch log manager中最大batch合并
raft_max_entries_size AppendEntries包含entries最大数量
raft_max_body_size AppendEntris最大body大小
raft_max_segment_size 单个logsegment大小
raft_max_byte_count_per_rpc snapshot每次rpc下载大小
raft_apply_batch apply的时候最大batch数量
raft_election_heartbeat_factor election超时与heartbeat超时的比例
raft_sync_policy raft_sync为true时的细化策略,0表示每次写都立即sync,1表示每写入多少bytes才进行一次sync
raft_sync_per_bytes raft_sync_policy 为1 时生效,表示每写bytes进行sync