ff48940485
added timestamp logging and redid most logging messages. added some extra informations in regards to crouching/jumping over obstacles that need to be checked further (dot_product). added cooldowns and finally fixed bot being stuck in spectate hopefully. also changed cooldown timers to span less UDP packets.
228 lines
9.4 KiB
Python
228 lines
9.4 KiB
Python
import os
|
|
import sys
|
|
import subprocess
|
|
import atexit
|
|
from threading import Timer
|
|
import string
|
|
import random
|
|
import signal
|
|
import time
|
|
import socket
|
|
import codecs
|
|
import datetime
|
|
|
|
looptestPath = '/home/gameservers/.steam/steam/steamapps/common/Counter-Strike Source/cstrike/cfg/looptest.cfg'
|
|
chatmsg = ""
|
|
ladder_counter = 0
|
|
|
|
def writeCfgInput(Input):
|
|
with open(looptestPath, 'w') as f:
|
|
f.write(Input)
|
|
|
|
def resetCfgInputShortWait():
|
|
str = "wait 5; "
|
|
with open(looptestPath, 'w') as f:
|
|
f.write(str)
|
|
time.sleep(0.2)
|
|
|
|
def exit_handler():
|
|
print('reached exithandler')
|
|
resetCfgInputShortWait()
|
|
|
|
def joinTeam():
|
|
str = "jointeam 2; joinclass 3; zspawn;"
|
|
writeCfgInput(str)
|
|
time.sleep(4.5)
|
|
print('jointeam func: ')
|
|
|
|
def bot_process_surf(input_line):
|
|
bot_surf_plane = input_line[input_line.index("surfing:") + len("surfing:"):]
|
|
bot_surf_plane = bot_surf_plane.strip()
|
|
bot_surf_plane = [float(i.replace('\U00002013', '-')) for i in bot_surf_plane.split(' ')]
|
|
#X axis positive = hold A
|
|
#X axis negative = hold D
|
|
#if Y axis = 0.0 use X instead
|
|
#Y axis negative = hold A
|
|
#Y axis positive = hold D
|
|
strInput = "-forward; wait 3; "
|
|
if bot_surf_plane[1] < 0.0:
|
|
strInput += "+moveleft; wait 3; "
|
|
elif bot_surf_plane[1] > 0.0:
|
|
strInput += "+moveright; wait 3;"
|
|
elif bot_surf_plane[0] < 0.0:
|
|
strInput += "+moveleft; wait 3; "
|
|
elif bot_surf_plane[0] > 0.0:
|
|
strInput += "+moveright; wait 3; "
|
|
print('bot surfing bot_surf_plane: ', bot_surf_plane)
|
|
writeCfgInput(strInput)
|
|
time.sleep(0.3)
|
|
writeCfgInput("wait 5;")
|
|
|
|
def bot_process_keyinput(input_line):
|
|
movement_input = input_line[input_line.index("keyinput:") + len("keyinput:"):input_line.index("dist_target:")]
|
|
dist_target = input_line[input_line.index("dist_target:") + len("dist_target:"):]
|
|
dist_target = float(dist_target)
|
|
strInput = f"""{movement_input}; wait 5; -jump; wait 5; -duck; wait 5; """
|
|
writeCfgInput(strInput)
|
|
print('MIMIC datetime: ', datetime.datetime.now().time(), ' dist_target: ', dist_target, ' movement strinput: ', strInput)
|
|
time.sleep(0.3)
|
|
writeCfgInput("wait 5;")
|
|
|
|
def bot_process_movement(input_line):
|
|
dist_target = input_line[input_line.index("dist_target:") + len("dist_target:"):input_line.index("targethuman:")]
|
|
targethuman = input_line[input_line.index("targethuman:") + len("targethuman:"):input_line.index("bot_on_type:")]
|
|
bot_on_type = input_line[input_line.index("bot_on_type:") + len("bot_on_type:"):input_line.index("enemy_distance:")]
|
|
enemy_distance = input_line[input_line.index("enemy_distance:") + len("enemy_distance:"):input_line.index("dot_product:")]
|
|
dot_product = input_line[input_line.index("dot_product:") + len("dot_product:"):input_line.index("targeteam:")]
|
|
targeteam = input_line[input_line.index("targeteam:") + len("targeteam:"):input_line.index("target_enemy:")]
|
|
target_enemy = input_line[input_line.index("target_enemy:") + len("target_enemy:"):input_line.index("z_axis:")]
|
|
z_axis = input_line[input_line.index("z_axis:") + len("z_axis:"):]
|
|
dist_target = float(dist_target)
|
|
z_axis = float(z_axis)
|
|
enemy_distance = float(enemy_distance)
|
|
bot_on_type = int(bot_on_type)
|
|
dot_product = float(dot_product)
|
|
targeteam = int(targeteam)
|
|
min_distance_target_human = 800.0
|
|
strInput = "-attack; wait 2; -use; wait 5; -jump; wait 5; -duck; wait 5; +attack; wait 5; cl_minmodels 1; wait 2; +use; +forward; wait 2; "
|
|
if dot_product > 0.0:
|
|
print('dot_product: ', dot_product)
|
|
if z_axis > 0.0:
|
|
print('z_axis: ', z_axis)
|
|
crouch_cap = 5000.0
|
|
jump_cap = 100000.0
|
|
if 0.0 < dot_product <= crouch_cap:
|
|
print('crouching', datetime.datetime.now().time())
|
|
strInput += "+duck; wait 1500; +jump; -duck; wait 250; -jump; wait 50;"
|
|
elif 0.0 < dot_product <= jump_cap:
|
|
print('jumping', datetime.datetime.now().time())
|
|
strInput += "+jump; wait 350; +duck; wait 250; -jump; -duck; wait 50;"
|
|
if dist_target > min_distance_target_human:
|
|
strInput += "use weapon_elite; wait 3; "
|
|
elif targeteam == 3:
|
|
strInput += "use weapon_p90; wait 3; "
|
|
elif targeteam == 2:
|
|
strInput += "use weapon_knife; wait 5; "
|
|
global ladder_counter
|
|
if bot_on_type == 0 and ladder_counter < 100:
|
|
print('bot_on_type ladder, ladder_counter: ', ladder_counter)
|
|
strInput += "setang -90 0 0; wait 5; -back; wait 3; -moveleft; wait 3; -moveright; wait 5; -jump; wait 3; -duck; wait 3; "
|
|
ladder_counter += 1
|
|
else:
|
|
ladder_counter = 0
|
|
min_enemy_distance = 100.0
|
|
if bot_on_type == 3:
|
|
print('3 = downhill: ', bot_on_type)
|
|
for _ in range(5):
|
|
strInput += "+jump; wait 5;"
|
|
if enemy_distance > 0:
|
|
print('date: ', datetime.datetime.now().time(), ' target_enemy: ', target_enemy, ' enemy distance: ', enemy_distance)
|
|
else:
|
|
print('date: ', datetime.datetime.now().time(), ' target human: ', targethuman, ' dist_target: ', dist_target,)
|
|
strInput = strinput_append(strInput, 2)
|
|
#print('strInput final:', strInput)
|
|
writeCfgInput(strInput)
|
|
time.sleep(0.4)
|
|
writeCfgInput("wait 5;")
|
|
|
|
def strinput_append(strInput, nth):
|
|
for _ in range(10 * nth):
|
|
boolean_val = random.choice([True, False])
|
|
if boolean_val:
|
|
strInput += "+moveleft; wait 15; -moveleft; "
|
|
else:
|
|
strInput += "+moveright; wait 15; -moveright; "
|
|
return strInput
|
|
|
|
def kill_css_process():
|
|
css_pid = subprocess.getoutput(["pidof hl2_linux"])
|
|
if css_pid:
|
|
print('css_pid: ', css_pid, 'shutting the bots game down....')
|
|
css_pid = int(css_pid.strip())
|
|
os.kill(css_pid, signal.SIGTERM)
|
|
time.sleep(15)
|
|
print('preparing to launch game....')
|
|
os.chdir('/home/gameservers/.steam/debian-installation/')
|
|
subprocess.check_call("./steam.sh %s" % ("-applaunch 240 -textmode -textmessagedebug -novid -nosound -noipx -nojoy -noshaderapi"), shell=True)
|
|
print('finished starting game')
|
|
|
|
def bot_connect_ze():
|
|
#use whatever ip you want here to connect with
|
|
strdev = "connect 151.80.230.149:27019/test132;"
|
|
str1 = "connect 151.80.230.149:27015;"
|
|
writeCfgInput(str1)
|
|
time.sleep(0.2)
|
|
writeCfgInput("wait 5;")
|
|
print('not yet connected')
|
|
|
|
if __name__ == '__main__':
|
|
atexit.register(exit_handler)
|
|
resetCfgInputShortWait()
|
|
local_ip = "127.0.0.1"
|
|
local_port = 48477
|
|
udp_external_ip = "62.210.110.245"
|
|
buffer_size = 4096 #potentially not large enough?
|
|
connection_issue_counter = 0;
|
|
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
|
|
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock_external:
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
sock.bind(("", local_port))
|
|
print('reached deadlock')
|
|
try:
|
|
while True:
|
|
data, addr = sock.recvfrom(buffer_size)
|
|
databyte = data
|
|
data = codecs.decode(data, "utf-8", "ignore")
|
|
ip = addr[0]
|
|
port = addr[1]
|
|
#print('port: ', port, " ip: ", ip)
|
|
if not data:
|
|
continue
|
|
if ip not in [local_ip, udp_external_ip]:
|
|
continue
|
|
if ip == udp_external_ip:
|
|
print("enabled remote UDP packet")
|
|
response_msg = f"""say {data}"""
|
|
writeCfgInput(response_msg)
|
|
time.sleep(0.5)
|
|
resetCfgInputShortWait()
|
|
#print('data: ', data)
|
|
if data == "autismo connected":
|
|
print('Bot connected!')
|
|
connection_issue_counter = 0
|
|
time.sleep(2)
|
|
joinTeam()
|
|
elif data == "bot kicked server full":
|
|
print('bot kicked server full: ', datetime.datetime.now().time())
|
|
elif data == "connect to ze":
|
|
if connection_issue_counter == 5:
|
|
kill_css_process()
|
|
connection_issue_counter += 1
|
|
print('connection_issue_counter: ', connection_issue_counter)
|
|
bot_connect_ze()
|
|
elif "clientmessage:" in data:
|
|
sock_external.sendto(databyte, (udp_external_ip, local_port))
|
|
print('sent databyte: ', databyte)
|
|
elif data.startswith("dist_target:"):
|
|
bot_process_movement(data)
|
|
elif data.startswith("surfing:"):
|
|
bot_process_surf(data)
|
|
elif data.startswith("keyinput:"):
|
|
bot_process_keyinput(data)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
#/home/gameservers/.steam/debian-installation/steamapps/common/Counter-Strike Source/cstrike/cfg/autoexec.cfg:
|
|
#alias loop "exec looptest.cfg; wait 5; loop;"; wait 5; loop;
|
|
|
|
#-condebug
|
|
#cd /home/gameservers/.steam/debian-installation/
|
|
#./steam.sh -applaunch 240 -textmode -textmessagedebug -novid -nosound -noipx -nojoy -noshaderapi
|
|
|
|
#cd /home/gameservers/ze_runner_files
|
|
#./play.sh
|
|
#screen -A -d -m -S ze_runner ./play.sh
|
|
|
|
#before steam login: export SDL_VIDEO_X11_VISUALID=0x074
|
|
#to find correct SDL_VIDEO_X11_VISUALID use glxinfo in X2GO
|