210 lines
8.2 KiB
Python
210 lines
8.2 KiB
Python
import os
|
|
import sys
|
|
import subprocess
|
|
import atexit
|
|
from threading import Timer
|
|
import string
|
|
import random
|
|
import signal
|
|
import socket
|
|
import codecs
|
|
import datetime
|
|
import time
|
|
|
|
looptestPath = '/home/gameservers/.steam/steam/steamapps/common/Counter-Strike Source/cstrike/cfg/looptest.cfg'
|
|
chatmsg = ""
|
|
|
|
def colored(r, g, b, text):
|
|
return "\033[38;2;{};{};{}m{} \033[38;2;255;255;255m".format(r, g, b, text)
|
|
|
|
def writeCfgInput(Input_user):
|
|
with open(looptestPath, 'w') as f:
|
|
colored_text = colored(255, 0, 0, ('Input: ' + Input_user))
|
|
#print(colored_text)
|
|
f.write(Input_user)
|
|
|
|
def exit_handler():
|
|
print('reached exithandler')
|
|
writeCfgInput('')
|
|
|
|
def joinTeam():
|
|
str = "jointeam 2; joinclass 3; zspawn;"
|
|
writeCfgInput(str)
|
|
print('jointeam func: ')
|
|
time.sleep(4.5)
|
|
writeCfgInput('')
|
|
|
|
def bot_process_surf(input_line):
|
|
bot_surf_plane = input_line[input_line.index("surfing:") + len("surfing:"):]
|
|
bot_surf_plane = bot_surf_plane.strip()
|
|
bot_surf_plane = [float(i.replace('\U00002013', '-')) for i in bot_surf_plane.split(' ')]
|
|
#X axis positive = hold A
|
|
#X axis negative = hold D
|
|
#if Y axis = 0.0 use X instead
|
|
#Y axis negative = hold A
|
|
#Y axis positive = hold D
|
|
strInput = "-forward; wait 3; "
|
|
if bot_surf_plane[1] < 0.0:
|
|
strInput += "+moveleft; wait 3; "
|
|
elif bot_surf_plane[1] > 0.0:
|
|
strInput += "+moveright; wait 3;"
|
|
elif bot_surf_plane[0] < 0.0:
|
|
strInput += "+moveleft; wait 3; "
|
|
elif bot_surf_plane[0] > 0.0:
|
|
strInput += "+moveright; wait 3; "
|
|
print('date: ', datetime.datetime.now().time(),'bot surfing bot_surf_plane: ', bot_surf_plane)
|
|
writeCfgInput(strInput)
|
|
time.sleep(0.1)
|
|
writeCfgInput("")
|
|
|
|
def bot_process_movement(input_line):
|
|
dist_target = input_line[input_line.index("dist_target:") + len("dist_target:"):input_line.index("targethuman:")]
|
|
targethuman = input_line[input_line.index("targethuman:") + len("targethuman:"):input_line.index("enemy_distance:")]
|
|
enemy_distance = input_line[input_line.index("enemy_distance:") + len("enemy_distance:"):input_line.index("targeteam:")]
|
|
targeteam = input_line[input_line.index("targeteam:") + len("targeteam:"):input_line.index("target_enemy:")]
|
|
target_enemy = input_line[input_line.index("target_enemy:") + len("target_enemy:"):]
|
|
dist_target = float(dist_target)
|
|
enemy_distance = float(enemy_distance)
|
|
targeteam = int(targeteam)
|
|
min_distance_target_human = 50.0
|
|
strInput = "-attack; wait 2; -use; wait 5; +attack; wait 5; cl_minmodels 1; wait 2; +use; +forward; wait 2; "
|
|
if dist_target > min_distance_target_human and targeteam == 3:
|
|
#print('dist_target: ', dist_target)
|
|
strInput += "use weapon_elite; wait 3; "
|
|
elif targeteam == 3:
|
|
strInput += "use weapon_p90; wait 3; "
|
|
elif targeteam == 2:
|
|
strInput += "use weapon_knife; wait 5; "
|
|
print('date: ', datetime.datetime.now().time(), ' target_enemy: ', target_enemy, ' enemy distance: ', enemy_distance, ' target human: ', targethuman,
|
|
' dist_target: ', dist_target)
|
|
strInput = strinput_append(strInput, 2)
|
|
#print('strInput final:', strInput)
|
|
writeCfgInput(strInput)
|
|
time.sleep(0.1)
|
|
writeCfgInput("")
|
|
|
|
def strinput_append(strInput, nth):
|
|
for _ in range(10 * nth):
|
|
boolean_val = random.choice([True, False])
|
|
if boolean_val:
|
|
strInput += "+moveleft; wait 15; -moveleft; "
|
|
else:
|
|
strInput += "+moveright; wait 15; -moveright; "
|
|
return strInput
|
|
|
|
def kill_css_process():
|
|
css_pid = subprocess.getoutput(["pidof hl2_linux"])
|
|
if css_pid:
|
|
print('css_pid: ', css_pid, 'shutting the bots game down....')
|
|
css_pid = int(css_pid.strip())
|
|
os.kill(css_pid, signal.SIGTERM)
|
|
time.sleep(15)
|
|
print('preparing to launch game....')
|
|
os.chdir('/home/gameservers/.steam/debian-installation/')
|
|
subprocess.check_call("./steam.sh %s" % ("-applaunch 240 -textmode -textmessagedebug -novid -nosound -noipx -nojoy -noshaderapi"), shell=True)
|
|
print('finished starting game')
|
|
|
|
def bot_connect_ze():
|
|
#use whatever ip you want here to connect with
|
|
strdev = "connect 151.80.230.149:27019/test132;"
|
|
str1 = "connect 151.80.230.149:27015;"
|
|
writeCfgInput(str1)
|
|
time.sleep(0.2)
|
|
writeCfgInput('')
|
|
print('not yet connected')
|
|
|
|
def pairwise(it):
|
|
it = iter(it)
|
|
while True:
|
|
try:
|
|
yield next(it), next(it)
|
|
except StopIteration:
|
|
# no more elements in the iterator
|
|
return
|
|
|
|
if __name__ == '__main__':
|
|
atexit.register(exit_handler)
|
|
local_ip = "127.0.0.1"
|
|
local_port = 48477
|
|
udp_external_ip = "62.210.116.176"
|
|
buffer_size = 4096 #potentially not large enough?
|
|
connection_issue_counter = 0;
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock_external = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
sock.bind(("", local_port))
|
|
messager_name = ""
|
|
print('reached deadlock')
|
|
try:
|
|
while True:
|
|
data, addr = sock.recvfrom(buffer_size)
|
|
data = codecs.decode(data, "utf-8", "ignore")
|
|
ip = addr[0]
|
|
port = addr[1]
|
|
#print('port: ', port, " ip: ", ip)
|
|
if not data:
|
|
continue
|
|
if ip not in [local_ip, udp_external_ip]:
|
|
continue
|
|
if ip == udp_external_ip:
|
|
if messager_name in data:
|
|
messager_name = ""
|
|
response_msg = f"""say {messager_name} {data}"""
|
|
print("remote UDP packet response_msg: ", response_msg)
|
|
writeCfgInput(response_msg)
|
|
#needs mini delay to have time to write input to console
|
|
time.sleep(0.1)
|
|
writeCfgInput("")
|
|
if data == "autismo connected":
|
|
print('Bot connected!')
|
|
connection_issue_counter = 0
|
|
joinTeam()
|
|
elif data == "bot kicked server full":
|
|
print('bot kicked server full: ', datetime.datetime.now().time())
|
|
elif data == "connect to ze":
|
|
if connection_issue_counter == 5:
|
|
kill_css_process()
|
|
connection_issue_counter += 1
|
|
print('connection_issue_counter: ', connection_issue_counter)
|
|
bot_connect_ze()
|
|
elif "clientmessage:" in data:
|
|
messager_name = data.split("clientmessage:", 1)[1].split(" 72DqZ84")[0]
|
|
databyte_send_message = messager_name + data.split("72DqZ84")[1]
|
|
sock_external.sendto(databyte_send_message.encode(), (udp_external_ip, local_port))
|
|
print('databyte_send_message: ', databyte_send_message)
|
|
elif data.startswith("dist_target:"):
|
|
bot_process_movement(data)
|
|
elif data.startswith("surfing:"):
|
|
bot_process_surf(data)
|
|
elif data.startswith("hull info:"):
|
|
hull_info = data[data.index("hull info:") + len("hull info:"):]
|
|
colored_text = colored(255, 0, 0, ('hull_info: ' + hull_info))
|
|
strInput = ""
|
|
if hull_info == "jump":
|
|
strInput += "+jump; wait 5; -jump; +duck; wait 50; -duck; wait 5; "
|
|
elif hull_info == "crouch":
|
|
strInput += "+duck; wait 50; -duck; wait 5; "
|
|
writeCfgInput(strInput)
|
|
#print(colored_text)
|
|
time.sleep(0.1)
|
|
writeCfgInput("")
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
sock.close()
|
|
sock_external.close()
|
|
|
|
#/home/gameservers/.steam/debian-installation/steamapps/common/Counter-Strike Source/cstrike/cfg/autoexec.cfg:
|
|
#alias loop "exec looptest.cfg; wait 5; loop;"; wait 5; loop;
|
|
|
|
#-condebug
|
|
#cd /home/gameservers/.steam/debian-installation/
|
|
#./steam.sh -applaunch 240 -textmode -textmessagedebug -novid -nosound -noipx -nojoy -noshaderapi
|
|
|
|
#cd /home/gameservers/ze_runner_files
|
|
#python3 ingamefollowct.py
|
|
#screen -d -m -S ze_runner python3 ingamefollowct.py
|
|
|
|
#before steam login: export SDL_VIDEO_X11_VISUALID=0x074
|
|
#to find correct SDL_VIDEO_X11_VISUALID use glxinfo in X2GO
|