From 3394c4d85bb2a455d0a65b12fcc8e73840429bf8 Mon Sep 17 00:00:00 2001 From: Metroid_Skittles Date: Thu, 5 Feb 2026 16:51:58 +0100 Subject: [PATCH] Update torchlight_changes_unloze/torchlight3/Torchlight/Commands.py Rewrite of command.py --- .../torchlight3/Torchlight/Commands.py | 2925 +++++++++++------ 1 file changed, 1935 insertions(+), 990 deletions(-) diff --git a/torchlight_changes_unloze/torchlight3/Torchlight/Commands.py b/torchlight_changes_unloze/torchlight3/Torchlight/Commands.py index d47d14c..afa0f5c 100755 --- a/torchlight_changes_unloze/torchlight3/Torchlight/Commands.py +++ b/torchlight_changes_unloze/torchlight3/Torchlight/Commands.py @@ -1,990 +1,1935 @@ -#!/usr/bin/python3 -# -*- coding: utf-8 -*- -import asyncio -import os -import sys -import logging -import math -from .Utils import Utils, DataHolder -import traceback - -def get_birtate(message): - bitrate = [] - try: - for msg in message[1].split(" "): #checking if - if "bitrate=" in msg: - bitrate = int(msg.split("bitrate=",1)[1]) - if bitrate < 0.0: - bitrate = 0.01 - if bitrate > 2000: - bitrate = 20 - bitrate.append(f"bitrate=tempo={bitrate}") - except Exception: - pass - return bitrate - -def get_backwards(message): - backwards = None - try: - for msg in message[1].split(" "): #checking if pitch= or tempo= is specified - if "backward=" in msg.lower(): - backwards = True - elif "backwards=" in msg.lower(): - backwards = True - except Exception: - pass - return backwards - -def get_rubberBand(message): - rubberband = [] - try: - for msg in message[1].split(" "): #checking if pitch= or tempo= is specified - if "tempo=" in msg: - tempo = float(msg.split("tempo=",1)[1]) - if tempo < 0.0: - tempo = 0.01 - if tempo > 20: - tempo = 20 - rubberband.append(f"rubberband=tempo={tempo}") - elif "pitch=" in msg: - pitch = float(msg.split("pitch=",1)[1]) - if pitch < 0.0: - pitch = 0.1 - if pitch > 5.0: - pitch = 5.0 - rubberband.append(f"rubberband=pitch={pitch}") - except Exception: - pass - return rubberband - -class BaseCommand(): - Order = 0 - def __init__(self, torchlight): - self.Logger = logging.getLogger(__class__.__name__) - self.Torchlight = torchlight - self.Triggers = [] - self.Level = 0 - - def check_chat_cooldown(self, player): - if player.ChatCooldown > self.Torchlight().Master.Loop.time(): - cooldown = player.ChatCooldown - self.Torchlight().Master.Loop.time() - self.Torchlight().SayPrivate(player, "You're on cooldown for the next {0:.1f} seconds.".format(cooldown)) - return True - - def check_disabled(self, player): - Level = 0 - if player.Access: - Level = player.Access["level"] - - Disabled = self.Torchlight().Disabled - if Disabled and (Disabled > Level or Disabled == Level and Level < self.Torchlight().Config["AntiSpam"]["ImmunityLevel"]): - self.Torchlight().SayPrivate(player, "Torchlight is currently disabled!") - return True - - async def _func(self, message, player): - self.Logger.debug(sys._getframe().f_code.co_name) - - -class URLFilter(BaseCommand): - Order = 1 - import re - import aiohttp - import magic - import datetime - import json - import io - from bs4 import BeautifulSoup - from PIL import Image - def __init__(self, torchlight): - super().__init__(torchlight) - self.Triggers = [self.re.compile(r'''(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'".,<>?«»“”‘’]))''', self.re.IGNORECASE)] - self.Level = -1 - self.re_youtube = self.re.compile(r'.*?(?:youtube\.com\/\S*(?:(?:\/e(?:mbed))?\/|watch\?(?:\S*?&?v\=))|youtu\.be\/)([a-zA-Z0-9_-]{6,11}).*?') - - async def URLInfo(self, url, yt = False): - Text = None - Info = None - match = self.re_youtube.search(url) - if match or yt: - Temp = DataHolder() - Time = None - - if Temp(url.find("&t=")) != -1 or Temp(url.find("?t=")) != -1 or Temp(url.find("#t=")) != -1: - TimeStr = url[Temp.value + 3:].split('&')[0].split('?')[0].split('#')[0] - if TimeStr: - Time = Utils.ParseTime(TimeStr) - - Proc = await asyncio.create_subprocess_exec("yt-dlp", "--dump-json", "-g", url, - stdout = asyncio.subprocess.PIPE) - Out, _ = await Proc.communicate() - - parts = Out.split(b'\n') - parts.pop() # trailing new line - - Info = parts.pop() - url = parts.pop() - - url = url.strip().decode("ascii") - Info = self.json.loads(Info) - - if Info["extractor_key"] == "Youtube": - self.Torchlight().SayChat("\x07E52D27[YouTube]\x01 {0} | {1} | {2:,}".format( - Info["title"], str(self.datetime.timedelta(seconds = Info["duration"])), int(Info["view_count"]))) - else: - match = None - - if Time: - url += "#t={0}".format(Time) - - else: - try: - async with self.aiohttp.ClientSession() as session: - Response = await asyncio.wait_for(session.get(url), 5) - if Response: - ContentType = Response.headers.get("Content-Type") - ContentLength = Response.headers.get("Content-Length") - Content = await asyncio.wait_for(Response.content.read(65536), 5) - - if not ContentLength: - ContentLength = -1 - - if ContentType.startswith("text"): - if ContentType.startswith("text/plain"): - Text = Content.decode("utf-8", errors = "ignore") - else: - Soup = self.BeautifulSoup(Content.decode("utf-8", errors = "ignore"), "lxml") - if Soup.title: - self.Torchlight().SayChat("[URL] {0}".format(Soup.title.string)) - elif ContentType.startswith("image"): - fp = self.io.BytesIO(Content) - im = self.Image.open(fp) - self.Torchlight().SayChat("[IMAGE] {0} | Width: {1} | Height: {2} | Size: {3}".format(im.format, im.size[0], im.size[1], Utils.HumanSize(ContentLength))) - fp.close() - else: - Filetype = self.magic.from_buffer(bytes(Content)) - self.Torchlight().SayChat("[FILE] {0} | Size: {1}".format(Filetype, Utils.HumanSize(ContentLength))) - - Response.close() - except Exception as e: - self.Torchlight().SayChat("Error: {0}".format(str(e))) - self.Logger.error(traceback.format_exc()) - - self.Torchlight().LastUrl = url - return url, Text - - async def _rfunc(self, line, match, player): - Url = match.groups()[0] - if not Url.startswith("http") and not Url.startswith("ftp"): - Url = "http://" + Url - - if line.startswith("!yt "): - URL, _ = await self.URLInfo(Url, True) - return "!yt " + URL - - if line.startswith("!dec "): - _, text = await self.URLInfo(Url, False) - if text: - return "!dec " + text - - asyncio.ensure_future(self.URLInfo(Url)) - return -1 - - -def FormatAccess(Torchlight, player): - Answer = "#{0} \"{1}\"({2}) is ".format(player.UserID, player.Name, player.UniqueID) - Level = str(0) - if player.Access: - Level = str(player.Access["level"]) - Answer += "level {0!s} as {1}.".format(Level, player.Access["name"]) - else: - Answer += "not authenticated." - - if Level in Torchlight().Config["AudioLimits"]: - Uses = Torchlight().Config["AudioLimits"][Level]["Uses"] - TotalTime = Torchlight().Config["AudioLimits"][Level]["TotalTime"] - - if Uses >= 0: - Answer += " Uses: {0}/{1}".format(player.Storage["Audio"]["Uses"], Uses) - if TotalTime >= 0: - Answer += " Time: {0}/{1}".format(round(player.Storage["Audio"]["TimeUsed"], 2), round(TotalTime, 2)) - - return Answer - -class Access(BaseCommand): - def __init__(self, torchlight): - super().__init__(torchlight) - self.Triggers = ["!access"] - self.Level = 0 - - async def _func(self, message, player): - self.Logger.debug(sys._getframe().f_code.co_name + ' ' + str(message)) - - if self.check_chat_cooldown(player): - return -1 - - Count = 0 - if message[0] == "!access": - if message[1]: - return -1 - - self.Torchlight().SayChat(FormatAccess(self.Torchlight, player), player) - - return 0 - -class Who(BaseCommand): - def __init__(self, torchlight): - super().__init__(torchlight) - self.Triggers = ["!who", "!whois"] - self.Level = 1 - - async def _func(self, message, player): - self.Logger.debug(sys._getframe().f_code.co_name + ' ' + str(message)) - - Count = 0 - if message[0] == "!who": - for Player in self.Torchlight().Players: - if Player.Name.lower().find(message[1].lower()) != -1: - self.Torchlight().SayChat(FormatAccess(self.Torchlight, Player)) - - Count += 1 - if Count >= 3: - break - - elif message[0] == "!whois": - for UniqueID, Access in self.Torchlight().Access: - if Access["name"].lower().find(message[1].lower()) != -1: - Player = self.Torchlight().Players.FindUniqueID(UniqueID) - if Player: - self.Torchlight().SayChat(FormatAccess(self.Torchlight, Player)) - else: - self.Torchlight().SayChat("#? \"{0}\"({1}) is level {2!s} is currently offline.".format(Access["name"], UniqueID, Access["level"])) - - Count += 1 - if Count >= 3: - break - return 0 - - -class WolframAlpha(BaseCommand): - import urllib.parse - import aiohttp - import xml.etree.ElementTree as etree - import re - def __init__(self, torchlight): - super().__init__(torchlight) - self.Triggers = ["!cc"] - self.Level = 10 - - def Clean(self, Text): - return self.re.sub("[ ]{2,}", " ", Text.replace(' | ', ': ').replace('\n', ' | ').replace('~~', ' ≈ ')).strip() - - async def Calculate(self, Params, player): - async with self.aiohttp.ClientSession() as session: - Response = await asyncio.wait_for(session.get("http://api.wolframalpha.com/v2/query", params=Params), 10) - if not Response: - return 1 - - Data = await asyncio.wait_for(Response.text(), 5) - if not Data: - return 2 - - Root = self.etree.fromstring(Data) - - - # Find all pods with plaintext answers - # Filter out None -answers, strip strings and filter out the empty ones - Pods = list(filter(None, [p.text.strip() for p in Root.findall('.//subpod/plaintext') if p is not None and p.text is not None])) - - # no answer pods found, check if there are didyoumeans-elements - if not Pods: - Didyoumeans = Root.find("didyoumeans") - # no support for future stuff yet, TODO? - if not Didyoumeans: - # If there's no pods, the question clearly wasn't understood - self.Torchlight().SayChat("Sorry, couldn't understand the question.", player) - return 3 - - Options = [] - for Didyoumean in Didyoumeans: - Options.append("\"{0}\"".format(Didyoumean.text)) - Line = " or ".join(Options) - Line = "Did you mean {0}?".format(Line) - self.Torchlight().SayChat(Line, player) - return 0 - - # If there's only one pod with text, it's probably the answer - # example: "integral x²" - if len(Pods) == 1: - Answer = self.Clean(Pods[0]) - self.Torchlight().SayChat(Answer, player) - return 0 - - # If there's multiple pods, first is the question interpretation - Question = self.Clean(Pods[0].replace(' | ', ' ').replace('\n', ' ')) - # and second is the best answer - Answer = self.Clean(Pods[1]) - self.Torchlight().SayChat("{0} = {1}".format(Question, Answer), player) - return 0 - - async def _func(self, message, player): - self.Logger.debug(sys._getframe().f_code.co_name + ' ' + str(message)) - - if self.check_chat_cooldown(player): - return -1 - - if self.check_disabled(player): - return -1 - - Params = dict({"input": message[1], "appid": self.Torchlight().Config["WolframAPIKey"]}) - Ret = await self.Calculate(Params, player) - return Ret - - -class UrbanDictionary(BaseCommand): - import aiohttp - def __init__(self, torchlight): - super().__init__(torchlight) - self.Triggers = ["!define", "!ud"] - self.Level = 10 - - async def _func(self, message, player): - self.Logger.debug(sys._getframe().f_code.co_name + ' ' + str(message)) - - if self.check_chat_cooldown(player): - return -1 - - if self.check_disabled(player): - return -1 - - async with self.aiohttp.ClientSession() as session: - Response = await asyncio.wait_for(session.get("https://api.urbandictionary.com/v0/define?term={0}".format(message[1])), 5) - if not Response: - return 1 - - Data = await asyncio.wait_for(Response.json(), 5) - if not Data: - return 3 - - if not 'list' in Data or not Data["list"]: - self.Torchlight().SayChat("[UB] No definition found for: {}".format(message[1]), player) - return 4 - - def print_item(item): - self.Torchlight().SayChat("[UD] {word} ({thumbs_up}/{thumbs_down}): {definition}\n{example}".format(**item), player) - - print_item(Data["list"][0]) - - -class OpenWeather(BaseCommand): - import aiohttp - import geoip2.database - def __init__(self, torchlight): - super().__init__(torchlight) - self.GeoIP = self.geoip2.database.Reader("/var/lib/GeoIP/GeoLite2-City.mmdb") - self.Triggers = ["!w", "!vv"] - self.Level = 10 - - async def _func(self, message, player): - self.Logger.debug(sys._getframe().f_code.co_name + ' ' + str(message)) - - if self.check_chat_cooldown(player): - return -1 - - if self.check_disabled(player): - return -1 - - if not message[1]: - # Use GeoIP location - info = self.GeoIP.city(player.Address.split(":")[0]) - Search = "lat={}&lon={}".format(info.location.latitude, info.location.longitude) - else: - Search = "q={}".format(message[1]) - - async with self.aiohttp.ClientSession() as session: - Response = await asyncio.wait_for(session.get("https://api.openweathermap.org/data/2.5/weather?APPID={0}&units=metric&{1}".format( - self.Torchlight().Config["OpenWeatherAPIKey"], Search)), 5) - if not Response: - return 2 - - Data = await asyncio.wait_for(Response.json(), 5) - if not Data: - return 3 - - if Data["cod"] != 200: - self.Torchlight().SayPrivate(player, "[OW] {0}".format(Data["message"])) - return 5 - - degToCardinal = lambda d: ["N", "NE", "E", "SE", "S", "SW", "W", "NW"][int(((d + 22.5)/45.0) % 8)] - if "deg" in Data["wind"]: - windDir = degToCardinal(Data["wind"]["deg"]) - else: - windDir = "?" - - timezone = "{}{}".format('+' if Data["timezone"] > 0 else '', int(Data["timezone"] / 3600)) - if Data["timezone"] % 3600 != 0: - timezone += ":{}".format((Data["timezone"] % 3600) / 60) - - self.Torchlight().SayChat("[{}, {}](UTC{}) {}°C ({}/{}) {}: {} | Wind {} {}kph | Clouds: {}%% | Humidity: {}%%".format(Data["name"], Data["sys"]["country"], timezone, - Data["main"]["temp"], Data["main"]["temp_min"], Data["main"]["temp_max"], Data["weather"][0]["main"], Data["weather"][0]["description"], - windDir, Data["wind"]["speed"], Data["clouds"]["all"], Data["main"]["humidity"]), player) - - return 0 - -''' -class WUnderground(BaseCommand): - import aiohttp - def __init__(self, torchlight): - super().__init__(torchlight) - self.Triggers = ["!w"] - self.Level = 0 - - async def _func(self, message, player): - if not message[1]: - # Use IP address - Search = "autoip" - Additional = "?geo_ip={0}".format(player.Address.split(":")[0]) - else: - async with self.aiohttp.ClientSession() as session: - Response = await asyncio.wait_for(session.get("http://autocomplete.wunderground.com/aq?format=JSON&query={0}".format(message[1])), 5) - if not Response: - return 2 - - Data = await asyncio.wait_for(Response.json(), 5) - if not Data: - return 3 - - if not Data["RESULTS"]: - self.Torchlight().SayPrivate(player, "[WU] No cities match your search query.") - return 4 - - Search = Data["RESULTS"][0]["name"] - Additional = "" - - async with self.aiohttp.ClientSession() as session: - Response = await asyncio.wait_for(session.get("http://api.wunderground.com/api/{0}/conditions/q/{1}.json{2}".format( - self.Torchlight().Config["WundergroundAPIKey"], Search, Additional)), 5) - if not Response: - return 2 - - Data = await asyncio.wait_for(Response.json(), 5) - if not Data: - return 3 - - if "error" in Data["response"]: - self.Torchlight().SayPrivate(player, "[WU] {0}.".format(Data["response"]["error"]["description"])) - return 5 - - if not "current_observation" in Data: - Choices = str() - NumResults = len(Data["response"]["results"]) - for i, Result in enumerate(Data["response"]["results"]): - Choices += "{0}, {1}".format(Result["city"], - Result["state"] if Result["state"] else Result ["country_iso3166"]) - - if i < NumResults - 1: - Choices += " | " - - self.Torchlight().SayPrivate(player, "[WU] Did you mean: {0}".format(Choices)) - return 6 - - Observation = Data["current_observation"] - - self.Torchlight().SayChat("[{0}, {1}] {2}°C ({3}F) {4} | Wind {5} {6}kph ({7}mph) | Humidity: {8}".format(Observation["display_location"]["city"], - Observation["display_location"]["state"] if Observation["display_location"]["state"] else Observation["display_location"]["country_iso3166"], - Observation["temp_c"], Observation["temp_f"], Observation["weather"], - Observation["wind_dir"], Observation["wind_kph"], Observation["wind_mph"], - Observation["relative_humidity"])) - - return 0 -''' - -class VoteDisable(BaseCommand): - def __init__(self, torchlight): - super().__init__(torchlight) - self.Triggers = ["!votedisable", "!disablevote"] - self.Level = 0 - - async def _func(self, message, player): - self.Logger.debug(sys._getframe().f_code.co_name + ' ' + str(message)) - - if self.Torchlight().Disabled: - self.Torchlight().SayPrivate(player, "Torchlight is already disabled for the duration of this map.") - return - - self.Torchlight().DisableVotes.add(player.UniqueID) - - have = len(self.Torchlight().DisableVotes) - needed = len(self.Torchlight().Players) // 5 - if have >= needed: - self.Torchlight().SayChat("Torchlight has been disabled for the duration of this map.") - self.Torchlight().Disabled = 6 - else: - self.Torchlight().SayPrivate(player, "Torchlight needs {0} more disable votes to be disabled.".format(needed - have)) - - -class VoiceCommands(BaseCommand): - import json - import random - def __init__(self, torchlight): - super().__init__(torchlight) - self.Triggers = ["!random", "!search"] - self.Level = 0 - - def LoadTriggers(self): - try: - with open("triggers.json", "r") as fp: - Triggers = self.json.load(fp) - except ValueError as e: - self.Logger.error(sys._getframe().f_code.co_name + ' ' + str(e)) - self.Torchlight().SayChat(str(e)) - - self.VoiceTriggers = dict() - for Line in Triggers: - for Trigger in Line["names"]: - self.VoiceTriggers[Trigger] = Line["sound"] - - def _setup(self): - self.Logger.debug(sys._getframe().f_code.co_name) - self.LoadTriggers() - for Trigger in self.VoiceTriggers.keys(): - self.Triggers.append(Trigger) - - async def _func(self, message, player): - self.Logger.debug(sys._getframe().f_code.co_name + ' ' + str(message)) - - if self.check_disabled(player): - return -1 - - Level = 0 - if player.Access: - Level = player.Access["level"] - - message[0] = message[0].lower() - message[1] = message[1].lower() - if message[0][0] != '!' and Level < 2: - return 1 - - if message[0] == "!search": - res = [] - for key in self.VoiceTriggers.keys(): - if message[1] in key.lower(): - res.append(key) - self.Torchlight().SayPrivate(player, "{} results: {}".format(len(res), ", ".join(res))) - return 0 - elif Level < 2: - return 0 - - rubberband = get_rubberBand(message) - backwards = get_backwards(message) - bitrate = get_birtate(message) - - if message[0] == "!random": - Trigger = self.random.choice(list(self.VoiceTriggers.values())) - if isinstance(Trigger, list): - Sound = self.random.choice(Trigger) - else: - Sound = Trigger - else: - Sounds = self.VoiceTriggers[message[0]] - - try: - Num = int(message[1].split(" ")[0]) - except ValueError: - Num = None - - if isinstance(Sounds, list): - if Num and Num > 0 and Num <= len(Sounds): - Sound = Sounds[Num - 1] - - elif message[1] and not message[1].startswith("tempo=") and not message[1].startswith("pitch=") and not message[1].startswith('backward=') and not message[1].startswith('backwards='): #it does not start with pitch or with tempo, so must be a number or alias. - searching = message[1].startswith('?') - search = message[1][1:] if searching else message[1].split(" ")[0] - Sound = None - names = [] - matches = [] - for sound in Sounds: - name = os.path.splitext(os.path.basename(sound))[0] - names.append(name) - - if search and search in name.lower(): - matches.append((name, sound)) - - if matches: - matches.sort(key=lambda t: len(t[0])) - mlist = [t[0] for t in matches] - if searching: - self.Torchlight().SayPrivate(player, "{} results: {}".format(len(mlist), ", ".join(mlist))) - return 0 - - Sound = matches[0][1] - if len(matches) > 1: - self.Torchlight().SayPrivate(player, "Multiple matches: {}".format(", ".join(mlist))) - - if not Sound and not Num: - if not searching: - self.Torchlight().SayPrivate(player, "Couldn't find {} in list of sounds.".format(message[1])) - self.Torchlight().SayPrivate(player, ", ".join(names)) - return 1 - - elif Num: - self.Torchlight().SayPrivate(player, "Number {} is out of bounds, max {}.".format(Num, len(Sounds))) - return 1 - - else: - Sound = self.random.choice(Sounds) - else: - Sound = Sounds - - if not Sound: - return 1 - - Path = os.path.abspath(os.path.join("sounds", Sound)) - AudioClip = self.Torchlight().AudioManager.AudioClip(player, "file://" + Path) - if not AudioClip: - return 1 - - return AudioClip.Play(rubberband = rubberband, bitrate = bitrate, backwards = backwards) - - -class YouTube(BaseCommand): - def __init__(self, torchlight): - super().__init__(torchlight) - self.Triggers = ["!yt"] - self.Level = 6 - - async def _func(self, message, player, line = None): - self.Logger.debug(sys._getframe().f_code.co_name + ' ' + str(message)) - - if self.check_disabled(player): - return -1 - - if self.Torchlight().LastUrl: - message[1] = message[1].replace("!last", self.Torchlight().LastUrl) - - Temp = DataHolder() - Time = None - - if Temp(message[1].find("&t=")) != -1 or Temp(message[1].find("?t=")) != -1 or Temp(message[1].find("#t=")) != -1: - TimeStr = message[1][Temp.value + 3:].split('&')[0].split('?')[0].split('#')[0] - if TimeStr: - Time = Utils.ParseTime(TimeStr) - - AudioClip = self.Torchlight().AudioManager.AudioClip(player, message[1]) - if not AudioClip: - return 1 - - #turning the string into a list where get_rubberband just picks the second element to search in - dline = ['', line.split(" ", 1)[1]] - rubberband = get_rubberBand(dline) - backwards = get_backwards(dline) - bitrate = get_birtate(dline) - return AudioClip.Play(Time, rubberband = rubberband, bitrate = bitrate, backwards = backwards) - -class YouTubeSearch(BaseCommand): - import json - import datetime - def __init__(self, torchlight): - super().__init__(torchlight) - self.Triggers = ["!yts"] - self.Level = 6 #adjusting to new levels - - async def _func(self, message, player, line = None): - self.Logger.debug(sys._getframe().f_code.co_name + ' ' + str(message)) - - if self.check_disabled(player): - return -1 - - Temp = DataHolder() - Time = None - - if Temp(message[1].find("&t=")) != -1 or Temp(message[1].find("?t=")) != -1 or Temp(message[1].find("#t=")) != -1: - TimeStr = message[1][Temp.value + 3:].split('&')[0].split('?')[0].split('#')[0] - if TimeStr: - Time = Utils.ParseTime(TimeStr) - message[1] = message[1][:Temp.value] - - search_term = message[1].split("pitch=")[0].split("tempo=")[0].split('backward=')[0].split('backwards=')[0] - Proc = await asyncio.create_subprocess_exec("yt-dlp", "--dump-json", "-xg", "ytsearch:" + search_term, - stdout = asyncio.subprocess.PIPE) - Out, _ = await Proc.communicate() - - print('out value: ', Out) - url, Info = Out.split(b'\n', maxsplit = 1) - url = url.strip().decode("ascii") - Info = self.json.loads(Info) - - if Info["extractor_key"] == "Youtube": - self.Torchlight().SayChat("\x07E52D27[YouTube]\x01 {0} | {1} | {2:,}".format( - Info["title"], str(self.datetime.timedelta(seconds = Info["duration"])), int(Info["view_count"]))) - AudioClip = self.Torchlight().AudioManager.AudioClip(player, url) - if not AudioClip: - return 1 - self.Torchlight().LastUrl = url - - #turning the string into a list where get_rubberband just picks the second element to search in - dline = ['', line.split(" ", 1)[1]] - rubberband = get_rubberBand(dline) - backwards = get_backwards(dline) - bitrate = get_birtate(dline) - return AudioClip.Play(Time, rubberband = rubberband, bitrate = bitrate, backwards = backwards) - - -class Say(BaseCommand): - import gtts - import tempfile - VALID_LANGUAGES = [lang for lang in gtts.lang.tts_langs().keys()] - def __init__(self, torchlight): - super().__init__(torchlight) - self.Triggers = [("!say", 4)] - self.Level = 2 - - async def Say(self, player, language, message): - actual_message = message.split("pitch=")[0].split("tempo=")[0].split('backward=')[0].split('backwards=')[0] - GTTS = self.gtts.gTTS(text = actual_message, lang = language) - - TempFile = self.tempfile.NamedTemporaryFile(delete = False) - GTTS.write_to_fp(TempFile) - TempFile.close() - - AudioClip = self.Torchlight().AudioManager.AudioClip(player, "file://" + TempFile.name) - if not AudioClip: - os.unlink(TempFile.name) - return 1 - #turning the string into a list where get_rubberband just picks the second element to search in - try: - dline = ['', message.split(" ", 1)[1]] - rubberband = get_rubberBand(dline) - backwards = get_backwards(dline) - except Exception: - rubberband = None - backwards = None - - try: - dline = ['', message.split(" ", 1)[1]] - bitrate = get_birtate(dline) - except Exception: - bitrate = None - if AudioClip.Play(rubberband = rubberband, bitrate = bitrate, backwards = backwards): - AudioClip.AudioPlayer.AddCallback("Stop", lambda: os.unlink(TempFile.name)) - return 0 - else: - os.unlink(TempFile.name) - return 1 - - async def _func(self, message, player): - self.Logger.debug(sys._getframe().f_code.co_name + ' ' + str(message)) - - if self.check_disabled(player): - return -1 - - if not message[1]: - return 1 - - Language = "en" - if len(message[0]) > 4: - Language = message[0][4:] - - if not Language in self.VALID_LANGUAGES: - return 1 - - asyncio.ensure_future(self.Say(player, Language, message[1])) - return 0 - - -class DECTalk(BaseCommand): - import tempfile - def __init__(self, torchlight): - super().__init__(torchlight) - self.Triggers = ["!dec"] - self.Level = 5 - - async def Say(self, player, message): - message = "[:phoneme on]" + message - TempFile = self.tempfile.NamedTemporaryFile(delete = False) - TempFile.close() - - Proc = await asyncio.create_subprocess_exec("wine", "say.exe", "-w", TempFile.name, - cwd = "dectalk", stdin = asyncio.subprocess.PIPE) - await Proc.communicate(message.encode('utf-8', errors='ignore')) - - AudioClip = self.Torchlight().AudioManager.AudioClip(player, "file://" + TempFile.name) - if not AudioClip: - os.unlink(TempFile.name) - return 1 - - if AudioClip.Play(dec_params = ["-af", "volume=10dB"]): - AudioClip.AudioPlayer.AddCallback("Stop", lambda: os.unlink(TempFile.name)) - return 0 - else: - os.unlink(TempFile.name) - return 1 - - async def _func(self, message, player): - self.Logger.debug(sys._getframe().f_code.co_name + ' ' + str(message)) - - if self.check_disabled(player): - return -1 - - if not message[1]: - return 1 - - asyncio.ensure_future(self.Say(player, message[1])) - return 0 - - -class Stop(BaseCommand): - def __init__(self, torchlight): - super().__init__(torchlight) - self.Triggers = ["!stop"] - self.Level = 0 - - async def _func(self, message, player): - self.Logger.debug(sys._getframe().f_code.co_name + ' ' + str(message)) - - self.Torchlight().AudioManager.Stop(player, message[1]) - return True - - -class EnableDisable(BaseCommand): - def __init__(self, torchlight): - super().__init__(torchlight) - self.Triggers = ["!enable", "!disable"] - self.Level = 6 - - async def _func(self, message, player): - self.Logger.debug(sys._getframe().f_code.co_name + ' ' + str(message)) - - if message[0] == "!enable": - if self.Torchlight().Disabled: - if self.Torchlight().Disabled > player.Access["level"]: - self.Torchlight().SayPrivate(player, "You don't have access to enable torchlight, since it was disabled by a higher level user.") - return 1 - self.Torchlight().SayChat("Torchlight has been enabled for the duration of this map - Type !disable to disable it again.") - - self.Torchlight().Disabled = False - - elif message[0] == "!disable": - if self.Torchlight().Disabled > player.Access["level"]: - self.Torchlight().SayPrivate(player, "You don't have access to disable torchlight, since it was already disabled by a higher level user.") - return 1 - self.Torchlight().SayChat("Torchlight has been disabled for the duration of this map - Type !enable to enable it again.") - self.Torchlight().Disabled = player.Access["level"] - - -class AdminAccess(BaseCommand): - from collections import OrderedDict - def __init__(self, torchlight): - super().__init__(torchlight) - self.Triggers = ["!access"] - self.Level = 7 - - def ReloadValidUsers(self): - self.Torchlight().Access.Load() - for Player in self.Torchlight().Players: - Access = self.Torchlight().Access[Player.UniqueID] - Player.Access = Access - - async def _func(self, message, player): - self.Logger.debug(sys._getframe().f_code.co_name + ' ' + str(message)) - if not message[1]: - return -1 - - if message[1].lower() == "reload": - self.ReloadValidUsers() - self.Torchlight().SayChat("Loaded access list with {0} users".format(len(self.Torchlight().Access))) - - elif message[1].lower() == "save": - self.Torchlight().Access.Save() - self.Torchlight().SayChat("Saved access list with {0} users".format(len(self.Torchlight().Access))) - - # Modify access - else: - Player = None - Buf = message[1] - Temp = Buf.find(" as ") - if Temp != -1: - try: - Regname, Level = Buf[Temp + 4:].rsplit(' ', 1) - except ValueError as e: - self.Torchlight().SayChat(str(e)) - return 1 - - Regname = Regname.strip() - Level = Level.strip() - Buf = Buf[:Temp].strip() - else: - try: - Buf, Level = Buf.rsplit(' ', 1) - except ValueError as e: - self.Torchlight().SayChat(str(e)) - return 2 - - Buf = Buf.strip() - Level = Level.strip() - - # Find user by User ID - if Buf[0] == '#' and Buf[1:].isnumeric(): - Player = self.Torchlight().Players.FindUserID(int(Buf[1:])) - # Search user by name - else: - for Player_ in self.Torchlight().Players: - if Player_.Name.lower().find(Buf.lower()) != -1: - Player = Player_ - break - - if not Player: - self.Torchlight().SayChat("Couldn't find user: {0}".format(Buf)) - return 3 - - if Level.isnumeric() or (Level.startswith('-') and Level[1:].isdigit()): - Level = int(Level) - - if Level >= player.Access["level"] and player.Access["level"] < 10: - self.Torchlight().SayChat("Trying to assign level {0}, which is higher or equal than your level ({1})".format(Level, player.Access["level"])) - return 4 - - if Player.Access: - if Player.Access["level"] >= player.Access["level"] and player.Access["level"] < 10: - self.Torchlight().SayChat("Trying to modify level {0}, which is higher or equal than your level ({1})".format(Player.Access["level"], player.Access["level"])) - return 5 - - if "Regname" in locals(): - self.Torchlight().SayChat("Changed \"{0}\"({1}) as {2} level/name from {3} to {4} as {5}".format( - Player.Name, Player.UniqueID, Player.Access["name"], Player.Access["level"], Level, Regname)) - Player.Access["name"] = Regname - else: - self.Torchlight().SayChat("Changed \"{0}\"({1}) as {2} level from {3} to {4}".format( - Player.Name, Player.UniqueID, Player.Access["name"], Player.Access["level"], Level)) - - Player.Access["level"] = Level - self.Torchlight().Access[Player.UniqueID] = Player.Access - else: - if not "Regname" in locals(): - Regname = Player.Name - - self.Torchlight().Access[Player.UniqueID] = self.OrderedDict([("name", Regname), ("level", Level)]) - Player.Access = self.Torchlight().Access[Player.UniqueID] - self.Torchlight().SayChat("Added \"{0}\"({1}) to access list as {2} with level {3}".format(Player.Name, Player.UniqueID, Regname, Level)) - else: - if Level == "revoke" and Player.Access: - if Player.Access["level"] >= player.Access["level"] and player.Access["level"] < 10: - self.Torchlight().SayChat("Trying to revoke level {0}, which is higher or equal than your level ({1})".format(Player.Access["level"], player.Access["level"])) - return 6 - - self.Torchlight().SayChat("Removed \"{0}\"({1}) from access list (was {2} with level {3})".format( - Player.Name, Player.UniqueID, Player.Access["name"], Player.Access["level"])) - del self.Torchlight().Access[Player.UniqueID] - Player.Access = None - return 0 - -class Reload(BaseCommand): - def __init__(self, torchlight): - super().__init__(torchlight) - self.Triggers = ["!reload"] - self.Level = 7 - - async def _func(self, message, player): - self.Logger.debug(sys._getframe().f_code.co_name + ' ' + str(message)) - self.Torchlight().Reload() - return 0 - +#!/usr/bin/python3 +# -*- coding: utf-8 -*- +""" +Torchlight Commands Module + +Provides command implementations for audio playback, voice modulation, +information lookups, and user access management. +""" + +import asyncio +import os +import sys +import logging +import math +import json +import re +import io +import traceback +from dataclasses import dataclass, field +from typing import Optional, List, Dict, Any, Tuple, Union +from enum import Enum +from datetime import timedelta +from abc import ABC + +# Third-party imports (conditionally loaded where used) +try: + import aiohttp + import magic + from bs4 import BeautifulSoup + from PIL import Image + import geoip2.database + import gtts + import tempfile + import urllib.parse + import xml.etree.ElementTree as etree +except ImportError as e: + pass # These will be loaded as needed + +from .Utils import Utils, DataHolder + + +# ============================================================================ +# Constants and Configuration +# ============================================================================ + +MIN_BITRATE = 0.01 +MAX_BITRATE = 2000 +DEFAULT_BITRATE = 20 + +MIN_TEMPO = 0.10 +MAX_TEMPO = 20 + +MIN_PITCH = 0.1 +MAX_PITCH = 5.0 + +DEFAULT_HTTP_TIMEOUT = 5 +DEFAULT_CONTENT_READ_SIZE = 65536 +GEOIP_DB_PATH = "/var/lib/GeoIP/GeoLite2-City.mmdb" +DECTALK_CWD = "dectalk" + + +# ============================================================================ +# Data Classes +# ============================================================================ + +@dataclass +class AudioParams: + """Container for audio playback parameters.""" + tempo: Optional[float] = None + pitch: Optional[float] = None + backwards: bool = False + bitrate: Optional[int] = None + + def to_rubberband_args(self) -> List[str]: + """Convert to ffmpeg rubberband filter arguments.""" + args = [] + if self.tempo is not None: + args.append(f"rubberband=tempo={self.tempo}") + if self.pitch is not None: + args.append(f"rubberband=pitch={self.pitch}") + return args + + def to_bitrate_args(self) -> List[str]: + """Convert to ffmpeg bitrate arguments.""" + if self.bitrate is not None: + return [f"bitrate={self.bitrate}"] + return [] + + +# ============================================================================ +# Parameter Parsing +# ============================================================================ + +class AudioParamParser: + """Unified parser for audio modification parameters.""" + + @staticmethod + def parse(message: str) -> AudioParams: + """ + Parse audio parameters from a message string. + + Supports: tempo=X, pitch=X, backward=, backwards= + + Args: + message: Command argument string to parse + + Returns: + AudioParams object with parsed and validated values + """ + params = AudioParams() + + if not message: + return params + + try: + parts = message.split() + + for part in parts: + if "tempo=" in part: + tempo_str = part.split("tempo=", 1)[1] + tempo = float(tempo_str) + params.tempo = max(MIN_TEMPO, min(tempo, MAX_TEMPO)) + + elif "pitch=" in part: + pitch_str = part.split("pitch=", 1)[1] + pitch = float(pitch_str) + params.pitch = max(MIN_PITCH, min(pitch, MAX_PITCH)) + + elif "backward=" in part.lower() or "backwards=" in part.lower(): + params.backwards = True + + elif "bitrate=" in part: + bitrate_str = part.split("bitrate=", 1)[1] + bitrate = int(bitrate_str) + params.bitrate = max(MIN_BITRATE, min(bitrate, MAX_BITRATE)) + + except (ValueError, IndexError) as e: + logging.getLogger(__name__).debug(f"Error parsing audio params: {e}") + + return params + + @staticmethod + def extract_non_param_args(message: str) -> str: + """ + Extract non-parameter arguments from message. + + Args: + message: Message to extract from + + Returns: + Message without param=value arguments + """ + if not message: + return "" + + params_to_remove = ["pitch=", "tempo=", "bitrate=", "backward=", "backwards="] + result = message + + for param in params_to_remove: + parts = result.split(param) + if len(parts) > 1: + # Keep only the part before this parameter + result = parts[0] + + return result.strip() + + +# ============================================================================ +# Base Command Class +# ============================================================================ + +class BaseCommand(ABC): + """Base class for all Torchlight commands.""" + + # Execution order + Order: int = 0 + + def __init__(self, torchlight) -> None: + """ + Initialize command. + + Args: + torchlight: Torchlight bot instance + """ + self.Logger = logging.getLogger(self.__class__.__name__) + self.Torchlight = torchlight + self.Triggers: List[Union[str, Tuple]] = [] + self.Level: int = 0 + + def _get_torchlight(self): + """Get torchlight instance (callable or direct).""" + if callable(self.Torchlight): + return self.Torchlight() + return self.Torchlight + + def _get_player_level(self, player) -> int: + """Safely get player access level.""" + try: + if player.Access: + return player.Access["level"] + except (AttributeError, KeyError, TypeError): + pass + return 0 + + def get_help(self) -> str: + """ + Get help text for this command. + + Returns: + Help text string + """ + return f"Usage: {self.Triggers[0] if self.Triggers else 'command'}" + + def check_chat_cooldown(self, player) -> bool: + """ + Check if player is on chat cooldown. + + Args: + player: Player object to check + + Returns: + True if on cooldown, False otherwise + """ + tl = self._get_torchlight() + current_time = tl.Master.Loop.time() + + if player.ChatCooldown > current_time: + cooldown = player.ChatCooldown - current_time + tl.SayPrivate( + player, + f"You're on cooldown for the next {cooldown:.1f} seconds." + ) + return True + return False + + def check_disabled(self, player) -> bool: + """ + Check if Torchlight is disabled for this player. + + Args: + player: Player object to check + + Returns: + True if disabled for player, False otherwise + """ + tl = self._get_torchlight() + + player_level = self._get_player_level(player) + is_disabled = tl.Disabled + + try: + immunity_level = tl.Config["AntiSpam"]["ImmunityLevel"] + except (KeyError, TypeError): + immunity_level = 0 + + if is_disabled and ( + is_disabled > player_level or + (is_disabled == player_level and player_level < immunity_level) + ): + tl.SayPrivate(player, "Torchlight is currently disabled!") + return True + + return False + + def _validate_command(self, player) -> bool: + """ + Run standard command validations. + + Args: + player: Player executing command + + Returns: + True if validations pass, False otherwise + """ + if self.check_chat_cooldown(player): + return False + if self.check_disabled(player): + return False + return True + + async def _func(self, message: List[str], player) -> int: + """ + Execute command. Override in subclasses. + + Args: + message: Parsed command message [trigger, args] + player: Player executing command + + Returns: + Command result code (0=success, -1=cooldown/disabled, 1=error) + """ + self.Logger.debug(sys._getframe().f_code.co_name) + return 0 + + +# ============================================================================ +# URL Filter Command +# ============================================================================ + +class URLFilter(BaseCommand): + """Filter and display information about URLs in chat.""" + + Order = 1 + + def __init__(self, torchlight) -> None: + super().__init__(torchlight) + self.Triggers = [re.compile( + r'''(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)''' + r'''(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+''' + r'''(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'".,<>?«»""'']))''', + re.IGNORECASE + )] + self.Level = -1 + self._youtube_regex = re.compile( + r'.*?(?:youtube\.com\/\S*(?:(?:\/e(?:mbed))?\/|watch\?(?:\S*?&?v\=))|youtu\.be\/)([a-zA-Z0-9_-]{6,11}).*?' + ) + + def _is_youtube_url(self, url: str) -> bool: + """Check if URL is YouTube.""" + return bool(self._youtube_regex.search(url)) + + def _extract_youtube_time(self, url: str) -> Optional[int]: + """Extract timestamp from YouTube URL.""" + temp = DataHolder() + + for marker in ["&t=", "?t=", "#t="]: + pos = temp(url.find(marker)) + if pos != -1: + time_str = url[temp.value + 3:].split('&')[0].split('?')[0].split('#')[0] + if time_str: + return Utils.ParseTime(time_str) + return None + + async def _get_youtube_info(self, url: str) -> Tuple[str, Optional[Dict]]: + """ + Fetch YouTube video information using yt-dlp. + + Args: + url: YouTube URL + + Returns: + Tuple of (video_url, info_dict) + """ + try: + proc = await asyncio.create_subprocess_exec( + "yt-dlp", "--dump-json", "-g", url, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, _ = await proc.communicate() + + parts = stdout.split(b'\n') + parts = [p for p in parts if p] # Remove empty lines + + if len(parts) < 2: + return url, None + + info = json.loads(parts[-1]) + video_url = parts[-2].strip().decode("ascii") + + return video_url, info + + except Exception as e: + self.Logger.error(f"YouTube info fetch failed: {e}") + return url, None + + async def _get_url_info(self, url: str, force_youtube: bool = False) -> Tuple[str, Optional[str]]: + """ + Fetch information about a URL. + + Args: + url: URL to fetch + force_youtube: Force YouTube parsing + + Returns: + Tuple of (url, text_content or None) + """ + tl = self._get_torchlight() + text = None + + if self._is_youtube_url(url) or force_youtube: + video_url, info = await self._get_youtube_info(url) + + if info: + try: + if info["extractor_key"] == "Youtube": + duration = timedelta(seconds=info.get("duration", 0)) + views = info.get("view_count", 0) + title = info.get("title", "Unknown") + + tl.SayChat( + f"\x07E52D27[YouTube]\x01 {title} | {duration} | {views:,}" + ) + url = video_url + except (KeyError, TypeError): + pass + + time_offset = self._extract_youtube_time(url) + if time_offset: + url += f"#t={time_offset}" + + else: + await self._fetch_url_content(url) + + tl.LastUrl = url + return url, text + + async def _fetch_url_content(self, url: str) -> None: + """ + Fetch and display URL content information. + + Args: + url: URL to fetch + """ + tl = self._get_torchlight() + + try: + async with aiohttp.ClientSession() as session: + response = await asyncio.wait_for( + session.get(url), + DEFAULT_HTTP_TIMEOUT + ) + + if not response: + return + + content_type = response.headers.get("Content-Type", "") + content_length = response.headers.get("Content-Length", "-1") + + try: + content_length = int(content_length) + except (ValueError, TypeError): + content_length = -1 + + content = await asyncio.wait_for( + response.content.read(DEFAULT_CONTENT_READ_SIZE), + DEFAULT_HTTP_TIMEOUT + ) + + self._display_url_content( + content_type, + content, + content_length + ) + + response.close() + + except asyncio.TimeoutError: + tl.SayChat("Error: Request timeout") + except Exception as e: + tl.SayChat(f"Error: {str(e)}") + self.Logger.error(traceback.format_exc()) + + def _display_url_content( + self, + content_type: str, + content: bytes, + content_length: int + ) -> None: + """ + Display formatted content information. + + Args: + content_type: MIME type + content: Content bytes + content_length: Size in bytes + """ + tl = self._get_torchlight() + + if "text" in content_type: + if "text/plain" in content_type: + text = content.decode("utf-8", errors="ignore") + tl.SayChat(f"[TEXT] {text[:200]}") + else: + soup = BeautifulSoup( + content.decode("utf-8", errors="ignore"), + "lxml" + ) + if soup.title: + tl.SayChat(f"[URL] {soup.title.string}") + + elif "image" in content_type: + try: + fp = io.BytesIO(content) + im = Image.open(fp) + size_str = Utils.HumanSize(content_length) if content_length > 0 else "unknown" + tl.SayChat( + f"[IMAGE] {im.format} | {im.size[0]}x{im.size[1]} | {size_str}" + ) + fp.close() + except Exception as e: + self.Logger.error(f"Image parsing failed: {e}") + + else: + try: + filetype = magic.from_buffer(bytes(content)) + size_str = Utils.HumanSize(content_length) if content_length > 0 else "unknown" + tl.SayChat(f"[FILE] {filetype} | {size_str}") + except Exception as e: + self.Logger.error(f"File type detection failed: {e}") + + async def _rfunc(self, line: str, match, player) -> Union[str, int]: + """ + Process URL from regex match. + + Args: + line: Original chat line + match: Regex match object + player: Player who sent message + + Returns: + Modified line or -1 to skip + """ + url = match.groups()[0] + + if not url.startswith("http") and not url.startswith("ftp"): + url = "http://" + url + + if line.startswith("!yt "): + processed_url, _ = await self._get_url_info(url, force_youtube=True) + return "!yt " + processed_url + + if line.startswith("!dec "): + _, text = await self._get_url_info(url, force_youtube=False) + if text: + return "!dec " + text + + asyncio.ensure_future(self._get_url_info(url)) + return -1 + + +# ============================================================================ +# User Access Commands +# ============================================================================ + +def format_player_access(torchlight, player) -> str: + """ + Format player access information for display. + + Args: + torchlight: Torchlight instance + player: Player object + + Returns: + Formatted access string + """ + tl = torchlight() if callable(torchlight) else torchlight + + answer = f'#{player.UserID} "{player.Name}"({player.UniqueID}) is ' + level_str = "0" + + if player.Access: + try: + level_str = str(player.Access["level"]) + answer += f"level {level_str} as {player.Access['name']}." + except (KeyError, TypeError): + answer += "not authenticated." + else: + answer += "not authenticated." + + # Add usage limits if configured + try: + if level_str in tl.Config["AudioLimits"]: + limits = tl.Config["AudioLimits"][level_str] + uses = limits["Uses"] + total_time = limits["TotalTime"] + + if uses >= 0: + used = player.Storage["Audio"]["Uses"] + answer += f" Uses: {used}/{uses}" + if total_time >= 0: + used_time = player.Storage["Audio"]["TimeUsed"] + answer += f" Time: {round(used_time, 2)}/{round(total_time, 2)}" + except (KeyError, TypeError): + pass + + return answer + + +class Access(BaseCommand): + """Display player access information.""" + + def __init__(self, torchlight) -> None: + super().__init__(torchlight) + self.Triggers = ["!access"] + self.Level = 0 + + async def _func(self, message: List[str], player) -> int: + """Execute access command.""" + self.Logger.debug(sys._getframe().f_code.co_name) + + if self.check_chat_cooldown(player): + return -1 + + tl = self._get_torchlight() + tl.SayChat(format_player_access(tl, player), player) + return 0 + + +class Who(BaseCommand): + """Search for players by name.""" + + def __init__(self, torchlight) -> None: + super().__init__(torchlight) + self.Triggers = ["!who", "!whois"] + self.Level = 1 + + async def _func(self, message: List[str], player) -> int: + """Execute who/whois command.""" + self.Logger.debug(sys._getframe().f_code.co_name) + + tl = self._get_torchlight() + search_term = message[1].lower() if len(message) > 1 else "" + count = 0 + max_results = 3 + + if message[0] == "!who": + for p in tl.Players: + if search_term in p.Name.lower(): + tl.SayChat(format_player_access(tl, p)) + count += 1 + if count >= max_results: + break + + elif message[0] == "!whois": + try: + for unique_id, access_info in tl.Access: + try: + if search_term in access_info["name"].lower(): + p = tl.Players.FindUniqueID(unique_id) + if p: + tl.SayChat(format_player_access(tl, p)) + else: + name = access_info["name"] + level = access_info["level"] + tl.SayChat( + f'#{unique_id} "{name}"({unique_id}) is level {level} ' + f'and currently offline.' + ) + count += 1 + if count >= max_results: + break + except (KeyError, TypeError) as e: + self.Logger.debug(f"Error accessing access_info: {e}") + continue + except Exception as e: + self.Logger.error(f"Error iterating Access list in !whois: {e}") + tl.SayPrivate(player, f"[WHOIS] Error: {str(e)}") + + return 0 + + +# ============================================================================ +# Information Commands +# ============================================================================ + +class WolframAlpha(BaseCommand): + """Calculation and question answering via Wolfram Alpha API.""" + + def __init__(self, torchlight) -> None: + super().__init__(torchlight) + self.Triggers = ["!cc"] + self.Level = 10 + + @staticmethod + def _clean_text(text: str) -> str: + """Clean Wolfram Alpha response text.""" + return re.sub( + r"[ ]{2,}", + " ", + text.replace(' | ', ': ') + .replace('\n', ' | ') + .replace('~~', ' ≈ ') + ).strip() + + async def _calculate(self, params: Dict[str, str], player) -> int: + """ + Query Wolfram Alpha and display results. + + Args: + params: Query parameters (input, appid) + player: Player making query + + Returns: + Result code + """ + tl = self._get_torchlight() + + try: + async with aiohttp.ClientSession() as session: + response = await asyncio.wait_for( + session.get( + "http://api.wolframalpha.com/v2/query", + params=params + ), + timeout=10 + ) + + if not response: + return 1 + + data = await asyncio.wait_for(response.text(), timeout=5) + if not data: + return 2 + + root = etree.fromstring(data) + + # Extract plaintext answers from pods + pods = [ + p.text.strip() + for p in root.findall('.//subpod/plaintext') + if p is not None and p.text is not None + ] + pods = [p for p in pods if p] # Filter empty + + if not pods: + didyoumeans = root.find("didyoumeans") + if didyoumeans is None: + tl.SayChat("Sorry, couldn't understand the question.", player) + return 3 + + options = [ + f'"{dm.text}"' + for dm in didyoumeans + if dm.text + ] + + if options: + suggestion = " or ".join(options) + tl.SayChat(f"Did you mean {suggestion}?", player) + return 0 + + # Format response + if len(pods) == 1: + answer = self._clean_text(pods[0]) + tl.SayChat(answer, player) + else: + question = self._clean_text(pods[0].replace(' | ', ' ').replace('\n', ' ')) + answer = self._clean_text(pods[1]) + tl.SayChat(f"{question} = {answer}", player) + + return 0 + + except Exception as e: + self.Logger.error(f"Wolfram Alpha query failed: {e}") + tl.SayPrivate(player, f"[CC] Error: {str(e)}") + return 1 + + async def _func(self, message: List[str], player) -> int: + """Execute calculation command.""" + self.Logger.debug(sys._getframe().f_code.co_name) + + if not self._validate_command(player): + return -1 + + tl = self._get_torchlight() + params = { + "input": message[1] if len(message) > 1 else "", + "appid": tl.Config["WolframAPIKey"] + } + + return await self._calculate(params, player) + + +class UrbanDictionary(BaseCommand): + """Word definitions via Urban Dictionary API.""" + + def __init__(self, torchlight) -> None: + super().__init__(torchlight) + self.Triggers = ["!define", "!ud"] + self.Level = 10 + + async def _func(self, message: List[str], player) -> int: + """Execute definition lookup command.""" + self.Logger.debug(sys._getframe().f_code.co_name) + + if not self._validate_command(player): + return -1 + + term = message[1] if len(message) > 1 else "" + if not term: + return 1 + + tl = self._get_torchlight() + + try: + async with aiohttp.ClientSession() as session: + response = await asyncio.wait_for( + session.get( + f"https://api.urbandictionary.com/v0/define?term={term}" + ), + timeout=DEFAULT_HTTP_TIMEOUT + ) + + if not response: + return 1 + + data = await asyncio.wait_for( + response.json(), + timeout=DEFAULT_HTTP_TIMEOUT + ) + + if not data or not data.get("list"): + tl.SayChat(f"[UD] No definition found for: {term}", player) + return 4 + + definition = data["list"][0] + word = definition.get("word", term) + thumbs_up = definition.get("thumbs_up", 0) + thumbs_down = definition.get("thumbs_down", 0) + def_text = definition.get("definition", "No definition available") + example = definition.get("example", "") + + msg = f"[UD] {word} ({thumbs_up}/{thumbs_down}): {def_text}" + if example: + msg += f"\n{example}" + + tl.SayChat(msg, player) + return 0 + + except asyncio.TimeoutError: + tl.SayChat("[UD] Error: Request timeout", player) + return 1 + except Exception as e: + self.Logger.error(f"UD lookup failed: {e}") + tl.SayChat(f"[UD] Error: {str(e)}", player) + return 1 + + +class OpenWeather(BaseCommand): + """Weather information via OpenWeather API.""" + + def __init__(self, torchlight) -> None: + super().__init__(torchlight) + try: + self.GeoIP = geoip2.database.Reader(GEOIP_DB_PATH) + except Exception as e: + self.Logger.warning(f"GeoIP database not available: {e}") + self.GeoIP = None + + self.Triggers = ["!w", "!vv"] + self.Level = 10 + + @staticmethod + def _degree_to_cardinal(degree: float) -> str: + """Convert wind degree to cardinal direction.""" + directions = ["N", "NE", "E", "SE", "S", "SW", "W", "NW"] + return directions[int(((degree + 22.5) / 45.0) % 8)] + + async def _func(self, message: List[str], player) -> int: + """Execute weather command.""" + self.Logger.debug(sys._getframe().f_code.co_name) + + if not self._validate_command(player): + return -1 + + tl = self._get_torchlight() + search = "" + + if not message[1]: + # Use GeoIP for location + if self.GeoIP: + try: + player_ip = player.Address.split(":")[0] + info = self.GeoIP.city(player_ip) + search = f"lat={info.location.latitude}&lon={info.location.longitude}" + except Exception as e: + self.Logger.warning(f"GeoIP lookup failed: {e}") + search = "q=auto" + else: + search = "q=auto" + else: + search = f"q={message[1]}" + + try: + api_key = tl.Config["OpenWeatherAPIKey"] + url = ( + f"https://api.openweathermap.org/data/2.5/weather" + f"?APPID={api_key}&units=metric&{search}" + ) + + async with aiohttp.ClientSession() as session: + response = await asyncio.wait_for( + session.get(url), + timeout=DEFAULT_HTTP_TIMEOUT + ) + + if not response: + return 2 + + data = await asyncio.wait_for( + response.json(), + timeout=DEFAULT_HTTP_TIMEOUT + ) + + if data["cod"] != 200: + msg = data.get("message", "Unknown error") + tl.SayPrivate(player, f"[OW] {msg}") + return 5 + + # Format wind direction + wind_dir = "?" + if "deg" in data["wind"]: + wind_dir = self._degree_to_cardinal(data["wind"]["deg"]) + + # Format timezone + tz_seconds = data.get("timezone", 0) + tz_hours = int(tz_seconds / 3600) + tz_mins = (tz_seconds % 3600) / 60 + tz_str = f"{'+' if tz_hours >= 0 else ''}{tz_hours}" + if tz_mins: + tz_str += f":{int(tz_mins)}" + + # Build weather message + city = data["name"] + country = data["sys"]["country"] + temp = data["main"]["temp"] + temp_min = data["main"]["temp_min"] + temp_max = data["main"]["temp_max"] + weather = data["weather"][0]["main"] + description = data["weather"][0]["description"] + wind_speed = data["wind"]["speed"] + clouds = data["clouds"]["all"] + humidity = data["main"]["humidity"] + + msg = ( + f"[{city}, {country}](UTC{tz_str}) {temp}°C ({temp_min}/{temp_max}) " + f"{weather}: {description} | Wind {wind_dir} {wind_speed}kph | " + f"Clouds: {clouds}% | Humidity: {humidity}%" + ) + + tl.SayChat(msg, player) + return 0 + + except asyncio.TimeoutError: + tl.SayChat("[WEATHER] Error: Request timeout", player) + return 1 + except Exception as e: + self.Logger.error(f"Weather lookup failed: {e}") + tl.SayChat(f"[WEATHER] Error: {str(e)}", player) + return 1 + + +# ============================================================================ +# Voice Commands +# ============================================================================ + +class VoiceCommands(BaseCommand): + """Play audio clips with optional modifiers.""" + + def __init__(self, torchlight) -> None: + super().__init__(torchlight) + self.Triggers = ["!random", "!search"] + self.Level = 0 + self.VoiceTriggers: Dict[str, Dict] = {} + + def _load_triggers(self) -> None: + """Load voice triggers from JSON file. Uses caching to avoid reloading.""" + # Skip if already loaded + if self.VoiceTriggers: + self.Logger.debug("Using cached voice triggers") + return + + try: + with open("triggers.json", "r") as fp: + triggers = json.load(fp) + + self.VoiceTriggers = {} + for line in triggers: + for trigger in line.get("names", []): + self.VoiceTriggers[trigger] = line + + self.Logger.info(f"Loaded {len(self.VoiceTriggers)} voice triggers") + + except FileNotFoundError: + self.Logger.error("triggers.json file not found") + except json.JSONDecodeError as e: + self.Logger.error(f"Invalid JSON in triggers.json: {e}") + except Exception as e: + self.Logger.error(f"Failed to load triggers: {e}") + + def _setup(self) -> None: + """Setup command triggers.""" + self.Logger.debug(sys._getframe().f_code.co_name) + self._load_triggers() + if not self.VoiceTriggers: + self.Logger.warning("No voice triggers loaded (triggers.json missing or empty)") + self.Triggers.extend(self.VoiceTriggers.keys()) + + async def _func(self, message: List[str], player) -> int: + """Execute voice command.""" + self.Logger.debug(sys._getframe().f_code.co_name) + + if self.check_disabled(player): + return -1 + + tl = self._get_torchlight() + player_level = self._get_player_level(player) + + trigger = message[0].lower() + args = (message[1] or "").lower() if len(message) > 1 else "" + + # Handle search command + if trigger == "!search": + matches = [ + key for key in self.VoiceTriggers.keys() + if args in key.lower() + ] + tl.SayPrivate( + player, + f"{len(matches)} results: {', '.join(matches)}" + ) + return 0 + + # Non-prefix commands require level 2+ + if not trigger.startswith('!') and player_level < 2: + return 1 + + if player_level < 2: + return 0 + + # Parse audio modifiers + parser = AudioParamParser() + audio_params = parser.parse(args) + + # Check if sound is restricted + disable_modifiers = False + + # Ensure triggers are loaded + if not self.VoiceTriggers: + try: + self._load_triggers() + except Exception: + pass + + if trigger == "!random": + import random + + values = list(self.VoiceTriggers.values()) + if not values: + return 1 + + trigger_data = random.choice(values) + disable_modifiers = trigger_data.get("restricted", False) + sound = self._select_sound(trigger_data) + else: + if trigger not in self.VoiceTriggers: + return 1 + + trigger_data = self.VoiceTriggers[trigger] + disable_modifiers = trigger_data.get("restricted", False) + sound = self._select_sound_by_args(trigger_data, args, player, tl) + + if not sound: + return 1 + + # Apply restrictions + if disable_modifiers: + tl.SayPrivate( + player, + "Modifiers are disabled for this sound. " + "Spam less, spam better." + ) + audio_params = AudioParams() + + # Play audio + return self._play_sound(sound, player, audio_params) + + @staticmethod + def _select_sound(trigger_data: Dict[str, Any]) -> Optional[str]: + """Select a random sound from trigger data.""" + import random + sounds = trigger_data.get("sound", []) + if isinstance(sounds, list): + return random.choice(sounds) + return sounds + + def _select_sound_by_args( + self, + trigger_data: Dict[str, Any], + args: str, + player, + tl + ) -> Optional[str]: + """Select sound based on user arguments.""" + sounds = trigger_data.get("sound", []) + + if not isinstance(sounds, list): + return sounds + + # No args or only modifiers: pick random + if not args or not args.strip(): + import random + return random.choice(sounds) + + cleaned_args = AudioParamParser.extract_non_param_args(args) + if not cleaned_args: + import random + return random.choice(sounds) + + # Try numeric index + try: + idx = int(cleaned_args.split()[0]) + if 0 < idx <= len(sounds): + return sounds[idx - 1] + if idx: + tl.SayPrivate( + player, + f"Number {idx} is out of bounds, max {len(sounds)}." + ) + return None + except (ValueError, IndexError): + pass + + # Don't search if args look like parameters + if args.startswith("tempo=") or args.startswith("pitch=") or \ + args.startswith("backward=") or args.startswith("backwards="): + import random + return random.choice(sounds) + + # Search by name + search_term = cleaned_args.split()[0] + searching_mode = search_term.startswith('?') + if searching_mode: + search_term = search_term[1:] + + matches = [] + names = [] + + for sound in sounds: + name = os.path.splitext(os.path.basename(sound))[0] + names.append(name) + + if search_term in name.lower(): + matches.append((name, sound)) + + if matches: + matches.sort(key=lambda t: len(t[0])) + match_names = [t[0] for t in matches] + + if searching_mode: + tl.SayPrivate( + player, + f"{len(match_names)} results: {', '.join(match_names)}" + ) + return None + + if len(matches) > 1: + tl.SayPrivate( + player, + f"Multiple matches: {', '.join(match_names)}" + ) + + return matches[0][1] + + # No matches + if not searching_mode: + tl.SayPrivate( + player, + f"Couldn't find {search_term} in list of sounds." + ) + tl.SayPrivate(player, ", ".join(names)) + return None + + def _play_sound( + self, + sound: str, + player, + audio_params: AudioParams + ) -> int: + """ + Play audio file. + + Args: + sound: Sound file path + player: Player to play for + audio_params: Audio parameters + + Returns: + Result code + """ + tl = self._get_torchlight() + path = os.path.abspath(os.path.join("sounds", sound)) + + audio_clip = tl.AudioManager.AudioClip(player, f"file://{path}") + if not audio_clip: + tl.SayPrivate(player, "[VOICE] Failed to create audio clip") + self.Logger.error(f"Failed to create audio clip for: {path}") + return 1 + + return audio_clip.Play( + rubberband=audio_params.to_rubberband_args(), + bitrate=audio_params.to_bitrate_args(), + backwards=audio_params.backwards + ) + + +class YouTube(BaseCommand): + """Play audio from YouTube videos.""" + + def __init__(self, torchlight) -> None: + super().__init__(torchlight) + self.Triggers = ["!yt"] + self.Level = 6 + + async def _func( + self, + message: List[str], + player, + line: Optional[str] = None + ) -> int: + """Execute YouTube playback command.""" + self.Logger.debug(sys._getframe().f_code.co_name) + + if self.check_disabled(player): + return -1 + + tl = self._get_torchlight() + url = message[1] if len(message) > 1 else "" + + if not url or not url.strip(): + tl.SayPrivate(player, "[YT] Please provide a YouTube URL") + return 1 + + # Replace !last with actual last URL + if url == "!last" and tl.LastUrl: + url = tl.LastUrl + + # Extract timestamp + time_offset = self._extract_time_offset(url) + if time_offset: + url = self._remove_time_from_url(url) + + # Create audio clip + audio_clip = tl.AudioManager.AudioClip(player, url) + if not audio_clip: + tl.SayPrivate(player, "[YT] Failed to create audio clip") + return 1 + + # Parse modifiers + parser = AudioParamParser() + modifiers_str = line.split(" ", 1)[1] if line and " " in line else "" + audio_params = parser.parse(modifiers_str) + + return audio_clip.Play( + time_offset, + rubberband=audio_params.to_rubberband_args(), + bitrate=audio_params.to_bitrate_args(), + backwards=audio_params.backwards + ) + + @staticmethod + def _extract_time_offset(url: str) -> Optional[int]: + """Extract timestamp from URL.""" + for marker in ["&t=", "?t=", "#t="]: + idx = url.find(marker) + if idx != -1: + time_str = url[idx + 3:].split('&')[0].split('?')[0].split('#')[0] + if time_str: + return Utils.ParseTime(time_str) + return None + + @staticmethod + def _remove_time_from_url(url: str) -> str: + """Remove timestamp from URL.""" + for marker in ["&t=", "?t=", "#t="]: + idx = url.find(marker) + if idx != -1: + return url[:idx] + return url + + +class YouTubeSearch(BaseCommand): + """Search YouTube and play first result.""" + + def __init__(self, torchlight) -> None: + super().__init__(torchlight) + self.Triggers = ["!yts"] + self.Level = 6 + + async def _func( + self, + message: List[str], + player, + line: Optional[str] = None + ) -> int: + """Execute YouTube search command.""" + self.Logger.debug(sys._getframe().f_code.co_name) + + if self.check_disabled(player): + return -1 + + tl = self._get_torchlight() + query = message[1] if len(message) > 1 else "" + + if not query or not query.strip(): + tl.SayPrivate(player, "[YTS] Please provide a search query") + return 1 + + # Remove modifiers from search query + parser = AudioParamParser() + search_term = parser.extract_non_param_args(query) + + try: + # Search YouTube + proc = await asyncio.create_subprocess_exec( + "yt-dlp", "--dump-json", "-xg", + f"ytsearch:{search_term}", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + + stdout, _ = await proc.communicate() + + lines = stdout.split(b'\n') + lines = [l for l in lines if l] + + if len(lines) < 2: + tl.SayPrivate(player, "[YTS] No results found") + return 1 + + url = lines[0].strip().decode("ascii") + info = json.loads(lines[1]) + + if info.get("extractor_key") == "Youtube": + duration = timedelta(seconds=info.get("duration", 0)) + views = info.get("view_count", 0) + title = info.get("title", "Unknown") + + tl.SayChat( + f"\x07E52D27[YouTube]\x01 {title} | {duration} | {views:,}" + ) + + # Create audio clip + audio_clip = tl.AudioManager.AudioClip(player, url) + if not audio_clip: + return 1 + + tl.LastUrl = url + + # Parse modifiers + modifiers_str = line.split(" ", 1)[1] if line and " " in line else "" + audio_params = parser.parse(modifiers_str) + + return audio_clip.Play( + None, + rubberband=audio_params.to_rubberband_args(), + bitrate=audio_params.to_bitrate_args(), + backwards=audio_params.backwards + ) + + except Exception as e: + self.Logger.error(f"YouTube search failed: {e}") + tl.SayPrivate(player, f"[YTS] Error: {str(e)}") + return 1 + + +# ============================================================================ +# Text-to-Speech Commands +# ============================================================================ + +class Say(BaseCommand): + """Text-to-speech via Google Translate.""" + + VALID_LANGUAGES = [] + + def __init__(self, torchlight) -> None: + super().__init__(torchlight) + self.Triggers = [("!say", 4)] + self.Level = 2 + + # Load valid languages on init + try: + self.VALID_LANGUAGES = list(gtts.lang.tts_langs().keys()) + except Exception: + pass + + async def _func(self, message: List[str], player) -> int: + """Execute text-to-speech command.""" + self.Logger.debug(sys._getframe().f_code.co_name) + + if self.check_disabled(player): + return -1 + + text = message[1] if len(message) > 1 else "" + if not text: + return 1 + + # Extract language from trigger + language = "en" + if len(message[0]) > 4: + language = message[0][4:] + + if language not in self.VALID_LANGUAGES: + return 1 + + parser = AudioParamParser() + audio_params = parser.parse(text) + clean_text = parser.extract_non_param_args(text) + + asyncio.ensure_future(self._speak(player, language, clean_text, audio_params)) + return 0 + + async def _speak( + self, + player, + language: str, + text: str, + audio_params: AudioParams + ) -> None: + """Generate and play speech.""" + tl = self._get_torchlight() + temp_file_path = None + + try: + tts = gtts.gTTS(text=text, lang=language) + temp_file = tempfile.NamedTemporaryFile(delete=False) + temp_file_path = temp_file.name + tts.write_to_fp(temp_file) + temp_file.close() + + audio_clip = tl.AudioManager.AudioClip( + player, + f"file://{temp_file_path}" + ) + + if not audio_clip: + tl.SayPrivate(player, "[SAY] Failed to create audio clip") + os.unlink(temp_file_path) + return + + result = audio_clip.Play( + rubberband=audio_params.to_rubberband_args(), + bitrate=audio_params.to_bitrate_args(), + backwards=audio_params.backwards + ) + + if result: + audio_clip.AudioPlayer.AddCallback( + "Stop", + lambda: self._cleanup_temp_file(temp_file_path) + ) + else: + os.unlink(temp_file_path) + + except Exception as e: + self.Logger.error(f"Text-to-speech failed: {e}") + tl.SayPrivate(player, f"[SAY] Error: {str(e)}") + if temp_file_path and os.path.exists(temp_file_path): + try: + os.unlink(temp_file_path) + except Exception: + pass + + @staticmethod + def _cleanup_temp_file(path: str) -> None: + """Safely cleanup temporary file.""" + try: + if path and os.path.exists(path): + os.unlink(path) + except Exception as e: + logging.getLogger(__name__).error(f"Failed to cleanup temp file: {e}") + + +class DECTalk(BaseCommand): + """DECTalk text-to-speech.""" + + def __init__(self, torchlight) -> None: + super().__init__(torchlight) + self.Triggers = ["!dec"] + self.Level = 5 + + async def _func(self, message: List[str], player) -> int: + """Execute DECTalk command.""" + self.Logger.debug(sys._getframe().f_code.co_name) + + if self.check_disabled(player): + return -1 + + text = message[1] if len(message) > 1 else "" + if not text or not text.strip(): + tl = self._get_torchlight() + tl.SayPrivate(player, "[DEC] Please provide text to synthesize") + return 1 + + asyncio.ensure_future(self._synthesize(player, text)) + return 0 + + async def _synthesize(self, player, text: str) -> None: + """Synthesize and play DECTalk audio.""" + tl = self._get_torchlight() + temp_file_path = None + + try: + # Format text for DECTalk + formatted_text = f"[:phoneme on]{text}" + + temp_file = tempfile.NamedTemporaryFile(delete=False) + temp_file_path = temp_file.name + temp_file.close() + + # Run DECTalk via Wine + proc = await asyncio.create_subprocess_exec( + "wine", "say.exe", "-w", temp_file_path, + cwd=DECTALK_CWD, + stdin=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + + await proc.communicate( + formatted_text.encode('utf-8', errors='ignore') + ) + + audio_clip = tl.AudioManager.AudioClip( + player, + f"file://{temp_file_path}" + ) + + if not audio_clip: + tl.SayPrivate(player, "[DEC] Failed to create audio clip") + os.unlink(temp_file_path) + return + + result = audio_clip.Play(dec_params=["-af", "volume=10dB"]) + + if result: + audio_clip.AudioPlayer.AddCallback( + "Stop", + lambda: self._cleanup_temp_file(temp_file_path) + ) + else: + os.unlink(temp_file_path) + + except Exception as e: + self.Logger.error(f"DECTalk synthesis failed: {e}") + tl.SayPrivate(player, f"[DEC] Error: {str(e)}") + if temp_file_path and os.path.exists(temp_file_path): + try: + os.unlink(temp_file_path) + except Exception: + pass + + @staticmethod + def _cleanup_temp_file(path: str) -> None: + """Safely cleanup temporary file.""" + try: + if path and os.path.exists(path): + os.unlink(path) + except Exception as e: + logging.getLogger(__name__).error(f"Failed to cleanup temp file: {e}") + + +# ============================================================================ +# Control Commands +# ============================================================================ + +class Stop(BaseCommand): + """Stop audio playback.""" + + def __init__(self, torchlight) -> None: + super().__init__(torchlight) + self.Triggers = ["!stop"] + self.Level = 0 + + async def _func(self, message: List[str], player) -> int: + """Execute stop command.""" + self.Logger.debug(sys._getframe().f_code.co_name) + + tl = self._get_torchlight() + try: + tl.AudioManager.Stop(player, message[1] if len(message) > 1 else "") + tl.SayPrivate(player, "[STOP] Audio stopped") + except Exception as e: + self.Logger.error(f"Stop command failed: {e}") + tl.SayPrivate(player, f"[STOP] Error: {str(e)}") + return True + + +class VoteDisable(BaseCommand): + """Vote to disable Torchlight.""" + + def __init__(self, torchlight) -> None: + super().__init__(torchlight) + self.Triggers = ["!votedisable", "!disablevote"] + self.Level = 0 + + async def _func(self, message: List[str], player) -> int: + """Execute vote disable command.""" + self.Logger.debug(sys._getframe().f_code.co_name) + + tl = self._get_torchlight() + + if tl.Disabled: + tl.SayPrivate( + player, + "Torchlight is already disabled for the duration of this map." + ) + return 1 + + tl.DisableVotes.add(player.UniqueID) + + votes_needed = len(tl.Players) // 5 + votes_have = len(tl.DisableVotes) + + if votes_have >= votes_needed: + tl.SayChat( + "Torchlight has been disabled for the duration of this map." + ) + tl.Disabled = 6 + else: + votes_remaining = votes_needed - votes_have + tl.SayPrivate( + player, + f"Torchlight needs {votes_remaining} more disable votes " + f"to be disabled." + ) + + return 0 + + +class EnableDisable(BaseCommand): + """Admin commands to enable/disable Torchlight.""" + + def __init__(self, torchlight) -> None: + super().__init__(torchlight) + self.Triggers = ["!enable", "!disable"] + self.Level = 6 + + async def _func(self, message: List[str], player) -> int: + """Execute enable/disable command.""" + self.Logger.debug(sys._getframe().f_code.co_name) + + tl = self._get_torchlight() + player_level = self._get_player_level(player) + disabled_level = tl.Disabled if tl.Disabled else 0 + + if message[0] == "!enable": + if disabled_level and disabled_level > player_level: + tl.SayPrivate( + player, + "You don't have access to enable torchlight, since it was " + "disabled by a higher level user." + ) + return 1 + + tl.SayChat( + "Torchlight has been enabled for the duration of this map - " + "Type !disable to disable it again." + ) + tl.Disabled = False + + elif message[0] == "!disable": + if disabled_level and disabled_level > player_level: + tl.SayPrivate( + player, + "You don't have access to disable torchlight, since it was " + "already disabled by a higher level user." + ) + return 1 + + tl.SayChat( + "Torchlight has been disabled for the duration of this map - " + "Type !enable to enable it again." + ) + tl.Disabled = player_level + + return 0 + + +# ============================================================================ +# Admin Commands +# ============================================================================ + +class AdminAccess(BaseCommand): + """Manage user access levels.""" + + def __init__(self, torchlight) -> None: + super().__init__(torchlight) + self.Triggers = ["!access"] + self.Level = 6 + + def _reload_access(self) -> None: + """Reload access list from storage.""" + tl = self._get_torchlight() + tl.Access.Load() + + for player in tl.Players: + try: + player.Access = tl.Access[player.UniqueID] + except (KeyError, TypeError): + player.Access = None + + async def _func(self, message: List[str], player) -> int: + """Execute admin access command.""" + self.Logger.debug(sys._getframe().f_code.co_name) + + tl = self._get_torchlight() + cmd = message[1].lower() if len(message) > 1 else "" + + if cmd == "reload": + self._reload_access() + tl.SayChat( + f"Loaded access list with {len(tl.Access)} users" + ) + return 0 + + elif cmd == "save": + tl.Access.Save() + tl.SayChat( + f"Saved access list with {len(tl.Access)} users" + ) + return 0 + + # Modify access for a specific user + else: + return await self._modify_access(message[1], player, tl) + + async def _modify_access( + self, + arg: str, + executor, + tl + ) -> int: + """Modify a user's access level.""" + # Parse the command + executor_level = self._get_player_level(executor) + + # Find " as " to separate regname from level + as_idx = arg.find(" as ") + + if as_idx != -1: + user_part = arg[:as_idx].strip() + rest = arg[as_idx + 4:].strip() + try: + regname, level_str = rest.rsplit(' ', 1) + except ValueError: + tl.SayChat("Invalid format: use '!access as '") + return 1 + else: + try: + user_part, level_str = arg.rsplit(' ', 1) + except ValueError: + tl.SayChat("Invalid format: use '!access '") + return 1 + regname = None + + user_part = user_part.strip() + level_str = level_str.strip() + + # Find user + target_player = self._find_user(user_part, tl) + if not target_player: + tl.SayChat(f"Couldn't find user: {user_part}") + return 1 + + # Handle numeric level change + if level_str.lstrip('-').isdigit(): + new_level = int(level_str) + + # Check permissions + if new_level >= executor_level and executor_level < 10: + tl.SayChat( + f"Can't assign level {new_level} - higher than your level " + f"({executor_level})" + ) + return 1 + + if target_player.Access: + target_level = target_player.Access["level"] + if target_level >= executor_level and executor_level < 10: + tl.SayChat( + f"Can't modify level {target_level} - " + f"higher than your level ({executor_level})" + ) + return 1 + + # Update existing access + if regname: + target_player.Access["name"] = regname + tl.SayChat( + f'Changed "{target_player.Name}"({target_player.UniqueID}) ' + f'from level {target_level} to {new_level} ' + f'as {regname}' + ) + else: + tl.SayChat( + f'Changed "{target_player.Name}"({target_player.UniqueID}) ' + f'from level {target_level} to {new_level}' + ) + + target_player.Access["level"] = new_level + else: + # Add new access + if not regname: + regname = target_player.Name + + from collections import OrderedDict + target_player.Access = OrderedDict([ + ("name", regname), + ("level", new_level) + ]) + tl.Access[target_player.UniqueID] = target_player.Access + + tl.SayChat( + f'Added "{target_player.Name}"({target_player.UniqueID}) ' + f'as {regname} with level {new_level}' + ) + + tl.Access[target_player.UniqueID] = target_player.Access + + elif level_str == "revoke": + if target_player.Access: + target_level = target_player.Access["level"] + if target_level >= executor_level and executor_level < 10: + tl.SayChat( + f"Can't revoke level {target_level} - " + f"higher than your level ({executor_level})" + ) + return 1 + + tl.SayChat( + f'Removed "{target_player.Name}"({target_player.UniqueID}) ' + f'(was {target_player.Access["name"]} with level ' + f'{target_level})' + ) + del tl.Access[target_player.UniqueID] + target_player.Access = None + else: + tl.SayChat(f"Invalid level: {level_str}") + return 1 + + return 0 + + @staticmethod + def _find_user(identifier: str, tl): + """Find user by ID or name.""" + # Try user ID format (#123) + if identifier.startswith('#') and identifier[1:].isdigit(): + return tl.Players.FindUserID(int(identifier[1:])) + + # Search by name + for player in tl.Players: + if identifier.lower() in player.Name.lower(): + return player + + return None + + +class TorchHelp(BaseCommand): + """List available commands and their help text.""" + + def __init__(self, torchlight) -> None: + super().__init__(torchlight) + self.Triggers = ["!torchhelp"] + self.Level = 0 + + async def _func(self, message: List[str], player) -> int: + """Show help for all commands or filter by term.""" + self.Logger.debug(sys._getframe().f_code.co_name) + + tl = self._get_torchlight() + term = message[1].lower() if len(message) > 1 else "" + + results = [] + + # Prefer authoritative listing from tl.Commands if available + try: + cmds_iter = None + if hasattr(tl, "Commands"): + cmds = tl.Commands + if isinstance(cmds, dict): + cmds_iter = cmds.values() + else: + cmds_iter = cmds + + if cmds_iter is not None: + for cmd in cmds_iter: + try: + trig = cmd.Triggers[0] if getattr(cmd, "Triggers", None) else type(cmd).__name__ + help_text = cmd.get_help() if hasattr(cmd, "get_help") else "" + if not term or term in trig.lower() or term in help_text.lower(): + results.append(f"{trig}: {help_text}") + except Exception: + continue + except Exception: + pass + + # Fallback: inspect defined command classes + if not results: + try: + import inspect + + for obj in globals().values(): + if inspect.isclass(obj) and issubclass(obj, BaseCommand) and obj is not BaseCommand: + try: + inst = obj(tl) + trig = inst.Triggers[0] if getattr(inst, "Triggers", None) else obj.__name__ + help_text = inst.get_help() if hasattr(inst, "get_help") else "" + if not term or term in trig.lower() or term in help_text.lower(): + results.append(f"{trig}: {help_text}") + except Exception: + continue + except Exception: + pass + + # Deliver results privately to requester + if not results: + tl.SayPrivate(player, "No help entries found.") + return 0 + + max_send = 20 + for line in results[:max_send]: + tl.SayPrivate(player, line) + + if len(results) > max_send: + tl.SayPrivate(player, f"... and {len(results) - max_send} more. Use '!torchhelp ' to filter.") + + return 0 + + +class Reload(BaseCommand): + """Reload all commands and configuration.""" + + def __init__(self, torchlight) -> None: + super().__init__(torchlight) + self.Triggers = ["!reload"] + self.Level = 7 + + async def _func(self, message: List[str], player) -> int: + """Execute reload command.""" + self.Logger.debug(sys._getframe().f_code.co_name) + + tl = self._get_torchlight() + tl.Reload() + return 0