Skip to content

Commit

Permalink
Made not ready return own error
Browse files Browse the repository at this point in the history
  • Loading branch information
peterbarrow committed May 27, 2021
1 parent 03a024b commit c99f418
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 11 deletions.
3 changes: 3 additions & 0 deletions errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,7 @@ var (

// ErrInterrupted indicates that a sleep was interrupted
ErrInterrupted = errors.New("interrupted")

// ErrConnectionNotReady indicated that the network connection to the gRPC server is not ready
ErrConnectionNotReady = errors.New("gRPC connection not ready")
)
77 changes: 66 additions & 11 deletions node/grpcnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,16 @@ func (n *GRPCNode) GetAddress() (url.URL, error) {
// SubmitTransaction submits a signed transaction
func (n *GRPCNode) SubmitTransaction(req *api.SubmitTransactionRequest) (response *api.SubmitTransactionResponse, err error) {
msg := "gRPC call failed: SubmitTransaction: %w"
if n == nil || n.conn.GetState() != connectivity.Ready {
if n == nil {
err = fmt.Errorf(msg, e.ErrNil)
return
}

if n.conn.GetState() != connectivity.Ready {
err = fmt.Errorf(msg, e.ErrConnectionNotReady)
return
}

c := api.NewTradingServiceClient(n.conn)
ctx, cancel := context.WithTimeout(context.Background(), n.callTimeout)
defer cancel()
Expand All @@ -74,11 +79,16 @@ func (n *GRPCNode) SubmitTransaction(req *api.SubmitTransactionRequest) (respons
// GetVegaTime gets the latest block header time from the node.
func (n *GRPCNode) GetVegaTime() (t time.Time, err error) {
msg := "gRPC call failed: GetVegaTime: %w"
if n == nil || n.conn.GetState() != connectivity.Ready {
if n == nil {
err = fmt.Errorf(msg, e.ErrNil)
return
}

if n.conn.GetState() != connectivity.Ready {
err = fmt.Errorf(msg, e.ErrConnectionNotReady)
return
}

c := api.NewTradingDataServiceClient(n.conn)
ctx, cancel := context.WithTimeout(context.Background(), n.callTimeout)
defer cancel()
Expand All @@ -100,11 +110,16 @@ func (n *GRPCNode) GetVegaTime() (t time.Time, err error) {
// MarketByID gets a Market from the node
func (n *GRPCNode) MarketByID(req *api.MarketByIDRequest) (response *api.MarketByIDResponse, err error) {
msg := "gRPC call failed: MarketByID: %w"
if n == nil || n.conn.GetState() != connectivity.Ready {
if n == nil {
err = fmt.Errorf(msg, e.ErrNil)
return
}

if n.conn.GetState() != connectivity.Ready {
err = fmt.Errorf(msg, e.ErrConnectionNotReady)
return
}

c := api.NewTradingDataServiceClient(n.conn)
ctx, cancel := context.WithTimeout(context.Background(), n.callTimeout)
defer cancel()
Expand All @@ -118,11 +133,16 @@ func (n *GRPCNode) MarketByID(req *api.MarketByIDRequest) (response *api.MarketB
// MarketDataByID gets market data from the node
func (n *GRPCNode) MarketDataByID(req *api.MarketDataByIDRequest) (response *api.MarketDataByIDResponse, err error) {
msg := "gRPC call failed: MarketDataByID: %w"
if n == nil || n.conn.GetState() != connectivity.Ready {
if n == nil {
err = fmt.Errorf(msg, e.ErrNil)
return
}

if n.conn.GetState() != connectivity.Ready {
err = fmt.Errorf(msg, e.ErrConnectionNotReady)
return
}

c := api.NewTradingDataServiceClient(n.conn)
ctx, cancel := context.WithTimeout(context.Background(), n.callTimeout)
defer cancel()
Expand All @@ -136,11 +156,16 @@ func (n *GRPCNode) MarketDataByID(req *api.MarketDataByIDRequest) (response *api
// LiquidityProvisions gets the liquidity provisions for a given market and party.
func (n *GRPCNode) LiquidityProvisions(req *api.LiquidityProvisionsRequest) (response *api.LiquidityProvisionsResponse, err error) {
msg := "gRPC call failed: LiquidityProvisions: %w"
if n == nil || n.conn.GetState() != connectivity.Ready {
if n == nil {
err = fmt.Errorf(msg, e.ErrNil)
return
}

if n.conn.GetState() != connectivity.Ready {
err = fmt.Errorf(msg, e.ErrConnectionNotReady)
return
}

c := api.NewTradingDataServiceClient(n.conn)
ctx, cancel := context.WithTimeout(context.Background(), n.callTimeout)
defer cancel()
Expand All @@ -154,11 +179,16 @@ func (n *GRPCNode) LiquidityProvisions(req *api.LiquidityProvisionsRequest) (res
// MarketDepth gets the depth for a market.
func (n *GRPCNode) MarketDepth(req *api.MarketDepthRequest) (response *api.MarketDepthResponse, err error) {
msg := "gRPC call failed: MarketDepth: %w"
if n == nil || n.conn.GetState() != connectivity.Ready {
if n == nil {
err = fmt.Errorf(msg, e.ErrNil)
return
}

if n.conn.GetState() != connectivity.Ready {
err = fmt.Errorf(msg, e.ErrConnectionNotReady)
return
}

c := api.NewTradingDataServiceClient(n.conn)
ctx, cancel := context.WithTimeout(context.Background(), n.callTimeout)
defer cancel()
Expand All @@ -172,11 +202,16 @@ func (n *GRPCNode) MarketDepth(req *api.MarketDepthRequest) (response *api.Marke
// PartyAccounts gets Accounts for a given partyID from the node
func (n *GRPCNode) PartyAccounts(req *api.PartyAccountsRequest) (response *api.PartyAccountsResponse, err error) {
msg := "gRPC call failed: PartyAccounts: %w"
if n == nil || n.conn.GetState() != connectivity.Ready {
if n == nil {
err = fmt.Errorf(msg, e.ErrNil)
return
}

if n.conn.GetState() != connectivity.Ready {
err = fmt.Errorf(msg, e.ErrConnectionNotReady)
return
}

c := api.NewTradingDataServiceClient(n.conn)
ctx, cancel := context.WithTimeout(context.Background(), n.callTimeout)
defer cancel()
Expand All @@ -190,11 +225,16 @@ func (n *GRPCNode) PartyAccounts(req *api.PartyAccountsRequest) (response *api.P
// PositionsByParty gets the positions for a party.
func (n *GRPCNode) PositionsByParty(req *api.PositionsByPartyRequest) (response *api.PositionsByPartyResponse, err error) {
msg := "gRPC call failed: PositionsByParty: %w"
if n == nil || n.conn.GetState() != connectivity.Ready {
if n == nil {
err = fmt.Errorf(msg, e.ErrNil)
return
}

if n.conn.GetState() != connectivity.Ready {
err = fmt.Errorf(msg, e.ErrConnectionNotReady)
return
}

c := api.NewTradingDataServiceClient(n.conn)
ctx, cancel := context.WithTimeout(context.Background(), n.callTimeout)
defer cancel()
Expand All @@ -208,11 +248,16 @@ func (n *GRPCNode) PositionsByParty(req *api.PositionsByPartyRequest) (response
// ObserveEventBus starts a network connection to the node to sending event messages on
func (n *GRPCNode) ObserveEventBus() (stream api.TradingDataService_ObserveEventBusClient, err error) {
msg := "gRPC call failed: ObserveEventBus: %w"
if n == nil || n.conn.GetState() != connectivity.Ready {
if n == nil {
err = fmt.Errorf(msg, e.ErrNil)
return
}

if n.conn.GetState() != connectivity.Ready {
err = fmt.Errorf(msg, e.ErrConnectionNotReady)
return
}

c := api.NewTradingDataServiceClient(n.conn)
stream, err = c.ObserveEventBus(context.Background())
if err != nil {
Expand All @@ -224,11 +269,16 @@ func (n *GRPCNode) ObserveEventBus() (stream api.TradingDataService_ObserveEvent
// PositionsSubscribe starts a network connection to receive the party position as it updates
func (n *GRPCNode) PositionsSubscribe(req *api.PositionsSubscribeRequest) (stream api.TradingDataService_PositionsSubscribeClient, err error) {
msg := "gRPC call failed: PositionsSubscribe: %w"
if n == nil || n.conn.GetState() != connectivity.Ready {
if n == nil {
err = fmt.Errorf(msg, e.ErrNil)
return
}

if n.conn.GetState() != connectivity.Ready {
err = fmt.Errorf(msg, e.ErrConnectionNotReady)
return
}

c := api.NewTradingDataServiceClient(n.conn)
stream, err = c.PositionsSubscribe(context.Background(), req)
if err != nil {
Expand All @@ -240,11 +290,16 @@ func (n *GRPCNode) PositionsSubscribe(req *api.PositionsSubscribeRequest) (strea
// AssetByID returns information about a given asset
func (n *GRPCNode) AssetByID(assetID string) (response *api.AssetByIDResponse, err error) {
msg := "gRPC call failed: AssetByID: %w"
if n == nil || n.conn.GetState() != connectivity.Ready {
if n == nil {
err = fmt.Errorf(msg, e.ErrNil)
return
}

if n.conn.GetState() != connectivity.Ready {
err = fmt.Errorf(msg, e.ErrConnectionNotReady)
return
}

c := api.NewTradingDataServiceClient(n.conn)
ctx, cancel := context.WithTimeout(context.Background(), n.callTimeout)
defer cancel()
Expand Down

0 comments on commit c99f418

Please sign in to comment.