1. 概述

结构:通常框架包含一个scheduler node,若干worker node和server node。

  • scheduler:监控其他节点的生命周期,通常发送信号到其他节点并收集过程信息
  • worker:主要负责本地数据计算,如读取数据、计算梯度。通过push、pull操作与server节点交互,如push梯度,pull模型权重。
  • server:维护并更新模型的权重,每个节点维护部分权重。

流程:

  • server分发数据
  • 每个worker训练本地数据,统计需要的参数,并向server查询参数;利用参数进行训练并更新,然后上传server
  • server得到节点的局部更新,汇总后更新本地数据。

对比MapReduce:

  • worker之间没有实时同步,梯度没有汇总。

1.1 key-value vectors

  • value设置为向量或矩阵,避免为每一个值设置一个关键词
  • 基于区间push&pull

1.2 分布式优化,模型一致性

  • Sequential序列式的,完全同步
    • 需要schedule节点维护同步机制。
  • Eventual,完全异步
    • 每当server节点接收到worker节点的梯度值,它会对权重进行更新。可能会有延迟。
  • Bounded Delay,在限制次数内异步 worker工作方式一样,server需要额外的梯度累加过程,另外需要schedule节点维护同步机制。

1.3 vector clock

每个参数有一个时间戳,避免参数的重复更新和通信。

1.4 一致性hash(均匀,后插入的不影响前面的)

  • server和key插入到hash环上,key由顺时针最近的server控制。
  • 增加/删除server节点时,server的key range会改变,需要遍历所有节点重新分配key。
  • 增加worker节点时,scheduler需要分配新数据,其他节点释放部分数据;删除worker节点时,需要寻找替代品。

类关系

  • Postoffice:全局管理类,单例。
  • Van:负责通信,Postoffice的成员。
  • Customer:用于通信的类,有单独线程接受msg,条件是当msg.meta.customer_id等于当前的customer_id时。
  • SimpleApp:对一组int headstring body提供基本通信,内部包含指向Customer的指针。
  • KVServer:SimpleApp派生类,保存key-value
  • KVWorker:SimpleApp派生类,向server进行push/pull操作
  • KVPairs:封装了key-value
  • SArray:共享数据,接口类似vector
  • Node
  • Control
  • Meta
  • Message

类概述

Customer

用于通信的类,有单独线程接受msg,条件是当msg.meta.customer_id等于当前的customer_id时。

// app_id:全局,postoffice代表的应用id
// customer_id:本地,
// recv_handle:单独的线程,while死循环,处理recv_queue_中的消息,直到接收到一个terminate(Customer析构时会产生)
Customer(int app_id, int customer_id, const RecvHandle& recv_handle);

// 向recver提出新的请求,tracker_增加一个pair<recver节点数, 0>,返回时间戳
int NewRequest(int recver);

// 等待某个timestamp所代表的pair中int1==int2
void WaitRequest(int timestamp);

// 返回int2
int NumResponse(int timestamp);

// int2 += num
void AddResponse(int timestamp, int num = 1);

// 记录所有需要处理的请求,总长度表示系统的时间戳,每个pair包含请求的总id数和已处理id数
std::vector<std::pair<int, int>> tracker_;

// 消息队列,供recv_handle处理
ThreadsafeQueue<Message> recv_queue_;

SimpleApp

对一组int headstring body提供基本通信,内部包含指向Customer的指针。

// 创建新的Customer,使用Process成员处理消息
explicit SimpleApp(int app_id, int customer_id);

// 调用Customer::NewRequest向recv_id注册新的请求,将时间戳作为返回值
// 生成msg,通过van向recv_id的每个节点发送msg。
virtual inline int Request(int req_head, const std::string& req_body, int recv_id);

virtual inline void Wait(int timestamp) { obj_->WaitRequest(timestamp); }

// 根据recv_req生成msg并通过van发送
virtual inline void Response(const SimpleData& recv_req, const std::string& res_body = "");

virtual inline void set_request_handle(const Handle& request_handle)

virtual inline void set_response_handle(const Handle& response_handle)

// SimpleApp的消息处理函数,根据msg生成SimpleData,然后调用Request或Response
inline void SimpleApp::Process(const Message& msg)

KVWorker

继承自SimpleApp,内部包含指向Customer的指针。

// 创建新的Customer,使用Process成员处理消息,注册slice函数
explicit KVWorker(int app_id, int customer_id)

// 向服务器节点发送新的请求Customer::NewRequest,向callback_添加回调函数cb
// 将数据转换成KVPair,并切片,每一段转换成msg,通过van发送
int Push(const std::vector<Key>& keys, const std::vector<Val>& vals, const std::vector<int>& lens = {},
           int cmd = 0, const Callback& cb = nullptr)

// 增加回调函数,在得到某个时间戳的全部数据后,从缓存中读入vals
// 将数据转换成KVPair,并切片,每一段转换成msg,通过van发送(??得到的数据通过Process进入缓存,供回调函数读取)
int Pull(const std::vector<Key>& keys, std::vector<Val>* vals, std::vector<int>* lens = nullptr,
           int cmd = 0, const Callback& cb = nullptr)

// 读取msg中的KVPair并缓存;处理完某个时间戳,则调用回调函数
void KVWorker<Val>::Process(const Message& msg)

// 每个时间戳的数据缓存
std::unordered_map<int, std::vector<KVPairs<Val>>> recv_kvs_;
std::unordered_map<int, Callback> callbacks_;

KVServer

继承自SimpleApp,内部包含指向Customer的指针。

// 创建新的Customer,customer_id=app_id,使用Process成员处理消息
explicit KVServer(int app_id)

// 调用request_handle_处理msg
void KVServer<Val>::Process(const Message& msg)

// 如果是push,将数据缓存在store中;如果是pull,从store中读取数据
// 调用server的Response函数
struct KVServerDefaultHandle

// 根据req与res生成msg,通过van发送
void KVServer<Val>::Response(const KVMeta& req, const KVPairs<Val>& res) {

Postoffice

// 创建van,得到环境变量
Postoffice()

// 创建group到id的映射,van启动customer_id,Barrier????
void Start(int customer_id, const char* argv0, const bool do_barrier)

void Finalize(const int customer_id, const bool do_barrier = true);

void AddCustomer(Customer* customer);

void RemoveCustomer(Customer* customer);

Customer* GetCustomer(int app_id, int customer_id, int timeout = 0) const;

const std::vector<Range>& GetServerKeyRanges();

// ????
void Manage(const Message& recv)

// node_id -> last_time,可以得到死亡节点
std::unordered_map<int, time_t> heartbeats_
// group -> ids
std::unordered_map<int, std::vector<int>>
// app_id -> (customer_id -> customer pointer)
std::unordered_map<int, std::unordered_map<int, Customer*>> customers_;

ShengYg

Step after step the ladder is ascended.


Tags •