#!/usr/bin/python3 # -*- coding: utf-8 -*- import logging import sys import math import time from .FFmpegAudioPlayer import FFmpegAudioPlayerFactory def _progress_delta(old_position, new_position, elapsed = None, jitter = 0.25): try: delta = abs(float(new_position) - float(old_position)) except (TypeError, ValueError): return 0.0 if not math.isfinite(delta): return 0.0 if elapsed is not None: try: elapsed = float(elapsed) except (TypeError, ValueError): return 0.0 if not math.isfinite(elapsed) or elapsed < 0.0: return 0.0 delta = min(delta, elapsed + jitter) return delta class AudioPlayerFactory(): AUDIOPLAYER_FFMPEG = 1 def __init__(self, master): self.Logger = logging.getLogger(__class__.__name__) self.Master = master self.Torchlight = self.Master.Torchlight self.FFmpegAudioPlayerFactory = FFmpegAudioPlayerFactory(self) def __del__(self): self.Logger.info("~AudioPlayerFactory()") def NewPlayer(self, _type): if _type == self.AUDIOPLAYER_FFMPEG: return self.FFmpegAudioPlayerFactory.NewPlayer() raise ValueError("Unsupported audio player type: {0}".format(_type)) class AntiSpam(): GLOBAL_PLAY_TIME_IMMUNITY_LEVEL = 10 def __init__(self, master): self.Logger = logging.getLogger(__class__.__name__) self.Master = master self.Torchlight = self.Master.Torchlight self.LastClips = dict() self.DisabledTime = None def _counts_toward_global_play_time(self, level): return level < self.GLOBAL_PLAY_TIME_IMMUNITY_LEVEL def CheckAntiSpam(self, player): if self.DisabledTime and self.DisabledTime > self.Torchlight().Master.Loop.time() and \ not (player.Access and player.Access["level"] >= self.Torchlight().Config["AntiSpam"]["ImmunityLevel"]): self.Torchlight().SayPrivate(player, "Torchlight is currently on cooldown! ({0} seconds left)".format( math.ceil(self.DisabledTime - self.Torchlight().Master.Loop.time()))) return False return True def SpamCheck(self): Now = self.Torchlight().Master.Loop.time() Duration = 0.0 config = self.Torchlight().Config["AntiSpam"] for Key, Clip in list(self.LastClips.items()): if not Clip["timestamp"]: continue if Clip["timestamp"] + Clip["duration"] + config["MaxUsageSpan"] < Now: if not Clip["active"]: del self.LastClips[Key] continue Duration += Clip["duration"] if Duration > config["MaxUsageTime"]: self.DisabledTime = self.Torchlight().Master.Loop.time() + config["PunishDelay"] self.Torchlight().SayChat("Blocked voice commands for the next {0} seconds. Used {1} seconds within {2} seconds.".format( config["PunishDelay"], config["MaxUsageTime"], config["MaxUsageSpan"])) # Make a copy of the list since AudioClip.Stop() will change the list for AudioClip in self.Master.AudioClips[:]: if AudioClip.Level < config["ImmunityLevel"]: AudioClip.Stop() self.LastClips.clear() def OnPlay(self, clip): if not self._counts_toward_global_play_time(clip.Level): return Now = self.Torchlight().Master.Loop.time() self.LastClips[clip] = dict({"timestamp": Now, "duration": 0.0, "dominant": False, "active": True, "last_update_realtime": time.time()}) HasDominant = False for Key, Clip in self.LastClips.items(): if Clip["dominant"]: HasDominant = True break self.LastClips[clip]["dominant"] = not HasDominant def OnStop(self, clip): if not self._counts_toward_global_play_time(clip.Level): return if clip not in self.LastClips: return self.LastClips[clip]["active"] = False if self.LastClips[clip]["dominant"]: for Key, Clip in self.LastClips.items(): if Clip["active"]: Clip["dominant"] = True break self.LastClips[clip]["dominant"] = False def OnUpdate(self, clip, old_position, new_position): if not self._counts_toward_global_play_time(clip.Level): return if clip not in self.LastClips: if self.Logger.isEnabledFor(logging.DEBUG): self.Logger.debug( "OnUpdate called for unknown clip key %r; %d known keys", clip, len(self.LastClips), ) return Clip = self.LastClips[clip] now = time.time() elapsed = None if Clip.get("last_update_realtime") is not None: elapsed = now - Clip["last_update_realtime"] Clip["last_update_realtime"] = now Delta = _progress_delta(old_position, new_position, elapsed = elapsed) if Delta <= 0.0 or not Clip["dominant"]: return Clip["duration"] += Delta self.SpamCheck() class Advertiser(): def __init__(self, master): self.Logger = logging.getLogger(__class__.__name__) self.Master = master self.Torchlight = self.Master.Torchlight self.LastClips = dict() def Think(self, clip): Now = self.Torchlight().Master.Loop.time() config = self.Torchlight().Config["Advertiser"] for Key, Clip in list(self.LastClips.items()): if not Clip["timestamp"]: continue if Clip["timestamp"] + Clip["duration"] + config["MaxSpan"] < Now: if not Clip["active"]: del self.LastClips[Key] continue Clip = self.LastClips.get(clip) if not Clip: return if not clip.StopHinted and not Clip.get("hinted") and Clip["duration"] >= config["AdStop"]: self.Torchlight().SayChat("Hint: Type \x07FF0000!stop(ze) !pls(mg)\x01 to stop all currently playing sounds.") Clip["hinted"] = True clip.StopHinted = True def OnPlay(self, clip): Now = self.Torchlight().Master.Loop.time() self.LastClips[clip] = dict({"timestamp": Now, "duration": 0.0, "dominant": False, "active": True, "hinted": False, "last_update_realtime": time.time()}) HasDominant = False for Key, Clip in self.LastClips.items(): if Clip["dominant"]: HasDominant = True break self.LastClips[clip]["dominant"] = not HasDominant def OnStop(self, clip): if clip not in self.LastClips: return self.LastClips[clip]["active"] = False if self.LastClips[clip]["dominant"]: for Key, Clip in self.LastClips.items(): if Clip["active"]: Clip["dominant"] = True break self.LastClips[clip]["dominant"] = False def OnUpdate(self, clip, old_position, new_position): if clip not in self.LastClips: return Clip = self.LastClips[clip] now = time.time() elapsed = None if Clip.get("last_update_realtime") is not None: elapsed = now - Clip["last_update_realtime"] Clip["last_update_realtime"] = now Delta = _progress_delta(old_position, new_position, elapsed = elapsed) if Delta <= 0.0 or not Clip["dominant"]: return Clip["duration"] += Delta self.Think(clip) class AudioManager(): def __init__(self, torchlight): self.Logger = logging.getLogger(__class__.__name__) self.Torchlight = torchlight self.AntiSpam = AntiSpam(self) self.Advertiser = Advertiser(self) self.AudioPlayerFactory = AudioPlayerFactory(self) self.AudioClips = [] def __del__(self): self.Logger.info("~AudioManager()") def CheckLimits(self, player): Level = 0 if player.Access: Level = player.Access["level"] config = self.Torchlight().Config["AudioLimits"] if str(Level) in config: level_config = config[str(Level)] if level_config["Uses"] >= 0 and \ player.Storage["Audio"]["Uses"] >= level_config["Uses"]: self.Torchlight().SayPrivate(player, "You have used up all of your free uses! ({0} uses)".format( level_config["Uses"])) return False if player.Storage["Audio"]["TimeUsed"] >= level_config["TotalTime"]: self.Torchlight().SayPrivate(player, "You have used up all of your free time! ({0} seconds)".format( level_config["TotalTime"])) return False TimeElapsed = self.Torchlight().Master.Loop.time() - player.Storage["Audio"]["LastUse"] UseDelay = player.Storage["Audio"]["LastUseLength"] * level_config["DelayFactor"] if TimeElapsed < UseDelay: self.Torchlight().SayPrivate(player, "You are currently on cooldown! ({0} seconds left)".format( round(UseDelay - TimeElapsed))) return False return True def Stop(self, player, extra): ExtraFilter = extra.strip().lower() if isinstance(extra, str) else "" Result = {"matched": 0, "stopped": 0, "pending": 0} for AudioClip in self.AudioClips[:]: if ExtraFilter and ExtraFilter not in AudioClip.Player.Name.lower(): continue Result["matched"] += 1 AudioClip.Stop() Result["stopped"] += 1 self.Torchlight().SayPrivate(AudioClip.Player, "Your audio clip was stopped.") if player != AudioClip.Player: self.Torchlight().SayPrivate(player, "Stopped \"{0}\"({1}) audio clip.".format(AudioClip.Player.Name, AudioClip.Player.UserID)) return Result def CreateAudioClip(self, player, uri, _type = AudioPlayerFactory.AUDIOPLAYER_FFMPEG): Level = 0 if player.Access: Level = player.Access["level"] if self.Torchlight().Disabled and self.Torchlight().Disabled > Level: self.Torchlight().SayPrivate(player, "Torchlight is currently disabled!") return None if not self.AntiSpam.CheckAntiSpam(player): return None if not self.CheckLimits(player): return None try: Clip = AudioClip(self, player, uri, _type) except ValueError as ex: self.Logger.warning("Rejected unsupported audio player type %r: %s", _type, ex) self.Torchlight().SayPrivate(player, "Unsupported audio player type.") return None self.AudioClips.append(Clip) anti_spam_track_ceiling = min( self.Torchlight().Config["AntiSpam"]["ImmunityLevel"], self.AntiSpam.GLOBAL_PLAY_TIME_IMMUNITY_LEVEL, ) if not player.Access or player.Access["level"] < anti_spam_track_ceiling: Clip.AudioPlayer.AddCallback("Play", lambda *args: self.AntiSpam.OnPlay(Clip, *args)) Clip.AudioPlayer.AddCallback("Stop", lambda *args: self.AntiSpam.OnStop(Clip, *args)) Clip.AudioPlayer.AddCallback("Update", lambda *args: self.AntiSpam.OnUpdate(Clip, *args)) Clip.AudioPlayer.AddCallback("Play", lambda *args: self.Advertiser.OnPlay(Clip, *args)) Clip.AudioPlayer.AddCallback("Stop", lambda *args: self.Advertiser.OnStop(Clip, *args)) Clip.AudioPlayer.AddCallback("Update", lambda *args: self.Advertiser.OnUpdate(Clip, *args)) return Clip def AudioClip(self, player, uri, _type = AudioPlayerFactory.AUDIOPLAYER_FFMPEG): return self.CreateAudioClip(player, uri, _type) def OnDisconnect(self, player): for AudioClip in self.AudioClips[:]: if AudioClip.Player == player: AudioClip.Stop() class AudioClip(): def __init__(self, master, player, uri, _type): self.Logger = logging.getLogger(__class__.__name__) self.Master = master self.Torchlight = self.Master.Torchlight self.Player = player self.Type = _type self.URI = uri self.LastPosition = None self.LastUpdateRealtime = None self.StartedAt = None self.StopHinted = False self.Stops = set() self.Level = 0 if self.Player.Access: self.Level = self.Player.Access["level"] self.AudioPlayer = self.Master.AudioPlayerFactory.NewPlayer(self.Type) self.AudioPlayer.AddCallback("Play", self.OnPlay) self.AudioPlayer.AddCallback("Stop", self.OnStop) self.AudioPlayer.AddCallback("Update", self.OnUpdate) def __del__(self): self.Logger.info("~AudioClip()") def Play(self, seconds = None, rubberband = None, dec_params = None, bitrate = None, backwards = None, *args): return self.AudioPlayer.PlayURI(self.URI, position = seconds, rubberband = rubberband, dec_params = dec_params, bitrate = bitrate, backwards = backwards, *args) def Stop(self): return self.AudioPlayer.Stop() def OnPlay(self): self.Logger.debug(sys._getframe().f_code.co_name + ' ' + self.URI) self.Player.Storage["Audio"]["Uses"] += 1 self.Player.Storage["Audio"]["LastUse"] = self.Torchlight().Master.Loop.time() self.Player.Storage["Audio"]["LastUseLength"] = 0.0 self.StartedAt = self.Player.Storage["Audio"]["LastUse"] self.LastUpdateRealtime = time.time() self.StopHinted = False def OnStop(self): self.Logger.debug(sys._getframe().f_code.co_name + ' ' + self.URI) if self in self.Master.AudioClips: self.Master.AudioClips.remove(self) played = None if self.AudioPlayer and self.AudioPlayer.StartedPlaying is not None: elapsed = time.time() - self.AudioPlayer.StartedPlaying if self.AudioPlayer.Seconds: elapsed = min(elapsed, self.AudioPlayer.Seconds) played = max(0.0, elapsed) if played is not None: last_length = self.Player.Storage["Audio"]["LastUseLength"] if played > last_length: delta = played - last_length self.Player.Storage["Audio"]["TimeUsed"] += delta self.Player.Storage["Audio"]["LastUseLength"] += delta if str(self.Level) in self.Torchlight().Config["AudioLimits"]: if self.Player.Storage: if self.Player.Storage["Audio"]["TimeUsed"] >= self.Torchlight().Config["AudioLimits"][str(self.Level)]["TotalTime"]: self.Torchlight().SayPrivate(self.Player, "You have used up all of your free time! ({0} seconds)".format( self.Torchlight().Config["AudioLimits"][str(self.Level)]["TotalTime"])) elif self.Player.Storage["Audio"]["LastUseLength"] >= self.Torchlight().Config["AudioLimits"][str(self.Level)]["MaxLength"]: self.Torchlight().SayPrivate(self.Player, "Your audio clip exceeded the maximum length! ({0} seconds)".format( self.Torchlight().Config["AudioLimits"][str(self.Level)]["MaxLength"])) del self.AudioPlayer def OnUpdate(self, old_position, new_position): now = time.time() elapsed = None if self.LastUpdateRealtime is not None: elapsed = now - self.LastUpdateRealtime self.LastUpdateRealtime = now Delta = _progress_delta(old_position, new_position, elapsed = elapsed) self.LastPosition = new_position if Delta <= 0.0: return self.Player.Storage["Audio"]["TimeUsed"] += Delta self.Player.Storage["Audio"]["LastUseLength"] += Delta if not str(self.Level) in self.Torchlight().Config["AudioLimits"]: return if (self.Player.Storage["Audio"]["TimeUsed"] >= self.Torchlight().Config["AudioLimits"][str(self.Level)]["TotalTime"] or self.Player.Storage["Audio"]["LastUseLength"] >= self.Torchlight().Config["AudioLimits"][str(self.Level)]["MaxLength"]): self.Stop()