Pipeline
Posted on

在介绍 io.Pipe 之前,我们先来看一段非常简单的代码,如果现在需要向服务器发送一个 POST 请求,按流程一步步来,首先构建一个消息结构体,然后将该结构体编码成 json 格式的数据,并将该数据暂时存储在 buf 中,最后利用 http.Post 发送该请求,轻松愉快。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| type message struct { Name string Age int }
func main() { m := &message{Name: "yiyangyi", Age: 23} var buf bytes.Buffer err := json.NewEncoder(&buf).Encode(m) if err != nil { log.Fatal(err) }
resp, err := http.Post("example.com", "application/json", &buf) if err != nil { log.Fatal(err) } defer resp.Body.Close() b, err := ioutil.ReadAll(resp.Body) if err != nil { log.Fatal(err) } fmt.Println(string(b)) }
|
但是,上面的代码存在一个问题,就是我们需要分配一个缓存区( bytes.Buffer )存储编码后的数据,为了节省内存,能不能通过一个方法来避免此部分内存的分配呢,解决方法就是我们今天的主角——io.Pipe 。
io.Pipe 执行后会得到一个 PipeReader 和 PipeWriter ,当程序向PipeWriter 中写入数据时,PipeReader 可以同步地收到这部分消息,根据这个特性,我们将 PipeWriter 代替 bytes.Buffer 作为参数 (它们同为 io.Writer ) 传入 json.NewEncoder 中,当我们 Encode 数据时,会直接向 PipeWriter 写入数据,此时 Pipe 的另一边,PipeReader 可以马上收到数据,利用 io.Pipe 我们就避免了中间非必要缓存的分配。
由于使用 io.Pipe 发送和接收数据是一个同步的过程 (类似于 unbuffered channel ),所以需要让发送和接收在两个 goroutine 中进行,如果它们存在于一个 goroutine 中,会导致程序死锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| type message struct { Name string Age int }
func main() { pr, pw := io.Pipe()
go func() { m := &message{Name: "yiyangyi", Age: 23} err := json.NewEncoder(pw).Encode(m) pw.CloseWithError(err) }()
resp, err := http.Post("example.com", "application/json", pr) if err != nil { log.Fatal(err) } defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body) if err != nil { log.Fatal(err) } fmt.Println(string(b)) }
|
io.TeeReader & io.MultiReader ❤️ io.Pipe
io.Pipe 经常与 io.TeeReader 和 io.MultiReader 一起使用,下面有两个这样的例子。
io.Pipe ❤️ io.TeeReader
1
| tr <--- r -----> pw ------ > pr
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| func upload(r io.Reader) { pr, pw := io.Pipe() tr := io.TeeReader(r, pw)
done := make(chan bool) defer close(done)
go func() { defer pw.Close() uploadFile(tr) done <- true }()
go func() { webmr := transcode(pr) uploadFile(webmr) done <- true }()
for c := 0; c < 2; c++ { <-done } }
|
io.Pipe ❤️ io.MultiReader
1 2 3 4 5 6 7
| <- pr1 <-----> pw1 --\
<- pr2 <-----> pw2 ---- > multiwriter(pw1,pw2,pw3,pw4) <----- r <- pr3 <-----> pw3 ----
<- pr4 <-----> pw4 --/
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| func TranscodeAll(r io.Reader) { mp4R, mp4W := io.Pipe() oggR, oggW := io.Pipe() wavR, wavW := io.Pipe() webmR, webmW := io.Pipe()
done := make(chan bool) defer close(done)
go transcodeMP4(mp4R, done) go transcodeOgg(oggR, done) go transcodeWav(wavR, done) go transcodeWebM(webmR, done)
go func() { defer mp4W.Close() defer oggW.Close() defer wavW.Close() defer webmW.Close()
mw := io.MultiWriter(mp4W, oggW, wavW, webmW) io.Copy(mw, r) }()
for c := 0; c < 4; c++ { <-done } }
|