mit6824
mapreduceMapReduce (2004)
介绍及例子
用户需要指定map
以及reduce
函数,map函数负责建立中间键值的映射关系,reduce将所有的值合并到相同的键上。(如wordcount程序)
1 | reduce (k2,list(v2)) → list(v2) |
输入键和值是从与输出键和值不同的域中绘制的。此外,中间键和值与输出键和值来自相同的域。
分布式Grep
map :在匹配到一行时发射一行
reduce:将中间数据复制到输出
url访问频率统计
map:输出 <URL, 1>
reduce: 对来自相同url的相加然后发送<URL, TOTAL count>
实现
Map调用通过将输入数据自动分割为 M 段 分布在多个机器上。
Map函数执行完之后,会通过分区函数将中间键划分为R个互不重叠的分区。(默认分区函数可以是hash函数,这样可以将相同的键放到同一个分区,即将该类键都分到某一个具体的分区中,由某一个任务执行)

任务粒度
当用户提供的map以及reduce操作都是确定性的(相同输入必定产生相同输出),那么mapreduce的分布式执行结果也会类似于单机顺序执行的结果。
M和R的数量可以尽可能比worker数量大,可以实现负载均衡,出错时可以可以快速failback。
还是有些具体限制的,比如master主机必须做出O(M+R)调度决策,以及在内存保存O(M*R)中状态
备份执行
多台worker之间存在掉队者现象,由于一些原因导致该worker虽然能正常运行,但是执行速度相当慢,这会导致整个MapReduce的时间非常长。解决办法是,在MapReduce将要完成时,将还在执行的任务备份到另一个worker进行执行,这两个worker任意一个执行完成都会导致该任务执行完成。
拓展功能
分区函数
数据在Map之后会被分区,默认分区函数时hash函数,当有特殊需求时,可以提供特殊的分区函数。
顺序保证
每个分区中,中间键值对会按照键递增顺序处理,这可以使得每个分区生成有序的生成文件。
组合函数
组合函数与Reduce函数的功能相同,但是组合函数是在执行Map任务的机器上执行的。(先组合再进行网络传输,可以少传输一点数据)
输入输出格式
可以将输入数据按照多种格式读入。
- 文本格式将每一行是为一个键值对,唔讲的偏移是键,而该行作为值。
读取器不一定从文件读取数据,额可以定义读取器从数据库读取。
副作用
跳过坏记录
通过信号处理器函数捕获异常,多次发现错误时,master 指示跳过。
本地执行
便于调试。
状态信息
主节点提供http页面供观察
计数器
Lecture01
使用分布式系统的两个原因:性能与容错。
该课程依赖的基础架构:存储,通信,计算。
能否提供些抽象的接口,将分布式特性隐藏在整个系统内。
构建分布系统时,使用了很多的工具:
- RPC, 目标就是掩盖我们正在不可靠网络上通信的事实。
- 线程,
- 并发控制,比如锁
可拓展性
一个系统跑在多个计算机上就会有多倍的性能。(系统拓展了,性能也会对应的拓展,这种可拓展性在一段范围内是生效的,当达到系统瓶颈时,将失去可拓展性,这需要我们再次优化架构设计)
可用性
单台计算机大概率是可靠的,但是如果一个服务器集群,很可能会出现错误。
容错:发生某些错误时,系统仍然能够正常运行,像没有错误一样。
可用(Availability)系统是指,在特定的故障范围内,系统仍然能够提供服务。(比如一个有两个拷贝的系统,一台有故障,那么系统仍然是可用的)
可恢复性(Recoverablity),一个更弱的容错特性,指的是发生会导致服务中止的错误时,系统会停止工作,不在响应请求,等待修复完成后,系统可以正常运行。(比如系统需要通过日志将数据/状态写入硬盘,这样发生错误——断电时,系统停止工作,重启后,会从磁盘读取数据正常运行)
一个好的可用的系统,也应该是可恢复的。为了实现这些特性,需要很多工具,比如:
- 硬盘,为了实现容错需要频繁写入,为了性能应该减少写入
- 复制,如果管理多系统中的副本是个问题,他们可能轻易的偏离同步的状态,而不再是副本。
一致性
很多问题,比如分布式kv数据库,有两台主机,向一台主机put了数据,这台主机可能无法及时同步到另一台主机上,造成不一致。
- 强一致,get请求会返回最近一次完成的put请求写入的值,
- 弱一致,不会做上面的保证,所有弱一致系统的get可能返回一个旧数据。
虽然强一致很完美,但是弱一致也让人感兴趣。因为强一致的代价太大,用户需要向所有的主机get数据,使用最新的数据作为get返回的数据,带来了很大的资源开销。当服务器相隔很远时,人们会构建弱一致系统,当然,为了让弱一致更有意义,还会定义很多的规则。
Map & Reduce
以wordcount
为例:
Map函数使用一个key以及value作为参数,key通常是输入文件的名字(或者索引),通常被忽略,值包含了要统计的文本。Map函数对于每一个单词,通过emit函数传到MarReduce库,比如emit('w', 1)
。
Reduce函数
Reduce函数的入参是某一个key的所有实例,比如参数为,(w, [1, 1, 1, 1,1])
。该函数也可以使用emit函数,该函数需要一个参数value,这个value会作为传入参数key的最终输出。
编码过程中,遇到了问题,是cpu跑满了,但是执行速度仍然十分慢,io速度也很慢:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 filename := fmt.Sprintf("mr-%v-%v", reply.MIndex, ihash(kvs[first].Key)%reply.RNum)
file, err := os.Create(filename)
if err != nil {
fmt.Println("Cannot create file: ", err)
return nil, false
}
//利用缓冲区,避免每次调用encode都会触发系统调用write
wfile := bufio.NewWriterSize(file, 64*1024)
enc := json.NewEncoder(wfile)
for _, kv := range kvs[first:i] {
err := enc.Encode(&kv)
if err != nil {
fmt.Println("Encode to json failed: ", err)
return nil, false
}
}一开始猜测是缓冲区太小,导致每次通过encode时,都会触发io,速度很慢,
实际上,操作系统本来就具有缓冲区,写入量达到一定程度时才会写入磁盘。这里可能是因为每次使用encode都触发了write系统调用,虽然write只是将数据写到内存上的磁盘缓冲区中,但毕竟是系统调用,十分耗时间,导致这里十分慢。
这里可使用这两种内存缓冲区,
bufio.Writer
内部维护一个内存缓冲区(大小可指定,比如 64KB)。每次写入先写到缓冲区(无需系统调用),只有缓冲区满或者调用
Flush()
才会写到磁盘缓冲区中,进而写入磁盘。发现速度也很慢,后面查看火焰图,是json的encode方法占用了大量的cpu,可以从这里优化,
- 就是使用更快的json库:github.com/goccy/go-json,
- 每次写入读取keyvalue的切片,而非单个keyvalue
极其隐蔽的bug:
Go 的
net/rpc
包使用 反射机制 来序列化和反序列化结构体字段。
只有大写字母开头的字段(即导出字段)才会被反射访问到,小写字母开头的字段是未导出的,无法被外部RPC包访问,因此:小写字段会被忽略,不会参与编码传输。
容错虚拟机Fault-Tolerant Virtual Machines (2010)
默认使用共享磁盘,主虚拟机与备份虚拟机访问相同的虚拟磁盘。
两种备份方式:
- 全量状态复制,将主虚拟机的全部状态传输至备份端:cpu状态,内存数据,io设备的操作状态。
- 状态机复制,保证输入一致,然后同步所有输入请求(数据包,用户操作指令),以及一些其他的非确定性操作信息,比如中断时机。
对多核处理器上较难实现,因为涉及到共享内存,多个进程均可对一块内存区域进行访问。(那感觉没啥实际意义啊)
基本设计
为了检测 主、备份虚拟机是否有故障,使用了两种机制,
- 一是在服务器之间建立心跳连接,
- 二是对日志通道上的流量进行监控。
确定性重放
FT 协议
主虚拟机的输出信息也需要通过特殊的日志传输给备份虚拟机,备份虚拟机确认后再由主虚拟机发送。
但是备份虚拟机接管时,仍然不知道主虚拟机是否执行了最后的输出,
- 幸运的是网络基础设施,操作系统(TCP)等通过设计能处理数据包丢失以及重复的情况。(重复以及丢失感觉都行,最主要的是避免服务器状态的不一致,比如主服务器收到自增数据包,再备用服务器未确认LOG前就给客户端回发了包,如果LOG和主服务器同时崩了,那么备用服务器的内部状态就和主服务器不一致;只要主服务器没有输出,感觉不管丢失多少LOG日志都没有问题,只要外部客户端能重发消息就行了)
- 也可以是使用两阶段提交(以一种分布式一致性协议),保证多个参与者要么全部成功,要么全部失败。
检测以及响应错误
具有一致性语义的键值服务器
不管有多少服务器副本,有多少客户端在操作,看起来就像是所有人都在访问 同一份全局的单机 key/value 存储。
用 KV(键值存储)实现锁
键值存储只提供了两个方法:
1 | func (kvtest.IKVClerk) Put(string, string, rpc.Tversion) rpc.Err |
实现如下:
1 | func (lk *Lock) Acquire() { |
主要思路是lk.ver
存储了当前线程最可能获取到锁的version,初始为0,
- 获取成功则标记value,让其他线程知道锁被占用。
- 失败则获取最新的version,将其作为
lk.version
,然后判断当前锁是否被占用再执行对应的操作
貌似下面的方法更简单:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 func (lk *Lock) Acquire() {
for {
id, version, err := lk.ck.Get(lk.lockStr)
if id == lk.id {
return
}
if err == rpc.ErrNoKey || id == "" {
err := lk.ck.Put(lk.lockStr, lk.id, version)
if err == rpc.OK {
return
}
}
time.Sleep(10 * time.Millisecond)
}
}
func (lk *Lock) Release() {
id, version, err := lk.ck.Get(lk.lockStr)
if err == rpc.OK && id == lk.id {
lk.ck.Put(lk.lockStr, "", version)
}
}
分布式系统的线性化
需要保证线性一致性,线性化是一种强一致性模型,因此在线性化系统之上构建其他系统相对容易。比如如下的操作是符合线性一致性的,我们能够找到每一个操作的线性化点,这样系统仍然具有线性的:

Raft共识算法
Raft和Paxos等价,但更好理解。
介绍
共识算法能够让一组机器协同工作,形成一个统一的整体,即便部分机器出现故障,该整体仍能正常运行。(要满足基本目标一致性)
Raft算法分为三个部分:领导者选举、日志复制和安全性
Raft有很多新特性:
- 强领导者机制:日志条目只会从领导者流向其他服务器。
- 领导者选举:再选举领导者时,使用随机计时器。
- 集群成员变更
复制状态机
一致性算法来源自复制状态机。复制状态机通常通过复制日志来实现,每台服务器都会存储一个包含一系列命令的日志,其状态机会按顺序执行这些命令。所有日志都包含相同的命令,且命令顺序一致,因此每个状态机处理的命令序列完全相同。所以关键问题就是如何保证复制LOG的一致性。
服务器上的共识模块会接收客户端发送的命令并将其添加到本地日志中,同时与其他服务器的共识模块通信,以确保即便部分服务器发生故障,所有日志最终仍会包含相同的请求,且请求顺序完全一致。一旦命令完成正确复制,每台服务器的状态机就会按日志顺序执行这些命令,并将执行结果返回给客户端。最终,这些服务器对外呈现为一个单一且高可靠的状态机。

拜占庭故障:节点可能恶意撒谎、发送错误信息、或者不一致的消息。(感觉节点可能被人入侵了,有叛徒)
一致性算法通常有如下特性:
- 非拜占庭故障场景下,都能保证安全性(比如延迟,丢包,重复以及乱序,分区故障即整个网络分为了若干个互相无法联系的分区)
- 只要集群中任意多数派服务器处于正常运行状态,且能彼此通信并与客户端交互,共识算法就能保持完全可用(即具备可用性)
- 不依赖节点的本地时钟,来保证日志的一致性,坏消息/慢消息最多只会导致卡顿,但不会威胁到一致性(不管机器的物理时钟漂移有多严重,Raft 依旧能确保所有副本的日志保持一致。)
- 在常见场景下,只要集群中多数派服务器对一轮远程过程调用(RPC)做出响应,一条命令就能完成执行;少数响应缓慢的服务器不会影响系统的整体性能。
Paxos算法的优势与不足
提升算法可理解性的通用方法
- 问题拆分
- 减少需要考虑的状态数量,
Raft共识算法
Raft就是管理复制日志的算法。
Raft首先选举一个特定的领导者,这个领导者独占管理复制日志的权限。领导者从客户端接收日志条目,将其复制到其他服务器上,并且靠苏服务器什么时候可以安全的将日志应用到状态机(执行的意思吧)。
基于领导者机制,Raft可分为三个独立的子问题:
- 领导者选举
- 复制日志,领导者接收客户端的日志,并且将日志复制到集群,强制其他服务器的日志与自身一致
- 安全,状态机安全。若任一服务器已将某条特定日志条目应用到其状态机中,则其他所有服务器绝不能将不同的命令应用到相同的日志索引位置。(否则一致性就没了)