353 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			353 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
#!/usr/bin/python3
 | 
						|
# -*- coding: utf-8 -*-
 | 
						|
import logging
 | 
						|
import sys
 | 
						|
import io
 | 
						|
import math
 | 
						|
from .FFmpegAudioPlayer import FFmpegAudioPlayerFactory
 | 
						|
 | 
						|
class AudioPlayerFactory():
 | 
						|
	AUDIOPLAYER_FFMPEG = 1
 | 
						|
 | 
						|
	def __init__(self, master):
 | 
						|
		self.Logger = logging.getLogger(__class__.__name__)
 | 
						|
		self.Master = master
 | 
						|
		self.Torchlight = self.Master.Torchlight
 | 
						|
 | 
						|
		self.FFmpegAudioPlayerFactory = FFmpegAudioPlayerFactory(self)
 | 
						|
 | 
						|
	def __del__(self):
 | 
						|
		self.Logger.info("~AudioPlayerFactory()")
 | 
						|
 | 
						|
	def NewPlayer(self, _type):
 | 
						|
		if _type == self.AUDIOPLAYER_FFMPEG:
 | 
						|
			return self.FFmpegAudioPlayerFactory.NewPlayer()
 | 
						|
 | 
						|
 | 
						|
class AntiSpam():
 | 
						|
	def __init__(self, master):
 | 
						|
		self.Logger = logging.getLogger(__class__.__name__)
 | 
						|
		self.Master = master
 | 
						|
		self.Torchlight = self.Master.Torchlight
 | 
						|
 | 
						|
		self.LastClips = dict()
 | 
						|
		self.DisabledTime = None
 | 
						|
		self.SaidHint = False
 | 
						|
 | 
						|
	def CheckAntiSpam(self, player):
 | 
						|
		if self.DisabledTime and self.DisabledTime > self.Torchlight().Master.Loop.time() and \
 | 
						|
			not (player.Access and player.Access["level"] >= self.Torchlight().Config["AntiSpam"]["ImmunityLevel"]):
 | 
						|
 | 
						|
			self.Torchlight().SayPrivate(player, "Torchlight is currently on cooldown! ({0} seconds left)".format(
 | 
						|
				math.ceil(self.DisabledTime - self.Torchlight().Master.Loop.time())))
 | 
						|
			return False
 | 
						|
 | 
						|
		return True
 | 
						|
 | 
						|
	def SpamCheck(self, Delta):
 | 
						|
		Now = self.Torchlight().Master.Loop.time()
 | 
						|
		Duration = 0.0
 | 
						|
 | 
						|
		for Key, Clip in list(self.LastClips.items()):
 | 
						|
			if not Clip["timestamp"]:
 | 
						|
				continue
 | 
						|
 | 
						|
			if Clip["timestamp"] + Clip["duration"] + self.Torchlight().Config["AntiSpam"]["MaxUsageSpan"] < Now:
 | 
						|
				if not Clip["active"]:
 | 
						|
					del self.LastClips[Key]
 | 
						|
				continue
 | 
						|
 | 
						|
			Duration += Clip["duration"]
 | 
						|
 | 
						|
		if Duration > self.Torchlight().Config["AntiSpam"]["MaxUsageTime"]:
 | 
						|
			self.DisabledTime = self.Torchlight().Master.Loop.time() + self.Torchlight().Config["AntiSpam"]["PunishDelay"]
 | 
						|
			self.Torchlight().SayChat("Blocked voice commands for the next {0} seconds. Used {1} seconds within {2} seconds.".format(
 | 
						|
				self.Torchlight().Config["AntiSpam"]["PunishDelay"], self.Torchlight().Config["AntiSpam"]["MaxUsageTime"], self.Torchlight().Config["AntiSpam"]["MaxUsageSpan"]))
 | 
						|
 | 
						|
			# Make a copy of the list since AudioClip.Stop() will change the list
 | 
						|
			for AudioClip in self.Master.AudioClips[:]:
 | 
						|
				if AudioClip.Level < self.Torchlight().Config["AntiSpam"]["ImmunityLevel"]:
 | 
						|
					AudioClip.Stop()
 | 
						|
 | 
						|
			self.LastClips.clear()
 | 
						|
 | 
						|
	def OnPlay(self, clip):
 | 
						|
		Now = self.Torchlight().Master.Loop.time()
 | 
						|
		self.LastClips[hash(clip)] = dict({"timestamp": Now, "duration": 0.0, "dominant": False, "active": True})
 | 
						|
 | 
						|
		HasDominant = False
 | 
						|
		for Key, Clip in self.LastClips.items():
 | 
						|
			if Clip["dominant"]:
 | 
						|
				HasDominant = True
 | 
						|
				break
 | 
						|
 | 
						|
		self.LastClips[hash(clip)]["dominant"] = not HasDominant
 | 
						|
 | 
						|
	def OnStop(self, clip):
 | 
						|
		if hash(clip) not in self.LastClips:
 | 
						|
			return
 | 
						|
 | 
						|
		self.LastClips[hash(clip)]["active"] = False
 | 
						|
 | 
						|
		if self.LastClips[hash(clip)]["dominant"]:
 | 
						|
			for Key, Clip in self.LastClips.items():
 | 
						|
				if Clip["active"]:
 | 
						|
					Clip["dominant"] = True
 | 
						|
					break
 | 
						|
 | 
						|
		self.LastClips[hash(clip)]["dominant"] = False
 | 
						|
 | 
						|
	def OnUpdate(self, clip, old_position, new_position):
 | 
						|
		Delta = new_position - old_position
 | 
						|
		Clip = self.LastClips[hash(clip)]
 | 
						|
 | 
						|
		if not Clip["dominant"]:
 | 
						|
			return
 | 
						|
 | 
						|
		Clip["duration"] += Delta
 | 
						|
		self.SpamCheck(Delta)
 | 
						|
 | 
						|
 | 
						|
class Advertiser():
 | 
						|
	def __init__(self, master):
 | 
						|
		self.Logger = logging.getLogger(__class__.__name__)
 | 
						|
		self.Master = master
 | 
						|
		self.Torchlight = self.Master.Torchlight
 | 
						|
 | 
						|
		self.LastClips = dict()
 | 
						|
		self.AdStop = 0
 | 
						|
		self.NextAdStop = 0
 | 
						|
 | 
						|
	def Think(self, Delta):
 | 
						|
		Now = self.Torchlight().Master.Loop.time()
 | 
						|
		Duration = 0.0
 | 
						|
 | 
						|
		for Key, Clip in list(self.LastClips.items()):
 | 
						|
			if not Clip["timestamp"]:
 | 
						|
				continue
 | 
						|
 | 
						|
			if Clip["timestamp"] + Clip["duration"] + self.Torchlight().Config["Advertiser"]["MaxSpan"] < Now:
 | 
						|
				if not Clip["active"]:
 | 
						|
					del self.LastClips[Key]
 | 
						|
				continue
 | 
						|
 | 
						|
			Duration += Clip["duration"]
 | 
						|
 | 
						|
		self.NextAdStop -= Delta
 | 
						|
		CeilDur = math.ceil(Duration)
 | 
						|
		if CeilDur > self.AdStop and self.NextAdStop <= 0 and CeilDur % self.Torchlight().Config["Advertiser"]["AdStop"] == 0:
 | 
						|
			self.Torchlight().SayChat("Hint: Type \x07FF0000!stop(ze) !pls(mg)\x01 to stop all currently playing sounds.")
 | 
						|
			self.AdStop = CeilDur
 | 
						|
			self.NextAdStop = 0
 | 
						|
		elif CeilDur < self.AdStop:
 | 
						|
			self.AdStop = 0
 | 
						|
			self.NextAdStop = self.Torchlight().Config["Advertiser"]["AdStop"] / 2
 | 
						|
 | 
						|
	def OnPlay(self, clip):
 | 
						|
		Now = self.Torchlight().Master.Loop.time()
 | 
						|
		self.LastClips[hash(clip)] = dict({"timestamp": Now, "duration": 0.0, "dominant": False, "active": True})
 | 
						|
 | 
						|
		HasDominant = False
 | 
						|
		for Key, Clip in self.LastClips.items():
 | 
						|
			if Clip["dominant"]:
 | 
						|
				HasDominant = True
 | 
						|
				break
 | 
						|
 | 
						|
		self.LastClips[hash(clip)]["dominant"] = not HasDominant
 | 
						|
 | 
						|
	def OnStop(self, clip):
 | 
						|
		if hash(clip) not in self.LastClips:
 | 
						|
			return
 | 
						|
 | 
						|
		self.LastClips[hash(clip)]["active"] = False
 | 
						|
 | 
						|
		if self.LastClips[hash(clip)]["dominant"]:
 | 
						|
			for Key, Clip in self.LastClips.items():
 | 
						|
				if Clip["active"]:
 | 
						|
					Clip["dominant"] = True
 | 
						|
					break
 | 
						|
 | 
						|
		self.LastClips[hash(clip)]["dominant"] = False
 | 
						|
 | 
						|
	def OnUpdate(self, clip, old_position, new_position):
 | 
						|
		Delta = new_position - old_position
 | 
						|
		Clip = self.LastClips[hash(clip)]
 | 
						|
 | 
						|
		if not Clip["dominant"]:
 | 
						|
			return
 | 
						|
 | 
						|
		Clip["duration"] += Delta
 | 
						|
		self.Think(Delta)
 | 
						|
 | 
						|
 | 
						|
class AudioManager():
 | 
						|
	def __init__(self, torchlight):
 | 
						|
		self.Logger = logging.getLogger(__class__.__name__)
 | 
						|
		self.Torchlight = torchlight
 | 
						|
		self.AntiSpam = AntiSpam(self)
 | 
						|
		self.Advertiser = Advertiser(self)
 | 
						|
		self.AudioPlayerFactory = AudioPlayerFactory(self)
 | 
						|
		self.AudioClips = []
 | 
						|
 | 
						|
	def __del__(self):
 | 
						|
		self.Logger.info("~AudioManager()")
 | 
						|
 | 
						|
	def CheckLimits(self, player):
 | 
						|
		Level = 0
 | 
						|
		if player.Access:
 | 
						|
			Level = player.Access["level"]
 | 
						|
 | 
						|
		if str(Level) in self.Torchlight().Config["AudioLimits"]:
 | 
						|
			if self.Torchlight().Config["AudioLimits"][str(Level)]["Uses"] >= 0 and \
 | 
						|
				player.Storage["Audio"]["Uses"] >= self.Torchlight().Config["AudioLimits"][str(Level)]["Uses"]:
 | 
						|
 | 
						|
				self.Torchlight().SayPrivate(player, "You have used up all of your free uses! ({0} uses)".format(
 | 
						|
					self.Torchlight().Config["AudioLimits"][str(Level)]["Uses"]))
 | 
						|
				return False
 | 
						|
 | 
						|
			if player.Storage["Audio"]["TimeUsed"] >= self.Torchlight().Config["AudioLimits"][str(Level)]["TotalTime"]:
 | 
						|
				self.Torchlight().SayPrivate(player, "You have used up all of your free time! ({0} seconds)".format(
 | 
						|
					self.Torchlight().Config["AudioLimits"][str(Level)]["TotalTime"]))
 | 
						|
				return False
 | 
						|
 | 
						|
			TimeElapsed = self.Torchlight().Master.Loop.time() - player.Storage["Audio"]["LastUse"]
 | 
						|
			UseDelay = player.Storage["Audio"]["LastUseLength"] * self.Torchlight().Config["AudioLimits"][str(Level)]["DelayFactor"]
 | 
						|
 | 
						|
			if TimeElapsed < UseDelay:
 | 
						|
				self.Torchlight().SayPrivate(player, "You are currently on cooldown! ({0} seconds left)".format(
 | 
						|
					round(UseDelay - TimeElapsed)))
 | 
						|
				return False
 | 
						|
 | 
						|
		return True
 | 
						|
 | 
						|
	def Stop(self, player, extra):
 | 
						|
		Level = 0
 | 
						|
		if player.Access:
 | 
						|
			Level = player.Access["level"]
 | 
						|
 | 
						|
		for AudioClip in self.AudioClips[:]:
 | 
						|
			if extra and not extra.lower() in AudioClip.Player.Name.lower():
 | 
						|
					continue
 | 
						|
 | 
						|
			if not Level or (Level < AudioClip.Level and Level < self.Torchlight().Config["AntiSpam"]["StopLevel"]):
 | 
						|
				AudioClip.Stops.add(player.UserID)
 | 
						|
 | 
						|
				if len(AudioClip.Stops) >= 3:
 | 
						|
					AudioClip.Stop()
 | 
						|
					self.Torchlight().SayPrivate(AudioClip.Player, "Your audio clip was stopped.")
 | 
						|
					if player != AudioClip.Player:
 | 
						|
						self.Torchlight().SayPrivate(player, "Stopped \"{0}\"({1}) audio clip.".format(AudioClip.Player.Name, AudioClip.Player.UserID))
 | 
						|
				else:
 | 
						|
					self.Torchlight().SayPrivate(player, "This audio clip needs {0} more !stop's.".format(3 - len(AudioClip.Stops)))
 | 
						|
			else:
 | 
						|
				AudioClip.Stop()
 | 
						|
				self.Torchlight().SayPrivate(AudioClip.Player, "Your audio clip was stopped.")
 | 
						|
				if player != AudioClip.Player:
 | 
						|
					self.Torchlight().SayPrivate(player, "Stopped \"{0}\"({1}) audio clip.".format(AudioClip.Player.Name, AudioClip.Player.UserID))
 | 
						|
 | 
						|
	def AudioClip(self, player, uri, _type = AudioPlayerFactory.AUDIOPLAYER_FFMPEG):
 | 
						|
		Level = 0
 | 
						|
		if player.Access:
 | 
						|
			Level = player.Access["level"]
 | 
						|
 | 
						|
		if self.Torchlight().Disabled and self.Torchlight().Disabled > Level:
 | 
						|
			self.Torchlight().SayPrivate(player, "Torchlight is currently disabled!")
 | 
						|
			return None
 | 
						|
 | 
						|
		if not self.AntiSpam.CheckAntiSpam(player):
 | 
						|
			return None
 | 
						|
 | 
						|
		if not self.CheckLimits(player):
 | 
						|
			return None
 | 
						|
 | 
						|
		Clip = AudioClip(self, player, uri, _type)
 | 
						|
		self.AudioClips.append(Clip)
 | 
						|
 | 
						|
		if not player.Access or player.Access["level"] < self.Torchlight().Config["AntiSpam"]["ImmunityLevel"]:
 | 
						|
			Clip.AudioPlayer.AddCallback("Play", lambda *args: self.AntiSpam.OnPlay(Clip, *args))
 | 
						|
			Clip.AudioPlayer.AddCallback("Stop", lambda *args: self.AntiSpam.OnStop(Clip, *args))
 | 
						|
			Clip.AudioPlayer.AddCallback("Update", lambda *args: self.AntiSpam.OnUpdate(Clip, *args))
 | 
						|
 | 
						|
		Clip.AudioPlayer.AddCallback("Play", lambda *args: self.Advertiser.OnPlay(Clip, *args))
 | 
						|
		Clip.AudioPlayer.AddCallback("Stop", lambda *args: self.Advertiser.OnStop(Clip, *args))
 | 
						|
		Clip.AudioPlayer.AddCallback("Update", lambda *args: self.Advertiser.OnUpdate(Clip, *args))
 | 
						|
 | 
						|
		return Clip
 | 
						|
 | 
						|
	def OnDisconnect(self, player):
 | 
						|
		for AudioClip in self.AudioClips[:]:
 | 
						|
			if AudioClip.Player == player:
 | 
						|
				AudioClip.Stop()
 | 
						|
 | 
						|
 | 
						|
class AudioClip():
 | 
						|
	def __init__(self, master, player, uri, _type):
 | 
						|
		self.Logger = logging.getLogger(__class__.__name__)
 | 
						|
		self.Master = master
 | 
						|
		self.Torchlight = self.Master.Torchlight
 | 
						|
		self.Player = player
 | 
						|
		self.Type = _type
 | 
						|
		self.URI = uri
 | 
						|
		self.LastPosition = None
 | 
						|
		self.Stops = set()
 | 
						|
 | 
						|
		self.Level = 0
 | 
						|
		if self.Player.Access:
 | 
						|
			self.Level = self.Player.Access["level"]
 | 
						|
 | 
						|
		self.AudioPlayer = self.Master.AudioPlayerFactory.NewPlayer(self.Type)
 | 
						|
		self.AudioPlayer.AddCallback("Play", self.OnPlay)
 | 
						|
		self.AudioPlayer.AddCallback("Stop", self.OnStop)
 | 
						|
		self.AudioPlayer.AddCallback("Update", self.OnUpdate)
 | 
						|
 | 
						|
	def __del__(self):
 | 
						|
		self.Logger.info("~AudioClip()")
 | 
						|
 | 
						|
	def Play(self, seconds = None, rubberband = None, dec_params = None, bitrate = None, backwards = None, *args):
 | 
						|
		return self.AudioPlayer.PlayURI(self.URI, position = seconds, rubberband = rubberband, dec_params = dec_params, bitrate = bitrate, 
 | 
						|
            backwards = backwards, *args)
 | 
						|
 | 
						|
	def Stop(self):
 | 
						|
		return self.AudioPlayer.Stop()
 | 
						|
 | 
						|
	def OnPlay(self):
 | 
						|
		self.Logger.debug(sys._getframe().f_code.co_name + ' ' + self.URI)
 | 
						|
 | 
						|
		self.Player.Storage["Audio"]["Uses"] += 1
 | 
						|
		self.Player.Storage["Audio"]["LastUse"] = self.Torchlight().Master.Loop.time()
 | 
						|
		self.Player.Storage["Audio"]["LastUseLength"] = 0.0
 | 
						|
 | 
						|
	def OnStop(self):
 | 
						|
		self.Logger.debug(sys._getframe().f_code.co_name + ' ' + self.URI)
 | 
						|
		self.Master.AudioClips.remove(self)
 | 
						|
 | 
						|
		if self.AudioPlayer.Playing:
 | 
						|
			Delta = self.AudioPlayer.Position - self.LastPosition
 | 
						|
			self.Player.Storage["Audio"]["TimeUsed"] += Delta
 | 
						|
			self.Player.Storage["Audio"]["LastUseLength"] += Delta
 | 
						|
 | 
						|
		if str(self.Level) in self.Torchlight().Config["AudioLimits"]:
 | 
						|
			if self.Player.Storage:
 | 
						|
				if self.Player.Storage["Audio"]["TimeUsed"] >= self.Torchlight().Config["AudioLimits"][str(self.Level)]["TotalTime"]:
 | 
						|
					self.Torchlight().SayPrivate(self.Player, "You have used up all of your free time! ({0} seconds)".format(
 | 
						|
						self.Torchlight().Config["AudioLimits"][str(self.Level)]["TotalTime"]))
 | 
						|
				elif self.Player.Storage["Audio"]["LastUseLength"] >= self.Torchlight().Config["AudioLimits"][str(self.Level)]["MaxLength"]:
 | 
						|
					self.Torchlight().SayPrivate(self.Player, "Your audio clip exceeded the maximum length! ({0} seconds)".format(
 | 
						|
						self.Torchlight().Config["AudioLimits"][str(self.Level)]["MaxLength"]))
 | 
						|
 | 
						|
		del self.AudioPlayer
 | 
						|
 | 
						|
	def OnUpdate(self, old_position, new_position):
 | 
						|
		Delta = new_position - old_position
 | 
						|
		self.LastPosition = new_position
 | 
						|
 | 
						|
		self.Player.Storage["Audio"]["TimeUsed"] += Delta
 | 
						|
		self.Player.Storage["Audio"]["LastUseLength"] += Delta
 | 
						|
 | 
						|
		if not str(self.Level) in self.Torchlight().Config["AudioLimits"]:
 | 
						|
			return
 | 
						|
 | 
						|
		if (self.Player.Storage["Audio"]["TimeUsed"] >= self.Torchlight().Config["AudioLimits"][str(self.Level)]["TotalTime"] or
 | 
						|
			self.Player.Storage["Audio"]["LastUseLength"] >= self.Torchlight().Config["AudioLimits"][str(self.Level)]["MaxLength"]):
 | 
						|
			self.Stop()
 |