侧边栏壁纸
博主头像
如此肤浅博主等级

但行好事,莫问前程!

  • 累计撰写 24 篇文章
  • 累计创建 12 个标签
  • 累计收到 6 条评论

目 录CONTENT

文章目录

MIT6.824_Lab1_2020版

如此肤浅
2023-01-05 / 0 评论 / 0 点赞 / 551 阅读 / 7,601 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2023-01-20,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

前言: 本人比较菜,go零基础入门、越读论文、独立做完Lab1,大概用时10天,但每天投入的时间不是很多,拖拖拉拉的。其中大部分时间都在构建思路,真正写代码也就一天写完的,一次性全部PASS,看到结果的时候比发论文还有成就感。建议后面的同学集中时间做,不然每次开始是都要回顾之前的思路。个人感觉如果只做实验,不记笔记的话就像蜻蜓点水,过几天就忘了,想回顾也很难再开始,于是便有了本文。

前置知识

实验分析

image-1673306588712

Lab1的主要任务就是实现一个如图所示的MapReduce框架,其分为Map阶段和Reduce阶段。在Map阶段,Master节点将一组map任务分配给一组Worker节点来执行。在Map阶段结束后进入Reduce阶段,Master节点将一组reduce任务分配给这组Worker节点执行。其中,Master节点负责任务的分配,Worker节点负责执行具体的任务,具体如下:

  • Master
    ① 初始化Master要维护的信息;
    ② 不断地检测所有任务是否完成,若完成则退出;
    ③ 提供RPC接口:响应Worker申请任务的请求,选择一个尚未完成的map/reduce任务,通过RPC传递给Worker节点;
    ④ 提供RPC接口:响应Worker修改任务状态的请求,需要根据收到的任务标识将该任务的状态修改为“已完成”;

  • Worker
    ① 若该Worker节点处于空闲状态,则通过RPC向Master申请一个任务了。若所有任务执行结束后,Master会先退出,导致RPC请求失败,从而Worker被迫结束;
    ② 根据Master返回的任务信息执行相应的map/reduce操作;
    ③ 当一个map/reduce任务完成后,通过RPC将该任务的标识传回给Master节点,告诉Master该任务已完成。

    MIT6.824 Lab1 MapReduce流程图

实现

定义数据结构

分析了Master节点和Worker节点各自需要完成的工作,我们就开始定义相关的数据结构。

type Master struct {
	// Your definitions here.
	TaskList  []TaskInfo //任务列表
	ReduceNum int        // reduce 任务的个数
	Phase     string     // 任务的阶段(map、reduce、done)
}
// Add your RPC definitions here.

// 任务信息
type TaskInfo struct {
	Phase      string    // 任务阶段:map、reduce
	TaskId     int       // 任务ID,用在Master任务列表中的下标表示
	InputFile  string    // 任务的文件名,仅在map任务中使用
	TaskStatus string    // 任务状态:waiting、allocated、finished
	StartTime  time.Time // 任务的开始时间,用于超时判断
	ReduceNum  int       // 记录reduce任务的数量,用于文件命名
	MapNum     int       // 记录map任务的数量,用于文件命名
}

// worker 通过 RPC 请求任务所传的参数,为空即可
type TaskArgs struct{}

// worker 通过 RPC 请求任务,Master 返回一个任务
type TaskReply struct {
	Task TaskInfo
}

初始化Master

刚开始,任务处于”map“阶段。根据输入的n个文件路径创建n个map任务,放入任务列表TaskList中。在创建任务时,任务等待被分配,因此任务的状态TaskInfo.TaskStatus设置为"waiting"。此处任务的开始时间TaskInfo.StartTime随意赋个值即可,后面在分配任务时会重新赋值。

// create a Master.
// main/mrmaster.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeMaster(files []string, nReduce int) *Master {
	m := Master{}

	// Your code here.
	m.initMaster(files, nReduce)

	m.server()

	return &m
}

// 根据输入的 files 和 nReduce 生成一组 map 任务,并初始化 Master
func (m *Master) initMaster(files []string, nReduce int) {
	m.Phase = "map"
	m.ReduceNum = nReduce

	for index, file := range files {
		task := TaskInfo{"map", index, file, "waiting", time.Now(), nReduce, len(m.TaskList)}
		m.TaskList = append(m.TaskList, task)
	}
}

Worker请求任务

一个Worker节点若处于空闲状态,需要不断地向Master要一个任务,拿到任务后需要根据任务所处的阶段执行相应的map/reduce操作。

// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	// Your worker implementation here.
	for {
		task := AskTask()
		switch task.Phase {
		case "map":
			DoMapTask(task, mapf)
		case "reduce":
			DoReduceTask(task, reducef)
		}
	}
	// uncomment to send the Example RPC to the master.
	// CallExample()
}

在通过RPC请求任务的时候,需要防止多个Worker的相互竞争,为了保证数据一致性,需要定义一个全局锁。如何定义RPC接口参照worker.go中的CallExample()和master.go中的Example()。

// 全局锁,避免多个 Worker 同时向 Master 申请任务
var mu_worker sync.Mutex

// 通过RPC向Master请求一个任务
func AskTask() TaskInfo {
	mu_worker.Lock()
	defer mu_worker.Unlock()

	args := TaskArgs{}
	reply := TaskReply{}

	ok := call("Master.AllocateTask", &args, &reply)
	if !ok {
		fmt.Println("RPC Master.AllocateTask err......")
	}

	return reply.Task
}

Master返回一个任务

Worker只发送一个要任务的请求,而具体分配哪一个任务、修改任务状态、判断任务是否分配完,这些细节都是Master需要考虑的。

首先,我们在任务列表中选择一个待分配(waiting)状态的任务进行分配,分配的时候修改该任务的状态为已分配(allocated),同时修改该任务的开始时间为系统时间。若任务列表中没有处在waiting状态的任务,并且程序还没有结束,说明有任务处于allocated状态。因此需要检测该任务是正在执行还是已经超时,若是该任务已经分配10s以上,但还没运行结束,说明Worker结点出现了问题,需要将该任务的状态修改为"waiting",以便重新分配给一个空闲的Worker节点。

// Your code here -- RPC handlers for the worker to call.
// RPC: 选择一个任务来响应 Worker 的请求
func (m *Master) AllocateTask(args *TaskArgs, reply *TaskReply) error {
	taskId := m.choiceTask()

	// 该阶段的所有任务均已经完成
	if taskId == -1 {
		m.dealTaskFinised()

		// 若是map阶段结束,则把任务列表里面的第一个 reduce 任务返回
		taskId = 0

	}

	m.TaskList[taskId].StartTime = time.Now() // 设置任务开始时间为被分配的时间
	reply.Task = m.TaskList[taskId]
	// fmt.Println("worker 来请求任务!!!")

	return nil
}

// 选择一个处于 waiting 状态的任务,返回其 TaskId,
// 若所有任务均已完成,返回 -1
func (m *Master) choiceTask() int {
	for {
		var allFinished = true
		for i := 0; i < len(m.TaskList); i++ {
			if m.TaskList[i].TaskStatus == "waiting" {
				m.TaskList[i].TaskStatus = "allocated"
				allFinished = false
				return i
			} else if m.TaskList[i].TaskStatus == "allocated" { // 若有任务没有完成,且没有任务处于waiting状态,则循环等待allocated的任务结束或超时
				m.moniterTimeOut()
				allFinished = false
			}
		}

		if allFinished {
			return -1
		}
	}
}

// 检测任务的状态,若已被分配的任务超过10秒还没执行结束,
// 则将该任务的状态设置为waiting,以便重新分配
func (m *Master) moniterTimeOut() {
	for i := 0; i < len(m.TaskList); i++ {
		if m.TaskList[i].TaskStatus == "allocated" && time.Since(m.TaskList[i].StartTime) > time.Second*10 {
			m.TaskList[i].TaskStatus = "waiting"
		}
	}
}

如果在分配任务时发现任务列表里面所有的任务均已经完成,则需要根据当前任务所处的map/reduce阶段采取不同的处理方式。

如果是处在map阶段,而所有任务都已经完成,说明该进入到reduce阶段了。所以将任务阶段Master.Phase设置为”reduce“,同时清空任务列表,生成一组reduce任务放入任务列表中。

如果是处在reduce阶段,而所有任务都已经完成,说明该MapReduce框架执行结束。只需将任务节点Master.Phase设置为”done“,并用一个无限循环将Master阻塞在这里。由于Master每隔1s会调用Done()来判断该框架是否执行结束,因此我们在Done()的实现中只需要判断一下任务阶段是否为”done“即可。若是,Master节点会自动结束运行。

// 所有任务均已经完成,根据所处的阶段执行不同的操作
func (m *Master) dealTaskFinised() {
	switch m.Phase {
	// 如果 map 阶段结束,则进入 reduce 阶段
	case "map":
		m.Phase = "reduce"

		// 生成一组reduce任务
		m.generateReduceTasks()

	// 如果 reduce 阶段结束,则 Master 停止运行
	case "reduce":
		m.Phase = "done"
		for {
			for _, r := range `-\|/` {
				fmt.Printf("\r%c", r)
				time.Sleep(time.Millisecond * 100)
			}
		}
	}
}

// map阶段结束后需要生成一组reduce任务,覆盖掉原来任务列表里面的任务
func (m *Master) generateReduceTasks() {
	taskInfo := []TaskInfo{}
	for i := 0; i < m.ReduceNum; i++ {
		task := TaskInfo{"reduce", i, "", "waiting", time.Now(), m.ReduceNum, len(m.TaskList)}
		taskInfo = append(taskInfo, task)
	}
	m.TaskList = taskInfo
}

// main/mrmaster.go calls Done() periodically to find out
// if the entire job has finished.
func (m *Master) Done() bool {
	ret := false

	// Your code here.
	if m.Phase == "done" {
		fmt.Println("------------ 所有任务执行结束,退出 --------")
		return true
	}

	return ret
}

Worker执行map/reduce任务

Worker节点通过RPC调用得到一个任务后,若该任务是map任务,则根据文件名读取文件,仿照mrsequential.go中的写法执行map任务。此后,将执行的结果转化为json个数,输出到”mr-X-Y“格式的文件中,其中X是map任务号,Y是通过ihash()函数求得的reduce任务号,以此来标记该文件用于后续第Y个reduce任务的输入。一个map任务对应多个输出文件。

在执行完该任务后,需要通过RPC通知Master该任务已执行完成,修改该任务的状态。

// 执行指定的 map 任务
func DoMapTask(task TaskInfo, mapf func(string, string) []KeyValue) {
	// 读取数据文件,调用mapf(),得到结果reduceData
    // 此处代码省略
    .........

	// 将 map 任务的结果转化为 json 格式,输出到文件
	GenerateFile(nReduce, reduceData, task.TaskId)

	// fmt.Println(">>>>>>>> map 任务 ", task.TaskId, " 执行成功!!!")

	//通过 RPC 调用,通知 Master 该 map 任务已完成,修改其状态
	ModifyTaskStatus(task.TaskId)
}

Worker执行reduce任务和map任务类似,只是任务的具体操作不同,仿照mrsequential.go中的写法即可。此处要求结果文件命名为”mr-out-Y“,其中Y是reduce任务的编号,一个reduce任务对应一个输出文件。

// 执行 reduce 任务,输出结果到文件中
func DoReduceTask(task TaskInfo, reducef func(string, []string) string) {
	// 根据任务号获取到用于该reduce任务的那组文件,
    // 读取数据文件,调用reducef(),将结果输出到文件。
    // 此处代码省略
    .........

	//通过 RPC 调用,通知 Master 该 reduce 任务已完成,修改其状态
	ModifyTaskStatus(task.TaskId)
}

通知该任务已完成

在map/reduce任务完成后,Worker需要通过RPC要求Master修改该任务的状态为”finished“。

// 修改指定任务的状态
func ModifyTaskStatus(taskId int) {
	mu_worker.Lock()
	defer mu_worker.Unlock()

	ok := call("Master.ModifyTask", &taskId, nil)
	if !ok {
		fmt.Println("RPC Master.ModifyTask err......")
	}
}

为此,Master需要提供相应的RPC接口。

// RPC: worker完成任务后,修改该任务的状态为finished
func (m *Master) ModifyTask(args *int, reply *TaskReply) error {
	m.TaskList[*args].TaskStatus = "finished"

	return nil
}

至此Lab1要求实现的MapReduce框架已基本完成!

实验结果

  1. 进入6.824/src/main目录。
  2. 在每次修改worker.go和master.go文件后都需要重新生成一下插件。
    go build -buildmode=plugin ../mrapps/wc.go
    
  3. 启动Master节点。
    go run mrmaster.go pg-*.txt
    
  4. 启动Worker节点。
    go run mrworker.go wc.so
    
  5. 执行测试程序。
    sh ./test-mr.sh
    
  • map阶段执行结束后会生成一组中间文件,如下图所示。
    image-1673581835870

  • reduce阶段执行结束会生成一组最终的结果文件,如下图所示。
    image-1673582150097

  • 测试程序执行结果如下所示。
    image-1673582308125

    image-1673582396388

    image-1673582408325

    image-1673582419820

    image-1673582439208

总结

  1. 经过学习go,看论文,做实验这整个过程,收获很大,无论是知识还是学习方法的层面。以前从来没有做过这种类型的实验,从看懂那一点原有代码,到整个实验完成,还是很有成就感的。
  2. 要集中时间来做才比较高效,不然每次都得回顾之前的思路。
  3. 思考比写代码更重要!之前每次有一点思路就想马上写代码,结果后面不断推翻自己之间写的东西。在整个流程想清楚,在草稿纸上画画流程图,把调用关系理清楚后,真正用在写代码上的时间其实是很少的。
  4. 最后完整代码以及输出的结果文件放在GitHub仓库中了,虽然MIT的教授不建议开源,但GitHub上开源的也不少。如果真想自己独立完成的伙伴我想也不会一上来就找源码吧。本人代码功底较差,很多模块都独立得不够好,代码也比较臃肿,仅供参考。
0

评论区