前言: 本人比较菜,go零基础入门、越读论文、独立做完Lab1,大概用时10天,但每天投入的时间不是很多,拖拖拉拉的。其中大部分时间都在构建思路,真正写代码也就一天写完的,一次性全部PASS,看到结果的时候比发论文还有成就感。建议后面的同学集中时间做,不然每次开始是都要回顾之前的思路。个人感觉如果只做实验,不记笔记的话就像蜻蜓点水,过几天就忘了,想回顾也很难再开始,于是便有了本文。
前置知识
-
go 语法
① 看B站视频 8小时转职Golang工程师,跟着敲一遍感觉就能做实验了;
② 遇到有问题的语法可以看 官方文档 和 go语言圣经,有一说一,go的文档写的简单易懂。 -
MapReduce
① MapReduce论文 必看;
② 如果看英文笔记困难的话,论文的 中文翻译版本 有助于理解;
③ B站简短的 MapReduce辅助理解视频 也有助于加深理解。
实验分析
Lab1的主要任务就是实现一个如图所示的MapReduce框架,其分为Map阶段和Reduce阶段。在Map阶段,Master节点将一组map任务分配给一组Worker节点来执行。在Map阶段结束后进入Reduce阶段,Master节点将一组reduce任务分配给这组Worker节点执行。其中,Master节点负责任务的分配,Worker节点负责执行具体的任务,具体如下:
-
Master
① 初始化Master要维护的信息;
② 不断地检测所有任务是否完成,若完成则退出;
③ 提供RPC接口:响应Worker申请任务的请求,选择一个尚未完成的map/reduce任务,通过RPC传递给Worker节点;
④ 提供RPC接口:响应Worker修改任务状态的请求,需要根据收到的任务标识将该任务的状态修改为“已完成”; -
Worker
① 若该Worker节点处于空闲状态,则通过RPC向Master申请一个任务了。若所有任务执行结束后,Master会先退出,导致RPC请求失败,从而Worker被迫结束;
② 根据Master返回的任务信息执行相应的map/reduce操作;
③ 当一个map/reduce任务完成后,通过RPC将该任务的标识传回给Master节点,告诉Master该任务已完成。
实现
定义数据结构
分析了Master节点和Worker节点各自需要完成的工作,我们就开始定义相关的数据结构。
type Master struct {
// Your definitions here.
TaskList []TaskInfo //任务列表
ReduceNum int // reduce 任务的个数
Phase string // 任务的阶段(map、reduce、done)
}
// Add your RPC definitions here.
// 任务信息
type TaskInfo struct {
Phase string // 任务阶段:map、reduce
TaskId int // 任务ID,用在Master任务列表中的下标表示
InputFile string // 任务的文件名,仅在map任务中使用
TaskStatus string // 任务状态:waiting、allocated、finished
StartTime time.Time // 任务的开始时间,用于超时判断
ReduceNum int // 记录reduce任务的数量,用于文件命名
MapNum int // 记录map任务的数量,用于文件命名
}
// worker 通过 RPC 请求任务所传的参数,为空即可
type TaskArgs struct{}
// worker 通过 RPC 请求任务,Master 返回一个任务
type TaskReply struct {
Task TaskInfo
}
初始化Master
刚开始,任务处于”map“阶段。根据输入的n个文件路径创建n个map任务,放入任务列表TaskList中。在创建任务时,任务等待被分配,因此任务的状态TaskInfo.TaskStatus设置为"waiting"。此处任务的开始时间TaskInfo.StartTime随意赋个值即可,后面在分配任务时会重新赋值。
// create a Master.
// main/mrmaster.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeMaster(files []string, nReduce int) *Master {
m := Master{}
// Your code here.
m.initMaster(files, nReduce)
m.server()
return &m
}
// 根据输入的 files 和 nReduce 生成一组 map 任务,并初始化 Master
func (m *Master) initMaster(files []string, nReduce int) {
m.Phase = "map"
m.ReduceNum = nReduce
for index, file := range files {
task := TaskInfo{"map", index, file, "waiting", time.Now(), nReduce, len(m.TaskList)}
m.TaskList = append(m.TaskList, task)
}
}
Worker请求任务
一个Worker节点若处于空闲状态,需要不断地向Master要一个任务,拿到任务后需要根据任务所处的阶段执行相应的map/reduce操作。
// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
// Your worker implementation here.
for {
task := AskTask()
switch task.Phase {
case "map":
DoMapTask(task, mapf)
case "reduce":
DoReduceTask(task, reducef)
}
}
// uncomment to send the Example RPC to the master.
// CallExample()
}
在通过RPC请求任务的时候,需要防止多个Worker的相互竞争,为了保证数据一致性,需要定义一个全局锁。如何定义RPC接口参照worker.go中的CallExample()和master.go中的Example()。
// 全局锁,避免多个 Worker 同时向 Master 申请任务
var mu_worker sync.Mutex
// 通过RPC向Master请求一个任务
func AskTask() TaskInfo {
mu_worker.Lock()
defer mu_worker.Unlock()
args := TaskArgs{}
reply := TaskReply{}
ok := call("Master.AllocateTask", &args, &reply)
if !ok {
fmt.Println("RPC Master.AllocateTask err......")
}
return reply.Task
}
Master返回一个任务
Worker只发送一个要任务的请求,而具体分配哪一个任务、修改任务状态、判断任务是否分配完,这些细节都是Master需要考虑的。
首先,我们在任务列表中选择一个待分配(waiting)状态的任务进行分配,分配的时候修改该任务的状态为已分配(allocated),同时修改该任务的开始时间为系统时间。若任务列表中没有处在waiting状态的任务,并且程序还没有结束,说明有任务处于allocated状态。因此需要检测该任务是正在执行还是已经超时,若是该任务已经分配10s以上,但还没运行结束,说明Worker结点出现了问题,需要将该任务的状态修改为"waiting",以便重新分配给一个空闲的Worker节点。
// Your code here -- RPC handlers for the worker to call.
// RPC: 选择一个任务来响应 Worker 的请求
func (m *Master) AllocateTask(args *TaskArgs, reply *TaskReply) error {
taskId := m.choiceTask()
// 该阶段的所有任务均已经完成
if taskId == -1 {
m.dealTaskFinised()
// 若是map阶段结束,则把任务列表里面的第一个 reduce 任务返回
taskId = 0
}
m.TaskList[taskId].StartTime = time.Now() // 设置任务开始时间为被分配的时间
reply.Task = m.TaskList[taskId]
// fmt.Println("worker 来请求任务!!!")
return nil
}
// 选择一个处于 waiting 状态的任务,返回其 TaskId,
// 若所有任务均已完成,返回 -1
func (m *Master) choiceTask() int {
for {
var allFinished = true
for i := 0; i < len(m.TaskList); i++ {
if m.TaskList[i].TaskStatus == "waiting" {
m.TaskList[i].TaskStatus = "allocated"
allFinished = false
return i
} else if m.TaskList[i].TaskStatus == "allocated" { // 若有任务没有完成,且没有任务处于waiting状态,则循环等待allocated的任务结束或超时
m.moniterTimeOut()
allFinished = false
}
}
if allFinished {
return -1
}
}
}
// 检测任务的状态,若已被分配的任务超过10秒还没执行结束,
// 则将该任务的状态设置为waiting,以便重新分配
func (m *Master) moniterTimeOut() {
for i := 0; i < len(m.TaskList); i++ {
if m.TaskList[i].TaskStatus == "allocated" && time.Since(m.TaskList[i].StartTime) > time.Second*10 {
m.TaskList[i].TaskStatus = "waiting"
}
}
}
如果在分配任务时发现任务列表里面所有的任务均已经完成,则需要根据当前任务所处的map/reduce阶段采取不同的处理方式。
如果是处在map阶段,而所有任务都已经完成,说明该进入到reduce阶段了。所以将任务阶段Master.Phase设置为”reduce“,同时清空任务列表,生成一组reduce任务放入任务列表中。
如果是处在reduce阶段,而所有任务都已经完成,说明该MapReduce框架执行结束。只需将任务节点Master.Phase设置为”done“,并用一个无限循环将Master阻塞在这里。由于Master每隔1s会调用Done()来判断该框架是否执行结束,因此我们在Done()的实现中只需要判断一下任务阶段是否为”done“即可。若是,Master节点会自动结束运行。
// 所有任务均已经完成,根据所处的阶段执行不同的操作
func (m *Master) dealTaskFinised() {
switch m.Phase {
// 如果 map 阶段结束,则进入 reduce 阶段
case "map":
m.Phase = "reduce"
// 生成一组reduce任务
m.generateReduceTasks()
// 如果 reduce 阶段结束,则 Master 停止运行
case "reduce":
m.Phase = "done"
for {
for _, r := range `-\|/` {
fmt.Printf("\r%c", r)
time.Sleep(time.Millisecond * 100)
}
}
}
}
// map阶段结束后需要生成一组reduce任务,覆盖掉原来任务列表里面的任务
func (m *Master) generateReduceTasks() {
taskInfo := []TaskInfo{}
for i := 0; i < m.ReduceNum; i++ {
task := TaskInfo{"reduce", i, "", "waiting", time.Now(), m.ReduceNum, len(m.TaskList)}
taskInfo = append(taskInfo, task)
}
m.TaskList = taskInfo
}
// main/mrmaster.go calls Done() periodically to find out
// if the entire job has finished.
func (m *Master) Done() bool {
ret := false
// Your code here.
if m.Phase == "done" {
fmt.Println("------------ 所有任务执行结束,退出 --------")
return true
}
return ret
}
Worker执行map/reduce任务
Worker节点通过RPC调用得到一个任务后,若该任务是map任务,则根据文件名读取文件,仿照mrsequential.go中的写法执行map任务。此后,将执行的结果转化为json个数,输出到”mr-X-Y“格式的文件中,其中X是map任务号,Y是通过ihash()函数求得的reduce任务号,以此来标记该文件用于后续第Y个reduce任务的输入。一个map任务对应多个输出文件。
在执行完该任务后,需要通过RPC通知Master该任务已执行完成,修改该任务的状态。
// 执行指定的 map 任务
func DoMapTask(task TaskInfo, mapf func(string, string) []KeyValue) {
// 读取数据文件,调用mapf(),得到结果reduceData
// 此处代码省略
.........
// 将 map 任务的结果转化为 json 格式,输出到文件
GenerateFile(nReduce, reduceData, task.TaskId)
// fmt.Println(">>>>>>>> map 任务 ", task.TaskId, " 执行成功!!!")
//通过 RPC 调用,通知 Master 该 map 任务已完成,修改其状态
ModifyTaskStatus(task.TaskId)
}
Worker执行reduce任务和map任务类似,只是任务的具体操作不同,仿照mrsequential.go中的写法即可。此处要求结果文件命名为”mr-out-Y“,其中Y是reduce任务的编号,一个reduce任务对应一个输出文件。
// 执行 reduce 任务,输出结果到文件中
func DoReduceTask(task TaskInfo, reducef func(string, []string) string) {
// 根据任务号获取到用于该reduce任务的那组文件,
// 读取数据文件,调用reducef(),将结果输出到文件。
// 此处代码省略
.........
//通过 RPC 调用,通知 Master 该 reduce 任务已完成,修改其状态
ModifyTaskStatus(task.TaskId)
}
通知该任务已完成
在map/reduce任务完成后,Worker需要通过RPC要求Master修改该任务的状态为”finished“。
// 修改指定任务的状态
func ModifyTaskStatus(taskId int) {
mu_worker.Lock()
defer mu_worker.Unlock()
ok := call("Master.ModifyTask", &taskId, nil)
if !ok {
fmt.Println("RPC Master.ModifyTask err......")
}
}
为此,Master需要提供相应的RPC接口。
// RPC: worker完成任务后,修改该任务的状态为finished
func (m *Master) ModifyTask(args *int, reply *TaskReply) error {
m.TaskList[*args].TaskStatus = "finished"
return nil
}
至此Lab1要求实现的MapReduce框架已基本完成!
实验结果
- 进入
6.824/src/main
目录。 - 在每次修改worker.go和master.go文件后都需要重新生成一下插件。
go build -buildmode=plugin ../mrapps/wc.go
- 启动Master节点。
go run mrmaster.go pg-*.txt
- 启动Worker节点。
go run mrworker.go wc.so
- 执行测试程序。
sh ./test-mr.sh
-
map阶段执行结束后会生成一组中间文件,如下图所示。
-
reduce阶段执行结束会生成一组最终的结果文件,如下图所示。
-
测试程序执行结果如下所示。
总结
- 经过学习go,看论文,做实验这整个过程,收获很大,无论是知识还是学习方法的层面。以前从来没有做过这种类型的实验,从看懂那一点原有代码,到整个实验完成,还是很有成就感的。
- 要集中时间来做才比较高效,不然每次都得回顾之前的思路。
- 思考比写代码更重要!之前每次有一点思路就想马上写代码,结果后面不断推翻自己之间写的东西。在整个流程想清楚,在草稿纸上画画流程图,把调用关系理清楚后,真正用在写代码上的时间其实是很少的。
- 最后完整代码以及输出的结果文件放在GitHub仓库中了,虽然MIT的教授不建议开源,但GitHub上开源的也不少。如果真想自己独立完成的伙伴我想也不会一上来就找源码吧。本人代码功底较差,很多模块都独立得不够好,代码也比较臃肿,仅供参考。
评论区