Skip to content

Commit

Permalink
pkg/types, pkg/exchange: add ExchangeTimeRangeProvider and use it as …
Browse files Browse the repository at this point in the history
…jumpIfEmpty param
  • Loading branch information
bailantaotao committed Oct 26, 2023
1 parent c4f1af0 commit d9474bb
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 24 deletions.
14 changes: 8 additions & 6 deletions pkg/exchange/batch/closedorders.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ type ClosedOrderBatchQuery struct {
types.ExchangeTradeHistoryService
}

func (q *ClosedOrderBatchQuery) Query(ctx context.Context, symbol string, startTime, endTime time.Time, lastOrderID uint64, opts ...Option) (c chan types.Order, errC chan error) {
func (q *ClosedOrderBatchQuery) Query(ctx context.Context, symbol string, startTime, endTime time.Time, lastOrderID uint64) (c chan types.Order, errC chan error) {
jump := 30 * 24 * time.Hour
timeRangeProvider, ok := q.ExchangeTradeHistoryService.(types.ExchangeTimeRangeProvider)
if ok {
jump = timeRangeProvider.GetMaxOrderHistoryTimeRange()
}

query := &AsyncTimeRangedBatchQuery{
Type: types.Order{},
Q: func(startTime, endTime time.Time) (interface{}, error) {
Expand All @@ -29,11 +35,7 @@ func (q *ClosedOrderBatchQuery) Query(ctx context.Context, symbol string, startT
}
return strconv.FormatUint(order.OrderID, 10)
},
JumpIfEmpty: 30 * 24 * time.Hour,
}

for _, opt := range opts {
opt(query)
JumpIfEmpty: jump,
}

c = make(chan types.Order, 100)
Expand Down
12 changes: 0 additions & 12 deletions pkg/exchange/batch/option.go

This file was deleted.

14 changes: 8 additions & 6 deletions pkg/exchange/batch/trade.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@ type TradeBatchQuery struct {
types.ExchangeTradeHistoryService
}

func (e TradeBatchQuery) Query(ctx context.Context, symbol string, options *types.TradeQueryOptions, opts ...Option) (c chan types.Trade, errC chan error) {
func (e TradeBatchQuery) Query(ctx context.Context, symbol string, options *types.TradeQueryOptions) (c chan types.Trade, errC chan error) {
if options.EndTime == nil {
now := time.Now()
options.EndTime = &now
}

jump := 3 * 24 * time.Hour
timeRangeProvider, ok := e.ExchangeTradeHistoryService.(types.ExchangeTimeRangeProvider)
if ok {
jump = timeRangeProvider.GetMaxTradeHistoryTimeRange()
}

startTime := *options.StartTime
endTime := *options.EndTime
query := &AsyncTimeRangedBatchQuery{
Expand All @@ -42,11 +48,7 @@ func (e TradeBatchQuery) Query(ctx context.Context, symbol string, options *type
}
return trade.Key().String()
},
JumpIfEmpty: 24 * time.Hour,
}

for _, opt := range opts {
opt(query)
JumpIfEmpty: jump,
}

c = make(chan types.Trade, 100)
Expand Down
5 changes: 5 additions & 0 deletions pkg/types/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ type ExchangeTradeHistoryService interface {
QueryClosedOrders(ctx context.Context, symbol string, since, until time.Time, lastOrderID uint64) (orders []Order, err error)
}

type ExchangeTimeRangeProvider interface {
GetMaxTradeHistoryTimeRange() time.Duration
GetMaxOrderHistoryTimeRange() time.Duration
}

type ExchangeMarketDataService interface {
NewStream() Stream

Expand Down

0 comments on commit d9474bb

Please sign in to comment.