Skip to content

Commit

Permalink
done (#388)
Browse files Browse the repository at this point in the history
  • Loading branch information
guonaihong authored May 23, 2024
1 parent c72fd8f commit b7068a7
Show file tree
Hide file tree
Showing 19 changed files with 104 additions and 206 deletions.
2 changes: 1 addition & 1 deletion bench/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (r *Report) Process(work chan struct{}) {
func (r *Report) WaitAll() {
<-r.waitQuit
//TODO 处理错误
r.outputReport() //输出最终报表
_ = r.outputReport() //输出最终报表
}

func (r *Report) calBody(resp *http.Response, bodySize uint64) {
Expand Down
82 changes: 12 additions & 70 deletions dataflow/dataflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package dataflow
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"net/url"
"time"
Expand All @@ -13,10 +11,10 @@ import (
"github.com/guonaihong/gout/decode"
"github.com/guonaihong/gout/encode"
"github.com/guonaihong/gout/enjson"
"github.com/guonaihong/gout/hcutil"
"github.com/guonaihong/gout/middler"
"github.com/guonaihong/gout/middleware/rsp/autodecodebody"
"github.com/guonaihong/gout/setting"
"golang.org/x/net/proxy"
)

const (
Expand All @@ -38,43 +36,43 @@ type DataFlow struct {

// GET send HTTP GET method
func (df *DataFlow) GET(url string, urlStruct ...interface{}) *DataFlow {
df.Req = reqDef(get, cleanPaths(url), df.out, urlStruct...)
df.Req, df.Err = reqDef(get, cleanPaths(url), df.out, urlStruct...)
return df
}

// POST send HTTP POST method
func (df *DataFlow) POST(url string, urlStruct ...interface{}) *DataFlow {
df.Req = reqDef(post, cleanPaths(url), df.out, urlStruct...)
df.Req, df.Err = reqDef(post, cleanPaths(url), df.out, urlStruct...)
return df
}

// PUT send HTTP PUT method
func (df *DataFlow) PUT(url string, urlStruct ...interface{}) *DataFlow {
df.Req = reqDef(put, cleanPaths(url), df.out, urlStruct...)
df.Req, df.Err = reqDef(put, cleanPaths(url), df.out, urlStruct...)
return df
}

// DELETE send HTTP DELETE method
func (df *DataFlow) DELETE(url string, urlStruct ...interface{}) *DataFlow {
df.Req = reqDef(delete2, cleanPaths(url), df.out, urlStruct...)
df.Req, df.Err = reqDef(delete2, cleanPaths(url), df.out, urlStruct...)
return df
}

// PATCH send HTTP PATCH method
func (df *DataFlow) PATCH(url string, urlStruct ...interface{}) *DataFlow {
df.Req = reqDef(patch, cleanPaths(url), df.out, urlStruct...)
df.Req, df.Err = reqDef(patch, cleanPaths(url), df.out, urlStruct...)
return df
}

// HEAD send HTTP HEAD method
func (df *DataFlow) HEAD(url string, urlStruct ...interface{}) *DataFlow {
df.Req = reqDef(head, cleanPaths(url), df.out, urlStruct...)
df.Req, df.Err = reqDef(head, cleanPaths(url), df.out, urlStruct...)
return df
}

// OPTIONS send HTTP OPTIONS method
func (df *DataFlow) OPTIONS(url string, urlStruct ...interface{}) *DataFlow {
df.Req = reqDef(options, cleanPaths(url), df.out, urlStruct...)
df.Req, df.Err = reqDef(options, cleanPaths(url), df.out, urlStruct...)
return df
}

Expand Down Expand Up @@ -135,7 +133,7 @@ func (df *DataFlow) SetURL(url string, urlStruct ...interface{}) *DataFlow {
}

if df.Req.url == "" && df.Req.req == nil {
df.Req = reqDef(df.method, cleanPaths(url), df.out, urlStruct...)
df.Req, df.Err = reqDef(df.method, cleanPaths(url), df.out, urlStruct...)
return df
}

Expand Down Expand Up @@ -224,81 +222,27 @@ func (df *DataFlow) SetProtoBuf(obj interface{}) *DataFlow {
return df
}

func (df *DataFlow) initTransport() {
if df.out.Client.Transport == nil {
df.out.Client.Transport = &http.Transport{}
}
}

func (df *DataFlow) getTransport() (*http.Transport, bool) {
// 直接return df.out.Client.Transport.(*http.Transport) 等于下面的写法
// ts := df.out.Client.Transport.(*http.Transport)
// return ts 编译会报错
ts, ok := df.out.Client.Transport.(*http.Transport)
return ts, ok
}

// UnixSocket 函数会修改Transport, 请像对待全局变量一样对待UnixSocket
// 对于全局变量的解释可看下面的链接
// https://github.com/guonaihong/gout/issues/373
func (df *DataFlow) UnixSocket(path string) *DataFlow {
df.initTransport()

transport, ok := df.getTransport()
if !ok {
df.Req.Err = fmt.Errorf("UnixSocket:not found http.transport:%T", df.out.Client.Transport)
return df
}

transport.Dial = func(proto, addr string) (conn net.Conn, err error) {
return net.Dial("unix", path)
}

df.Err = hcutil.UnixSocket(df.out.Client, path)
return df
}

// SetProxy 函数会修改Transport,请像对待全局变量一样对待SetProxy
// 对于全局变量的解释可看下面的链接
// https://github.com/guonaihong/gout/issues/373
func (df *DataFlow) SetProxy(proxyURL string) *DataFlow {
proxy, err := url.Parse(modifyURL(proxyURL))
if err != nil {
df.Req.Err = err
return df
}

df.initTransport()

transport, ok := df.getTransport()
if !ok {
df.Req.Err = fmt.Errorf("SetProxy:not found http.transport:%T", df.out.Client.Transport)
return df
}

transport.Proxy = http.ProxyURL(proxy)

df.Err = hcutil.SetProxy(df.out.Client, proxyURL)
return df
}

// SetSOCKS5 函数会修改Transport,请像对待全局变量一样对待SetSOCKS5
// 对于全局变量的解释可看下面的链接
// https://github.com/guonaihong/gout/issues/373
func (df *DataFlow) SetSOCKS5(addr string) *DataFlow {
dialer, err := proxy.SOCKS5("tcp", addr, nil, proxy.Direct)
if err != nil {
df.Req.Err = err
return df
}

df.initTransport()

transport, ok := df.getTransport()
if !ok {
df.Req.Err = fmt.Errorf("SetSOCKS5:not found http.transport:%T", df.out.Client.Transport)
return df
}

transport.Dial = dialer.Dial
df.Err = hcutil.SetSOCKS5(df.out.Client, addr)
return df
}

Expand Down Expand Up @@ -391,8 +335,6 @@ func (df *DataFlow) SetTimeout(d time.Duration) *DataFlow {

// WithContext set context, and SetTimeout are mutually exclusive functions
func (df *DataFlow) WithContext(c context.Context) *DataFlow {
df.Req.Index++
df.Req.ctxIndex = df.Req.Index
df.Req.c = c
return df
}
Expand Down
10 changes: 8 additions & 2 deletions dataflow/dataflow_auto_decode_body_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ func create_AutoDecodeBody() *httptest.Server {
c.Header("Content-Encoding", "br")
var buf bytes.Buffer
w := brotli.NewWriter(&buf)
w.Write([]byte(test_autoDecodeBody_data))
_, err := w.Write([]byte(test_autoDecodeBody_data))
if err != nil {
log.Fatal(err)
}
w.Flush()
w.Close()
c.String(200, buf.String())
Expand All @@ -56,7 +59,10 @@ func create_AutoDecodeBody() *httptest.Server {

var buf bytes.Buffer
w := zlib.NewWriter(&buf)
w.Write([]byte(test_autoDecodeBody_data))
_, err := w.Write([]byte(test_autoDecodeBody_data))
if err != nil {
log.Fatal(err)
}
w.Close()
c.Header("Content-Encoding", "deflate")
c.String(200, buf.String())
Expand Down
6 changes: 4 additions & 2 deletions dataflow/dataflow_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ func Test_SetJSONNotEscape(t *testing.T) {
fmt.Println("url", ts.URL)
var buf bytes.Buffer
//POST(ts.URL).Debug(true).SetJSONNotEscape(map[string]any{"url": "http://www.com?a=b&c=d"}).Do()
POST(ts.URL).Debug(debug.ToWriter(&buf, false)).SetJSONNotEscape(map[string]interface{}{"url": "http://www.com?a=b&c=d"}).Do()
err := POST(ts.URL).Debug(debug.ToWriter(&buf, false)).SetJSONNotEscape(map[string]interface{}{"url": "http://www.com?a=b&c=d"}).Do()
assert.NoError(t, err)
assert.True(t, bytes.Contains(buf.Bytes(), []byte("&")), buf.String())
buf.Reset()
//POST(ts.URL).Debug(true).SetJSON(map[string]any{"url": "http://www.com?a=b&c=d"}).Do()
POST(ts.URL).Debug(debug.ToWriter(&buf, false)).SetJSON(map[string]interface{}{"url": "http://www.com?a=b&c=d"}).Do()
err = POST(ts.URL).Debug(debug.ToWriter(&buf, false)).SetJSON(map[string]interface{}{"url": "http://www.com?a=b&c=d"}).Do()
assert.NoError(t, err)
assert.False(t, bytes.Contains(buf.Bytes(), []byte("&")), buf.String())
}
6 changes: 3 additions & 3 deletions dataflow/dataflow_middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package dataflow

import (
"bytes"
"errors"
"fmt"
"github.com/guonaihong/gout/json"
"io/ioutil"
"net/http"
"strings"
"testing"

"github.com/guonaihong/gout/json"

core "github.com/guonaihong/gout/core"

"github.com/guonaihong/gout/middler"
Expand Down Expand Up @@ -58,7 +58,7 @@ func (d *demoResponseMiddler) ModifyResponse(response *http.Response) error {

// Go中json中的数字经过反序列化会成为float64类型
if float64(200) != code {
return errors.New(fmt.Sprintf("请求失败, code %d msg %s", code, msg))
return fmt.Errorf("请求失败, code %d msg %s", code, msg)
} else {
byt, _ := json.Marshal(&data)
response.Body = ioutil.NopCloser(bytes.NewReader(byt))
Expand Down
11 changes: 4 additions & 7 deletions dataflow/dataflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,6 @@ func testWithContextCancel(t *testing.T, ts *httptest.Server) {
assert.Error(t, err)
}

//
func TestWithContext(t *testing.T) {
router := setupContext(t)
ts := httptest.NewServer(http.HandlerFunc(router.ServeHTTP))
Expand Down Expand Up @@ -566,8 +565,7 @@ func Test_DataFlow_Timeout(t *testing.T) {
Do()
assert.Error(t, err)

// 使用互斥api的原则,后面的覆盖前面的
// 这里是SetTimeout生效, 超时时间200ms
//
ctx, cancel := context.WithTimeout(context.Background(), longTimeout*time.Millisecond)
defer cancel()

Expand All @@ -579,10 +577,9 @@ func Test_DataFlow_Timeout(t *testing.T) {

assert.Error(t, err)

assert.LessOrEqual(t, int(time.Since(s)), int(middleTimeout*time.Millisecond))
assert.LessOrEqual(t, time.Since(s), shortTimeout*time.Millisecond+time.Millisecond*50)

// 使用互斥api的原则,后面的覆盖前面的
// 这里是WithContext生效, 超时时间400ms
//
ctx, cancel = context.WithTimeout(context.Background(), longTimeout*time.Millisecond)
defer cancel()
s = time.Now()
Expand All @@ -592,7 +589,7 @@ func Test_DataFlow_Timeout(t *testing.T) {
Do()

assert.Error(t, err)
assert.GreaterOrEqual(t, int(time.Since(s)), int(middleTimeout*time.Millisecond))
assert.GreaterOrEqual(t, int(time.Since(s)), int(shortTimeout*time.Millisecond))
}

func Test_DataFlow_SetURL(t *testing.T) {
Expand Down
16 changes: 11 additions & 5 deletions dataflow/req.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,11 @@ func clearHeader(header http.Header) {

// retry模块需要context.Context,所以这里也返回context.Context
func (r *Req) GetContext() context.Context {
if r.Timeout > 0 && r.TimeoutIndex > r.ctxIndex {
r.c, r.cancel = context.WithTimeout(context.Background(), r.Timeout)
if r.Timeout > 0 {
if r.c == nil {
r.c = context.TODO()
}
r.c, r.cancel = context.WithTimeout(r.c, r.Timeout)
}
return r.c
}
Expand Down Expand Up @@ -588,19 +591,22 @@ func modifyURL(url string) string {
return fmt.Sprintf("http://%s", url)
}

func reqDef(method string, url string, g *Gout, urlStruct ...interface{}) Req {
func reqDef(method string, url string, g *Gout, urlStruct ...interface{}) (Req, error) {
if len(urlStruct) > 0 {
var out strings.Builder
tpl := template.Must(template.New(url).Parse(url))
tpl.Execute(&out, urlStruct[0])
err := tpl.Execute(&out, urlStruct[0])
if err != nil {
return Req{}, err
}
url = out.String()
}

r := Req{method: method, url: modifyURL(url), g: g}

r.Setting = GlobalSetting

return r
return r, nil
}

// ReadAll returns the whole response body as bytes.
Expand Down
5 changes: 4 additions & 1 deletion dataflow/req_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ func Benchmark_URL_Template(b *testing.B) {

for n := 0; n < b.N; n++ {
code := 0
New().GET("{{.Host}}/{{.Method}}", tc).Code(&code).Do()
err := New().GET("{{.Host}}/{{.Method}}", tc).Code(&code).Do()
if err != nil {
panic(err)
}
if code != 200 {
panic("code != 200")
}
Expand Down
24 changes: 16 additions & 8 deletions dataflow/req_url_template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,25 +64,33 @@ func Test_URL_Template(t *testing.T) {
code := 0
switch tc.Method {
case "get":
New().GET("{{.Host}}/{{.Method}}", tc).Debug(true).BindBody(&body).Code(&code).Do()
err := New().GET("{{.Host}}/{{.Method}}", tc).Debug(true).BindBody(&body).Code(&code).Do()
assert.NoError(t, err)
case "post":
New().POST("{{.Host}}/{{.Method}}", tc).BindBody(&body).Code(&code).Do()
err := New().POST("{{.Host}}/{{.Method}}", tc).BindBody(&body).Code(&code).Do()
assert.NoError(t, err)
case "put":
New().PUT("{{.Host}}/{{.Method}}", tc).BindBody(&body).Code(&code).Do()
err := New().PUT("{{.Host}}/{{.Method}}", tc).BindBody(&body).Code(&code).Do()
assert.NoError(t, err)
case "patch":
New().PATCH("{{.Host}}/{{.Method}}", tc).BindBody(&body).Code(&code).Do()
err := New().PATCH("{{.Host}}/{{.Method}}", tc).BindBody(&body).Code(&code).Do()
assert.NoError(t, err)
case "options":
New().OPTIONS("{{.Host}}/{{.Method}}", tc).BindBody(&body).Code(&code).Do()
err := New().OPTIONS("{{.Host}}/{{.Method}}", tc).BindBody(&body).Code(&code).Do()
assert.NoError(t, err)
case "head":
code := 0
New().HEAD("{{.Host}}/{{.Method}}", tc).Debug(true).BindBody(&body).Code(&code).Do()
New().SetMethod(strings.ToUpper(tc.Method)).SetURL("{{.Host}}/{{.Method}}", tc).Debug(true).BindBody(&body2).Code(&code).Do()
err := New().HEAD("{{.Host}}/{{.Method}}", tc).Debug(true).BindBody(&body).Code(&code).Do()
assert.NoError(t, err)
err = New().SetMethod(strings.ToUpper(tc.Method)).SetURL("{{.Host}}/{{.Method}}", tc).Debug(true).BindBody(&body2).Code(&code).Do()
assert.NoError(t, err)
assert.Equal(t, code, 200)
continue
}
assert.Equal(t, code, 200)

New().SetMethod(strings.ToUpper(tc.Method)).SetURL("{{.Host}}/{{.Method}}", tc).Debug(true).BindBody(&body2).Do()
err := New().SetMethod(strings.ToUpper(tc.Method)).SetURL("{{.Host}}/{{.Method}}", tc).Debug(true).BindBody(&body2).Do()
assert.NoError(t, err)
assert.Equal(t, body, tc.Method)
b := assert.Equal(t, body2, tc.Method)
if !b {
Expand Down
Loading

0 comments on commit b7068a7

Please sign in to comment.