projects-jenz/AutismBotIngame/python/ingamefollowct.py

215 lines
8.4 KiB
Python

import os
import sys
import subprocess
import atexit
from threading import Timer
import string
import random
import signal
import socket
import codecs
import datetime
import time
who_am_i = subprocess.getoutput(["whoami"])
looptestPath = f"""/home/{who_am_i}/.steam/debian-installation/steamapps/common/Counter-Strike Source/cstrike/cfg/looptest.cfg"""
chatmsg = ""
def colored(r, g, b, text):
return "\033[38;2;{};{};{}m{} \033[38;2;255;255;255m".format(r, g, b, text)
def writeCfgInput(Input_user):
with open(looptestPath, 'w') as f:
colored_text = colored(255, 0, 0, ('Input: ' + Input_user))
#print(colored_text)
f.write(Input_user)
def exit_handler():
print('reached exithandler')
writeCfgInput('')
def joinTeam():
str = "jointeam 2; joinclass 3; zspawn;"
writeCfgInput(str)
print('jointeam func: ')
time.sleep(4.5)
writeCfgInput('')
def bot_process_surf(input_line):
bot_surf_plane = input_line[input_line.index("surfing:") + len("surfing:"):]
bot_surf_plane = bot_surf_plane.strip()
bot_surf_plane = [float(i.replace('\U00002013', '-')) for i in bot_surf_plane.split(' ')]
#X axis positive = hold A
#X axis negative = hold D
#if Y axis = 0.0 use X instead
#Y axis negative = hold A
#Y axis positive = hold D
strInput = "-forward; wait 3; "
if bot_surf_plane[1] < 0.0:
strInput += "+moveleft; wait 3; "
elif bot_surf_plane[1] > 0.0:
strInput += "+moveright; wait 3;"
elif bot_surf_plane[0] < 0.0:
strInput += "+moveleft; wait 3; "
elif bot_surf_plane[0] > 0.0:
strInput += "+moveright; wait 3; "
print('date: ', datetime.datetime.now().time(),'bot surfing bot_surf_plane: ', bot_surf_plane)
writeCfgInput(strInput)
time.sleep(0.1)
writeCfgInput("")
def bot_process_movement(input_line):
dist_target = input_line[input_line.index("dist_target:") + len("dist_target:"):input_line.index("targethuman:")]
targethuman = input_line[input_line.index("targethuman:") + len("targethuman:"):input_line.index("enemy_distance:")]
enemy_distance = input_line[input_line.index("enemy_distance:") + len("enemy_distance:"):input_line.index("targeteam:")]
targeteam = input_line[input_line.index("targeteam:") + len("targeteam:"):input_line.index("target_enemy:")]
target_enemy = input_line[input_line.index("target_enemy:") + len("target_enemy:"):]
dist_target = float(dist_target)
enemy_distance = float(enemy_distance)
targeteam = int(targeteam)
min_distance_target_human = 50.0
strInput = "-attack; wait 2; -use; wait 5; +attack; wait 5; cl_minmodels 1; wait 2; +use; +forward; wait 2; "
if dist_target > min_distance_target_human and targeteam == 3:
#print('dist_target: ', dist_target)
strInput += "use weapon_elite; wait 3; "
elif targeteam == 3:
strInput += "use weapon_p90; wait 3; "
elif targeteam == 2:
strInput += "use weapon_knife; wait 5; "
print('date: ', datetime.datetime.now().time(), ' target_enemy: ', target_enemy, ' enemy distance: ', enemy_distance, ' target human: ', targethuman,
' dist_target: ', dist_target)
strInput = strinput_append(strInput, 2)
#print('strInput final:', strInput)
writeCfgInput(strInput)
time.sleep(0.1)
writeCfgInput("")
def strinput_append(strInput, nth):
for _ in range(10 * nth):
boolean_val = random.choice([True, False])
if boolean_val:
strInput += "+moveleft; wait 15; -moveleft; "
else:
strInput += "+moveright; wait 15; -moveright; "
return strInput
def kill_css_process():
css_pid = subprocess.getoutput(["pidof hl2_linux"])
if css_pid:
for pid in css_pid.split(" "):
username = subprocess.getoutput([f"""ps -o user= -p {pid}"""])
if username == who_am_i:
print('pid: ', pid, 'shutting the bots game down....')
pid = int(pid.strip())
os.kill(pid, signal.SIGTERM)
time.sleep(15)
print('preparing to launch game....')
os.chdir(f"""/home/{who_am_i}/.steam/debian-installation/""")
subprocess.check_call("./steam.sh %s" % ("-applaunch 240 -textmode -textmessagedebug -novid -nosound -noipx -nojoy -noshaderapi"), shell=True)
print('finished starting game')
def bot_connect_ze():
#use whatever ip you want here to connect with
strdev = "connect 144.76.218.19:27019/test132;"
str1 = "connect 144.76.218.19:27015;"
writeCfgInput(str1)
time.sleep(0.2)
writeCfgInput('')
print('not yet connected')
def pairwise(it):
it = iter(it)
while True:
try:
yield next(it), next(it)
except StopIteration:
# no more elements in the iterator
return
if __name__ == '__main__':
atexit.register(exit_handler)
local_ip = "127.0.0.1"
local_port = 48477
udp_external_ip = "164.132.201.173"
buffer_size = 4096 #potentially not large enough?
connection_issue_counter = 0;
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_external = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("", local_port))
messager_name = ""
print('reached deadlock')
try:
while True:
data, addr = sock.recvfrom(buffer_size)
data = codecs.decode(data, "utf-8", "ignore")
ip = addr[0]
port = addr[1]
#print('port: ', port, " ip: ", ip)
#print(data)
if not data:
continue
if ip not in [local_ip, udp_external_ip]:
continue
if ip == udp_external_ip:
if messager_name in data:
messager_name = ""
response_msg = f"""say {messager_name} {data}"""
print("remote UDP packet response_msg: ", response_msg)
writeCfgInput(response_msg)
#needs mini delay to have time to write input to console
time.sleep(0.1)
writeCfgInput("")
if data == "autismo connected":
print('Bot connected!')
connection_issue_counter = 0
joinTeam()
elif data == "bot kicked server full":
print('bot kicked server full: ', datetime.datetime.now().time())
elif data == "connect to ze":
if connection_issue_counter == 5 or connection_issue_counter > 50:
kill_css_process()
connection_issue_counter += 1
print('connection_issue_counter: ', connection_issue_counter)
bot_connect_ze()
elif "clientmessage:" in data:
messager_name = data.split("clientmessage:", 1)[1].split(" 72DqZ84")[0]
databyte_send_message = messager_name + data.split("72DqZ84")[1]
sock_external.sendto(databyte_send_message.encode(), (udp_external_ip, local_port))
print('databyte_send_message: ', databyte_send_message)
elif data.startswith("dist_target:"):
bot_process_movement(data)
elif data.startswith("surfing:"):
bot_process_surf(data)
elif data.startswith("hull info:"):
hull_info = data[data.index("hull info:") + len("hull info:"):]
colored_text = colored(255, 0, 0, ('hull_info: ' + hull_info))
strInput = ""
if hull_info == "jump":
strInput += "+jump; wait 5; -jump; +duck; wait 50; -duck; wait 5; "
elif hull_info == "crouch":
strInput += "+duck; wait 50; -duck; wait 5; "
writeCfgInput(strInput)
#print(colored_text)
time.sleep(0.1)
writeCfgInput("")
except KeyboardInterrupt:
pass
finally:
sock.close()
sock_external.close()
#/home/gameservers/.steam/debian-installation/steamapps/common/Counter-Strike Source/cstrike/cfg/autoexec.cfg:
#alias loop "exec looptest.cfg; wait 5; loop;"; wait 5; loop;
#-condebug
#cd /home/gameservers/.steam/debian-installation/
#./steam.sh -applaunch 240 -textmode -textmessagedebug -novid -nosound -noipx -nojoy -noshaderapi
#cd /home/gameservers/ze_runner_files
#python3 ingamefollowct.py
#screen -d -m -S ze_runner python3 ingamefollowct.py
#before steam login: export SDL_VIDEO_X11_VISUALID=0x074
#to find correct SDL_VIDEO_X11_VISUALID use glxinfo in X2GO