Pipeline

pipeline

在介绍 io.Pipe 之前,我们先来看一段非常简单的代码,如果现在需要向服务器发送一个 POST 请求,按流程一步步来,首先构建一个消息结构体,然后将该结构体编码成 json 格式的数据,并将该数据暂时存储在 buf 中,最后利用 http.Post 发送该请求,轻松愉快。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type message struct {
Name string
Age int
}

func main() {
m := &message{Name: "yiyangyi", Age: 23}
var buf bytes.Buffer
err := json.NewEncoder(&buf).Encode(m)
if err != nil {
log.Fatal(err)
}

resp, err := http.Post("example.com", "application/json", &buf)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Println(string(b))
}

但是,上面的代码存在一个问题,就是我们需要分配一个缓存区( bytes.Buffer )存储编码后的数据,为了节省内存,能不能通过一个方法来避免此部分内存的分配呢,解决方法就是我们今天的主角——io.Pipe

io.Pipe 执行后会得到一个 PipeReaderPipeWriter ,当程序向PipeWriter 中写入数据时,PipeReader 可以同步地收到这部分消息,根据这个特性,我们将 PipeWriter 代替 bytes.Buffer 作为参数 (它们同为 io.Writer ) 传入 json.NewEncoder 中,当我们 Encode 数据时,会直接向 PipeWriter 写入数据,此时 Pipe 的另一边,PipeReader 可以马上收到数据,利用 io.Pipe 我们就避免了中间非必要缓存的分配。

由于使用 io.Pipe 发送和接收数据是一个同步的过程 (类似于 unbuffered channel ),所以需要让发送和接收在两个 goroutine 中进行,如果它们存在于一个 goroutine 中,会导致程序死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type message struct {
Name string
Age int
}

func main() {
pr, pw := io.Pipe()

go func() {
m := &message{Name: "yiyangyi", Age: 23}
err := json.NewEncoder(pw).Encode(m)
pw.CloseWithError(err)
}()

resp, err := http.Post("example.com", "application/json", pr)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Println(string(b))
}

io.TeeReader & io.MultiReader ❤️ io.Pipe

io.Pipe 经常与 io.TeeReaderio.MultiReader 一起使用,下面有两个这样的例子。

io.Pipe ❤️ io.TeeReader

1
tr  <--- r -----> pw ------ > pr
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func upload(r io.Reader) {
pr, pw := io.Pipe()
tr := io.TeeReader(r, pw)

done := make(chan bool)
defer close(done)

go func() {
defer pw.Close()
uploadFile(tr)
done <- true
}()

go func() {
webmr := transcode(pr)
uploadFile(webmr)
done <- true
}()

for c := 0; c < 2; c++ {
<-done
}
}

io.Pipe ❤️ io.MultiReader

1
2
3
4
5
6
7
<- pr1 <-----> pw1 --\

<- pr2 <-----> pw2 ----
> multiwriter(pw1,pw2,pw3,pw4) <----- r
<- pr3 <-----> pw3 ----

<- pr4 <-----> pw4 --/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func TranscodeAll(r io.Reader) {
mp4R, mp4W := io.Pipe()
oggR, oggW := io.Pipe()
wavR, wavW := io.Pipe()
webmR, webmW := io.Pipe()

done := make(chan bool)
defer close(done)

go transcodeMP4(mp4R, done)
go transcodeOgg(oggR, done)
go transcodeWav(wavR, done)
go transcodeWebM(webmR, done)

go func() {
defer mp4W.Close()
defer oggW.Close()
defer wavW.Close()
defer webmW.Close()

mw := io.MultiWriter(mp4W, oggW, wavW, webmW)
io.Copy(mw, r)
}()

for c := 0; c < 4; c++ {
<-done
}
}
Pieces of Valuable Programming Knowledges