6.5840 Lab1

Misc

  • ls -a 查看隐藏文件

  • 将本地主机的 pub 公钥存放在远程主机的 ~/.ssh/authorized_keys 中即可免密登录

  • Ubuntu 一行安装 Go 最新版本 apt install golang。想起之前安装某些环境时需要手动 export 路径,但是这次没有。查看 ~/.bashrc 或者 ~/.profile 文件中都没有 go 相关的路径导出。发现是将 go 的可执行文件直接放在了 /usr/bin 中,这也相当于在 $PATH 中了。可使用 which go 查到

  • 设置 goproxy,否则 VSCode 无法正常下载 Go 有关的插件

    1
    2
    3
    4
    vim ~/.bashrc
    # 添加 export GOPROXY=https://goproxy.io,direct
    source ~./bashrc
    go env GOPROXY

Go

  • 样例程序的脚本解释

    1
    2
    3
    go build -buildmode=plugin ../mrapps/wc.go
    rm mr-out*
    go run mrsequential.go wc.so pg*.txt
    • go build 是编译命令,其中 -buildmode=plugin 指定以「插件」模式编译。插件是一种特殊的共享库,可以在运行时被 Go 程序动态加载。这在实现插件化设计或热插拔功能时非常有用。编译后的插件的后缀名为 .so
    • rm mr-out* 使用正则表达式删除前面运行可能生成的文件
    • go run 是编译运行命令。后面的插件名和文件名为运行的命令行参数
  • 需要指出的是 main 包内的诸多演示文件定义了同名的函数,这在 Go 中是不允许的。Go 中一个包下的所有源代码在编译时被视为一个整体,因此一个源代码文件中的全局标识符(如函数、全局常量、全局变量)必须和其他的源代码文件中的上述标识符不同

  • 要对一个类的切片类型调用 sort.Sort() 方法,要求这个切片类型实现 sort.Interface,也即实现三个方法(Go 中实现接口不必显式声明)

    • Len() int
    • Less(i, j int) bool
    • Swap(i, j int)

    示例如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    type KeyValue struct {
    Key string
    Value string
    }

    type ByKey []KeyValue

    func (a ByKey) Len() int {
    return len(a)
    }

    func (a ByKey) Less(i, j int) bool {
    return a[i].Key < a[j].Key
    }

    func (a ByKey) Swap(i, j int) {
    a[i], a[j] = a[j], a[i]
    }
  • sync.Map

    1
    2
    3
    4
    5
    var m sync.Map
    m.Store("key", "value")
    val, ok := m.Load("key") // 第二个返回值返回键是否存在
    // 注意 val 是一个 any 类型的值,需要类型断言
    m.delete("key")

MapReduce 论文阅读

编程模型

  • Map 函数的 input:键值对(实际上只需要用到值);Map 函数的 Output:键值对(Emit 出,这个键值对也叫中间键值对)
  • Reduce 函数的 input:具有相同 key 的中间键值对的 value 构成的集合;Reduce 函数的 Output:一个有意义的导出数据

Lab1 中提供的一个简单的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

//
// a word-count application "plugin" for MapReduce.
//
// go build -buildmode=plugin wc.go
//

import "6.5840/mr"
import "unicode"
import "strings"
import "strconv"

// The map function is called once for each file of input. The first
// argument is the name of the input file, and the second is the
// file's complete contents. You should ignore the input file name,
// and look only at the contents argument. The return value is a slice
// of key/value pairs.
func Map(filename string, contents string) []mr.KeyValue {
// function to detect word separators.
ff := func(r rune) bool { return !unicode.IsLetter(r) }

// split contents into an array of words.
words := strings.FieldsFunc(contents, ff)

kva := []mr.KeyValue{}
for _, w := range words {
kv := mr.KeyValue{w, "1"}
kva = append(kva, kv)
}
return kva
}

// The reduce function is called once for each key generated by the
// map tasks, with a list of all the values created for that key by
// any map task.
func Reduce(key string, values []string) string {
// return the number of occurrences of this word.
return strconv.Itoa(len(values))
}

实现

MapReduce概览

任务

  • 填充 mr/rpc.go, mr/worker.go, mr/coordinator.go
  • 一个输入文件运行一次 map,一次 map 输出 10 个 中间文件,一个 reduce 输出一个输出文件
  • coordinator 与 worker 之间通过 RPC 进行通信(可以理解为传递一个 JSON)

思路

  • 容错机制 Fault Tolerence

    主节点故障则整个任务失败(这是 MapReduce 论文中提到的处理方法。事实上,本实验中不会出现主节点失败的情形)

    从节点故障,有以下几种情况

    • 做 map 时故障

      主要是在主节点分配 map 任务后注册一个 10s 后执行的函数,查看当前这个任务是否真的执行完成了。如果没有执行完成,那么将这个任务从 working 队列撤销,重新加入 waiting 队列。不过这样的实现也会带来一定的问题,比如一个从节点不是真的宕掉而是运行的很慢,即使主节点在 10s 后选择重做这个任务,这个被认为是宕掉的节点也能读写中间文件或者是读写结果文件。不过考虑到它生成的结果与其他从节点是一致的,因此没有特别处理这一点。

      考虑到故障的存在,所以需要一个额外的 working 队列。理由如下:在我的实现中,通过 cntMap 变量是否为 0 来判断所有 map 任务是否做完;而每个 worker 做完一个 map 任务后,通过 CallMapTaskDone 告知 coordinator。在一个有缺陷的实现中,coordinator 每处理一次 MapTaskDone,就将 cntMap 减一。但是,考虑故障的存在,可能存在对于同一个 map 任务的来自不同 worker 的成功通告,如果不加考虑地直接将 cntMap 减一,cntMap 其实会提前为 0 或者变为负数,这是不正确的实现。那么,设置一个 working 队列,只有当通告成功的 map task 还在 working 队列时,才将 cntMap 减一

      以上的收到重复确认的场景,在软件架构中称为「幂等性」,这本身是离散数学关系运算中的一个名词,指的是 ;幂等性(Idempotence)是软件设计中的一个重要概念,特别是在分布式系统、网络通信和 API 设计中。一个操作是幂等的,意味着无论这个操作进行多少次,都会产生相同的结果,至少在最终状态上是这样。也就是说,执行一次和多次效果是相同的

    • 做 reduce 时故障

      同上

    • 写文件时故障,基于实验手册的 Hints 实现

      To ensure that nobody observes partially written files in the presence of crashes, the MapReduce paper mentions the trick of using a temporary file and atomically renaming it once it is completely written. You can use ioutil.TempFile to create a temporary file and os.Rename to atomically rename it.

  • 并发性能

    改进的实现中,对于 map 任务的等待队列换为了通道去实现,因为这个队列的大小其实是未知的。并且在这个地方踩了一个坑。因为通道的初始时实际上是必须指定最大容量的!如果缺省容量,那么容量就为 1,这也称作不带缓冲的通道。这样的话将导致后续对通道的写全部阻塞,整个程序进入死循环。代码中将通道的容量取一个较大的数 100

    其他的队列,由于实际上可以预先知道其长度,因此没有使用通道实现,而是使用了 map 和相应的 sync.Mutex

  • 退出机制

    当所有任务做完,coordinator 与 worker 应该及时退出(虽然不考虑这个环节,评测脚本也能顺利通过)。在 worker 端,可以利用 call() 函数的返回值来判断 coordinator 能否正常连通,如果不能连通,可以认为整个任务已经结束了,worker 可以退出。另外,在 worker 所有任务做完以后也应该退出。在 coordinator 端,判断 cntReduce 是否为 0 来判断所有的任务是否做完。值得注意的是,为了使得所有 worker 先于 coordinator 退出,coordinator 在退出之前应该先等一会。如果 coordinator 先退出了,可能会出现 RPC 通信失败的报错

结果

Lab1测试结果
Lab1测试结果

备注

  • 在进行 crash test 的时候十分慢,目前还未知道原因,也许是本实验中的并发控制过于粗糙的原因

后记

  • 在用测试脚本进行某几个测试的时候出现了找不到文件的错误,而手动运行时是没问题的。排查后可知是执行路径出错了。手动运行(使用 go run)的执行路径是 src/main,而脚本的执行路径(直接执行编译后的二进制文件)是 src/main/mr-tmp。这一点在以往的编程实践中没有注意到(或许是我忘了...)

  • 关于并发控制的实现较为粗糙,也许这也是造成性能瓶颈的一个原因。可以考虑将一些数据结构换为通道去实现,因为通道是天生并发安全的

  • 本次实验收获特别多。首先是第一次体会到了并发编程以及使用 Go 进行并发编程的优雅。其次是学习了许多操作系统相关的 API,理解了几个 shell 脚本的函数

  • 十分喜欢 Go 语言的多返回值特性,其他语言中只能用传引用/指针或者是定义一个类的方式来实现了

  • 除了上述提到的路径的错误之外,本地简单测试以后运行脚本就得到了通过。这让我深深怀疑我的编码质量以及测试脚本的覆盖性。另外本实验中的编码风格也十分不好,后续打算参考一些别人的代码

  • GPT4 果然是效率神器!这里感谢一位友人提供的 Team 版本


6.5840 Lab1
https://balddemian.github.io/6.5840-Lab1/
作者
Peiyang He
发布于
2024年4月12日
许可协议