diff --git a/store/store.go b/gsistore/store.go similarity index 99% rename from store/store.go rename to gsistore/store.go index ee069d6..2763ac5 100644 --- a/store/store.go +++ b/gsistore/store.go @@ -1,4 +1,4 @@ -package store +package gsistore import ( "reflect" diff --git a/store/store_test.go b/gsistore/store_test.go similarity index 99% rename from store/store_test.go rename to gsistore/store_test.go index 55f5826..fd3d5c9 100644 --- a/store/store_test.go +++ b/gsistore/store_test.go @@ -1,4 +1,4 @@ -package store +package gsistore import ( "testing" @@ -66,7 +66,6 @@ func TestChannelStoreClose(t *testing.T) { func assertChannel(t *testing.T, channel chan *model.GameState, hasElement, hasMore bool) { element, more := <-channel - if hasElement { assert.NotNil(t, element) } else { diff --git a/model/gamestate.go b/model/gamestate.go index 4f9a67c..763346b 100644 --- a/model/gamestate.go +++ b/model/gamestate.go @@ -1,10 +1,11 @@ package model type GameState struct { - Auth *AuthState `json:"auth"` - Map *MapState `json:"map"` - Player *PlayerState `json:"player"` - Provider *ProviderState `json:"provider"` + Auth *AuthState `json:"auth"` + Map *MapState `json:"map"` + Player *PlayerState `json:"player"` + Provider *ProviderState `json:"provider"` + PreviousState *GameState `json:"previously"` } type AuthState struct { @@ -20,7 +21,9 @@ type ProviderState struct { } type MapState struct { - Name string `json:"name"` + Name string `json:"name"` + TeamCT *TeamState `json:"team_ct"` + TeamT *TeamState `json:"team_t"` } type PlayerState struct { @@ -37,3 +40,7 @@ type MatchStats struct { Mvps int `json:"mvps"` Score int `json:"score"` } + +type TeamState struct { + Timeouts *int `json:"timeouts_remaining"` +} diff --git a/model/serverstate.go b/model/serverstate.go new file mode 100644 index 0000000..aeb7ec8 --- /dev/null +++ b/model/serverstate.go @@ -0,0 +1,73 @@ +package model + +// Data structure sent by server, it is structured this way to economize bandwidth + +type ServerState struct { + ServerInfo ServerInfo `json:"serverInfo"` + PlayerInfo []PlayerInfo `json:"playerInfo"` +} + +type ServerInfo struct { + TimeStamp int `json:"timestamp"` + ServerName string `json:"servername"` + MapName string `json:"mapname"` + TimeoutsCTPrev int `json:"timeoutsCTprev"` + TimeoutsTPrev int `json:"timeoutsTprev"` + TimeoutsCT int `json:"timeoutsCT"` + TimeoutsT int `json:"timeoutsT"` + Global int `json:"global"` +} + +type PlayerInfo struct { + AuthKey string `json:"authkey"` + SteamId int64 `json:"steamid,string"` + Clan string `json:"clan"` // SteamID, clan and name already exists in the gsi data, do we need to send this again? + Name string `json:"name"` + TimeInServer float64 `json:"timeinserver"` // Need a better name + KZData KZData `json:"KZData"` +} + +type KZData struct { + Global bool `json:"global"` + Course int `json:"course"` + Time float64 `json:"time"` + Checkpoints int `json:"checkpoints"` + Teleports int `json:"teleports"` +} + +// Stored data structure that will be communicated to bot +type FullPlayerInfo struct { + TimeStamp int `json:"timestamp"` + AuthKey string `json:"authkey"` + TimeoutsCTPrev int `json:"timeoutsCTprev"` + TimeoutsTPrev int `json:"timeoutsTprev"` + TimeoutsCT int `json:"timeoutsCT"` + TimeoutsT int `json:"timeoutsT"` + ServerName string `json:"servername"` + MapName string `json:"mapname"` + ServerGlobal int `json:"serverglobal"` + SteamId int64 `json:"steamid,string"` + Clan string `json:"clan"` + Name string `json:"name"` + TimeInServer float64 `json:"timeinserver"` // Need a better name + KZData KZData `json:"KZData"` +} + +func New(sInfo *ServerInfo, pInfo *PlayerInfo) *FullPlayerInfo { + return &FullPlayerInfo{ + sInfo.TimeStamp, + pInfo.AuthKey, + sInfo.TimeoutsCTPrev, + sInfo.TimeoutsTPrev, + sInfo.TimeoutsCT, + sInfo.TimeoutsT, + sInfo.ServerName, + sInfo.MapName, + sInfo.Global, + pInfo.SteamId, + pInfo.Clan, + pInfo.Name, + pInfo.TimeInServer, + pInfo.KZData, + } +} diff --git a/server/server.go b/server/server.go index 2f4a2fb..c39e187 100644 --- a/server/server.go +++ b/server/server.go @@ -13,9 +13,9 @@ import ( "github.com/gorilla/mux" "github.com/gorilla/websocket" - + "gitlab.com/prestrafe/prestrafe-gsi/gsistore" "gitlab.com/prestrafe/prestrafe-gsi/model" - "gitlab.com/prestrafe/prestrafe-gsi/store" + "gitlab.com/prestrafe/prestrafe-gsi/smstore" ) // Defines the public API for the Game State Integration server. The server acts as a rely between the CSGO GSI API, @@ -34,7 +34,8 @@ type server struct { port int filter TokenFilter logger *log.Logger - store store.Store + gsiStore gsistore.Store + smStore smstore.Store httpServer *http.Server upgrader *websocket.Upgrader } @@ -47,7 +48,8 @@ func New(addr string, port, ttl int, filter TokenFilter) Server { port, filter, log.New(os.Stdout, "GSI-Server > ", log.LstdFlags), - store.New(time.Duration(ttl) * time.Second), + gsistore.New(time.Duration(ttl) * time.Second), + smstore.New(time.Duration(ttl) * time.Second), nil, nil, } @@ -61,9 +63,15 @@ func (s *server) Start() error { // router.Path("/").Methods("GET").HandlerFunc(s.handleGet) // router.Path("/").Methods("POST").HandlerFunc(s.handlePost) - router.Path("/get").Methods("GET").HandlerFunc(s.handleGet) - router.Path("/update").Methods("POST").HandlerFunc(s.handlePost) + // GSI Handlers + router.Path("/gsi/get").Methods("GET").HandlerFunc(s.handleGSIGet) + router.Path("/gsi/update").Methods("POST").HandlerFunc(s.handleGSIPost) + router.Path("/websocket").Methods("GET").HandlerFunc(s.handleWebsocket) + + // SM Handlers + router.Path("/sm/update").Methods("POST").HandlerFunc(s.handleServerPost) + router.Path("/sm/get").Methods("GET").HandlerFunc(s.handleServerGet) router.NotFoundHandler = http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { s.logger.Printf("Unmatched request: %s %s\n", request.Method, request.URL) writer.WriteHeader(http.StatusNotFound) @@ -91,11 +99,12 @@ func (s *server) Start() error { func (s *server) Stop() error { s.logger.Printf("Stopping GSI server on %s:%d\n", s.addr, s.port) - s.store.Close() + s.gsiStore.Close() + s.smStore.Close() return s.httpServer.Shutdown(context.Background()) } -func (s *server) handleGet(writer http.ResponseWriter, request *http.Request) { +func (s *server) handleGSIGet(writer http.ResponseWriter, request *http.Request) { if !strings.HasPrefix(request.Header.Get("Authorization"), "GSI ") { s.logger.Printf("%s - Unauthorized GSI read (no token)\n", request.RemoteAddr) writer.WriteHeader(http.StatusUnauthorized) @@ -109,7 +118,7 @@ func (s *server) handleGet(writer http.ResponseWriter, request *http.Request) { return } - gameState, hasGameState := s.store.Get(authToken) + gameState, hasGameState := s.gsiStore.Get(authToken) if !hasGameState { s.logger.Printf("%s - Unknown GSI read to %s\n", request.RemoteAddr, authToken) writer.WriteHeader(http.StatusNotFound) @@ -133,7 +142,7 @@ func (s *server) handleGet(writer http.ResponseWriter, request *http.Request) { } } -func (s *server) handlePost(writer http.ResponseWriter, request *http.Request) { +func (s *server) handleGSIPost(writer http.ResponseWriter, request *http.Request) { body, ioError := ioutil.ReadAll(request.Body) if ioError != nil || body == nil || len(body) <= 0 { s.logger.Printf("%s - Empty GSI update received: %s\n", request.RemoteAddr, ioError) @@ -143,7 +152,11 @@ func (s *server) handlePost(writer http.ResponseWriter, request *http.Request) { gameState := new(model.GameState) if jsonError := json.Unmarshal(body, gameState); jsonError != nil { - s.logger.Printf("%s - Could not de-serialize game state: %s\n", request.RemoteAddr, jsonError) + if jsonError.Error() != "json: cannot unmarshal bool into Go struct field GameState.previously.map of type model.MapState" { + // Upon map change, instead of returning a map object the GSI client return a bool. + // It's not necessary to log this error; we send 400 anyway to mark that the game state is not updated. + s.logger.Printf("%s - Could not de-serialize game state: %s\n", request.RemoteAddr, jsonError) + } writer.WriteHeader(http.StatusBadRequest) return } @@ -164,9 +177,74 @@ func (s *server) handlePost(writer http.ResponseWriter, request *http.Request) { } if gameState.Provider != nil { - s.store.Put(authToken, gameState) + s.gsiStore.Put(authToken, gameState) } else { - s.store.Remove(authToken) + s.gsiStore.Remove(authToken) + } + + writer.WriteHeader(http.StatusOK) +} + +func (s *server) handleServerGet(writer http.ResponseWriter, request *http.Request) { + if !strings.HasPrefix(request.Header.Get("Authorization"), "SM ") { + s.logger.Printf("%s - Unauthorized SM read (no token)\n", request.RemoteAddr) + writer.WriteHeader(http.StatusUnauthorized) + return + } + + authToken := request.Header.Get("Authorization")[3:] + if !s.filter.Accept(authToken) { + s.logger.Printf("%s - Unauthorized SM read (rejected token)\n", request.RemoteAddr) + writer.WriteHeader(http.StatusUnauthorized) + return + } + + fullPlayerState, hasFullPlayerState := s.smStore.Get(authToken) + if !hasFullPlayerState { + s.logger.Printf("%s - Unknown SM read to %s\n", request.RemoteAddr, authToken) + writer.WriteHeader(http.StatusNotFound) + return + } + + response, jsonError := json.Marshal(fullPlayerState) + if jsonError != nil { + s.logger.Printf("%s - Could not serialize game state %s: %s\n", request.RemoteAddr, authToken, jsonError) + writer.WriteHeader(http.StatusInternalServerError) + return + } + + writer.Header().Set("Content-Type", "application/json") + writer.WriteHeader(http.StatusOK) + + if _, ioError := writer.Write(response); ioError != nil { + s.logger.Printf("%s - Could not write game state %s: %s\n", request.RemoteAddr, authToken, ioError) + writer.WriteHeader(http.StatusInternalServerError) + return + } +} + +func (s *server) handleServerPost(writer http.ResponseWriter, request *http.Request) { + body, ioError := ioutil.ReadAll(request.Body) + if ioError != nil || body == nil || len(body) <= 0 { + s.logger.Printf("%s - Empty SM update received: %s\n", request.RemoteAddr, ioError) + writer.WriteHeader(http.StatusBadRequest) + return + } + + serverState := new(model.ServerState) + if jsonError := json.Unmarshal(body, serverState); jsonError != nil { + s.logger.Printf("%s - Could not de-serialize server state: %s\n", request.RemoteAddr, jsonError) + writer.WriteHeader(http.StatusBadRequest) + return + } + serverInfo := serverState.ServerInfo + + playerInfos := serverState.PlayerInfo + + for _, player := range playerInfos { + if player.AuthKey != "" { + s.smStore.Put(&serverInfo, &player) + } } writer.WriteHeader(http.StatusOK) @@ -195,7 +273,7 @@ func (s *server) handleWebsocket(writer http.ResponseWriter, request *http.Reque return } - channel := s.store.GetChannel(authToken) + channel := s.gsiStore.GetChannel(authToken) for { gameState, more := <-channel @@ -204,8 +282,9 @@ func (s *server) handleWebsocket(writer http.ResponseWriter, request *http.Reque s.logger.Printf("%s - Could not serialize game state %s: %s\n", request.RemoteAddr, authToken, ioError) } _ = conn.Close() - s.store.ReleaseChannel(authToken) + s.gsiStore.ReleaseChannel(authToken) return } + } } diff --git a/smstore/store.go b/smstore/store.go new file mode 100644 index 0000000..81822b5 --- /dev/null +++ b/smstore/store.go @@ -0,0 +1,164 @@ +package smstore + +import ( + "reflect" + "sync" + "time" + + "github.com/patrickmn/go-cache" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "gitlab.com/prestrafe/prestrafe-gsi/model" +) + +const ( + channelBufferSize = 10 +) + +var ( + operationsCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "prestrafe", + Subsystem: "sm", + Name: "operations", + Help: "Counts the number of operations on the SM backend per token", + }, []string{"token", "operation"}) +) + +// Defines the public API for the SM store. The store is responsible for saving game states and evicting them once they +// go stale. Additional the store provides a channel object, that can be used to get notified, if a game state updates. +type Store interface { + // Returns a channel that is filled with updates of the game state for the given auth token. Calling this method + // also means that the caller needs to call ReleaseChannel(authKey), once he is done with using the channel. + GetChannel(authKey string) chan *model.FullPlayerInfo + + // Releases a channel that was previously acquired by GetChannel(authKey). + ReleaseChannel(authKey string) + + // Returns a game state for the given auth token, if one is present. + Get(authKey string) (playerState *model.FullPlayerInfo, present bool) + + // Puts a newStore game state for the given auth token, if none is already present. Otherwise the existing game state + // will be updated with the passed one. + Put(serverInfo *model.ServerInfo, playerInfo *model.PlayerInfo) + + // Removes a game state for the given auth token, if one is present. + Remove(authKey string) + + // Closes the store and releases all resources held by it. + Close() +} + +type store struct { + channels map[string]*channelContainer + internalCache *cache.Cache + locker sync.Locker +} + +type channelContainer struct { + channel chan *model.FullPlayerInfo + clients int +} + +// Creates a newStore store, with a given TTL. The TTL is the duration for game states, before they are considered stale. +func New(ttl time.Duration) Store { + return newStore(ttl) +} + +func newStore(ttl time.Duration) *store { + internalCache := cache.New(ttl, ttl*10) + channels := make(map[string]*channelContainer) + store := &store{channels, internalCache, &sync.Mutex{}} + + internalCache.OnEvicted(func(authKey string, item interface{}) { + store.pushUpdate(authKey, nil) + }) + + return store +} + +func (s *store) GetChannel(authKey string) chan *model.FullPlayerInfo { + operationsCounter.WithLabelValues(authKey, "channel_get").Inc() + + s.locker.Lock() + + if _, present := s.channels[authKey]; !present { + playerState, _ := s.Get(authKey) + + s.channels[authKey] = &channelContainer{make(chan *model.FullPlayerInfo, channelBufferSize), 0} + s.channels[authKey].channel <- playerState + } + + container := s.channels[authKey] + container.clients++ + + s.locker.Unlock() + + return container.channel +} + +func (s *store) ReleaseChannel(authKey string) { + operationsCounter.WithLabelValues(authKey, "channel_release").Inc() + + if _, present := s.channels[authKey]; present { + s.locker.Lock() + + if container, present := s.channels[authKey]; present { + container.clients-- + if container.clients < 1 { + delete(s.channels, authKey) + close(container.channel) + } + } + + s.locker.Unlock() + } +} + +func (s *store) Get(authKey string) (gameState *model.FullPlayerInfo, present bool) { + operationsCounter.WithLabelValues(authKey, "get").Inc() + + if cached, isCached := s.internalCache.Get(authKey); isCached { + gameState = cached.(*model.FullPlayerInfo) + present = isCached + } + return +} + +func (s *store) Put(serverInfo *model.ServerInfo, playerInfo *model.PlayerInfo) { + operationsCounter.WithLabelValues(playerInfo.AuthKey, "put").Inc() + + previousFullPlayerInfo, _ := s.internalCache.Get(playerInfo.AuthKey) + fullPlayerInfo := model.New(serverInfo, playerInfo) + s.internalCache.Set(playerInfo.AuthKey, fullPlayerInfo, cache.DefaultExpiration) + + if !reflect.DeepEqual(previousFullPlayerInfo, fullPlayerInfo) { + s.pushUpdate(playerInfo.AuthKey, fullPlayerInfo) + } +} + +// Unneeded, delete later +func (s *store) Remove(authKey string) { + operationsCounter.WithLabelValues(authKey, "remove").Inc() + + s.internalCache.Delete(authKey) +} + +func (s *store) Close() { + for authKey, channelContainer := range s.channels { + delete(s.channels, authKey) + close(channelContainer.channel) + } +} + +func (s *store) pushUpdate(authKey string, gameState *model.FullPlayerInfo) { + if _, present := s.channels[authKey]; present { + s.locker.Lock() + + if channel, present := s.channels[authKey]; present { + channel.channel <- gameState + } + + s.locker.Unlock() + } +}