import os import subprocess import atexit import re from threading import Timer import string import random looptestPath = '/home/nonroot/.steam/steam/steamapps/common/Counter-Strike Source/cstrike/cfg/looptest.cfg' consolelogPath = '/home/nonroot/.steam/steam/steamapps/common/Counter-Strike Source/cstrike/console.log' def id_generator(size=6, chars=string.ascii_uppercase + string.digits): return ''.join(random.choice(chars) for _ in range(size)) def clearconsolelog(): open(consolelogPath, 'w').close() connectionTimer = Timer(20.0, clearconsolelog) #daemon true kills thread instead of deadlock waiting in case program exits connectionTimer.daemon = True connectionTimer.start() def writeCfgInput(Input): with open(looptestPath, 'w') as f: f.write(Input) def getconsoleOutputForStatus(): randomlygeneratedString = id_generator(11) str = "status; wait 5; {0}; wait 5;".format(randomlygeneratedString) print('writeCfgInput') writeCfgInput(str) checkConsoleOutput(randomlygeneratedString) server_str = "" player_count = 0 with open(consolelogPath, 'r') as f: for line in f: if "udp/ip :" in line: server_str = line[9:].strip() if "players :" in line: player_count = int(line[9:].split()[0].strip()) return (server_str, player_count) def get_output_if_spec(): randomlygeneratedString = id_generator(11) str = "zspawn; wait 50; {0}; wait 5;".format(randomlygeneratedString) writeCfgInput(str) #is sm_team a command to show team? #maybe instead say !teaminfo, add sourcemod command to autism_bot_info.sp to print team to chat instead targetstr = "This feature requires that you are on a team." checkConsoleOutput(randomlygeneratedString) with open(consolelogPath, 'r') as f: for line in f: if targetstr in line: return True return False def checkConsoleOutput(input): bool = False #print('entered checkconsole output') while not bool: with open(consolelogPath, 'r') as f: for line in f: if input in line: #print('line: ', line, ' \ninput: ', input) bool = True break def resetCfgInputShortWait(): #getpos randomlygeneratedString = id_generator(11) print('randomlygeneratedString: ', randomlygeneratedString) str = "{0}; wait 5;".format(randomlygeneratedString) with open(looptestPath, 'w') as f: f.write(str) checkConsoleOutput(randomlygeneratedString) def exit_handler(): print('reached exithandler') str = "wait 5;" with open(looptestPath, 'w') as f: f.write(str) def joinTeam(): randomlygeneratedString = id_generator(11) str = "jointeam 2; wait 2; zspawn; wait 1; {0}; wait 5;".format(randomlygeneratedString) writeCfgInput(str) checkConsoleOutput(randomlygeneratedString) def checkbotteam(): team = get_output_if_spec() if team: joinTeam() def checkIfConnected(): randomlygeneratedString = id_generator(11) str1 = "connect 151.80.230.149:27015; wait 5; {0}; wait 50; ".format(randomlygeneratedString) server_str, player_count = getconsoleOutputForStatus() if "Zombie Escape" not in server_str: writeCfgInput(str1) elif player_count > 58: writeCfgInput("connect 151.80.230.149:27016; wait 5; {0}; wait 50;").format(randomlygeneratedString) checkConsoleOutput(randomlygeneratedString) resetCfgInputShortWait() print('reached checkbotteam:') checkbotteam() resetCfgInputShortWait() connectionTimer = Timer(60.0 * 5, checkIfConnected) #daemon true kills thread instead of deadlock waiting in case program exits connectionTimer.daemon = True connectionTimer.start() def getBotOrigin(): randomlygeneratedString = id_generator(11) str = "getpos; wait 1; {0}; wait 5;".format(randomlygeneratedString) writeCfgInput(str) checkConsoleOutput(randomlygeneratedString) previousStr = "" with open(consolelogPath, 'r') as f: for line in f: if randomlygeneratedString in line: try: coords = re.search('setpos (.*);setang', previousStr).group(1).strip().split() print('coords: ', coords) return coords except AttributeError as err: pass previousStr = line return None def get_player_info(): movement_input = [] ct_angles = [] while not movement_input or not ct_angles: with open(consolelogPath, 'r') as f: for line in f: if "movement_input_:" in line: movement_input.append(line[16:].strip()) if "ct_eye_angles_:" in line: ct_angles = line[15:].split() ct_angles = [element.strip() for element in ct_angles] return (movement_input, ct_angles) def getBotOrgin(botOrigin): #x -> z -> y coordAxis order botOrigin = [x.strip() for x in botOrigin] botOrigin = [x.replace(";", "") for x in botOrigin] botIntorigin0 = float(botOrigin[0]) botIntorigin1 = float(botOrigin[1]) botIntorigin2 = float(botOrigin[2]) #bot coordinates might be usefull? return [botIntorigin0, botIntorigin1, botIntorigin2] def followPlayer(): #setang 0 180 0; #default_input = "+attack; wait 50; cl_minmodels; wait 50; +right; wait 50; +jump; wait 50; -jump; wait 50; +forward; wait 50; {0}; wait 5;".format(randomlygeneratedString) #writeCfgInput(default_input) #botOrigin = getBotOrigin() movement_input, ct_angles = get_player_info() randomlygeneratedString = id_generator(11) strInput = "+attack; wait 5; cl_minmodels 1; wait 5; {0}; wait 5; setang {1} {2} {3}; wait 5;".format(randomlygeneratedString, ct_angles[0], ct_angles[1], ct_angles[2]) for movement in movement_input: strInput += movement + "; wait 5;" print('strInput: ', strInput) writeCfgInput(strInput) checkConsoleOutput(randomlygeneratedString) connectionTimer = Timer(2.5, followPlayer) connectionTimer.daemon = True connectionTimer.start() def deadlock(): try: while True: 42 == 42 except KeyboardInterrupt: pass if __name__ == '__main__': atexit.register(exit_handler) clearconsolelog() resetCfgInputShortWait() checkIfConnected() followPlayer() print('reached deadlock') deadlock() #autoexec.cfg: #alias loop "exec looptest.cfg; wait 5; loop;"; wait 5; loop;