state.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. """Related to CSGO Gamestate"""
  2. import asyncio
  3. import json
  4. from threading import Lock, Thread
  5. from http.server import BaseHTTPRequestHandler, HTTPServer
  6. import config
  7. class PlayerState:
  8. def __init__(self, json, sounds):
  9. self.valid = False
  10. self.sounds = sounds
  11. provider = json.get("provider", {})
  12. if not provider:
  13. # Not ingame
  14. return
  15. player = json.get("player", {})
  16. if not player:
  17. # Invalid gamestate
  18. return
  19. # Is the GameState tracking local player or spectated player
  20. self.steamid = provider["steamid"]
  21. self.playerid = player["steamid"]
  22. self.is_local_player = self.steamid == self.playerid
  23. self.is_ingame = player["activity"] != "menu"
  24. # NOTE : this is modified in compare()
  25. self.play_timeout = False
  26. if self.is_ingame:
  27. sounds.playerid = self.playerid
  28. else:
  29. sounds.playerid = None
  30. return
  31. try:
  32. map = json.get("map", {})
  33. round = json.get("round", {})
  34. match_stats = player["match_stats"]
  35. state = player["state"]
  36. self.current_round = map["round"]
  37. except KeyError as err:
  38. print("Invalid json :")
  39. print(err)
  40. print(json)
  41. return
  42. self.flash_opacity = state["flashed"]
  43. self.is_knife_active = False
  44. for weapon in player["weapons"]:
  45. weapon = player["weapons"][weapon]
  46. # Taser has no 'type' so we have to check for its name
  47. if weapon["name"] == "weapon_taser":
  48. self.is_knife_active = weapon["state"] == "active"
  49. elif weapon["type"] == "Knife":
  50. self.is_knife_active = weapon["state"] == "active"
  51. self.mvps = match_stats["mvps"]
  52. self.phase = round["phase"] if round else "unknown"
  53. self.remaining_timeouts = (
  54. map["team_ct"]["timeouts_remaining"] + map["team_t"]["timeouts_remaining"]
  55. )
  56. self.round_kills = state["round_kills"]
  57. self.round_headshots = state["round_killhs"]
  58. self.total_deaths = match_stats["deaths"]
  59. self.total_kills = match_stats["kills"]
  60. # ------------------------------------------------------------
  61. # Below, only states that can't be compared to previous states
  62. # ------------------------------------------------------------
  63. # Updates only at round end
  64. if self.phase == "over":
  65. try:
  66. self.won_round = round["win_team"] == player["team"]
  67. except KeyError:
  68. # Player has not yet joined a team
  69. self.won_round = False
  70. # ------------------------------------------------------------
  71. self.valid = True
  72. async def compare(self, old_state) -> None:
  73. # Init state without playing sounds
  74. if not old_state or not old_state.valid:
  75. return
  76. if not self.is_ingame or not self.valid:
  77. return
  78. # Ignore warmup
  79. if self.phase == "warmup":
  80. print("[*] New match")
  81. return
  82. # Reset state after warmup
  83. if self.phase != "unknown" and old_state.phase == "unknown":
  84. print("[*] End of warmup")
  85. return
  86. # Check if we should play timeout
  87. if not self.play_timeout:
  88. self.play_timeout = old_state.play_timeout
  89. if self.remaining_timeouts == old_state.remaining_timeouts - 1:
  90. self.play_timeout = True
  91. print("[*] Timeout sound queued for next freezetime")
  92. # Play timeout music
  93. if self.phase == "freezetime" and self.play_timeout:
  94. self.sounds.play("Timeout")
  95. self.play_timeout = False
  96. # Reset state when switching players (used for MVPs)
  97. if self.playerid != old_state.playerid:
  98. print("[*] Different player")
  99. return
  100. # Play round start, win, lose, MVP
  101. if self.is_local_player and self.mvps == old_state.mvps + 1:
  102. self.sounds.play("MVP")
  103. elif self.phase != old_state.phase:
  104. if self.phase == "over" and self.mvps == old_state.mvps:
  105. self.sounds.play("Round win" if self.won_round else "Round lose")
  106. elif self.phase == "live":
  107. self.sounds.play("Round start")
  108. # Don't play player-triggered sounds below this ##########
  109. if not self.is_local_player:
  110. return
  111. ##########################################################
  112. # Lost kills - either teamkilled or suicided
  113. if self.total_kills < old_state.total_kills:
  114. if self.total_deaths == old_state.total_deaths + 1:
  115. self.sounds.play("Suicide")
  116. elif self.total_deaths == old_state.total_deaths:
  117. self.sounds.play("Teamkill")
  118. # Didn't suicide or teamkill -> check if player just died
  119. elif self.total_deaths == old_state.total_deaths + 1:
  120. self.sounds.play("Death")
  121. # Player got flashed
  122. if self.flash_opacity > 150 and self.flash_opacity > old_state.flash_opacity:
  123. self.sounds.play("Flashed")
  124. # Player killed someone
  125. if self.round_kills == old_state.round_kills + 1:
  126. # Kill with knife equipped
  127. if self.is_knife_active:
  128. self.sounds.play("Unusual kill")
  129. # Kill with weapon equipped
  130. else:
  131. # Prefer playing "Headshot" over "x kills"
  132. prefer_headshots = config.config["Sounds"].getboolean( # type: ignore
  133. "PreferHeadshots", False
  134. )
  135. if self.round_headshots == old_state.round_headshots + 1:
  136. if prefer_headshots:
  137. self.sounds.play("Headshot")
  138. else:
  139. self.sounds.play(
  140. f"{self.round_kills} kills"
  141. ) or self.sounds.play("Headshot")
  142. else:
  143. self.sounds.play(f"{self.round_kills} kills")
  144. # Player killed multiple players
  145. elif self.round_kills > old_state.round_kills:
  146. self.sounds.play("Collateral")
  147. class CSGOState:
  148. """Follows the CSGO state via gamestate integration"""
  149. def __init__(self, client):
  150. self.lock = Lock()
  151. self.old_state = None
  152. self.client = client
  153. server = HTTPServer(("127.0.0.1", 3000), PostHandler)
  154. server.RequestHandlerClass.state = self
  155. Thread(target=server.serve_forever, daemon=True).start()
  156. def is_ingame(self):
  157. return self.old_state is not None and self.old_state.is_ingame is True
  158. def is_alive(self):
  159. with self.lock:
  160. if not self.is_ingame():
  161. return False
  162. if self.old_state.phase != "live":
  163. return False
  164. if self.old_state.steamid != self.old_state.playerid:
  165. return False
  166. return True
  167. async def update(self, json):
  168. """Update the entire game state"""
  169. with self.lock:
  170. newstate = PlayerState(json, self.client.sounds)
  171. await newstate.compare(self.old_state)
  172. self.old_state = newstate
  173. class PostHandler(BaseHTTPRequestHandler):
  174. def do_POST(self):
  175. content_len = int(self.headers["Content-Length"])
  176. body = self.rfile.read(content_len)
  177. self.send_response(200)
  178. self.end_headers()
  179. asyncio.run(self.state.update(json.loads(body)))
  180. return
  181. def log_message(self, format, *args):
  182. # Do not spam the console with POSTs
  183. return