找回密码
 立即注册
首页 业界区 安全 MIT6.824 lab1 实验反思

MIT6.824 lab1 实验反思

决台 2025-9-28 18:34:40
MIT6.824 lab1 实验反思

0. 实验文档

6.5840 Lab 1: MapReduce
1. 环境搭建

实验文档中并没有指出应该在哪个操作系统的环境下进行实验,但是需要使用到 Go 的插件功能,考虑到插件功能在 Windows 上支持有限,所以应该选择类 Unix 系统。恰好手里有一台Linux的物理机,索性就用 VsCode + ssh 的环境进行实验了。
1.1 配置 Go SDK

以下摘自 6.5840 Go:
Depending on your Linux distribution, you might be able to get an up-to-date version of Go from the package repository, e.g. by running apt install golang. Otherwise, you can manually install a binary from Go's website. First, make sure that you're running a 64-bit kernel (uname -a should mention "x86_64 GNU/Linux"), and then run:
  1. $ wget -qO- https://go.dev/dl/go1.25.0.linux-amd64.tar.gz | sudo tar xz -C /usr/local
复制代码
You'll need to make sure /usr/local/go/bin is on your PATH. You can do this by adding export PATH=$PATH:/usr/local/go/bin to your shell's init file ( commonly this is one of .bashrc, .bash_profile or .zshrc)
本次实验采用的 Go 版本是 1.25.0 。
1.2 VsCode 插件安装

VsCode这边大概要安装一个Go语言的插件,Code Runner 随意吧
1.3 VsCode 编码体验优化

执行命令:
  1. $ go install -v github.com/sqs/goreturns@latest
复制代码

  • 该命令会在你的 $GOPATH/bin 目录下(或者如果你设置了 $GOBIN,则在 $GOBIN 目录下)生成一个名为 goreturns 的可执行文件。
  • goreturns 是一个用于格式化 Go 源代码的工具,特别适用于在保存文件时自动格式化。它在 gofmt 的基础上增加了额外的功能,比如自动添加或移除导入语句(类似 goimports),并且会针对返回语句(return)进行特殊处理(例如在函数返回多个值且类型复杂时,自动拆分成多行以增强可读性),因此得名 “goreturns”。
1.4 VsCode debug 工具配置(重要)

在项目根目录下(当然也可以自己选择一个工作目录,feel free)新建 .vscode/launch.json,编写好基本的配置:
1.png

关键配置详解:

  • configurations.program:  要debug的程序入口文件
  • configurations.cwd: 程序工作目录
  • configurations.args:  参数,这里是编译好的插件和输入文件
  • configurations.bulldFlags: 开启竞态条件检测
这里的 debug 工具默认使用 $GOPATH/bin 路径下的 delve 工具。通常来说,delve 工具支持的 Go SDK 版本一定要和当前 Go SDK 版本是兼容的才行。本机环境是 Go 1.25.0,采用的 delve 版本是 1.25.1、
在配置好这些以后启动调试,可能会出现一个匪夷所思的问题:
2.png

这里显示无法加载 wc.go 编译来的插件。
如果我们使用实验文档给出来的编译命令的确是会出现这个问题。原因是通常来说,编译器对源码进行编译的时候多少会带点优化手段,比如调整一些指令顺序,以达到较好的性能。但这样对于 debug 肯定是不利的。
解决办法就是,禁止编译器进行优化。所以如果要使用 delve 对代码进行 debug 的话,需要把编译插件的命令改成这样:
  1. $ go build -race -buildmode=plugin -gcflags="all=-N -l" -o wc.so ../mrapps/wc.go
复制代码
参数详解:

  • race:开启竞态条件检测
  • buildmode=plugin:输出是共享库文件
  • gcflags=

    • all:表示这些选项应用于当前模块的所有包(包括依赖包)
    • N:禁止编译器优化
    • -l:禁止函数内联

做完wg.so插件的一些琐碎的工作以后,运行 debug,会发现还有一个问题:
3.png

这是因为 go build 命令不支持通配符的使用,解决方案是直接把文件名写死在 debug 配置文件里:
4.png

做好上面的一切以后,就可以开始快乐 debug 了。Have fun~
2. lab——,启动——

2.1 需求描述


  • 简单复刻 MapReduce 论文中描述的分布式数据处理系统——其中包含一个 master (实验中采用了 Coordinator 这个术语)和若干个 worker。master 负责调度分配任务,监测 worker 是否可能下线;worker 则负责执行具体的 Map 阶段任务和 Reduce 阶段任务
  • 主要需求如下:

    • 协调者(coordinator)

      • 跟踪所有 Map 和 Reduce 任务的状态(待分配、进行中、已完成)。
      • 响应工作者的任务请求,分配任务(Map任务或Reduce任务)。
      • 当Map任务全部完成后,才能开始分配Reduce任务。
      • 监控任务超时(10秒),将超时任务重新分配。
      • 当所有任务完成后,协调者退出(通过Done()方法返回true)。

    • 工作者(worker)

      • 循环向协调者请求任务。
      • 根据任务类型(Map/Reduce)读取输入文件,调用对应的Map/Reduce函数。
      • 将Map任务的输出写入中间文件(按照nReduce分桶),格式为mr-X-Y,其中X是Map任务编号,Y是Reduce任务编号。
      • 将Reduce任务的输出写入最终文件mr-out-X(每个Reduce任务一个输出文件)。
      • 如果协调者没有任务可分配(例如所有任务已完成),工作者应退出。

    • 文件处理

      • Map任务:输入是多个文件(每个文件对应一个Map任务),输出是nReduce个中间文件(每个中间文件对应一个Reduce任务桶)。
      • Reduce任务:从多个Map任务产生的中间文件(同一个Reduce桶编号)中读取数据,执行Reduce函数,输出到一个最终文件。
      • 使用JSON格式写入和读取中间文件(键值对)。
      • 最终输出文件mr-out-X的格式:每行是"%v %v",即键和值。

    • 容错

      • 工作者可能在执行任务时崩溃,协调者等待10秒后重新分配该任务给其他工作者。
      • 确保最终结果正确,不能因为任务重复执行而出现错误(Map和Reduce函数必须是幂等的)。

    • 并发

      • 多个Map任务和Reduce任务可以并行执行。
      • 协调者需要处理并发的RPC请求,注意锁的使用。

    • 测试

      • 通过test-mr.sh测试,包括:单词计数(wc)、索引生成(indexer)、并行性测试、任务计数、提前退出测试和崩溃测试。


2.2 详细实现方案

2.2.1 任务及其状态

定义Map任务和Reduce任务的结构:
  1. type Task struct {
  2.         TimeStamp time.Time // time stamp that give the task to any worker
  3.         Filenames []string  // input files
  4.         TaskID    int       // task ID
  5.         TaskType  int       // task type, 0 map task, 1 reduce task
  6.         Status    int       // task status, 0 init, 1 ready, 2 in process, 3 completed, 4 failed
  7. }
复制代码
这里我们要记录每个任务的开始时间戳、输入文件列表、任务ID、任务类型、任务状态。其中任务包含3个状态,分别是就绪、处理中和已完成:
  1. const (
  2.         TASK_READY = iota // ready
  3.         TASK_IN_PROCESS   // in process
  4.         TASK_COMPLETED    // completed
  5. )
复制代码
其有限状态机如下:
5.png

任务创建的时候,应该是赋成 ready 状态。这个时候的时间戳没有含义,所以也应该赋零值;在任务分发出去以后,应该进入 in process 状态,表示任务正在处理中。这期间有3种情况:成功、失败、超时。如果任务处理成功了,自然标记成 completed 状态,表示任务完成;如果任务处理失败,或者是超时,在 Coordinator 这边都只能感知到 worker 进程有一段时间没有发送任务完成的通知了,所以就把任务初始化,然后分配给下一个来寻求新任务的 worker。
2.2.2 Coordinator 实现细节

Coordinator 结构

对于 Coordinator来说,需要记录所有的输入文件的信息,所有的 Map 任务和 Reduce 任务的信息,同时考虑到这些任务信息都是并发的读写操作,所以需要引入互斥锁对临界资源进行保护。所以定义 Coordinator 结构如下:
  1. type Coordinator struct {
  2.         // Your definitions here.
  3.         mu      *sync.Mutex // mutex lock, to protect 2 maps below
  4.         nReduce int
  5.         tasks   map[int][]*Task // all the tasks, key = 0 -> map tasks, key = 1 -> reduce tasks
  6. }
复制代码
这里我选择了 map 这个数据结构去存放两种类型的任务,每个 key 对应一个任务列表,列表中的就是尚未完成的任务。
Coordinator 需要给 Worker 提供的 RPC

GetTask
  1. func (c *Coordinator) GetTask(args *GetTaskRequest, response *GetTaskResponse) (err error) {
  2.         c.mu.Lock()
  3.         defer c.mu.Unlock()
  4.         mapTaskRem := len(c.tasks[TASK_TYPE_MAP])
  5.         reduceTaskRem := len(c.tasks[TASK_TYPE_REDUCE])
  6.         if mapTaskRem > 0 {
  7.                 err = c.getMapTask(response)
  8.                 if err != nil {
  9.                         return err
  10.                 }
  11.                 return nil
  12.         } else if reduceTaskRem > 0 {
  13.                 err = c.getReduceTask(response)
  14.                 if err != nil {
  15.                         return err
  16.                 }
  17.                 return nil
  18.         }
  19.         response.TaskType = TASK_DONE
  20.         return nil
  21. }
复制代码
根据实验文档的要求,要先完成所有 Map 任务,再处理 Reduce 任务。显然 GetTask 需要实现这一点。大致逻辑是,先加好锁,然后获取 Map 任务剩余数量和 Reduce 任务剩余数量。根据剩余任务数量判断应该获取 Map 任务还是 Reduce 任务。如果两个任务都没有剩余,就可以返回 TASK_DONE 这个任务类型,表示没有任务可做,这时 Worker 就可以退出了。
这里的 TASK_DONE 这个类型是必须的。如果不从这里获取没有任务执行的信息,而单从 response 有没有记录具体任务来判断是否有任务执行的话,会出现一种情况:当前所有任务都在处理中和没有任务处理两种状态是无法区分的,这个时候程序的行为就不符合预期了。
ReportTask
  1. func (c *Coordinator) ReportTask(args *ReportTaskArgs, response *struct{}) error {
  2.         taskID, taskType := args.TaskId, args.TaskType
  3.         c.mu.Lock()
  4.         defer c.mu.Unlock()
  5.         for _, task := range c.tasks[taskType] {
  6.                 if task.TaskID != taskID {
  7.                         continue
  8.                 }
  9.                 if time.Since(task.TimeStamp) < TASK_TIMEOUT { // report in time, and mark the task as completed
  10.                         task.Status = TASK_COMPLETED
  11.                         log.Printf("Coordinator.ReportTask: task %d completed.", taskID)
  12.                 } else { // report timeout, and mark the task as failed
  13.                         task.Status = TASK_READY
  14.                         task.TimeStamp = time.Time{}
  15.                         if task.TaskType == TASK_TYPE_REDUCE {
  16.                                 task.Filenames = nil
  17.                         }
  18.                         log.Printf("Coordinator.ReportTask: task %d timeout.", taskID)
  19.                 }
  20.         }
  21.         return nil
  22. }
复制代码
当 Worker 执行完某个任务以后,就要调用 ReportTask 向 Coordinator 通告当前任务已经完成。这个时候 Coordinator 需要做的就是判断通告到达的时间,如果超出了 TASK_TIMEOUT(10s),就要重置任务状态,期待其他 Worker 重新获取这个任务并执行;否则就标记为已完成状态,等待后台 goroutine 将已完成的任务从任务列表中清除。不过通常来说,这里不会出现 TASK_TIMEOUT,因为 Coordinator 也会在后台跑一个 goroutine,周期性地检查并重置超时任务。
2.2.3 Worker 实现细节

相对于 Coordinator 来说,Worker 就简单很多了,它只要循环地获取任务、执行任务、通告任务完成就好了。
但是也有一点需要注意,比如当前 Worker 实际上已经执行一段时间了,触发了 TASK_TIMEOUT。在 Coordinator 端,发现了这个超时任务,并分配给了其他正在请求任务的 Worker。但是当前的 Worker 并不知道 Coordinator 做的这个事情,它一直执行,直到算出了所有的结果——这个时候它要把结果写入到对应的中间文件中。这里如果处理不当,可能在中间文件中会看到两份一模一样的结果,这样就会影响到最终结果的生成。
根据实验文档的提示,可以使用临时文件+原子性重命名操作来规避这个问题。当 Worker 正在计算结果的时候,要将结果写入临时文件中,当结果计算完之后,就要原子性重命名这个临时文件,将它重命名成当前阶段任务的输出结果文件。当然如果在重命名之前就发现这个输出结果文件存在的话,就不必再进行重命名操作了,因为这个时候有其他 Worker 接替当前 Worker 完成了任务。
  1. func runMapTask(res *GetTaskResponse, mapf func(string, string) []KeyValue) error {
  2.         intermediate := []KeyValue{}
  3.         filenames := res.Filenames
  4.         for _, filename := range filenames {
  5.                 file, err := os.Open(filename)
  6.                 if err != nil {
  7.                         return err
  8.                 }
  9.                 defer file.Close()
  10.                 content, err := ioutil.ReadAll(file)
  11.                 if err != nil {
  12.                         return err
  13.                 }
  14.                 kva := mapf(filename, string(content))
  15.                 intermediate = append(intermediate, kva...)
  16.         }
  17.         // write the kvs into intermediate file
  18.         finalFilenames := make([]string, res.NReduce)
  19.         outFiles := make([]*os.File, res.NReduce)
  20.         outEncoders := make([]*json.Encoder, res.NReduce)
  21.         for i := 0; i < res.NReduce; i++ {
  22.                 filename := fmt.Sprintf("mr-%d-%d", res.TaskId, i)
  23.                 tmpFilename := fmt.Sprintf("tmp-%s", filename)
  24.                 outFiles[i], _ = os.CreateTemp(".", tmpFilename)
  25.                 outEncoders[i] = json.NewEncoder(outFiles[i])
  26.                 finalFilenames[i] = filename
  27.         }
  28.         for _, kv := range intermediate {
  29.                 reduceID := ihash(kv.Key) % res.NReduce
  30.                 outEncoders[reduceID].Encode(&kv)
  31.         }
  32.         for i, tmpFile := range outFiles {
  33.                 if err := tmpFile.Close(); err != nil {
  34.                         return fmt.Errorf("close temp file %s failed: %w", tmpFile.Name(), err)
  35.                 }
  36.                 tmpPath := tmpFile.Name()
  37.                 finalPath := finalFilenames[i]
  38.                 if _, err := os.Stat(finalPath); err == nil { // exist, ignore this rename operation
  39.                         continue
  40.                 }
  41.                 if err := os.Rename(tmpPath, finalPath); err != nil {
  42.                         return fmt.Errorf("rename temp file %s to %s failed: %w", tmpPath, finalPath, err)
  43.                 }
  44.         }
  45.         return nil
  46. }
复制代码
当这个任务完成以后,要向 Coordinator 通告当前任务 ID,表示这个任务已经完成了,通知 Coordinator 后台 goroutine 将其删除。
2.2.4 验证


  • 可以参照实验文档,跑一遍这个分布式计算系统,然后通过命令:
    1. $  cat mr-out-* | LC_ALL=C sort | more
    复制代码
    查看计算结果。
  • 运行 test-mr-many.sh 脚本若干次,如果一直稳定通过,就表示当前实现可以通过实验。
6.png

3. 总结

本实验旨在实现一个简单的分布式计算框架,引出分布式系统的知识。
对于本实验来说,有以下三个难点:

  • 理清 Coordinator 和 Worker 各自的职责
  • 哪些信息需要进行通告
  • 提供一个简单的容错机制保证任务能够及时处理
在阅读过 MapReduce 论文 以后,其实可以发现这个分布式计算框架仍然有很多值得改进的地方。但是对于分布式系统的启蒙作用,不管是于当时的技术人员,还是于当下初步认识分布式系统的我们,都有着极高的历史地位。所以,还是对 Google 的技术人员们说一声:Respect!

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册