projects-jenz/torchlight_changes_unloze/torchlight3/Torchlight/AudioManager.py

454 lines
15 KiB
Python
Executable File

#!/usr/bin/python3
# -*- coding: utf-8 -*-
import logging
import sys
import math
import time
from .FFmpegAudioPlayer import FFmpegAudioPlayerFactory
def _progress_delta(old_position, new_position, elapsed = None, jitter = 0.25):
try:
delta = abs(float(new_position) - float(old_position))
except (TypeError, ValueError):
return 0.0
if not math.isfinite(delta):
return 0.0
if elapsed is not None:
try:
elapsed = float(elapsed)
except (TypeError, ValueError):
return 0.0
if not math.isfinite(elapsed) or elapsed < 0.0:
return 0.0
delta = min(delta, elapsed + jitter)
return delta
class AudioPlayerFactory():
AUDIOPLAYER_FFMPEG = 1
def __init__(self, master):
self.Logger = logging.getLogger(__class__.__name__)
self.Master = master
self.Torchlight = self.Master.Torchlight
self.FFmpegAudioPlayerFactory = FFmpegAudioPlayerFactory(self)
def __del__(self):
self.Logger.info("~AudioPlayerFactory()")
def NewPlayer(self, _type):
if _type == self.AUDIOPLAYER_FFMPEG:
return self.FFmpegAudioPlayerFactory.NewPlayer()
raise ValueError("Unsupported audio player type: {0}".format(_type))
class AntiSpam():
GLOBAL_PLAY_TIME_IMMUNITY_LEVEL = 10
def __init__(self, master):
self.Logger = logging.getLogger(__class__.__name__)
self.Master = master
self.Torchlight = self.Master.Torchlight
self.LastClips = dict()
self.DisabledTime = None
def _counts_toward_global_play_time(self, level):
return level < self.GLOBAL_PLAY_TIME_IMMUNITY_LEVEL
def CheckAntiSpam(self, player):
if self.DisabledTime and self.DisabledTime > self.Torchlight().Master.Loop.time() and \
not (player.Access and player.Access["level"] >= self.Torchlight().Config["AntiSpam"]["ImmunityLevel"]):
self.Torchlight().SayPrivate(player, "Torchlight is currently on cooldown! ({0} seconds left)".format(
math.ceil(self.DisabledTime - self.Torchlight().Master.Loop.time())))
return False
return True
def SpamCheck(self):
Now = self.Torchlight().Master.Loop.time()
Duration = 0.0
config = self.Torchlight().Config["AntiSpam"]
for Key, Clip in list(self.LastClips.items()):
if not Clip["timestamp"]:
continue
if Clip["timestamp"] + Clip["duration"] + config["MaxUsageSpan"] < Now:
if not Clip["active"]:
del self.LastClips[Key]
continue
Duration += Clip["duration"]
if Duration > config["MaxUsageTime"]:
self.DisabledTime = self.Torchlight().Master.Loop.time() + config["PunishDelay"]
self.Torchlight().SayChat("Blocked voice commands for the next {0} seconds. Used {1} seconds within {2} seconds.".format(
config["PunishDelay"], config["MaxUsageTime"], config["MaxUsageSpan"]))
# Make a copy of the list since AudioClip.Stop() will change the list
for AudioClip in self.Master.AudioClips[:]:
if AudioClip.Level < config["ImmunityLevel"]:
AudioClip.Stop()
self.LastClips.clear()
def OnPlay(self, clip):
if not self._counts_toward_global_play_time(clip.Level):
return
Now = self.Torchlight().Master.Loop.time()
self.LastClips[clip] = dict({"timestamp": Now, "duration": 0.0, "dominant": False, "active": True, "last_update_realtime": time.time()})
HasDominant = False
for Key, Clip in self.LastClips.items():
if Clip["dominant"]:
HasDominant = True
break
self.LastClips[clip]["dominant"] = not HasDominant
def OnStop(self, clip):
if not self._counts_toward_global_play_time(clip.Level):
return
if clip not in self.LastClips:
return
self.LastClips[clip]["active"] = False
if self.LastClips[clip]["dominant"]:
for Key, Clip in self.LastClips.items():
if Clip["active"]:
Clip["dominant"] = True
break
self.LastClips[clip]["dominant"] = False
def OnUpdate(self, clip, old_position, new_position):
if not self._counts_toward_global_play_time(clip.Level):
return
if clip not in self.LastClips:
if self.Logger.isEnabledFor(logging.DEBUG):
self.Logger.debug(
"OnUpdate called for unknown clip key %r; %d known keys",
clip,
len(self.LastClips),
)
return
Clip = self.LastClips[clip]
now = time.time()
elapsed = None
if Clip.get("last_update_realtime") is not None:
elapsed = now - Clip["last_update_realtime"]
Clip["last_update_realtime"] = now
Delta = _progress_delta(old_position, new_position, elapsed = elapsed)
if Delta <= 0.0 or not Clip["dominant"]:
return
Clip["duration"] += Delta
self.SpamCheck()
class Advertiser():
def __init__(self, master):
self.Logger = logging.getLogger(__class__.__name__)
self.Master = master
self.Torchlight = self.Master.Torchlight
self.LastClips = dict()
def Think(self, clip):
Now = self.Torchlight().Master.Loop.time()
config = self.Torchlight().Config["Advertiser"]
for Key, Clip in list(self.LastClips.items()):
if not Clip["timestamp"]:
continue
if Clip["timestamp"] + Clip["duration"] + config["MaxSpan"] < Now:
if not Clip["active"]:
del self.LastClips[Key]
continue
Clip = self.LastClips.get(clip)
if not Clip:
return
if not clip.StopHinted and not Clip.get("hinted") and Clip["duration"] >= config["AdStop"]:
self.Torchlight().SayChat("Hint: Type \x07FF0000!stop(ze) !pls(mg)\x01 to stop all currently playing sounds.")
Clip["hinted"] = True
clip.StopHinted = True
def OnPlay(self, clip):
Now = self.Torchlight().Master.Loop.time()
self.LastClips[clip] = dict({"timestamp": Now, "duration": 0.0, "dominant": False, "active": True, "hinted": False, "last_update_realtime": time.time()})
HasDominant = False
for Key, Clip in self.LastClips.items():
if Clip["dominant"]:
HasDominant = True
break
self.LastClips[clip]["dominant"] = not HasDominant
def OnStop(self, clip):
if clip not in self.LastClips:
return
self.LastClips[clip]["active"] = False
if self.LastClips[clip]["dominant"]:
for Key, Clip in self.LastClips.items():
if Clip["active"]:
Clip["dominant"] = True
break
self.LastClips[clip]["dominant"] = False
def OnUpdate(self, clip, old_position, new_position):
if clip not in self.LastClips:
return
Clip = self.LastClips[clip]
now = time.time()
elapsed = None
if Clip.get("last_update_realtime") is not None:
elapsed = now - Clip["last_update_realtime"]
Clip["last_update_realtime"] = now
Delta = _progress_delta(old_position, new_position, elapsed = elapsed)
if Delta <= 0.0 or not Clip["dominant"]:
return
Clip["duration"] += Delta
self.Think(clip)
class AudioManager():
def __init__(self, torchlight):
self.Logger = logging.getLogger(__class__.__name__)
self.Torchlight = torchlight
self.AntiSpam = AntiSpam(self)
self.Advertiser = Advertiser(self)
self.AudioPlayerFactory = AudioPlayerFactory(self)
self.AudioClips = []
def __del__(self):
self.Logger.info("~AudioManager()")
def CheckLimits(self, player):
Level = 0
if player.Access:
Level = player.Access["level"]
config = self.Torchlight().Config["AudioLimits"]
if str(Level) in config:
level_config = config[str(Level)]
if level_config["Uses"] >= 0 and \
player.Storage["Audio"]["Uses"] >= level_config["Uses"]:
self.Torchlight().SayPrivate(player, "You have used up all of your free uses! ({0} uses)".format(
level_config["Uses"]))
return False
if player.Storage["Audio"]["TimeUsed"] >= level_config["TotalTime"]:
self.Torchlight().SayPrivate(player, "You have used up all of your free time! ({0} seconds)".format(
level_config["TotalTime"]))
return False
TimeElapsed = self.Torchlight().Master.Loop.time() - player.Storage["Audio"]["LastUse"]
UseDelay = player.Storage["Audio"]["LastUseLength"] * level_config["DelayFactor"]
if TimeElapsed < UseDelay:
self.Torchlight().SayPrivate(player, "You are currently on cooldown! ({0} seconds left)".format(
round(UseDelay - TimeElapsed)))
return False
return True
def Stop(self, player, extra):
Level = 0
if player.Access:
Level = player.Access["level"]
StopLevel = self.Torchlight().Config["AntiSpam"]["StopLevel"]
ExtraFilter = extra.strip().lower() if isinstance(extra, str) else ""
Result = {"matched": 0, "stopped": 0, "pending": 0}
for AudioClip in self.AudioClips[:]:
if ExtraFilter and ExtraFilter not in AudioClip.Player.Name.lower():
continue
Result["matched"] += 1
# Always allow users to stop their own clip immediately.
requires_votes = player != AudioClip.Player and \
(not Level or (Level < AudioClip.Level and Level < StopLevel))
if requires_votes:
AudioClip.Stops.add(player.UserID)
if len(AudioClip.Stops) >= 3:
AudioClip.Stop()
Result["stopped"] += 1
self.Torchlight().SayPrivate(AudioClip.Player, "Your audio clip was stopped.")
if player != AudioClip.Player:
self.Torchlight().SayPrivate(player, "Stopped \"{0}\"({1}) audio clip.".format(AudioClip.Player.Name, AudioClip.Player.UserID))
else:
Result["pending"] += 1
self.Torchlight().SayPrivate(player, "This audio clip needs {0} more !stop's.".format(3 - len(AudioClip.Stops)))
else:
AudioClip.Stop()
Result["stopped"] += 1
self.Torchlight().SayPrivate(AudioClip.Player, "Your audio clip was stopped.")
if player != AudioClip.Player:
self.Torchlight().SayPrivate(player, "Stopped \"{0}\"({1}) audio clip.".format(AudioClip.Player.Name, AudioClip.Player.UserID))
return Result
def CreateAudioClip(self, player, uri, _type = AudioPlayerFactory.AUDIOPLAYER_FFMPEG):
Level = 0
if player.Access:
Level = player.Access["level"]
if self.Torchlight().Disabled and self.Torchlight().Disabled > Level:
self.Torchlight().SayPrivate(player, "Torchlight is currently disabled!")
return None
if not self.AntiSpam.CheckAntiSpam(player):
return None
if not self.CheckLimits(player):
return None
try:
Clip = AudioClip(self, player, uri, _type)
except ValueError as ex:
self.Logger.warning("Rejected unsupported audio player type %r: %s", _type, ex)
self.Torchlight().SayPrivate(player, "Unsupported audio player type.")
return None
self.AudioClips.append(Clip)
anti_spam_track_ceiling = min(
self.Torchlight().Config["AntiSpam"]["ImmunityLevel"],
self.AntiSpam.GLOBAL_PLAY_TIME_IMMUNITY_LEVEL,
)
if not player.Access or player.Access["level"] < anti_spam_track_ceiling:
Clip.AudioPlayer.AddCallback("Play", lambda *args: self.AntiSpam.OnPlay(Clip, *args))
Clip.AudioPlayer.AddCallback("Stop", lambda *args: self.AntiSpam.OnStop(Clip, *args))
Clip.AudioPlayer.AddCallback("Update", lambda *args: self.AntiSpam.OnUpdate(Clip, *args))
Clip.AudioPlayer.AddCallback("Play", lambda *args: self.Advertiser.OnPlay(Clip, *args))
Clip.AudioPlayer.AddCallback("Stop", lambda *args: self.Advertiser.OnStop(Clip, *args))
Clip.AudioPlayer.AddCallback("Update", lambda *args: self.Advertiser.OnUpdate(Clip, *args))
return Clip
def AudioClip(self, player, uri, _type = AudioPlayerFactory.AUDIOPLAYER_FFMPEG):
return self.CreateAudioClip(player, uri, _type)
def OnDisconnect(self, player):
for AudioClip in self.AudioClips[:]:
if AudioClip.Player == player:
AudioClip.Stop()
class AudioClip():
def __init__(self, master, player, uri, _type):
self.Logger = logging.getLogger(__class__.__name__)
self.Master = master
self.Torchlight = self.Master.Torchlight
self.Player = player
self.Type = _type
self.URI = uri
self.LastPosition = None
self.LastUpdateRealtime = None
self.StartedAt = None
self.StopHinted = False
self.Stops = set()
self.Level = 0
if self.Player.Access:
self.Level = self.Player.Access["level"]
self.AudioPlayer = self.Master.AudioPlayerFactory.NewPlayer(self.Type)
self.AudioPlayer.AddCallback("Play", self.OnPlay)
self.AudioPlayer.AddCallback("Stop", self.OnStop)
self.AudioPlayer.AddCallback("Update", self.OnUpdate)
def __del__(self):
self.Logger.info("~AudioClip()")
def Play(self, seconds = None, rubberband = None, dec_params = None, bitrate = None, backwards = None, *args):
return self.AudioPlayer.PlayURI(self.URI, position = seconds, rubberband = rubberband, dec_params = dec_params, bitrate = bitrate,
backwards = backwards, *args)
def Stop(self):
return self.AudioPlayer.Stop()
def OnPlay(self):
self.Logger.debug(sys._getframe().f_code.co_name + ' ' + self.URI)
self.Player.Storage["Audio"]["Uses"] += 1
self.Player.Storage["Audio"]["LastUse"] = self.Torchlight().Master.Loop.time()
self.Player.Storage["Audio"]["LastUseLength"] = 0.0
self.StartedAt = self.Player.Storage["Audio"]["LastUse"]
self.LastUpdateRealtime = time.time()
self.StopHinted = False
def OnStop(self):
self.Logger.debug(sys._getframe().f_code.co_name + ' ' + self.URI)
if self in self.Master.AudioClips:
self.Master.AudioClips.remove(self)
played = None
if self.AudioPlayer and self.AudioPlayer.StartedPlaying is not None:
elapsed = time.time() - self.AudioPlayer.StartedPlaying
if self.AudioPlayer.Seconds:
elapsed = min(elapsed, self.AudioPlayer.Seconds)
played = max(0.0, elapsed)
if played is not None:
last_length = self.Player.Storage["Audio"]["LastUseLength"]
if played > last_length:
delta = played - last_length
self.Player.Storage["Audio"]["TimeUsed"] += delta
self.Player.Storage["Audio"]["LastUseLength"] += delta
if str(self.Level) in self.Torchlight().Config["AudioLimits"]:
if self.Player.Storage:
if self.Player.Storage["Audio"]["TimeUsed"] >= self.Torchlight().Config["AudioLimits"][str(self.Level)]["TotalTime"]:
self.Torchlight().SayPrivate(self.Player, "You have used up all of your free time! ({0} seconds)".format(
self.Torchlight().Config["AudioLimits"][str(self.Level)]["TotalTime"]))
elif self.Player.Storage["Audio"]["LastUseLength"] >= self.Torchlight().Config["AudioLimits"][str(self.Level)]["MaxLength"]:
self.Torchlight().SayPrivate(self.Player, "Your audio clip exceeded the maximum length! ({0} seconds)".format(
self.Torchlight().Config["AudioLimits"][str(self.Level)]["MaxLength"]))
del self.AudioPlayer
def OnUpdate(self, old_position, new_position):
now = time.time()
elapsed = None
if self.LastUpdateRealtime is not None:
elapsed = now - self.LastUpdateRealtime
self.LastUpdateRealtime = now
Delta = _progress_delta(old_position, new_position, elapsed = elapsed)
self.LastPosition = new_position
if Delta <= 0.0:
return
self.Player.Storage["Audio"]["TimeUsed"] += Delta
self.Player.Storage["Audio"]["LastUseLength"] += Delta
if not str(self.Level) in self.Torchlight().Config["AudioLimits"]:
return
if (self.Player.Storage["Audio"]["TimeUsed"] >= self.Torchlight().Config["AudioLimits"][str(self.Level)]["TotalTime"] or
self.Player.Storage["Audio"]["LastUseLength"] >= self.Torchlight().Config["AudioLimits"][str(self.Level)]["MaxLength"]):
self.Stop()