#!/usr/bin/python3 # -*- coding: utf-8 -*- import asyncio import logging import requests from .Constants import * session = requests.Session() def usteamid_to_steamid(usteamid): for ch in ['[', ']']: if ch in usteamid: usteamid = usteamid.replace(ch, '') usteamid_split = usteamid.split(':') steamid = [] steamid.append('STEAM_0:') z = int(usteamid_split[2]) if z % 2 == 0: steamid.append('0:') else: steamid.append('1:') steamacct = z // 2 steamid.append(str(steamacct)) return ''.join(steamid) class PlayerManager(): def __init__(self, master): self.Logger = logging.getLogger(__class__.__name__) self.Torchlight = master self.Players = [None] * (MAXPLAYERS + 1) self.Storage = self.StorageManager(self) self.Torchlight().GameEvents.HookEx("player_connect", self.Event_PlayerConnect) self.Torchlight().GameEvents.HookEx("player_activate", self.Event_PlayerActivate) self.Torchlight().Forwards.HookEx("OnClientPostAdminCheck", self.OnClientPostAdminCheck) self.Torchlight().GameEvents.HookEx("player_info", self.Event_PlayerInfo) self.Torchlight().GameEvents.HookEx("player_disconnect", self.Event_PlayerDisconnect) self.Torchlight().GameEvents.HookEx("server_spawn", self.Event_ServerSpawn) def Event_PlayerConnect(self, name, index, userid, networkid, address, bot): index += 1 self.Logger.info("OnConnect(name={0}, index={1}, userid={2}, networkid={3}, address={4}, bot={5})" .format(name, index, userid, networkid, address, bot)) if self.Players[index] != None: self.Logger.error("!!! Player already exists, overwriting !!!") self.Players[index] = self.Player(self, index, userid, networkid, address, name) self.Players[index].OnConnect() def Event_PlayerActivate(self, userid): self.Logger.info("Pre_OnActivate(userid={0})".format(userid)) index = self.FindUserID(userid).Index self.Logger.info("OnActivate(index={0}, userid={1})".format(index, userid)) self.Players[index].OnActivate() def OnClientPostAdminCheck(self, client): self.Logger.info("OnClientPostAdminCheck(client={0})".format(client)) asyncio.ensure_future(self.Players[client].OnClientPostAdminCheck()) def Event_PlayerInfo(self, name, index, userid, networkid, bot): index += 1 self.Logger.info("OnInfo(name={0}, index={1}, userid={2}, networkid={3}, bot={4})" .format(name, index, userid, networkid, bot)) # We've connected to the server and receive info events about the already connected players # Emulate connect message if not self.Players[index]: self.Event_PlayerConnect(name, index - 1, userid, networkid, bot) else: self.Players[index].OnInfo(name) def Event_PlayerDisconnect(self, userid, reason, name, networkid, bot): index = self.FindUserID(userid).Index self.Logger.info("OnDisconnect(index={0}, userid={1}, reason={2}, name={3}, networkid={4}, bot={5})" .format(index, userid, reason, name, networkid, bot)) self.Players[index].OnDisconnect(reason) self.Players[index] = None def Event_ServerSpawn(self, hostname, address, ip, port, game, mapname, maxplayers, os, dedicated, password): self.Logger.info("ServerSpawn(mapname={0})" .format(mapname)) self.Storage.Reset() for i in range(1, len(self.Players)): if self.Players[i]: self.Players[i].OnDisconnect("mapchange") self.Players[i].OnConnect() def FindUniqueID(self, uniqueid): for Player in self.Players: if Player and Player.UniqueID == uniqueid: return Player def FindUserID(self, userid): for Player in self.Players: if Player and Player.UserID == userid: return Player def FindName(self, name): for Player in self.Players: if Player and Player.Name == name: return Player def __len__(self): Count = 0 for i in range(1, len(self.Players)): if self.Players[i]: Count += 1 return Count def __setitem__(self, key, value): if key > 0 and key <= MAXPLAYERS: self.Players[key] = value def __getitem__(self, key): if key > 0 and key <= MAXPLAYERS: return self.Players[key] def __iter__(self): for i in range(1, len(self.Players)): if self.Players[i]: yield self.Players[i] class StorageManager(): def __init__(self, master): self.PlayerManager = master self.Storage = dict() def Reset(self): self.Storage = dict() def __getitem__(self, key): if not key in self.Storage: self.Storage[key] = dict() return self.Storage[key] class Admin(): def __init__(self): self._FlagBits = 0 def FlagBits(self): return self._FlagBits def Reservation(self): return (self._FlagBits & ADMFLAG_RESERVATION) def Generic(self): return (self._FlagBits & ADMFLAG_GENERIC) def Kick(self): return (self._FlagBits & ADMFLAG_KICK) def Ban(self): return (self._FlagBits & ADMFLAG_BAN) def Unban(self): return (self._FlagBits & ADMFLAG_UNBAN) def Slay(self): return (self._FlagBits & ADMFLAG_SLAY) def Changemap(self): return (self._FlagBits & ADMFLAG_CHANGEMAP) def Convars(self): return (self._FlagBits & ADMFLAG_CONVARS) def Config(self): return (self._FlagBits & ADMFLAG_CONFIG) def Chat(self): return (self._FlagBits & ADMFLAG_CHAT) def Vote(self): return (self._FlagBits & ADMFLAG_VOTE) def Password(self): return (self._FlagBits & ADMFLAG_PASSWORD) def RCON(self): return (self._FlagBits & ADMFLAG_RCON) def Cheats(self): return (self._FlagBits & ADMFLAG_CHEATS) def Root(self): return (self._FlagBits & ADMFLAG_ROOT) def Custom1(self): return (self._FlagBits & ADMFLAG_CUSTOM1) def Custom2(self): return (self._FlagBits & ADMFLAG_CUSTOM2) def Custom3(self): return (self._FlagBits & ADMFLAG_CUSTOM3) def Custom4(self): return (self._FlagBits & ADMFLAG_CUSTOM4) def Custom5(self): return (self._FlagBits & ADMFLAG_CUSTOM5) def Custom6(self): return (self._FlagBits & ADMFLAG_CUSTOM6) class Player(): def __init__(self, master, index, userid, uniqueid, address, name): self.PlayerManager = master self.Torchlight = self.PlayerManager.Torchlight self.Index = index self.UserID = userid self.UniqueID = uniqueid self.Address = address self.Name = name self.Access = None self.Admin = self.PlayerManager.Admin() self.Storage = None self.Active = False self.ChatCooldown = 0 def OnConnect(self): self.Storage = self.PlayerManager.Storage[self.UniqueID] if not "Audio" in self.Storage: self.Storage["Audio"] = dict({"Uses": 0, "LastUse": 0.0, "LastUseLength": 0.0, "TimeUsed": 0.0}) self.Access = self.Torchlight().Access[self.UniqueID] def OnActivate(self): self.Active = True async def OnClientPostAdminCheck(self): self.Admin._FlagBits = (await self.Torchlight().API.GetUserFlagBits(self.Index))["result"] self.PlayerManager.Logger.info("#{0} \"{1}\"({2}) FlagBits: {3}".format(self.UserID, self.Name, self.UniqueID, self.Admin._FlagBits)) if not self.Access: if self.Admin.RCON(): self.Access = dict({"level": 9, "name": "SAdmin"}) elif self.Admin.Generic(): self.Access = dict({"level": 6, "name": "Admin"}) elif self.Admin.Custom1(): self.Access = dict({"level": 5, "name": "VIP"}) elif self.UniqueID != 'BOT': #self.PlayerManager.Logger.info(f"self.UniqueID: {self.UniqueID}") #2024 20th may. giving level based on racetimer rank steam2 = usteamid_to_steamid(self.UniqueID) #converting steam3 to steam2 r = session.get(f"https://racebackend.unloze.com/racetimer_endpoints-1.0/api/timers/player/{steam2}") if "Rank" in r.json(): rank = r.json()["Rank"] #self.PlayerManager.Logger.info(f"rank: {rank}") if rank <= 250: self.Access = dict({"level": 4, "name": "top 250"}) elif rank <= 500: self.Access = dict({"level": 3, "name": "top 500"}) elif rank <= 1000: self.Access = dict({"level": 2, "name": "top 1000"}) if self.PlayerManager.Torchlight().Config["DefaultLevel"]: if self.Access and self.Access["level"] < self.PlayerManager.Torchlight().Config["DefaultLevel"]: self.Access = dict({"level": self.PlayerManager.Torchlight().Config["DefaultLevel"], "name": "Default"}) def OnInfo(self, name): self.Name = name def OnDisconnect(self, message): self.Active = False self.Storage = None self.Torchlight().AudioManager.OnDisconnect(self)