-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpipe.go
146 lines (120 loc) · 3.12 KB
/
pipe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
//
// pipe.go
//
// Created by Frederic DELBOS - [email protected] on Apr 26 2015.
// This file is subject to the terms and conditions defined in
// file 'LICENSE', which is part of this source code package.
//
// Package pipe is a simple stream processing library that works like Unix pipes.
// This library has no external dependencies and is fully asynchronous.
// Create a Pipe from a Reader, add some transformation functions and
// get the result on an io.Writer.
package pipe
import (
"io"
)
// Pipe object
type Pipe struct {
reader *io.PipeReader
errors []chan error
errorWriter chan error
// Total number of bytes read at the origin of the Pipe.
TotalIn int64
// Total number of bytes written at the end of the Pipe.
TotalOut int64
}
// Filter are functions to transform the stream
// and add them to the Pipe.
type Filter func(io.Reader, io.Writer) error
// New create a new Pipe that reads from reader.
func New(reader io.Reader) *Pipe {
r, w := io.Pipe()
p := &Pipe{
reader: r,
errors: make([]chan error, 1),
}
p.errors[0] = make(chan error, 1)
p.errorWriter = make(chan error, 1)
go func(errCh chan error) {
total, err := io.Copy(w, reader)
w.Close()
p.TotalIn = total
errCh <- err
}(p.errors[0])
return p
}
// Push appends a function to the Pipe.
// Note that you can add as many functions as you like at once or
// separatly. They will be processed in order.
func (p *Pipe) Push(procs ...Filter) *Pipe {
for _, proc := range procs {
if proc == nil {
continue
}
err := make(chan error, 1)
p.errors = append(p.errors, err)
r, w := io.Pipe()
go func(p Filter, r io.Reader, w *io.PipeWriter, err chan error) {
err <- p(r, w)
w.Close()
}(proc, p.reader, w, err)
p.reader = r
}
return p
}
// To writes the ouptut of the Pipe in w.
func (p *Pipe) To(w io.Writer) *Pipe {
go func() {
total, err := io.Copy(w, p.reader)
p.TotalOut = total
p.errorWriter <- err
}()
return p
}
// ToCloser writes the ouptut of the Pipe in io.WriteCloser w and close at the end.
func (p *Pipe) ToCloser(w io.WriteCloser) *Pipe {
go func() {
total, err := io.Copy(w, p.reader)
p.TotalOut = total
if err == nil {
err = w.Close()
}
p.errorWriter <- err
}()
return p
}
// Exec waits for the Pipe to complete and returns an error if any
// of the functions failed.
func (p *Pipe) Exec() error {
defer p.reader.Close()
for i := range p.errors {
if err := <-p.errors[i]; err != nil {
close(p.errors[i])
return err
}
close(p.errors[i])
}
defer close(p.errorWriter)
return <-p.errorWriter
}
// Tee creates a new Pipe to duplicate the stream.
// The stream will pass through all previously pushed functions
// before going through the tee Pipe.
// Functions pushed to the original Pipe after a call to Tee will
// not alter the new Tee Pipe.
func (p *Pipe) Tee() *Pipe {
tR, tW := io.Pipe()
reader := io.TeeReader(p.reader, tW)
newR, newW := io.Pipe()
err := make(chan error, 1)
p.errors = append(p.errors, err)
go func(errCh chan error) {
_, err := io.Copy(newW, reader)
errCh <- err
newW.Close()
tW.Close()
}(err)
newPipe := New(tR)
p.reader = newR
return newPipe
}