352 lines
12 KiB
Python
352 lines
12 KiB
Python
|
#!/usr/bin/python3
|
||
|
# -*- coding: utf-8 -*-
|
||
|
import logging
|
||
|
import sys
|
||
|
import io
|
||
|
import math
|
||
|
from .FFmpegAudioPlayer import FFmpegAudioPlayerFactory
|
||
|
|
||
|
class AudioPlayerFactory():
|
||
|
AUDIOPLAYER_FFMPEG = 1
|
||
|
|
||
|
def __init__(self, master):
|
||
|
self.Logger = logging.getLogger(__class__.__name__)
|
||
|
self.Master = master
|
||
|
self.Torchlight = self.Master.Torchlight
|
||
|
|
||
|
self.FFmpegAudioPlayerFactory = FFmpegAudioPlayerFactory(self)
|
||
|
|
||
|
def __del__(self):
|
||
|
self.Logger.info("~AudioPlayerFactory()")
|
||
|
|
||
|
def NewPlayer(self, _type):
|
||
|
if _type == self.AUDIOPLAYER_FFMPEG:
|
||
|
return self.FFmpegAudioPlayerFactory.NewPlayer()
|
||
|
|
||
|
|
||
|
class AntiSpam():
|
||
|
def __init__(self, master):
|
||
|
self.Logger = logging.getLogger(__class__.__name__)
|
||
|
self.Master = master
|
||
|
self.Torchlight = self.Master.Torchlight
|
||
|
|
||
|
self.LastClips = dict()
|
||
|
self.DisabledTime = None
|
||
|
self.SaidHint = False
|
||
|
|
||
|
def CheckAntiSpam(self, player):
|
||
|
if self.DisabledTime and self.DisabledTime > self.Torchlight().Master.Loop.time() and \
|
||
|
not (player.Access and player.Access["level"] >= self.Torchlight().Config["AntiSpam"]["ImmunityLevel"]):
|
||
|
|
||
|
self.Torchlight().SayPrivate(player, "Torchlight is currently on cooldown! ({0} seconds left)".format(
|
||
|
math.ceil(self.DisabledTime - self.Torchlight().Master.Loop.time())))
|
||
|
return False
|
||
|
|
||
|
return True
|
||
|
|
||
|
def SpamCheck(self, Delta):
|
||
|
Now = self.Torchlight().Master.Loop.time()
|
||
|
Duration = 0.0
|
||
|
|
||
|
for Key, Clip in list(self.LastClips.items()):
|
||
|
if not Clip["timestamp"]:
|
||
|
continue
|
||
|
|
||
|
if Clip["timestamp"] + Clip["duration"] + self.Torchlight().Config["AntiSpam"]["MaxUsageSpan"] < Now:
|
||
|
if not Clip["active"]:
|
||
|
del self.LastClips[Key]
|
||
|
continue
|
||
|
|
||
|
Duration += Clip["duration"]
|
||
|
|
||
|
if Duration > self.Torchlight().Config["AntiSpam"]["MaxUsageTime"]:
|
||
|
self.DisabledTime = self.Torchlight().Master.Loop.time() + self.Torchlight().Config["AntiSpam"]["PunishDelay"]
|
||
|
self.Torchlight().SayChat("Blocked voice commands for the next {0} seconds. Used {1} seconds within {2} seconds.".format(
|
||
|
self.Torchlight().Config["AntiSpam"]["PunishDelay"], self.Torchlight().Config["AntiSpam"]["MaxUsageTime"], self.Torchlight().Config["AntiSpam"]["MaxUsageSpan"]))
|
||
|
|
||
|
# Make a copy of the list since AudioClip.Stop() will change the list
|
||
|
for AudioClip in self.Master.AudioClips[:]:
|
||
|
if AudioClip.Level < self.Torchlight().Config["AntiSpam"]["ImmunityLevel"]:
|
||
|
AudioClip.Stop()
|
||
|
|
||
|
self.LastClips.clear()
|
||
|
|
||
|
def OnPlay(self, clip):
|
||
|
Now = self.Torchlight().Master.Loop.time()
|
||
|
self.LastClips[hash(clip)] = dict({"timestamp": Now, "duration": 0.0, "dominant": False, "active": True})
|
||
|
|
||
|
HasDominant = False
|
||
|
for Key, Clip in self.LastClips.items():
|
||
|
if Clip["dominant"]:
|
||
|
HasDominant = True
|
||
|
break
|
||
|
|
||
|
self.LastClips[hash(clip)]["dominant"] = not HasDominant
|
||
|
|
||
|
def OnStop(self, clip):
|
||
|
if hash(clip) not in self.LastClips:
|
||
|
return
|
||
|
|
||
|
self.LastClips[hash(clip)]["active"] = False
|
||
|
|
||
|
if self.LastClips[hash(clip)]["dominant"]:
|
||
|
for Key, Clip in self.LastClips.items():
|
||
|
if Clip["active"]:
|
||
|
Clip["dominant"] = True
|
||
|
break
|
||
|
|
||
|
self.LastClips[hash(clip)]["dominant"] = False
|
||
|
|
||
|
def OnUpdate(self, clip, old_position, new_position):
|
||
|
Delta = new_position - old_position
|
||
|
Clip = self.LastClips[hash(clip)]
|
||
|
|
||
|
if not Clip["dominant"]:
|
||
|
return
|
||
|
|
||
|
Clip["duration"] += Delta
|
||
|
self.SpamCheck(Delta)
|
||
|
|
||
|
|
||
|
class Advertiser():
|
||
|
def __init__(self, master):
|
||
|
self.Logger = logging.getLogger(__class__.__name__)
|
||
|
self.Master = master
|
||
|
self.Torchlight = self.Master.Torchlight
|
||
|
|
||
|
self.LastClips = dict()
|
||
|
self.AdStop = 0
|
||
|
self.NextAdStop = 0
|
||
|
|
||
|
def Think(self, Delta):
|
||
|
Now = self.Torchlight().Master.Loop.time()
|
||
|
Duration = 0.0
|
||
|
|
||
|
for Key, Clip in list(self.LastClips.items()):
|
||
|
if not Clip["timestamp"]:
|
||
|
continue
|
||
|
|
||
|
if Clip["timestamp"] + Clip["duration"] + self.Torchlight().Config["Advertiser"]["MaxSpan"] < Now:
|
||
|
if not Clip["active"]:
|
||
|
del self.LastClips[Key]
|
||
|
continue
|
||
|
|
||
|
Duration += Clip["duration"]
|
||
|
|
||
|
self.NextAdStop -= Delta
|
||
|
CeilDur = math.ceil(Duration)
|
||
|
if CeilDur > self.AdStop and self.NextAdStop <= 0 and CeilDur % self.Torchlight().Config["Advertiser"]["AdStop"] == 0:
|
||
|
self.Torchlight().SayChat("Hint: Type \x07FF0000!stop(ze) !pls(mg)\x01 to stop all currently playing sounds.")
|
||
|
self.AdStop = CeilDur
|
||
|
self.NextAdStop = 0
|
||
|
elif CeilDur < self.AdStop:
|
||
|
self.AdStop = 0
|
||
|
self.NextAdStop = self.Torchlight().Config["Advertiser"]["AdStop"] / 2
|
||
|
|
||
|
def OnPlay(self, clip):
|
||
|
Now = self.Torchlight().Master.Loop.time()
|
||
|
self.LastClips[hash(clip)] = dict({"timestamp": Now, "duration": 0.0, "dominant": False, "active": True})
|
||
|
|
||
|
HasDominant = False
|
||
|
for Key, Clip in self.LastClips.items():
|
||
|
if Clip["dominant"]:
|
||
|
HasDominant = True
|
||
|
break
|
||
|
|
||
|
self.LastClips[hash(clip)]["dominant"] = not HasDominant
|
||
|
|
||
|
def OnStop(self, clip):
|
||
|
if hash(clip) not in self.LastClips:
|
||
|
return
|
||
|
|
||
|
self.LastClips[hash(clip)]["active"] = False
|
||
|
|
||
|
if self.LastClips[hash(clip)]["dominant"]:
|
||
|
for Key, Clip in self.LastClips.items():
|
||
|
if Clip["active"]:
|
||
|
Clip["dominant"] = True
|
||
|
break
|
||
|
|
||
|
self.LastClips[hash(clip)]["dominant"] = False
|
||
|
|
||
|
def OnUpdate(self, clip, old_position, new_position):
|
||
|
Delta = new_position - old_position
|
||
|
Clip = self.LastClips[hash(clip)]
|
||
|
|
||
|
if not Clip["dominant"]:
|
||
|
return
|
||
|
|
||
|
Clip["duration"] += Delta
|
||
|
self.Think(Delta)
|
||
|
|
||
|
|
||
|
class AudioManager():
|
||
|
def __init__(self, torchlight):
|
||
|
self.Logger = logging.getLogger(__class__.__name__)
|
||
|
self.Torchlight = torchlight
|
||
|
self.AntiSpam = AntiSpam(self)
|
||
|
self.Advertiser = Advertiser(self)
|
||
|
self.AudioPlayerFactory = AudioPlayerFactory(self)
|
||
|
self.AudioClips = []
|
||
|
|
||
|
def __del__(self):
|
||
|
self.Logger.info("~AudioManager()")
|
||
|
|
||
|
def CheckLimits(self, player):
|
||
|
Level = 0
|
||
|
if player.Access:
|
||
|
Level = player.Access["level"]
|
||
|
|
||
|
if str(Level) in self.Torchlight().Config["AudioLimits"]:
|
||
|
if self.Torchlight().Config["AudioLimits"][str(Level)]["Uses"] >= 0 and \
|
||
|
player.Storage["Audio"]["Uses"] >= self.Torchlight().Config["AudioLimits"][str(Level)]["Uses"]:
|
||
|
|
||
|
self.Torchlight().SayPrivate(player, "You have used up all of your free uses! ({0} uses)".format(
|
||
|
self.Torchlight().Config["AudioLimits"][str(Level)]["Uses"]))
|
||
|
return False
|
||
|
|
||
|
if player.Storage["Audio"]["TimeUsed"] >= self.Torchlight().Config["AudioLimits"][str(Level)]["TotalTime"]:
|
||
|
self.Torchlight().SayPrivate(player, "You have used up all of your free time! ({0} seconds)".format(
|
||
|
self.Torchlight().Config["AudioLimits"][str(Level)]["TotalTime"]))
|
||
|
return False
|
||
|
|
||
|
TimeElapsed = self.Torchlight().Master.Loop.time() - player.Storage["Audio"]["LastUse"]
|
||
|
UseDelay = player.Storage["Audio"]["LastUseLength"] * self.Torchlight().Config["AudioLimits"][str(Level)]["DelayFactor"]
|
||
|
|
||
|
if TimeElapsed < UseDelay:
|
||
|
self.Torchlight().SayPrivate(player, "You are currently on cooldown! ({0} seconds left)".format(
|
||
|
round(UseDelay - TimeElapsed)))
|
||
|
return False
|
||
|
|
||
|
return True
|
||
|
|
||
|
def Stop(self, player, extra):
|
||
|
Level = 0
|
||
|
if player.Access:
|
||
|
Level = player.Access["level"]
|
||
|
|
||
|
for AudioClip in self.AudioClips[:]:
|
||
|
if extra and not extra.lower() in AudioClip.Player.Name.lower():
|
||
|
continue
|
||
|
|
||
|
if not Level or (Level < AudioClip.Level and Level < self.Torchlight().Config["AntiSpam"]["StopLevel"]):
|
||
|
AudioClip.Stops.add(player.UserID)
|
||
|
|
||
|
if len(AudioClip.Stops) >= 3:
|
||
|
AudioClip.Stop()
|
||
|
self.Torchlight().SayPrivate(AudioClip.Player, "Your audio clip was stopped.")
|
||
|
if player != AudioClip.Player:
|
||
|
self.Torchlight().SayPrivate(player, "Stopped \"{0}\"({1}) audio clip.".format(AudioClip.Player.Name, AudioClip.Player.UserID))
|
||
|
else:
|
||
|
self.Torchlight().SayPrivate(player, "This audio clip needs {0} more !stop's.".format(3 - len(AudioClip.Stops)))
|
||
|
else:
|
||
|
AudioClip.Stop()
|
||
|
self.Torchlight().SayPrivate(AudioClip.Player, "Your audio clip was stopped.")
|
||
|
if player != AudioClip.Player:
|
||
|
self.Torchlight().SayPrivate(player, "Stopped \"{0}\"({1}) audio clip.".format(AudioClip.Player.Name, AudioClip.Player.UserID))
|
||
|
|
||
|
def AudioClip(self, player, uri, _type = AudioPlayerFactory.AUDIOPLAYER_FFMPEG):
|
||
|
Level = 0
|
||
|
if player.Access:
|
||
|
Level = player.Access["level"]
|
||
|
|
||
|
if self.Torchlight().Disabled and self.Torchlight().Disabled > Level:
|
||
|
self.Torchlight().SayPrivate(player, "Torchlight is currently disabled!")
|
||
|
return None
|
||
|
|
||
|
if not self.AntiSpam.CheckAntiSpam(player):
|
||
|
return None
|
||
|
|
||
|
if not self.CheckLimits(player):
|
||
|
return None
|
||
|
|
||
|
Clip = AudioClip(self, player, uri, _type)
|
||
|
self.AudioClips.append(Clip)
|
||
|
|
||
|
if not player.Access or player.Access["level"] < self.Torchlight().Config["AntiSpam"]["ImmunityLevel"]:
|
||
|
Clip.AudioPlayer.AddCallback("Play", lambda *args: self.AntiSpam.OnPlay(Clip, *args))
|
||
|
Clip.AudioPlayer.AddCallback("Stop", lambda *args: self.AntiSpam.OnStop(Clip, *args))
|
||
|
Clip.AudioPlayer.AddCallback("Update", lambda *args: self.AntiSpam.OnUpdate(Clip, *args))
|
||
|
|
||
|
Clip.AudioPlayer.AddCallback("Play", lambda *args: self.Advertiser.OnPlay(Clip, *args))
|
||
|
Clip.AudioPlayer.AddCallback("Stop", lambda *args: self.Advertiser.OnStop(Clip, *args))
|
||
|
Clip.AudioPlayer.AddCallback("Update", lambda *args: self.Advertiser.OnUpdate(Clip, *args))
|
||
|
|
||
|
return Clip
|
||
|
|
||
|
def OnDisconnect(self, player):
|
||
|
for AudioClip in self.AudioClips[:]:
|
||
|
if AudioClip.Player == player:
|
||
|
AudioClip.Stop()
|
||
|
|
||
|
|
||
|
class AudioClip():
|
||
|
def __init__(self, master, player, uri, _type):
|
||
|
self.Logger = logging.getLogger(__class__.__name__)
|
||
|
self.Master = master
|
||
|
self.Torchlight = self.Master.Torchlight
|
||
|
self.Player = player
|
||
|
self.Type = _type
|
||
|
self.URI = uri
|
||
|
self.LastPosition = None
|
||
|
self.Stops = set()
|
||
|
|
||
|
self.Level = 0
|
||
|
if self.Player.Access:
|
||
|
self.Level = self.Player.Access["level"]
|
||
|
|
||
|
self.AudioPlayer = self.Master.AudioPlayerFactory.NewPlayer(self.Type)
|
||
|
self.AudioPlayer.AddCallback("Play", self.OnPlay)
|
||
|
self.AudioPlayer.AddCallback("Stop", self.OnStop)
|
||
|
self.AudioPlayer.AddCallback("Update", self.OnUpdate)
|
||
|
|
||
|
def __del__(self):
|
||
|
self.Logger.info("~AudioClip()")
|
||
|
|
||
|
def Play(self, seconds = None, *args):
|
||
|
return self.AudioPlayer.PlayURI(self.URI, seconds, *args)
|
||
|
|
||
|
def Stop(self):
|
||
|
return self.AudioPlayer.Stop()
|
||
|
|
||
|
def OnPlay(self):
|
||
|
self.Logger.debug(sys._getframe().f_code.co_name + ' ' + self.URI)
|
||
|
|
||
|
self.Player.Storage["Audio"]["Uses"] += 1
|
||
|
self.Player.Storage["Audio"]["LastUse"] = self.Torchlight().Master.Loop.time()
|
||
|
self.Player.Storage["Audio"]["LastUseLength"] = 0.0
|
||
|
|
||
|
def OnStop(self):
|
||
|
self.Logger.debug(sys._getframe().f_code.co_name + ' ' + self.URI)
|
||
|
self.Master.AudioClips.remove(self)
|
||
|
|
||
|
if self.AudioPlayer.Playing:
|
||
|
Delta = self.AudioPlayer.Position - self.LastPosition
|
||
|
self.Player.Storage["Audio"]["TimeUsed"] += Delta
|
||
|
self.Player.Storage["Audio"]["LastUseLength"] += Delta
|
||
|
|
||
|
if str(self.Level) in self.Torchlight().Config["AudioLimits"]:
|
||
|
if self.Player.Storage:
|
||
|
if self.Player.Storage["Audio"]["TimeUsed"] >= self.Torchlight().Config["AudioLimits"][str(self.Level)]["TotalTime"]:
|
||
|
self.Torchlight().SayPrivate(self.Player, "You have used up all of your free time! ({0} seconds)".format(
|
||
|
self.Torchlight().Config["AudioLimits"][str(self.Level)]["TotalTime"]))
|
||
|
elif self.Player.Storage["Audio"]["LastUseLength"] >= self.Torchlight().Config["AudioLimits"][str(self.Level)]["MaxLength"]:
|
||
|
self.Torchlight().SayPrivate(self.Player, "Your audio clip exceeded the maximum length! ({0} seconds)".format(
|
||
|
self.Torchlight().Config["AudioLimits"][str(self.Level)]["MaxLength"]))
|
||
|
|
||
|
del self.AudioPlayer
|
||
|
|
||
|
def OnUpdate(self, old_position, new_position):
|
||
|
Delta = new_position - old_position
|
||
|
self.LastPosition = new_position
|
||
|
|
||
|
self.Player.Storage["Audio"]["TimeUsed"] += Delta
|
||
|
self.Player.Storage["Audio"]["LastUseLength"] += Delta
|
||
|
|
||
|
if not str(self.Level) in self.Torchlight().Config["AudioLimits"]:
|
||
|
return
|
||
|
|
||
|
if (self.Player.Storage["Audio"]["TimeUsed"] >= self.Torchlight().Config["AudioLimits"][str(self.Level)]["TotalTime"] or
|
||
|
self.Player.Storage["Audio"]["LastUseLength"] >= self.Torchlight().Config["AudioLimits"][str(self.Level)]["MaxLength"]):
|
||
|
self.Stop()
|