-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstream_controller.go
121 lines (115 loc) · 2.37 KB
/
stream_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package gogtp
import (
"bufio"
"context"
"errors"
"io"
"strings"
)
//StreamController 输入输入流控制
type StreamController struct {
StdIn io.WriteCloser
StdOut io.ReadCloser
StdErr io.ReadCloser
CancelContext context.Context
CancelFunc context.CancelFunc
}
//NewStreamController 创建输入输出流控制器
func NewStreamController(in io.WriteCloser, out,err io.ReadCloser) *StreamController {
return &StreamController{
StdIn: in,
StdOut: out,
StdErr:err,
}
}
//Wait 等待命令执行
func (sc *StreamController) Wait() {
for {
if sc.CancelContext != nil {
<-sc.CancelContext.Done()
}
break
}
}
//Stop 停止命令执行
func (sc *StreamController) Stop() {
if sc.CancelFunc!=nil{
sc.CancelFunc()
}
}
//SendCommand 发送命令
func (sc *StreamController) SendCommand(command cmdOptions, respFunc RespFunc) error {
if sc.CancelContext != nil {
<-sc.CancelContext.Done()
}
in := command.ToString()
if in == "" {
return errors.New("command is not null")
}
sc.CancelContext, sc.CancelFunc = context.WithCancel(context.TODO())
firstLine := true
content := strings.Builder{}
go func() {
reader := bufio.NewReader(sc.StdOut)
for {
if sc.CancelContext == nil {
break
}
select {
case <-sc.CancelContext.Done():
sc.CancelContext = nil
sc.CancelFunc = nil
break
default:
line, err := reader.ReadString('\n')
if io.EOF == err {
break
} else if err != nil {
respFunc(Response{
Command: in,
Result: "",
Error: err,
})
sc.CancelFunc()
}
if firstLine && (len(line) == 0 || (!strings.Contains(line, "=") && !strings.Contains(line, "?"))) {
continue
}
firstLine = false
content.WriteString(line + "\n")
if command.End {
respFunc(Response{
Command: in,
Result: content.String(),
Error: nil,
})
sc.CancelFunc()
continue
}
respFunc(Response{
Command: in,
Result: content.String(),
Error: nil,
})
}
}
}()
_, err := sc.StdIn.Write([]byte(in+"\n"))
if err != nil {
return err
}
return nil
}
//ListenStdErr std err 输出监听
func (sc *StreamController) ListenStdErr(sub func(string)) {
buffer := bufio.NewReader(sc.StdErr)
go func() {
for {
line, err := buffer.ReadString('\n')
if err != nil || io.EOF == err {
return
}
sub(line)
}
}()
}