Skip to content

Commit

Permalink
om1 to om2
Browse files Browse the repository at this point in the history
  • Loading branch information
Yi Zhong committed Sep 17, 2024
1 parent 83dbe0f commit aff2b2a
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 197 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/build-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ jobs:
run: make build-local
- name: Install space-agon in minikube
run: make install-local
- name: Run integration-test
run: |
nohup minikube tunnel &
make integration-test
# - name: Run integration-test
# run: |
# nohup minikube tunnel &
# make integration-test
2 changes: 1 addition & 1 deletion director/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (r Client) run() error {

fmt.Println("The game server is allocated, assigning tickets")

if _, err = r.OmClient.CreateAssignments(context.Background(), createOMAssignTicketRequest(resp.GetMatch(), gsa)); err != nil {
if _, err = r.OmClient.CreateAssignments(createOMAssignTicketRequest(resp.GetMatch(), gsa)); err != nil {
// Corner case where we allocated a game server for players who left the queue after some waiting time.
// Note that we may still leak some game servers when tickets got assigned but players left the queue before game frontend announced the assignments.
if err = agonesClient.AgonesV1().GameServers("default").Delete(ctx, gsa.Status.GameServerName, metav1.DeleteOptions{}); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func streamAssignments(ctx context.Context, assignments chan *pb2.Assignment, er
omClient := omclient.CreateOMClient()

log.Println("creating a ticket 02...")
ticketId, err := omClient.CreateTicket(ctx, &pb2.Ticket{})
ticketId, err := omClient.CreateTicket(&pb2.Ticket{})
if err != nil {
errs <- fmt.Errorf("error creating open match ticket: %w", err)
return
Expand Down
115 changes: 0 additions & 115 deletions frontend/frontend_test.go

This file was deleted.

17 changes: 9 additions & 8 deletions mmf/mmf.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func GetChunkedRequest(stream pb2.MatchMakingFunctionService_RunServer) (*pb2.Pr
Pools: pools,
Extensions: in.GetProfile().GetExtensions(),
}
fmt.Println("Finished receiving %v chunks of MMF profile %v", in.GetProfile().GetName(), i)
// fmt.Println("Finished receiving %v chunks of MMF profile %v", in.GetProfile().GetName(), i)
break
}
}
Expand All @@ -103,17 +103,15 @@ func (mmf *matchFunctionService) Run(stream pb2.MatchMakingFunctionService_RunSe
// player, so just concatinate all the pools together.
tickets := []*pb2.Ticket{}
for pname, pool := range req.GetPools() {
for _, ticket := range pool.GetParticipants().GetTickets() {
tickets = append(tickets, ticket)
}
tickets = append(tickets, pool.GetParticipants().GetTickets()...)
fmt.Printf("Found %v tickets in pool %v", len(tickets), pname)
}
fmt.Printf("Matching among %v tickets from %v provided pools", len(tickets), len(req.GetPools()))

// Make match proposal
poolTickets := make(map[string][]*pb2.Ticket)
poolTickets["everyone"] = tickets
proposals, err := makeMatches(poolTickets)
proposals, err := makeMatches(req, poolTickets)
if err != nil {
return err
}
Expand All @@ -131,7 +129,7 @@ func (mmf *matchFunctionService) Run(stream pb2.MatchMakingFunctionService_RunSe
return nil
}

func makeMatches(poolTickets map[string][]*pb2.Ticket) ([]*pb2.Match, error) {
func makeMatches(profile *pb2.Profile, poolTickets map[string][]*pb2.Ticket) ([]*pb2.Match, error) {
var matches []*pb2.Match

tickets, ok := poolTickets["everyone"]
Expand All @@ -143,9 +141,12 @@ func makeMatches(poolTickets map[string][]*pb2.Ticket) ([]*pb2.Match, error) {

for i := 0; i+1 < len(tickets); i += 2 {
proposal := &pb2.Match{
Id: fmt.Sprintf("match-time-%s-num-%d", t, i/2),
Id: fmt.Sprintf("profile-%s-time-%s-num-%d", profile.Name, t, i/2),
Rosters: map[string]*pb2.Roster{
"rosterName": {Tickets: []*pb2.Ticket{tickets[i], tickets[i+1]}},
"everyone": {
Name: profile.Name,
Tickets: []*pb2.Ticket{tickets[i], tickets[i+1]},
},
},
}
matches = append(matches, proposal)
Expand Down
7 changes: 3 additions & 4 deletions mmf/mmf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,14 @@ func TestNewMatches(t *testing.T) {
&pb2.Ticket{Id: "ticket-2"},
},
}
matches, err := makeMatches(poolTickets)
matches, err := makeMatches(profile, poolTickets)
if err != nil {
t.Errorf("Create new match proposal failed: %v", err)
}

assert.NoError(t, err)
assert.Len(t, matches, 1)
// assert.Len(t, matches[0].Tickets, len(poolTickets[pool.Name]))
assert.Len(t, matches[0].Rosters[pool.Name].Tickets, len(poolTickets[pool.Name]))
assert.Contains(t, matches[0].Id, "profile-"+profile.Name+"-time")
// assert.Equal(t, matches[0].MatchProfile, profile.Name)
// assert.Equal(t, matches[0].MatchFunction, matchName)
assert.Equal(t, profile.Name, matches[0].Rosters[pool.Name].Name)
}
69 changes: 5 additions & 64 deletions omclient/omclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package omclient
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -87,7 +86,7 @@ func CreateOMClient() *RestfulOMGrpcClient {
// Match core on the client's behalf to make a matchmaking ticket. In this sense,
// your platform services layer acts as a 'proxy' for the player's game client
// from the viewpoint of Open Match.
func (rc *RestfulOMGrpcClient) CreateTicket(ctx context.Context, ticket *pb.Ticket) (string, error) {
func (rc *RestfulOMGrpcClient) CreateTicket(ticket *pb.Ticket) (string, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -96,9 +95,6 @@ func (rc *RestfulOMGrpcClient) CreateTicket(ctx context.Context, ticket *pb.Tick
"operation": "proxy_CreateTicket",
})

// Update metrics
//createdTicketCounter.Add(ctx, 1)

// Put the ticket into open match CreateTicket request protobuf message.
reqPb := &pb.CreateTicketRequest{Ticket: ticket}
resPb := &pb.CreateTicketResponse{}
Expand Down Expand Up @@ -169,13 +165,6 @@ func (rc *RestfulOMGrpcClient) CreateTicket(ctx context.Context, ticket *pb.Tick
return "", err
}

if resPb == nil {
// Mark as a permanent error so the backoff library doesn't retry this REST call
err := backoff.Permanent(errors.New("CreateTicket returned empty result"))
logger.Error(err)
return "", err
}

// Successful ticket creation
logger.Debugf("CreateTicket %v complete", resPb.TicketId)
return resPb.TicketId, err
Expand Down Expand Up @@ -266,13 +255,13 @@ func (rc *RestfulOMGrpcClient) ActivateTickets(ctx context.Context, ticketIdsToA
if err != nil {
logger.WithFields(logrus.Fields{
"caller": callerFromContext,
}).Errorf("ActivateTickets attempt failed: %w", err)
}).Errorf("ActivateTickets attempt failed: %v", err)

return err
} else if resp != nil && resp.StatusCode != http.StatusOK { // HTTP error code
logger.WithFields(logrus.Fields{
"caller": callerFromContext,
}).Errorf("ActivateTickets attempt failed: %w", fmt.Errorf("%s (%d)", http.StatusText(resp.StatusCode), resp.StatusCode))
}).Errorf("ActivateTickets attempt failed: %v", fmt.Errorf("%s (%d)", http.StatusText(resp.StatusCode), resp.StatusCode))
return fmt.Errorf("%s (%d)", http.StatusText(resp.StatusCode), resp.StatusCode)
}
return nil
Expand All @@ -289,7 +278,7 @@ func (rc *RestfulOMGrpcClient) ActivateTickets(ctx context.Context, ticketIdsToA
if err != nil {
logger.WithFields(logrus.Fields{
"caller": callerFromContext,
}).Errorf("ActivateTickets failed: %w", err)
}).Errorf("ActivateTickets failed: %v", err)
}
logger.WithFields(logrus.Fields{
"caller": callerFromContext,
Expand Down Expand Up @@ -382,11 +371,6 @@ func (rc *RestfulOMGrpcClient) InvokeMatchmakingFunctions(ctx context.Context, r
logger.Trace("Successfully unmarshalled HTTP response JSON body back into StreamedMmfResponse protobuf message")
}

if resPb == nil {
logger.Trace("StreamedMmfResponse protobuf was nil!")
continue // Loop again to get the next streamed response
}

// Send back the streamed responses as we get them
logger.Trace("StreamedMmfResponse protobuf exists")
respChan <- resPb
Expand Down Expand Up @@ -477,11 +461,6 @@ func (rc *RestfulOMGrpcClient) WatchAssignments(ctx context.Context, reqPb *pb.W
logger.Trace("Successfully unmarshalled HTTP response JSON body back into StreamedWatchAssignmentsResponse protobuf message")
}

if resPb == nil {
logger.Trace("StreamedWatchAssignmentsResponse protobuf was nil!")
continue // Loop again to get the next streamed response
}

// Send back the streamed responses as we get them
logger.Trace("StreamedWatchAssignmentsResponse protobuf exists")
respChan <- resPb
Expand All @@ -493,7 +472,7 @@ func (rc *RestfulOMGrpcClient) WatchAssignments(ctx context.Context, reqPb *pb.W
}
}

func (rc *RestfulOMGrpcClient) CreateAssignments(ctx context.Context, reqPb *pb.CreateAssignmentsRequest) (*pb.CreateAssignmentsResponse, error) {
func (rc *RestfulOMGrpcClient) CreateAssignments(reqPb *pb.CreateAssignmentsRequest) (*pb.CreateAssignmentsResponse, error) {
fmt.Println("Entering CreateAssignments")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -576,40 +555,11 @@ func (rc *RestfulOMGrpcClient) CreateAssignments(ctx context.Context, reqPb *pb.
return resPb, err
}

if resPb == nil {
// Mark as a permanent error so the backoff library doesn't retry this REST call
err := backoff.Permanent(errors.New("CreateAssignments returned empty result"))
logger.Error(err)
return resPb, err
}

// Successful ticket creation
logger.Debugf("CreateAssignments %v complete", resPb)
return resPb, err
}

// type idTokenSource struct {
// TokenSource oauth2.TokenSource
// }

// func (s *idTokenSource) Token() (*oauth2.Token, error) {
// token, err := s.TokenSource.Token()
// if err != nil {
// return nil, err
// }

// idToken, ok := token.Extra("id_token").(string)
// if !ok {
// return nil, fmt.Errorf("token did not contain an id_token")
// }

// return &oauth2.Token{
// AccessToken: idToken,
// TokenType: "Bearer",
// Expiry: token.Expiry,
// }, nil
// }

// Post Makes an HTTP request at the given url+path, marshalling the
// provided protobuf in the pbReq argument into the HTTP request JSON body (for
// use with grpc gateway). It attempts to transparently handle TLS if the target
Expand Down Expand Up @@ -693,12 +643,3 @@ func readAllBody(resp http.Response, logger *logrus.Entry) ([]byte, error) {

return body, err
}

func syncMapDump(sm *sync.Map) map[string]interface{} {
out := map[string]interface{}{}
sm.Range(func(key, value interface{}) bool {
out[fmt.Sprint(key)] = value
return true
})
return out
}

0 comments on commit aff2b2a

Please sign in to comment.