#!/usr/local/bin/python3.5 import argparse import sys import os import math import time import threading import queue import bz2 import shutil import pyinotify import ftplib import traceback from io import BytesIO from urllib.parse import urlparse import string import random global args global parsed global commonprefix global commonprefix_ftp global jobs def random_string(length): return ''.join(random.choice(string.ascii_letters) for m in range(length)) USER = "" PASSWORD = "" # Terminal color codes c_null = "\x1b[00;00m" c_red = "\x1b[31;01m" c_orange= "\x1b[33;01m" c_green = "\x1b[32;01m" c_white = "\x1b[37;01m" # Valid file extensions to compress valid_extensions = tuple([ 'bsp', 'nav', # maps 'mdl', 'vtx', 'vvd', 'phy', # models 'vtf', 'vmt', 'png', # textures 'wav', 'mp3', # sounds 'pcf', # particles 'ttf', 'otf' # fonts ]) ignore_names = ["ar_baggage.bsp", "ar_monastery.bsp", "ar_shoots.bsp", "cs_assault.bsp", "cs_compound.bsp", "cs_havana.bsp", "cs_italy.bsp", "cs_militia.bsp", "cs_office.bsp", "de_aztec.bsp", "de_bank.bsp", "de_cache.bsp", "de_cbble.bsp", "de_chateau.bsp", "de_dust.bsp", "de_dust2.bsp", "de_inferno.bsp", "de_lake.bsp", "de_mirage.bsp", "de_nuke.bsp", "de_overpass.bsp", "de_piranesi.bsp", "de_port.bsp", "de_prodigy.bsp", "de_safehouse.bsp", "de_shortdust.bsp", "de_shorttrain.bsp", "de_stmarc.bsp", "de_sugarcane.bsp", "de_tides.bsp", "de_train.bsp", "de_vertigo.bsp", "training1.bsp", "test_hardware.bsp", "test_speakers.bsp", "ar_baggage.nav", "ar_monastery.nav", "ar_shoots.nav", "cs_assault.nav", "cs_compound.nav", "cs_havana.nav", "cs_italy.nav", "cs_militia.nav", "cs_office.nav", "de_aztec.nav", "de_bank.nav", "de_cache.nav", "de_cbble.nav", "de_chateau.nav", "de_dust.nav", "de_dust2.nav", "de_inferno.nav", "de_lake.nav", "de_mirage.nav", "de_nuke.nav", "de_overpass.nav", "de_piranesi.nav", "de_port.nav", "de_prodigy.nav", "de_safehouse.nav", "de_shortdust.nav", "de_shorttrain.nav", "de_stmarc.nav", "de_sugarcane.nav", "de_tides.nav", "de_train.nav", "de_vertigo.nav", "training1.nav", "test_hardware.nav", "test_speakers.nav"] ignore_folders = ["workshop"] # inotify mask NOTIFY_MASK = pyinotify.IN_CLOSE_WRITE | pyinotify.IN_DELETE | pyinotify.IN_MOVED_TO | pyinotify.IN_MOVED_FROM def static_var(varname, value): def decorate(func): setattr(func, varname, value) return func return decorate def PrettyPrint(filename, status): if status == "Exists": color = c_white elif status == "Added": color = c_orange elif status == "Done" or status == "Moved": color = c_green else: color = c_red columns = int(os.popen("stty size", 'r').read().split()[1]) rows = math.ceil((len(filename) + len(status))/columns) text = filename + '.'*(columns*rows - (len(filename) + len(status))) + color + status + c_null text += chr(8)*(len(text) + 1) print(text + '\n'*rows) # This is called a lot during startup. @static_var("cache_path", 0) @static_var("cache_resp", 0) @static_var("cache_ftp", 0) def FTP_FileExists(ftp, path): Exists = False try: # Cache should only be valid for one ftp connection if FTP_FileExists.cache_ftp != ftp: FTP_FileExists.cache_ftp = ftp FTP_FileExists.cache_path = 0 FTP_FileExists.cache_resp = 0 if FTP_FileExists.cache_path != os.path.dirname(path): FTP_FileExists.cache_path = os.path.dirname(path) FTP_FileExists.cache_resp = [] ftp.dir(os.path.dirname(path), FTP_FileExists.cache_resp.append) for line in FTP_FileExists.cache_resp: if line[0] == '-': line = line.split(maxsplit=8)[8] if line == os.path.basename(path): Exists = True break except ftplib.all_errors: return Exists return Exists def FTP_DirExists(ftp, path): Exists = False try: resp = [] ftp.dir(os.path.abspath(os.path.join(path, "..")), resp.append) for line in resp: if line[0] == 'd': line = line.split(maxsplit=8)[8] if line == os.path.basename(path): Exists = True break except ftplib.all_errors: return Exists return Exists def Compress(ftp, item): sourcefile, destfile = item # Remove destination file if already exists if FTP_FileExists(ftp, destfile): ftp.delete(destfile) # Check whether directory tree exists at destination, create it if necessary directory = os.path.dirname(destfile) if not FTP_DirExists(ftp, directory): create_tree = os.path.relpath(directory, commonprefix_ftp).split('/') create_tree.reverse() # First one will always be /cstrike or whatever... create_dir = os.path.abspath(os.path.join(commonprefix_ftp, create_tree.pop())) while create_tree: create_dir = os.path.abspath(os.path.join(create_dir, create_tree.pop())) try: ftp.mkd(create_dir) except ftplib.error_perm as e: # ignore "directory already exists" if not e.args[0].startswith('550'): raise folder = "/tmp/fastDL_sync_" + random_string(10) os.mkdir(folder) tempfile = os.path.join(folder, os.path.basename(destfile)) with open(sourcefile, "rb") as infile: with bz2.BZ2File(tempfile, "wb", compresslevel=9) as outfile: shutil.copyfileobj(infile, outfile, 64*1024) with open(tempfile, "rb") as temp: ftp.storbinary("STOR {0}".format(destfile), temp) os.remove(tempfile) PrettyPrint(os.path.relpath(sourcefile, commonprefix), "Done") os.rmdir(folder) def Delete(ftp, item): item = item[0] try: ftp.delete(item) PrettyPrint(os.path.relpath(item, commonprefix_ftp), "Deleted") except ftplib.error_perm: pass def Move(ftp, item): sourcepath, destpath = item # Check whether directory tree exists at destination, create it if necessary directory = os.path.dirname(destpath) if not FTP_DirExists(ftp, directory): ftp.mkd(directory) ftp.rename(sourcepath, destpath) PrettyPrint("{0} -> {1}".format(os.path.relpath(sourcepath, commonprefix_ftp), os.path.relpath(destpath, commonprefix_ftp)), "Moved") def Worker(): while True: job = jobs.get() try: if args.dry_run: print("Job: {0}({1})".format(job[0].__name__, job[1])) else: ftp = ftplib.FTP(parsed.netloc) #example for NA fastdl running with vsftpd on custom port #ftp = ftplib.FTP(parsed.netloc, 2121) ftp.login(USER, PASSWORD) job[0](ftp, job[1:]) ftp.quit() except Exception as e: print("worker error {0}".format(e)) print(traceback.format_exc()) finally: jobs.task_done() class EventHandler(pyinotify.ProcessEvent): def my_init(self, source, destination): self.SourceDirectory = os.path.abspath(source) self.DestinationDirectory = os.path.abspath(destination) def process_IN_CLOSE_WRITE(self, event): if not event.pathname.endswith(valid_extensions) or os.path.basename(event.pathname) in ignore_names or any(folder in event.pathname for folder in ignore_folders): return destpath = os.path.join(self.DestinationDirectory, os.path.relpath(event.pathname, os.path.join(self.SourceDirectory, ".."))) jobs.put((Compress, event.pathname, destpath + ".bz2")) def process_IN_DELETE(self, event): destpath = os.path.join(self.DestinationDirectory, os.path.relpath(event.pathname, os.path.join(self.SourceDirectory, ".."))) if event.dir: if os.path.exists(destpath): jobs.put((Delete, destpath)) else: if not event.pathname.endswith(valid_extensions) or os.path.basename(event.pathname) in ignore_names or any(folder in event.pathname for folder in ignore_folders): return jobs.put((Delete, destpath + ".bz2")) def process_IN_MOVED_TO(self, event): # Moved from untracked directory, handle as new file if not hasattr(event, "src_pathname"): if not event.pathname.endswith(valid_extensions) or os.path.basename(event.pathname) in ignore_names or any(folder in event.pathname for folder in ignore_folders): return destpath = os.path.join(self.DestinationDirectory, os.path.relpath(event.pathname, os.path.join(self.SourceDirectory, ".."))) jobs.put((Compress, event.pathname, destpath + ".bz2")) return # Moved inside tracked directory, handle as rename sourcepath = os.path.join(self.DestinationDirectory, os.path.relpath(event.src_pathname, os.path.join(self.SourceDirectory, ".."))) destpath = os.path.join(self.DestinationDirectory, os.path.relpath(event.pathname, os.path.join(self.SourceDirectory, ".."))) if event.dir: jobs.put((Move, sourcepath, destpath)) else: if event.src_pathname.endswith(valid_extensions) or os.path.basename(event.pathname) in ignore_names or any(folder in event.pathname for folder in ignore_folders): return if not event.src_pathname.endswith(valid_extensions) and event.pathname.endswith(valid_extensions): # Renamed invalid_ext file to valid one -> compress jobs.put((Compress, event.pathname, destpath + ".bz2")) return elif event.src_pathname.endswith(valid_extensions) and not event.pathname.endswith(valid_extensions): # Renamed valid_ext file to invalid one -> delete from destination jobs.put((Delete, sourcepath + ".bz2")) return jobs.put((Move, sourcepath + ".bz2", destpath + ".bz2")) class DirectoryHandler: def __init__(self, source, destination, watchmanager=None): self.SourceDirectory = os.path.abspath(source) self.DestinationDirectory = destination if watchmanager: self.WatchManager = watchmanager self.NotifyHandler = EventHandler(source=self.SourceDirectory, destination=self.DestinationDirectory) self.NotifyNotifier = pyinotify.Notifier(self.WatchManager, self.NotifyHandler, timeout=1000) self.NotifyWatch = self.WatchManager.add_watch(self.SourceDirectory, NOTIFY_MASK, rec=True, auto_add=True) def __enter__(self): return self def __exit__(self, type, value, traceback): self.WatchManager.rm_watch(self.NotifyWatch, rec=True) def Loop(self): self.NotifyNotifier.process_events() while self.NotifyNotifier.check_events(): self.NotifyNotifier.read_events() self.NotifyNotifier.process_events() def Do(self, ftp): # Normal mode for dirpath, dirnames, filenames in os.walk(self.SourceDirectory): if not any(folder in dirpath for folder in ignore_folders): filenames.sort() for filename in [f for f in filenames if f.endswith(valid_extensions) and f not in ignore_names]: self.Checkfile(ftp, dirpath, filename) def Checkfile(self, ftp, dirpath, filename): sourcefile = os.path.join(dirpath, filename) destfile = os.path.join(self.DestinationDirectory, os.path.relpath(dirpath, os.path.join(self.SourceDirectory, "..")), filename + ".bz2") if FTP_FileExists(ftp, destfile): PrettyPrint(os.path.relpath(sourcefile, commonprefix), "Exists") else: PrettyPrint(os.path.relpath(sourcefile, commonprefix), "Added") jobs.put((Compress, sourcefile, destfile)) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Automate FastDL BZip2 process") parser.add_argument("-t", "--threads", type=int, default=1, help="Worker thread count") parser.add_argument("--dry-run", action="store_true", help="Test mode (don't run any jobs, just print them)") parser.add_argument("source", nargs='+', help="Source Path") parser.add_argument("destination", help="Destination Path") args = parser.parse_args() parsed = urlparse(args.destination) if not parsed.scheme == "ftp": print("Destination is not an ftp address!") sys.exit(1) #NA example running vsftpd on custom port #ftp = ftplib.FTP() #ftp.connect(parsed.netloc, 2121) #ftp.login(USER, PASSWORD) ftp = ftplib.FTP(parsed.netloc) ftp.login(USER, PASSWORD) # make common prefix for better logging commonprefix = os.path.abspath(os.path.join(os.path.dirname(os.path.commonprefix(args.source)), "..")) commonprefix_ftp = os.path.dirname(parsed.path) jobs = queue.Queue() # Create initial jobs WatchManager = pyinotify.WatchManager() DirectoryHandlers = [] for source in args.source: handler = DirectoryHandler(source, parsed.path, WatchManager) DirectoryHandlers.append(handler) handler.Do(ftp) ftp.quit() # Start worker threads for i in range(args.threads): worker_thread = threading.Thread(target=Worker) worker_thread.daemon = True worker_thread.start() # inotify loop try: while True: for handler in DirectoryHandlers: handler.Loop() except KeyboardInterrupt: print("Waiting for remaining jobs to complete...") jobs.join() print("Exiting!")