projects-jenz/torchlight_changes_unloze/torchlight3/Torchlight/Commands.py

1936 lines
62 KiB
Python
Executable File

#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
Torchlight Commands Module
Provides command implementations for audio playback, voice modulation,
information lookups, and user access management.
"""
import asyncio
import os
import sys
import logging
import math
import json
import re
import io
import traceback
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any, Tuple, Union
from enum import Enum
from datetime import timedelta
from abc import ABC
# Third-party imports (conditionally loaded where used)
try:
import aiohttp
import magic
from bs4 import BeautifulSoup
from PIL import Image
import geoip2.database
import gtts
import tempfile
import urllib.parse
import xml.etree.ElementTree as etree
except ImportError as e:
pass # These will be loaded as needed
from .Utils import Utils, DataHolder
# ============================================================================
# Constants and Configuration
# ============================================================================
MIN_BITRATE = 0.01
MAX_BITRATE = 2000
DEFAULT_BITRATE = 20
MIN_TEMPO = 0.10
MAX_TEMPO = 20
MIN_PITCH = 0.1
MAX_PITCH = 5.0
DEFAULT_HTTP_TIMEOUT = 5
DEFAULT_CONTENT_READ_SIZE = 65536
GEOIP_DB_PATH = "/var/lib/GeoIP/GeoLite2-City.mmdb"
DECTALK_CWD = "dectalk"
# ============================================================================
# Data Classes
# ============================================================================
@dataclass
class AudioParams:
"""Container for audio playback parameters."""
tempo: Optional[float] = None
pitch: Optional[float] = None
backwards: bool = False
bitrate: Optional[int] = None
def to_rubberband_args(self) -> List[str]:
"""Convert to ffmpeg rubberband filter arguments."""
args = []
if self.tempo is not None:
args.append(f"rubberband=tempo={self.tempo}")
if self.pitch is not None:
args.append(f"rubberband=pitch={self.pitch}")
return args
def to_bitrate_args(self) -> List[str]:
"""Convert to ffmpeg bitrate arguments."""
if self.bitrate is not None:
return [f"bitrate={self.bitrate}"]
return []
# ============================================================================
# Parameter Parsing
# ============================================================================
class AudioParamParser:
"""Unified parser for audio modification parameters."""
@staticmethod
def parse(message: str) -> AudioParams:
"""
Parse audio parameters from a message string.
Supports: tempo=X, pitch=X, backward=, backwards=
Args:
message: Command argument string to parse
Returns:
AudioParams object with parsed and validated values
"""
params = AudioParams()
if not message:
return params
try:
parts = message.split()
for part in parts:
if "tempo=" in part:
tempo_str = part.split("tempo=", 1)[1]
tempo = float(tempo_str)
params.tempo = max(MIN_TEMPO, min(tempo, MAX_TEMPO))
elif "pitch=" in part:
pitch_str = part.split("pitch=", 1)[1]
pitch = float(pitch_str)
params.pitch = max(MIN_PITCH, min(pitch, MAX_PITCH))
elif "backward=" in part.lower() or "backwards=" in part.lower():
params.backwards = True
elif "bitrate=" in part:
bitrate_str = part.split("bitrate=", 1)[1]
bitrate = int(bitrate_str)
params.bitrate = max(MIN_BITRATE, min(bitrate, MAX_BITRATE))
except (ValueError, IndexError) as e:
logging.getLogger(__name__).debug(f"Error parsing audio params: {e}")
return params
@staticmethod
def extract_non_param_args(message: str) -> str:
"""
Extract non-parameter arguments from message.
Args:
message: Message to extract from
Returns:
Message without param=value arguments
"""
if not message:
return ""
params_to_remove = ["pitch=", "tempo=", "bitrate=", "backward=", "backwards="]
result = message
for param in params_to_remove:
parts = result.split(param)
if len(parts) > 1:
# Keep only the part before this parameter
result = parts[0]
return result.strip()
# ============================================================================
# Base Command Class
# ============================================================================
class BaseCommand(ABC):
"""Base class for all Torchlight commands."""
# Execution order
Order: int = 0
def __init__(self, torchlight) -> None:
"""
Initialize command.
Args:
torchlight: Torchlight bot instance
"""
self.Logger = logging.getLogger(self.__class__.__name__)
self.Torchlight = torchlight
self.Triggers: List[Union[str, Tuple]] = []
self.Level: int = 0
def _get_torchlight(self):
"""Get torchlight instance (callable or direct)."""
if callable(self.Torchlight):
return self.Torchlight()
return self.Torchlight
def _get_player_level(self, player) -> int:
"""Safely get player access level."""
try:
if player.Access:
return player.Access["level"]
except (AttributeError, KeyError, TypeError):
pass
return 0
def get_help(self) -> str:
"""
Get help text for this command.
Returns:
Help text string
"""
return f"Usage: {self.Triggers[0] if self.Triggers else 'command'}"
def check_chat_cooldown(self, player) -> bool:
"""
Check if player is on chat cooldown.
Args:
player: Player object to check
Returns:
True if on cooldown, False otherwise
"""
tl = self._get_torchlight()
current_time = tl.Master.Loop.time()
if player.ChatCooldown > current_time:
cooldown = player.ChatCooldown - current_time
tl.SayPrivate(
player,
f"You're on cooldown for the next {cooldown:.1f} seconds."
)
return True
return False
def check_disabled(self, player) -> bool:
"""
Check if Torchlight is disabled for this player.
Args:
player: Player object to check
Returns:
True if disabled for player, False otherwise
"""
tl = self._get_torchlight()
player_level = self._get_player_level(player)
is_disabled = tl.Disabled
try:
immunity_level = tl.Config["AntiSpam"]["ImmunityLevel"]
except (KeyError, TypeError):
immunity_level = 0
if is_disabled and (
is_disabled > player_level or
(is_disabled == player_level and player_level < immunity_level)
):
tl.SayPrivate(player, "Torchlight is currently disabled!")
return True
return False
def _validate_command(self, player) -> bool:
"""
Run standard command validations.
Args:
player: Player executing command
Returns:
True if validations pass, False otherwise
"""
if self.check_chat_cooldown(player):
return False
if self.check_disabled(player):
return False
return True
async def _func(self, message: List[str], player) -> int:
"""
Execute command. Override in subclasses.
Args:
message: Parsed command message [trigger, args]
player: Player executing command
Returns:
Command result code (0=success, -1=cooldown/disabled, 1=error)
"""
self.Logger.debug(sys._getframe().f_code.co_name)
return 0
# ============================================================================
# URL Filter Command
# ============================================================================
class URLFilter(BaseCommand):
"""Filter and display information about URLs in chat."""
Order = 1
def __init__(self, torchlight) -> None:
super().__init__(torchlight)
self.Triggers = [re.compile(
r'''(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)'''
r'''(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+'''
r'''(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'".,<>?«»""'']))''',
re.IGNORECASE
)]
self.Level = -1
self._youtube_regex = re.compile(
r'.*?(?:youtube\.com\/\S*(?:(?:\/e(?:mbed))?\/|watch\?(?:\S*?&?v\=))|youtu\.be\/)([a-zA-Z0-9_-]{6,11}).*?'
)
def _is_youtube_url(self, url: str) -> bool:
"""Check if URL is YouTube."""
return bool(self._youtube_regex.search(url))
def _extract_youtube_time(self, url: str) -> Optional[int]:
"""Extract timestamp from YouTube URL."""
temp = DataHolder()
for marker in ["&t=", "?t=", "#t="]:
pos = temp(url.find(marker))
if pos != -1:
time_str = url[temp.value + 3:].split('&')[0].split('?')[0].split('#')[0]
if time_str:
return Utils.ParseTime(time_str)
return None
async def _get_youtube_info(self, url: str) -> Tuple[str, Optional[Dict]]:
"""
Fetch YouTube video information using yt-dlp.
Args:
url: YouTube URL
Returns:
Tuple of (video_url, info_dict)
"""
try:
proc = await asyncio.create_subprocess_exec(
"yt-dlp", "--dump-json", "-g", url,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, _ = await proc.communicate()
parts = stdout.split(b'\n')
parts = [p for p in parts if p] # Remove empty lines
if len(parts) < 2:
return url, None
info = json.loads(parts[-1])
video_url = parts[-2].strip().decode("ascii")
return video_url, info
except Exception as e:
self.Logger.error(f"YouTube info fetch failed: {e}")
return url, None
async def _get_url_info(self, url: str, force_youtube: bool = False) -> Tuple[str, Optional[str]]:
"""
Fetch information about a URL.
Args:
url: URL to fetch
force_youtube: Force YouTube parsing
Returns:
Tuple of (url, text_content or None)
"""
tl = self._get_torchlight()
text = None
if self._is_youtube_url(url) or force_youtube:
video_url, info = await self._get_youtube_info(url)
if info:
try:
if info["extractor_key"] == "Youtube":
duration = timedelta(seconds=info.get("duration", 0))
views = info.get("view_count", 0)
title = info.get("title", "Unknown")
tl.SayChat(
f"\x07E52D27[YouTube]\x01 {title} | {duration} | {views:,}"
)
url = video_url
except (KeyError, TypeError):
pass
time_offset = self._extract_youtube_time(url)
if time_offset:
url += f"#t={time_offset}"
else:
await self._fetch_url_content(url)
tl.LastUrl = url
return url, text
async def _fetch_url_content(self, url: str) -> None:
"""
Fetch and display URL content information.
Args:
url: URL to fetch
"""
tl = self._get_torchlight()
try:
async with aiohttp.ClientSession() as session:
response = await asyncio.wait_for(
session.get(url),
DEFAULT_HTTP_TIMEOUT
)
if not response:
return
content_type = response.headers.get("Content-Type", "")
content_length = response.headers.get("Content-Length", "-1")
try:
content_length = int(content_length)
except (ValueError, TypeError):
content_length = -1
content = await asyncio.wait_for(
response.content.read(DEFAULT_CONTENT_READ_SIZE),
DEFAULT_HTTP_TIMEOUT
)
self._display_url_content(
content_type,
content,
content_length
)
response.close()
except asyncio.TimeoutError:
tl.SayChat("Error: Request timeout")
except Exception as e:
tl.SayChat(f"Error: {str(e)}")
self.Logger.error(traceback.format_exc())
def _display_url_content(
self,
content_type: str,
content: bytes,
content_length: int
) -> None:
"""
Display formatted content information.
Args:
content_type: MIME type
content: Content bytes
content_length: Size in bytes
"""
tl = self._get_torchlight()
if "text" in content_type:
if "text/plain" in content_type:
text = content.decode("utf-8", errors="ignore")
tl.SayChat(f"[TEXT] {text[:200]}")
else:
soup = BeautifulSoup(
content.decode("utf-8", errors="ignore"),
"lxml"
)
if soup.title:
tl.SayChat(f"[URL] {soup.title.string}")
elif "image" in content_type:
try:
fp = io.BytesIO(content)
im = Image.open(fp)
size_str = Utils.HumanSize(content_length) if content_length > 0 else "unknown"
tl.SayChat(
f"[IMAGE] {im.format} | {im.size[0]}x{im.size[1]} | {size_str}"
)
fp.close()
except Exception as e:
self.Logger.error(f"Image parsing failed: {e}")
else:
try:
filetype = magic.from_buffer(bytes(content))
size_str = Utils.HumanSize(content_length) if content_length > 0 else "unknown"
tl.SayChat(f"[FILE] {filetype} | {size_str}")
except Exception as e:
self.Logger.error(f"File type detection failed: {e}")
async def _rfunc(self, line: str, match, player) -> Union[str, int]:
"""
Process URL from regex match.
Args:
line: Original chat line
match: Regex match object
player: Player who sent message
Returns:
Modified line or -1 to skip
"""
url = match.groups()[0]
if not url.startswith("http") and not url.startswith("ftp"):
url = "http://" + url
if line.startswith("!yt "):
processed_url, _ = await self._get_url_info(url, force_youtube=True)
return "!yt " + processed_url
if line.startswith("!dec "):
_, text = await self._get_url_info(url, force_youtube=False)
if text:
return "!dec " + text
asyncio.ensure_future(self._get_url_info(url))
return -1
# ============================================================================
# User Access Commands
# ============================================================================
def format_player_access(torchlight, player) -> str:
"""
Format player access information for display.
Args:
torchlight: Torchlight instance
player: Player object
Returns:
Formatted access string
"""
tl = torchlight() if callable(torchlight) else torchlight
answer = f'#{player.UserID} "{player.Name}"({player.UniqueID}) is '
level_str = "0"
if player.Access:
try:
level_str = str(player.Access["level"])
answer += f"level {level_str} as {player.Access['name']}."
except (KeyError, TypeError):
answer += "not authenticated."
else:
answer += "not authenticated."
# Add usage limits if configured
try:
if level_str in tl.Config["AudioLimits"]:
limits = tl.Config["AudioLimits"][level_str]
uses = limits["Uses"]
total_time = limits["TotalTime"]
if uses >= 0:
used = player.Storage["Audio"]["Uses"]
answer += f" Uses: {used}/{uses}"
if total_time >= 0:
used_time = player.Storage["Audio"]["TimeUsed"]
answer += f" Time: {round(used_time, 2)}/{round(total_time, 2)}"
except (KeyError, TypeError):
pass
return answer
class Access(BaseCommand):
"""Display player access information."""
def __init__(self, torchlight) -> None:
super().__init__(torchlight)
self.Triggers = ["!access"]
self.Level = 0
async def _func(self, message: List[str], player) -> int:
"""Execute access command."""
self.Logger.debug(sys._getframe().f_code.co_name)
if self.check_chat_cooldown(player):
return -1
tl = self._get_torchlight()
tl.SayChat(format_player_access(tl, player), player)
return 0
class Who(BaseCommand):
"""Search for players by name."""
def __init__(self, torchlight) -> None:
super().__init__(torchlight)
self.Triggers = ["!who", "!whois"]
self.Level = 1
async def _func(self, message: List[str], player) -> int:
"""Execute who/whois command."""
self.Logger.debug(sys._getframe().f_code.co_name)
tl = self._get_torchlight()
search_term = message[1].lower() if len(message) > 1 else ""
count = 0
max_results = 3
if message[0] == "!who":
for p in tl.Players:
if search_term in p.Name.lower():
tl.SayChat(format_player_access(tl, p))
count += 1
if count >= max_results:
break
elif message[0] == "!whois":
try:
for unique_id, access_info in tl.Access:
try:
if search_term in access_info["name"].lower():
p = tl.Players.FindUniqueID(unique_id)
if p:
tl.SayChat(format_player_access(tl, p))
else:
name = access_info["name"]
level = access_info["level"]
tl.SayChat(
f'#{unique_id} "{name}"({unique_id}) is level {level} '
f'and currently offline.'
)
count += 1
if count >= max_results:
break
except (KeyError, TypeError) as e:
self.Logger.debug(f"Error accessing access_info: {e}")
continue
except Exception as e:
self.Logger.error(f"Error iterating Access list in !whois: {e}")
tl.SayPrivate(player, f"[WHOIS] Error: {str(e)}")
return 0
# ============================================================================
# Information Commands
# ============================================================================
class WolframAlpha(BaseCommand):
"""Calculation and question answering via Wolfram Alpha API."""
def __init__(self, torchlight) -> None:
super().__init__(torchlight)
self.Triggers = ["!cc"]
self.Level = 10
@staticmethod
def _clean_text(text: str) -> str:
"""Clean Wolfram Alpha response text."""
return re.sub(
r"[ ]{2,}",
" ",
text.replace(' | ', ': ')
.replace('\n', ' | ')
.replace('~~', '')
).strip()
async def _calculate(self, params: Dict[str, str], player) -> int:
"""
Query Wolfram Alpha and display results.
Args:
params: Query parameters (input, appid)
player: Player making query
Returns:
Result code
"""
tl = self._get_torchlight()
try:
async with aiohttp.ClientSession() as session:
response = await asyncio.wait_for(
session.get(
"http://api.wolframalpha.com/v2/query",
params=params
),
timeout=10
)
if not response:
return 1
data = await asyncio.wait_for(response.text(), timeout=5)
if not data:
return 2
root = etree.fromstring(data)
# Extract plaintext answers from pods
pods = [
p.text.strip()
for p in root.findall('.//subpod/plaintext')
if p is not None and p.text is not None
]
pods = [p for p in pods if p] # Filter empty
if not pods:
didyoumeans = root.find("didyoumeans")
if didyoumeans is None:
tl.SayChat("Sorry, couldn't understand the question.", player)
return 3
options = [
f'"{dm.text}"'
for dm in didyoumeans
if dm.text
]
if options:
suggestion = " or ".join(options)
tl.SayChat(f"Did you mean {suggestion}?", player)
return 0
# Format response
if len(pods) == 1:
answer = self._clean_text(pods[0])
tl.SayChat(answer, player)
else:
question = self._clean_text(pods[0].replace(' | ', ' ').replace('\n', ' '))
answer = self._clean_text(pods[1])
tl.SayChat(f"{question} = {answer}", player)
return 0
except Exception as e:
self.Logger.error(f"Wolfram Alpha query failed: {e}")
tl.SayPrivate(player, f"[CC] Error: {str(e)}")
return 1
async def _func(self, message: List[str], player) -> int:
"""Execute calculation command."""
self.Logger.debug(sys._getframe().f_code.co_name)
if not self._validate_command(player):
return -1
tl = self._get_torchlight()
params = {
"input": message[1] if len(message) > 1 else "",
"appid": tl.Config["WolframAPIKey"]
}
return await self._calculate(params, player)
class UrbanDictionary(BaseCommand):
"""Word definitions via Urban Dictionary API."""
def __init__(self, torchlight) -> None:
super().__init__(torchlight)
self.Triggers = ["!define", "!ud"]
self.Level = 10
async def _func(self, message: List[str], player) -> int:
"""Execute definition lookup command."""
self.Logger.debug(sys._getframe().f_code.co_name)
if not self._validate_command(player):
return -1
term = message[1] if len(message) > 1 else ""
if not term:
return 1
tl = self._get_torchlight()
try:
async with aiohttp.ClientSession() as session:
response = await asyncio.wait_for(
session.get(
f"https://api.urbandictionary.com/v0/define?term={term}"
),
timeout=DEFAULT_HTTP_TIMEOUT
)
if not response:
return 1
data = await asyncio.wait_for(
response.json(),
timeout=DEFAULT_HTTP_TIMEOUT
)
if not data or not data.get("list"):
tl.SayChat(f"[UD] No definition found for: {term}", player)
return 4
definition = data["list"][0]
word = definition.get("word", term)
thumbs_up = definition.get("thumbs_up", 0)
thumbs_down = definition.get("thumbs_down", 0)
def_text = definition.get("definition", "No definition available")
example = definition.get("example", "")
msg = f"[UD] {word} ({thumbs_up}/{thumbs_down}): {def_text}"
if example:
msg += f"\n{example}"
tl.SayChat(msg, player)
return 0
except asyncio.TimeoutError:
tl.SayChat("[UD] Error: Request timeout", player)
return 1
except Exception as e:
self.Logger.error(f"UD lookup failed: {e}")
tl.SayChat(f"[UD] Error: {str(e)}", player)
return 1
class OpenWeather(BaseCommand):
"""Weather information via OpenWeather API."""
def __init__(self, torchlight) -> None:
super().__init__(torchlight)
try:
self.GeoIP = geoip2.database.Reader(GEOIP_DB_PATH)
except Exception as e:
self.Logger.warning(f"GeoIP database not available: {e}")
self.GeoIP = None
self.Triggers = ["!w", "!vv"]
self.Level = 10
@staticmethod
def _degree_to_cardinal(degree: float) -> str:
"""Convert wind degree to cardinal direction."""
directions = ["N", "NE", "E", "SE", "S", "SW", "W", "NW"]
return directions[int(((degree + 22.5) / 45.0) % 8)]
async def _func(self, message: List[str], player) -> int:
"""Execute weather command."""
self.Logger.debug(sys._getframe().f_code.co_name)
if not self._validate_command(player):
return -1
tl = self._get_torchlight()
search = ""
if not message[1]:
# Use GeoIP for location
if self.GeoIP:
try:
player_ip = player.Address.split(":")[0]
info = self.GeoIP.city(player_ip)
search = f"lat={info.location.latitude}&lon={info.location.longitude}"
except Exception as e:
self.Logger.warning(f"GeoIP lookup failed: {e}")
search = "q=auto"
else:
search = "q=auto"
else:
search = f"q={message[1]}"
try:
api_key = tl.Config["OpenWeatherAPIKey"]
url = (
f"https://api.openweathermap.org/data/2.5/weather"
f"?APPID={api_key}&units=metric&{search}"
)
async with aiohttp.ClientSession() as session:
response = await asyncio.wait_for(
session.get(url),
timeout=DEFAULT_HTTP_TIMEOUT
)
if not response:
return 2
data = await asyncio.wait_for(
response.json(),
timeout=DEFAULT_HTTP_TIMEOUT
)
if data["cod"] != 200:
msg = data.get("message", "Unknown error")
tl.SayPrivate(player, f"[OW] {msg}")
return 5
# Format wind direction
wind_dir = "?"
if "deg" in data["wind"]:
wind_dir = self._degree_to_cardinal(data["wind"]["deg"])
# Format timezone
tz_seconds = data.get("timezone", 0)
tz_hours = int(tz_seconds / 3600)
tz_mins = (tz_seconds % 3600) / 60
tz_str = f"{'+' if tz_hours >= 0 else ''}{tz_hours}"
if tz_mins:
tz_str += f":{int(tz_mins)}"
# Build weather message
city = data["name"]
country = data["sys"]["country"]
temp = data["main"]["temp"]
temp_min = data["main"]["temp_min"]
temp_max = data["main"]["temp_max"]
weather = data["weather"][0]["main"]
description = data["weather"][0]["description"]
wind_speed = data["wind"]["speed"]
clouds = data["clouds"]["all"]
humidity = data["main"]["humidity"]
msg = (
f"[{city}, {country}](UTC{tz_str}) {temp}°C ({temp_min}/{temp_max}) "
f"{weather}: {description} | Wind {wind_dir} {wind_speed}kph | "
f"Clouds: {clouds}% | Humidity: {humidity}%"
)
tl.SayChat(msg, player)
return 0
except asyncio.TimeoutError:
tl.SayChat("[WEATHER] Error: Request timeout", player)
return 1
except Exception as e:
self.Logger.error(f"Weather lookup failed: {e}")
tl.SayChat(f"[WEATHER] Error: {str(e)}", player)
return 1
# ============================================================================
# Voice Commands
# ============================================================================
class VoiceCommands(BaseCommand):
"""Play audio clips with optional modifiers."""
def __init__(self, torchlight) -> None:
super().__init__(torchlight)
self.Triggers = ["!random", "!search"]
self.Level = 0
self.VoiceTriggers: Dict[str, Dict] = {}
def _load_triggers(self) -> None:
"""Load voice triggers from JSON file. Uses caching to avoid reloading."""
# Skip if already loaded
if self.VoiceTriggers:
self.Logger.debug("Using cached voice triggers")
return
try:
with open("triggers.json", "r") as fp:
triggers = json.load(fp)
self.VoiceTriggers = {}
for line in triggers:
for trigger in line.get("names", []):
self.VoiceTriggers[trigger] = line
self.Logger.info(f"Loaded {len(self.VoiceTriggers)} voice triggers")
except FileNotFoundError:
self.Logger.error("triggers.json file not found")
except json.JSONDecodeError as e:
self.Logger.error(f"Invalid JSON in triggers.json: {e}")
except Exception as e:
self.Logger.error(f"Failed to load triggers: {e}")
def _setup(self) -> None:
"""Setup command triggers."""
self.Logger.debug(sys._getframe().f_code.co_name)
self._load_triggers()
if not self.VoiceTriggers:
self.Logger.warning("No voice triggers loaded (triggers.json missing or empty)")
self.Triggers.extend(self.VoiceTriggers.keys())
async def _func(self, message: List[str], player) -> int:
"""Execute voice command."""
self.Logger.debug(sys._getframe().f_code.co_name)
if self.check_disabled(player):
return -1
tl = self._get_torchlight()
player_level = self._get_player_level(player)
trigger = message[0].lower()
args = (message[1] or "").lower() if len(message) > 1 else ""
# Handle search command
if trigger == "!search":
matches = [
key for key in self.VoiceTriggers.keys()
if args in key.lower()
]
tl.SayPrivate(
player,
f"{len(matches)} results: {', '.join(matches)}"
)
return 0
# Non-prefix commands require level 2+
if not trigger.startswith('!') and player_level < 2:
return 1
if player_level < 2:
return 0
# Parse audio modifiers
parser = AudioParamParser()
audio_params = parser.parse(args)
# Check if sound is restricted
disable_modifiers = False
# Ensure triggers are loaded
if not self.VoiceTriggers:
try:
self._load_triggers()
except Exception:
pass
if trigger == "!random":
import random
values = list(self.VoiceTriggers.values())
if not values:
return 1
trigger_data = random.choice(values)
disable_modifiers = trigger_data.get("restricted", False)
sound = self._select_sound(trigger_data)
else:
if trigger not in self.VoiceTriggers:
return 1
trigger_data = self.VoiceTriggers[trigger]
disable_modifiers = trigger_data.get("restricted", False)
sound = self._select_sound_by_args(trigger_data, args, player, tl)
if not sound:
return 1
# Apply restrictions
if disable_modifiers:
tl.SayPrivate(
player,
"Modifiers are disabled for this sound. "
"Spam less, spam better."
)
audio_params = AudioParams()
# Play audio
return self._play_sound(sound, player, audio_params)
@staticmethod
def _select_sound(trigger_data: Dict[str, Any]) -> Optional[str]:
"""Select a random sound from trigger data."""
import random
sounds = trigger_data.get("sound", [])
if isinstance(sounds, list):
return random.choice(sounds)
return sounds
def _select_sound_by_args(
self,
trigger_data: Dict[str, Any],
args: str,
player,
tl
) -> Optional[str]:
"""Select sound based on user arguments."""
sounds = trigger_data.get("sound", [])
if not isinstance(sounds, list):
return sounds
# No args or only modifiers: pick random
if not args or not args.strip():
import random
return random.choice(sounds)
cleaned_args = AudioParamParser.extract_non_param_args(args)
if not cleaned_args:
import random
return random.choice(sounds)
# Try numeric index
try:
idx = int(cleaned_args.split()[0])
if 0 < idx <= len(sounds):
return sounds[idx - 1]
if idx:
tl.SayPrivate(
player,
f"Number {idx} is out of bounds, max {len(sounds)}."
)
return None
except (ValueError, IndexError):
pass
# Don't search if args look like parameters
if args.startswith("tempo=") or args.startswith("pitch=") or \
args.startswith("backward=") or args.startswith("backwards="):
import random
return random.choice(sounds)
# Search by name
search_term = cleaned_args.split()[0]
searching_mode = search_term.startswith('?')
if searching_mode:
search_term = search_term[1:]
matches = []
names = []
for sound in sounds:
name = os.path.splitext(os.path.basename(sound))[0]
names.append(name)
if search_term in name.lower():
matches.append((name, sound))
if matches:
matches.sort(key=lambda t: len(t[0]))
match_names = [t[0] for t in matches]
if searching_mode:
tl.SayPrivate(
player,
f"{len(match_names)} results: {', '.join(match_names)}"
)
return None
if len(matches) > 1:
tl.SayPrivate(
player,
f"Multiple matches: {', '.join(match_names)}"
)
return matches[0][1]
# No matches
if not searching_mode:
tl.SayPrivate(
player,
f"Couldn't find {search_term} in list of sounds."
)
tl.SayPrivate(player, ", ".join(names))
return None
def _play_sound(
self,
sound: str,
player,
audio_params: AudioParams
) -> int:
"""
Play audio file.
Args:
sound: Sound file path
player: Player to play for
audio_params: Audio parameters
Returns:
Result code
"""
tl = self._get_torchlight()
path = os.path.abspath(os.path.join("sounds", sound))
audio_clip = tl.AudioManager.AudioClip(player, f"file://{path}")
if not audio_clip:
tl.SayPrivate(player, "[VOICE] Failed to create audio clip")
self.Logger.error(f"Failed to create audio clip for: {path}")
return 1
return audio_clip.Play(
rubberband=audio_params.to_rubberband_args(),
bitrate=audio_params.to_bitrate_args(),
backwards=audio_params.backwards
)
class YouTube(BaseCommand):
"""Play audio from YouTube videos."""
def __init__(self, torchlight) -> None:
super().__init__(torchlight)
self.Triggers = ["!yt"]
self.Level = 6
async def _func(
self,
message: List[str],
player,
line: Optional[str] = None
) -> int:
"""Execute YouTube playback command."""
self.Logger.debug(sys._getframe().f_code.co_name)
if self.check_disabled(player):
return -1
tl = self._get_torchlight()
url = message[1] if len(message) > 1 else ""
if not url or not url.strip():
tl.SayPrivate(player, "[YT] Please provide a YouTube URL")
return 1
# Replace !last with actual last URL
if url == "!last" and tl.LastUrl:
url = tl.LastUrl
# Extract timestamp
time_offset = self._extract_time_offset(url)
if time_offset:
url = self._remove_time_from_url(url)
# Create audio clip
audio_clip = tl.AudioManager.AudioClip(player, url)
if not audio_clip:
tl.SayPrivate(player, "[YT] Failed to create audio clip")
return 1
# Parse modifiers
parser = AudioParamParser()
modifiers_str = line.split(" ", 1)[1] if line and " " in line else ""
audio_params = parser.parse(modifiers_str)
return audio_clip.Play(
time_offset,
rubberband=audio_params.to_rubberband_args(),
bitrate=audio_params.to_bitrate_args(),
backwards=audio_params.backwards
)
@staticmethod
def _extract_time_offset(url: str) -> Optional[int]:
"""Extract timestamp from URL."""
for marker in ["&t=", "?t=", "#t="]:
idx = url.find(marker)
if idx != -1:
time_str = url[idx + 3:].split('&')[0].split('?')[0].split('#')[0]
if time_str:
return Utils.ParseTime(time_str)
return None
@staticmethod
def _remove_time_from_url(url: str) -> str:
"""Remove timestamp from URL."""
for marker in ["&t=", "?t=", "#t="]:
idx = url.find(marker)
if idx != -1:
return url[:idx]
return url
class YouTubeSearch(BaseCommand):
"""Search YouTube and play first result."""
def __init__(self, torchlight) -> None:
super().__init__(torchlight)
self.Triggers = ["!yts"]
self.Level = 6
async def _func(
self,
message: List[str],
player,
line: Optional[str] = None
) -> int:
"""Execute YouTube search command."""
self.Logger.debug(sys._getframe().f_code.co_name)
if self.check_disabled(player):
return -1
tl = self._get_torchlight()
query = message[1] if len(message) > 1 else ""
if not query or not query.strip():
tl.SayPrivate(player, "[YTS] Please provide a search query")
return 1
# Remove modifiers from search query
parser = AudioParamParser()
search_term = parser.extract_non_param_args(query)
try:
# Search YouTube
proc = await asyncio.create_subprocess_exec(
"yt-dlp", "--dump-json", "-xg",
f"ytsearch:{search_term}",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, _ = await proc.communicate()
lines = stdout.split(b'\n')
lines = [l for l in lines if l]
if len(lines) < 2:
tl.SayPrivate(player, "[YTS] No results found")
return 1
url = lines[0].strip().decode("ascii")
info = json.loads(lines[1])
if info.get("extractor_key") == "Youtube":
duration = timedelta(seconds=info.get("duration", 0))
views = info.get("view_count", 0)
title = info.get("title", "Unknown")
tl.SayChat(
f"\x07E52D27[YouTube]\x01 {title} | {duration} | {views:,}"
)
# Create audio clip
audio_clip = tl.AudioManager.AudioClip(player, url)
if not audio_clip:
return 1
tl.LastUrl = url
# Parse modifiers
modifiers_str = line.split(" ", 1)[1] if line and " " in line else ""
audio_params = parser.parse(modifiers_str)
return audio_clip.Play(
None,
rubberband=audio_params.to_rubberband_args(),
bitrate=audio_params.to_bitrate_args(),
backwards=audio_params.backwards
)
except Exception as e:
self.Logger.error(f"YouTube search failed: {e}")
tl.SayPrivate(player, f"[YTS] Error: {str(e)}")
return 1
# ============================================================================
# Text-to-Speech Commands
# ============================================================================
class Say(BaseCommand):
"""Text-to-speech via Google Translate."""
VALID_LANGUAGES = []
def __init__(self, torchlight) -> None:
super().__init__(torchlight)
self.Triggers = [("!say", 4)]
self.Level = 2
# Load valid languages on init
try:
self.VALID_LANGUAGES = list(gtts.lang.tts_langs().keys())
except Exception:
pass
async def _func(self, message: List[str], player) -> int:
"""Execute text-to-speech command."""
self.Logger.debug(sys._getframe().f_code.co_name)
if self.check_disabled(player):
return -1
text = message[1] if len(message) > 1 else ""
if not text:
return 1
# Extract language from trigger
language = "en"
if len(message[0]) > 4:
language = message[0][4:]
if language not in self.VALID_LANGUAGES:
return 1
parser = AudioParamParser()
audio_params = parser.parse(text)
clean_text = parser.extract_non_param_args(text)
asyncio.ensure_future(self._speak(player, language, clean_text, audio_params))
return 0
async def _speak(
self,
player,
language: str,
text: str,
audio_params: AudioParams
) -> None:
"""Generate and play speech."""
tl = self._get_torchlight()
temp_file_path = None
try:
tts = gtts.gTTS(text=text, lang=language)
temp_file = tempfile.NamedTemporaryFile(delete=False)
temp_file_path = temp_file.name
tts.write_to_fp(temp_file)
temp_file.close()
audio_clip = tl.AudioManager.AudioClip(
player,
f"file://{temp_file_path}"
)
if not audio_clip:
tl.SayPrivate(player, "[SAY] Failed to create audio clip")
os.unlink(temp_file_path)
return
result = audio_clip.Play(
rubberband=audio_params.to_rubberband_args(),
bitrate=audio_params.to_bitrate_args(),
backwards=audio_params.backwards
)
if result:
audio_clip.AudioPlayer.AddCallback(
"Stop",
lambda: self._cleanup_temp_file(temp_file_path)
)
else:
os.unlink(temp_file_path)
except Exception as e:
self.Logger.error(f"Text-to-speech failed: {e}")
tl.SayPrivate(player, f"[SAY] Error: {str(e)}")
if temp_file_path and os.path.exists(temp_file_path):
try:
os.unlink(temp_file_path)
except Exception:
pass
@staticmethod
def _cleanup_temp_file(path: str) -> None:
"""Safely cleanup temporary file."""
try:
if path and os.path.exists(path):
os.unlink(path)
except Exception as e:
logging.getLogger(__name__).error(f"Failed to cleanup temp file: {e}")
class DECTalk(BaseCommand):
"""DECTalk text-to-speech."""
def __init__(self, torchlight) -> None:
super().__init__(torchlight)
self.Triggers = ["!dec"]
self.Level = 5
async def _func(self, message: List[str], player) -> int:
"""Execute DECTalk command."""
self.Logger.debug(sys._getframe().f_code.co_name)
if self.check_disabled(player):
return -1
text = message[1] if len(message) > 1 else ""
if not text or not text.strip():
tl = self._get_torchlight()
tl.SayPrivate(player, "[DEC] Please provide text to synthesize")
return 1
asyncio.ensure_future(self._synthesize(player, text))
return 0
async def _synthesize(self, player, text: str) -> None:
"""Synthesize and play DECTalk audio."""
tl = self._get_torchlight()
temp_file_path = None
try:
# Format text for DECTalk
formatted_text = f"[:phoneme on]{text}"
temp_file = tempfile.NamedTemporaryFile(delete=False)
temp_file_path = temp_file.name
temp_file.close()
# Run DECTalk via Wine
proc = await asyncio.create_subprocess_exec(
"wine", "say.exe", "-w", temp_file_path,
cwd=DECTALK_CWD,
stdin=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
await proc.communicate(
formatted_text.encode('utf-8', errors='ignore')
)
audio_clip = tl.AudioManager.AudioClip(
player,
f"file://{temp_file_path}"
)
if not audio_clip:
tl.SayPrivate(player, "[DEC] Failed to create audio clip")
os.unlink(temp_file_path)
return
result = audio_clip.Play(dec_params=["-af", "volume=10dB"])
if result:
audio_clip.AudioPlayer.AddCallback(
"Stop",
lambda: self._cleanup_temp_file(temp_file_path)
)
else:
os.unlink(temp_file_path)
except Exception as e:
self.Logger.error(f"DECTalk synthesis failed: {e}")
tl.SayPrivate(player, f"[DEC] Error: {str(e)}")
if temp_file_path and os.path.exists(temp_file_path):
try:
os.unlink(temp_file_path)
except Exception:
pass
@staticmethod
def _cleanup_temp_file(path: str) -> None:
"""Safely cleanup temporary file."""
try:
if path and os.path.exists(path):
os.unlink(path)
except Exception as e:
logging.getLogger(__name__).error(f"Failed to cleanup temp file: {e}")
# ============================================================================
# Control Commands
# ============================================================================
class Stop(BaseCommand):
"""Stop audio playback."""
def __init__(self, torchlight) -> None:
super().__init__(torchlight)
self.Triggers = ["!stop"]
self.Level = 0
async def _func(self, message: List[str], player) -> int:
"""Execute stop command."""
self.Logger.debug(sys._getframe().f_code.co_name)
tl = self._get_torchlight()
try:
tl.AudioManager.Stop(player, message[1] if len(message) > 1 else "")
tl.SayPrivate(player, "[STOP] Audio stopped")
except Exception as e:
self.Logger.error(f"Stop command failed: {e}")
tl.SayPrivate(player, f"[STOP] Error: {str(e)}")
return True
class VoteDisable(BaseCommand):
"""Vote to disable Torchlight."""
def __init__(self, torchlight) -> None:
super().__init__(torchlight)
self.Triggers = ["!votedisable", "!disablevote"]
self.Level = 0
async def _func(self, message: List[str], player) -> int:
"""Execute vote disable command."""
self.Logger.debug(sys._getframe().f_code.co_name)
tl = self._get_torchlight()
if tl.Disabled:
tl.SayPrivate(
player,
"Torchlight is already disabled for the duration of this map."
)
return 1
tl.DisableVotes.add(player.UniqueID)
votes_needed = len(tl.Players) // 5
votes_have = len(tl.DisableVotes)
if votes_have >= votes_needed:
tl.SayChat(
"Torchlight has been disabled for the duration of this map."
)
tl.Disabled = 6
else:
votes_remaining = votes_needed - votes_have
tl.SayPrivate(
player,
f"Torchlight needs {votes_remaining} more disable votes "
f"to be disabled."
)
return 0
class EnableDisable(BaseCommand):
"""Admin commands to enable/disable Torchlight."""
def __init__(self, torchlight) -> None:
super().__init__(torchlight)
self.Triggers = ["!enable", "!disable"]
self.Level = 6
async def _func(self, message: List[str], player) -> int:
"""Execute enable/disable command."""
self.Logger.debug(sys._getframe().f_code.co_name)
tl = self._get_torchlight()
player_level = self._get_player_level(player)
disabled_level = tl.Disabled if tl.Disabled else 0
if message[0] == "!enable":
if disabled_level and disabled_level > player_level:
tl.SayPrivate(
player,
"You don't have access to enable torchlight, since it was "
"disabled by a higher level user."
)
return 1
tl.SayChat(
"Torchlight has been enabled for the duration of this map - "
"Type !disable to disable it again."
)
tl.Disabled = False
elif message[0] == "!disable":
if disabled_level and disabled_level > player_level:
tl.SayPrivate(
player,
"You don't have access to disable torchlight, since it was "
"already disabled by a higher level user."
)
return 1
tl.SayChat(
"Torchlight has been disabled for the duration of this map - "
"Type !enable to enable it again."
)
tl.Disabled = player_level
return 0
# ============================================================================
# Admin Commands
# ============================================================================
class AdminAccess(BaseCommand):
"""Manage user access levels."""
def __init__(self, torchlight) -> None:
super().__init__(torchlight)
self.Triggers = ["!access"]
self.Level = 6
def _reload_access(self) -> None:
"""Reload access list from storage."""
tl = self._get_torchlight()
tl.Access.Load()
for player in tl.Players:
try:
player.Access = tl.Access[player.UniqueID]
except (KeyError, TypeError):
player.Access = None
async def _func(self, message: List[str], player) -> int:
"""Execute admin access command."""
self.Logger.debug(sys._getframe().f_code.co_name)
tl = self._get_torchlight()
cmd = message[1].lower() if len(message) > 1 else ""
if cmd == "reload":
self._reload_access()
tl.SayChat(
f"Loaded access list with {len(tl.Access)} users"
)
return 0
elif cmd == "save":
tl.Access.Save()
tl.SayChat(
f"Saved access list with {len(tl.Access)} users"
)
return 0
# Modify access for a specific user
else:
return await self._modify_access(message[1], player, tl)
async def _modify_access(
self,
arg: str,
executor,
tl
) -> int:
"""Modify a user's access level."""
# Parse the command
executor_level = self._get_player_level(executor)
# Find " as " to separate regname from level
as_idx = arg.find(" as ")
if as_idx != -1:
user_part = arg[:as_idx].strip()
rest = arg[as_idx + 4:].strip()
try:
regname, level_str = rest.rsplit(' ', 1)
except ValueError:
tl.SayChat("Invalid format: use '!access <user> as <name> <level>'")
return 1
else:
try:
user_part, level_str = arg.rsplit(' ', 1)
except ValueError:
tl.SayChat("Invalid format: use '!access <user> <level>'")
return 1
regname = None
user_part = user_part.strip()
level_str = level_str.strip()
# Find user
target_player = self._find_user(user_part, tl)
if not target_player:
tl.SayChat(f"Couldn't find user: {user_part}")
return 1
# Handle numeric level change
if level_str.lstrip('-').isdigit():
new_level = int(level_str)
# Check permissions
if new_level >= executor_level and executor_level < 10:
tl.SayChat(
f"Can't assign level {new_level} - higher than your level "
f"({executor_level})"
)
return 1
if target_player.Access:
target_level = target_player.Access["level"]
if target_level >= executor_level and executor_level < 10:
tl.SayChat(
f"Can't modify level {target_level} - "
f"higher than your level ({executor_level})"
)
return 1
# Update existing access
if regname:
target_player.Access["name"] = regname
tl.SayChat(
f'Changed "{target_player.Name}"({target_player.UniqueID}) '
f'from level {target_level} to {new_level} '
f'as {regname}'
)
else:
tl.SayChat(
f'Changed "{target_player.Name}"({target_player.UniqueID}) '
f'from level {target_level} to {new_level}'
)
target_player.Access["level"] = new_level
else:
# Add new access
if not regname:
regname = target_player.Name
from collections import OrderedDict
target_player.Access = OrderedDict([
("name", regname),
("level", new_level)
])
tl.Access[target_player.UniqueID] = target_player.Access
tl.SayChat(
f'Added "{target_player.Name}"({target_player.UniqueID}) '
f'as {regname} with level {new_level}'
)
tl.Access[target_player.UniqueID] = target_player.Access
elif level_str == "revoke":
if target_player.Access:
target_level = target_player.Access["level"]
if target_level >= executor_level and executor_level < 10:
tl.SayChat(
f"Can't revoke level {target_level} - "
f"higher than your level ({executor_level})"
)
return 1
tl.SayChat(
f'Removed "{target_player.Name}"({target_player.UniqueID}) '
f'(was {target_player.Access["name"]} with level '
f'{target_level})'
)
del tl.Access[target_player.UniqueID]
target_player.Access = None
else:
tl.SayChat(f"Invalid level: {level_str}")
return 1
return 0
@staticmethod
def _find_user(identifier: str, tl):
"""Find user by ID or name."""
# Try user ID format (#123)
if identifier.startswith('#') and identifier[1:].isdigit():
return tl.Players.FindUserID(int(identifier[1:]))
# Search by name
for player in tl.Players:
if identifier.lower() in player.Name.lower():
return player
return None
class TorchHelp(BaseCommand):
"""List available commands and their help text."""
def __init__(self, torchlight) -> None:
super().__init__(torchlight)
self.Triggers = ["!torchhelp"]
self.Level = 0
async def _func(self, message: List[str], player) -> int:
"""Show help for all commands or filter by term."""
self.Logger.debug(sys._getframe().f_code.co_name)
tl = self._get_torchlight()
term = message[1].lower() if len(message) > 1 else ""
results = []
# Prefer authoritative listing from tl.Commands if available
try:
cmds_iter = None
if hasattr(tl, "Commands"):
cmds = tl.Commands
if isinstance(cmds, dict):
cmds_iter = cmds.values()
else:
cmds_iter = cmds
if cmds_iter is not None:
for cmd in cmds_iter:
try:
trig = cmd.Triggers[0] if getattr(cmd, "Triggers", None) else type(cmd).__name__
help_text = cmd.get_help() if hasattr(cmd, "get_help") else ""
if not term or term in trig.lower() or term in help_text.lower():
results.append(f"{trig}: {help_text}")
except Exception:
continue
except Exception:
pass
# Fallback: inspect defined command classes
if not results:
try:
import inspect
for obj in globals().values():
if inspect.isclass(obj) and issubclass(obj, BaseCommand) and obj is not BaseCommand:
try:
inst = obj(tl)
trig = inst.Triggers[0] if getattr(inst, "Triggers", None) else obj.__name__
help_text = inst.get_help() if hasattr(inst, "get_help") else ""
if not term or term in trig.lower() or term in help_text.lower():
results.append(f"{trig}: {help_text}")
except Exception:
continue
except Exception:
pass
# Deliver results privately to requester
if not results:
tl.SayPrivate(player, "No help entries found.")
return 0
max_send = 20
for line in results[:max_send]:
tl.SayPrivate(player, line)
if len(results) > max_send:
tl.SayPrivate(player, f"... and {len(results) - max_send} more. Use '!torchhelp <term>' to filter.")
return 0
class Reload(BaseCommand):
"""Reload all commands and configuration."""
def __init__(self, torchlight) -> None:
super().__init__(torchlight)
self.Triggers = ["!reload"]
self.Level = 7
async def _func(self, message: List[str], player) -> int:
"""Execute reload command."""
self.Logger.debug(sys._getframe().f_code.co_name)
tl = self._get_torchlight()
tl.Reload()
return 0