import os import subprocess import atexit from threading import Timer import string import random import time looptestPath = '/home/nonroot/.steam/steam/steamapps/common/Counter-Strike Source/cstrike/cfg/looptest.cfg' consolelogPath = '/home/nonroot/.steam/steam/steamapps/common/Counter-Strike Source/cstrike/console.log' clearbool = True def id_generator(size=6, chars=string.ascii_uppercase + string.digits): return ''.join(random.choice(chars) for _ in range(size)) def switchbool(): global clearbool clearbool = True def clearconsolelog(): global clearbool if clearbool: clearbool = False open(consolelogPath, 'w').close() #print('cleaned console') connectionTimer = Timer(3.0, switchbool) #daemon true kills thread instead of deadlock waiting in case program exits connectionTimer.daemon = True connectionTimer.start() def writeCfgInput(Input): with open(looptestPath, 'w') as f: f.write(Input) def getconsoleOutputForStatus(): i = 0 while i < 8: time.sleep(0.25) randomlygeneratedString = id_generator(11) str = "status; wait 5; {0}; wait 5;".format(randomlygeneratedString) writeCfgInput(str) checkConsoleOutput(randomlygeneratedString) server_str = "" player_count = 0 with open(consolelogPath, 'r') as f: for line in f: try: if "udp/ip" in line: server_str = line[9:].strip() if "players " in line: player_count = int(line[9:].split()[0].strip()) except Exception: pass if server_str: print('server_str: ', server_str, '\nplayer_count: ', player_count) return (server_str, player_count) i += 1 return ("None", "None") def get_output_if_spec(): randomlygeneratedString = id_generator(11) str = "zspawn; wait 500; {0}; wait 5;".format(randomlygeneratedString) writeCfgInput(str) #is sm_team a command to show team? #maybe instead say !teaminfo, add sourcemod command to autism_bot_info.sp to print team to chat instead targetstr = "This feature requires that you are on a team." failurestr = "Unknown command \"zspawn\"" alivestr = "This feature requires that you are dead." checkConsoleOutput(randomlygeneratedString) with open(consolelogPath, 'r') as f: for line in f: if alivestr in line: return 2 if targetstr in line: return 1 if failurestr in line: return 0 return 0 def checkConsoleOutput(input): iterations_count = 0 #print('entered checkconsole output') while iterations_count < 6: time.sleep(0.5) with open(consolelogPath, 'r') as f: for line in f: if input in line: iterations_count += 6 break iterations_count += 1 def resetCfgInputShortWait(): #getpos randomlygeneratedString = id_generator(11) #print('randomlygeneratedString: ', randomlygeneratedString) str = "{0}; wait 5;".format(randomlygeneratedString) with open(looptestPath, 'w') as f: f.write(str) checkConsoleOutput(randomlygeneratedString) def exit_handler(): print('reached exithandler') str = "wait 5;" with open(looptestPath, 'w') as f: f.write(str) def joinTeam(): randomlygeneratedString = id_generator(11) str = "jointeam 2; wait 2; zspawn; wait 1; {0}; wait 5;".format(randomlygeneratedString) writeCfgInput(str) checkConsoleOutput(randomlygeneratedString) def checkbotteam(): while True: clearconsolelog() state = get_output_if_spec() if state == 1: joinTeam() break if state == 2: break resetCfgInputShortWait() connectionTimer = Timer(20.0, checkbotteam) connectionTimer.daemon = True connectionTimer.start() def checkIfConnected(): randomlygeneratedString = id_generator(11) server_str, player_count = getconsoleOutputForStatus() connectionTimer = None if "27015" not in server_str: str1 = "connect 151.80.230.149:27015; {0}; wait 500;".format(randomlygeneratedString) writeCfgInput(str1) checkConsoleOutput(randomlygeneratedString) elif player_count > 63: #TODO check team differently because zspawn unknown command on zr server writeCfgInput("connect 151.80.230.149:27016; {0}; wait 5;".format(randomlygeneratedString)) checkConsoleOutput(randomlygeneratedString) connectionTimer = Timer(60.0 * 15, checkIfConnected) resetCfgInputShortWait() if not connectionTimer: connectionTimer = Timer(60.0 * 2, checkIfConnected) #daemon true kills thread instead of deadlock waiting in case program exits connectionTimer.daemon = True connectionTimer.start() #print('finished checkIfConnected') def get_player_info(): movement_input = [] ct_angles = [] ct_origin_diff = [] ct_origin_position = [] ct_distance = 0 while True: with open(consolelogPath, 'r') as f: try: for line in f: print('line: ', line) if "movement_input_specific:" in line and len(line) > 26: movement_input = line.split("movement_input_specific:", 1)[1].split("+") movement_input = [element.replace("\n", "") for element in movement_input] #print('movement: ', movement) if "ct_eye_angles_:" in line: ct_angles = line.split("ct_eye_angles_:", 1)[1].split("+") ct_angles = [element.strip() for element in ct_angles] if "player_origin_diff:" in line: ct_origin_diff = line.split("player_origin_diff:", 1)[1].split("+") ct_origin_diff = [element.strip() for element in ct_origin_diff] ct_origin_diff = [float(element) for element in ct_origin_diff] if "player_origin_position:" in line: ct_origin_position = line.split("player_origin_position:", 1)[1].split("+") ct_origin_position = [element.strip() for element in ct_origin_position] if "player_lowest_distance:" in line: ct_distance = line.split("player_lowest_distance:", 1)[1].strip() ct_distance = float(ct_distance) if movement_input and ct_angles and ct_origin_diff and ct_origin_position and ct_distance: return (movement_input, ct_angles, ct_origin_diff, ct_origin_position, ct_distance) except (UnicodeDecodeError, IndexError): pass def getBotOrgin(botOrigin): #x -> z -> y coordAxis order botOrigin = [x.strip() for x in botOrigin] botOrigin = [x.replace(";", "") for x in botOrigin] #botOrigin = [x.replace(";", "") for x in botOrigin] botIntorigin0 = float(botOrigin[0]) botIntorigin1 = float(botOrigin[1]) botIntorigin2 = float(botOrigin[2]) #bot coordinates might be usefull? return [botIntorigin0, botIntorigin1, botIntorigin2] def getBotOrigin(): while True: str = "getpos; wait 1; wait 5;" writeCfgInput(str) previousStr = "" with open(consolelogPath, 'r') as f: for line in f: if "setpos" in line: try: print('found setpos line: ', line) coords = line.split("setpos")[1].split(";setang")[0].split() #print('coords: ', coords) return getBotOrgin(coords) except IndexError as err: pass previousStr = line def followPlayer(remaining_instructions = [], indexCounter = 0, previousmoment = []): #setang 0 180 0; #default_input = "+attack; wait 50; cl_minmodels; wait 50; +right; wait 50; +jump; wait 50; -jump; wait 50; +forward; wait 50; {0}; wait 5;".format(randomlygeneratedString) #writeCfgInput(default_input) botOrigin = getBotOrigin() movement_input, ct_angles, ct_origin_diff, ct_origin_position, ct_distance = get_player_info() print('movement_input: ', movement_input, '\nct_angles: ', ct_angles, "\nct_origin_diff: ", ct_origin_diff, "\nct_origin_position: ", ct_origin_position, "\nct_distance: ", ct_distance, "\nbotOrigin: ", botOrigin) randomlygeneratedString = id_generator(11) remaining_instructions.append([movement_input, ct_angles]) strInput = "-jump; wait 5; +attack; wait 5; cl_minmodels 1; wait 5; setang 0 180 0; wait 5;".format(ct_angles[0], ct_angles[1], ct_angles[2]) #strInput += "-forward; wait 5; -back; wait 5; -moveleft; wait 5; -moveright; wait 5; " if ct_distance > 250: remaining_instructions = [] indexCounter = 0 #0 180 0: W = X axis greater minus, S = X axis greater plus, A = Y axis greater minus, D = Y axis greater plus if botOrigin[0] + 50 > ct_origin_diff[0]: strInput += "wait 5; +forward;" elif botOrigin[0] - 50 < ct_origin_diff[0]: strInput += "wait 5; +back;" if botOrigin[1] + 50 > ct_origin_diff[1]: strInput += "wait 5; +moveleft;" elif botOrigin[1] - 50 < ct_origin_diff[1]: strInput += "wait 5; +moveright;" print('ct_distance > 250:') else: (movement, angles) = remaining_instructions[indexCounter] if previousmoment != movement: strInput += "-forward; wait 5; -back; wait 5; -moveleft; wait 5; -moveright; wait 5; " previousmoment = movement strInput = strInput.replace("setang 0 180 0;", "setang {0} {1} {2};".format(angles[0], angles[1], angles[2])) for move in movement: strInput += "+" strInput += move strInput += "; " indexCounter += 1 strInput += " wait 5; {0}; ".format(randomlygeneratedString) print('strInput: ', strInput) writeCfgInput(strInput) checkConsoleOutput(randomlygeneratedString) clearconsolelog() connectionTimer = Timer(0.2, followPlayer, args=(remaining_instructions, indexCounter)) connectionTimer.daemon = True connectionTimer.start() def deadlock(): try: while True: 42 == 42 except KeyboardInterrupt: pass if __name__ == '__main__': atexit.register(exit_handler) clearconsolelog() resetCfgInputShortWait() checkIfConnected() checkbotteam() print('reached followPlayer') followPlayer() print('reached deadlock') deadlock() #autoexec.cfg: #alias loop "exec looptest.cfg; wait 5; loop;"; wait 5; loop;