Pipeline
Posted on
在介绍 io.Pipe
之前,我们先来看一段非常简单的代码,如果现在需要向服务器发送一个 POST
请求,按流程一步步来,首先构建一个消息结构体,然后将该结构体编码成 json
格式的数据,并将该数据暂时存储在 buf
中,最后利用 http.Post
发送该请求,轻松愉快。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| type message struct { Name string Age int }
func main() { m := &message{Name: "yiyangyi", Age: 23} var buf bytes.Buffer err := json.NewEncoder(&buf).Encode(m) if err != nil { log.Fatal(err) }
resp, err := http.Post("example.com", "application/json", &buf) if err != nil { log.Fatal(err) } defer resp.Body.Close() b, err := ioutil.ReadAll(resp.Body) if err != nil { log.Fatal(err) } fmt.Println(string(b)) }
|
但是,上面的代码存在一个问题,就是我们需要分配一个缓存区( bytes.Buffer
)存储编码后的数据,为了节省内存,能不能通过一个方法来避免此部分内存的分配呢,解决方法就是我们今天的主角——io.Pipe
。
io.Pipe
执行后会得到一个 PipeReader
和 PipeWriter
,当程序向PipeWriter
中写入数据时,PipeReader
可以同步地收到这部分消息,根据这个特性,我们将 PipeWriter
代替 bytes.Buffer
作为参数 (它们同为 io.Writer
) 传入 json.NewEncoder
中,当我们 Encode
数据时,会直接向 PipeWriter
写入数据,此时 Pipe
的另一边,PipeReader
可以马上收到数据,利用 io.Pipe
我们就避免了中间非必要缓存的分配。
由于使用 io.Pipe
发送和接收数据是一个同步的过程 (类似于 unbuffered channel
),所以需要让发送和接收在两个 goroutine
中进行,如果它们存在于一个 goroutine
中,会导致程序死锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| type message struct { Name string Age int }
func main() { pr, pw := io.Pipe()
go func() { m := &message{Name: "yiyangyi", Age: 23} err := json.NewEncoder(pw).Encode(m) pw.CloseWithError(err) }()
resp, err := http.Post("example.com", "application/json", pr) if err != nil { log.Fatal(err) } defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body) if err != nil { log.Fatal(err) } fmt.Println(string(b)) }
|
io.TeeReader & io.MultiReader ❤️ io.Pipe
io.Pipe
经常与 io.TeeReader
和 io.MultiReader
一起使用,下面有两个这样的例子。
io.Pipe ❤️ io.TeeReader
1
| tr <--- r -----> pw ------ > pr
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| func upload(r io.Reader) { pr, pw := io.Pipe() tr := io.TeeReader(r, pw)
done := make(chan bool) defer close(done)
go func() { defer pw.Close() uploadFile(tr) done <- true }()
go func() { webmr := transcode(pr) uploadFile(webmr) done <- true }()
for c := 0; c < 2; c++ { <-done } }
|
io.Pipe ❤️ io.MultiReader
1 2 3 4 5 6 7
| <- pr1 <-----> pw1 --\
<- pr2 <-----> pw2 ---- > multiwriter(pw1,pw2,pw3,pw4) <----- r <- pr3 <-----> pw3 ----
<- pr4 <-----> pw4 --/
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| func TranscodeAll(r io.Reader) { mp4R, mp4W := io.Pipe() oggR, oggW := io.Pipe() wavR, wavW := io.Pipe() webmR, webmW := io.Pipe()
done := make(chan bool) defer close(done)
go transcodeMP4(mp4R, done) go transcodeOgg(oggR, done) go transcodeWav(wavR, done) go transcodeWebM(webmR, done)
go func() { defer mp4W.Close() defer oggW.Close() defer wavW.Close() defer webmW.Close()
mw := io.MultiWriter(mp4W, oggW, wavW, webmW) io.Copy(mw, r) }()
for c := 0; c < 4; c++ { <-done } }
|