projects-jenz/fastdl_sync/fastdl_ftp.py

384 lines
12 KiB
Python
Executable File

#!/usr/local/bin/python3.5
import argparse
import sys
import os
import math
import time
import threading
import queue
import bz2
import shutil
import pyinotify
import ftplib
import traceback
from io import BytesIO
from urllib.parse import urlparse
import string
import random
global args
global parsed
global commonprefix
global commonprefix_ftp
global jobs
def random_string(length):
return ''.join(random.choice(string.ascii_letters) for m in range(length))
USER = ""
PASSWORD = ""
# Terminal color codes
c_null = "\x1b[00;00m"
c_red = "\x1b[31;01m"
c_orange= "\x1b[33;01m"
c_green = "\x1b[32;01m"
c_white = "\x1b[37;01m"
# Valid file extensions to compress
valid_extensions = tuple([
'bsp', 'nav', # maps
'mdl', 'vtx', 'vvd', 'phy', # models
'vtf', 'vmt', 'png', # textures
'wav', 'mp3', # sounds
'pcf', # particles
'ttf', 'otf' # fonts
])
ignore_names = ["ar_baggage.bsp", "ar_monastery.bsp", "ar_shoots.bsp",
"cs_assault.bsp", "cs_compound.bsp", "cs_havana.bsp",
"cs_italy.bsp", "cs_militia.bsp", "cs_office.bsp",
"de_aztec.bsp", "de_bank.bsp", "de_cache.bsp",
"de_cbble.bsp", "de_chateau.bsp", "de_dust.bsp",
"de_dust2.bsp", "de_inferno.bsp", "de_lake.bsp",
"de_mirage.bsp", "de_nuke.bsp", "de_overpass.bsp",
"de_piranesi.bsp", "de_port.bsp", "de_prodigy.bsp",
"de_safehouse.bsp", "de_shortdust.bsp", "de_shorttrain.bsp",
"de_stmarc.bsp", "de_sugarcane.bsp", "de_tides.bsp",
"de_train.bsp", "de_vertigo.bsp", "training1.bsp",
"test_hardware.bsp", "test_speakers.bsp",
"ar_baggage.nav", "ar_monastery.nav", "ar_shoots.nav",
"cs_assault.nav", "cs_compound.nav", "cs_havana.nav",
"cs_italy.nav", "cs_militia.nav", "cs_office.nav",
"de_aztec.nav", "de_bank.nav", "de_cache.nav",
"de_cbble.nav", "de_chateau.nav", "de_dust.nav",
"de_dust2.nav", "de_inferno.nav", "de_lake.nav",
"de_mirage.nav", "de_nuke.nav", "de_overpass.nav",
"de_piranesi.nav", "de_port.nav", "de_prodigy.nav",
"de_safehouse.nav", "de_shortdust.nav", "de_shorttrain.nav",
"de_stmarc.nav", "de_sugarcane.nav", "de_tides.nav",
"de_train.nav", "de_vertigo.nav", "training1.nav",
"test_hardware.nav", "test_speakers.nav"]
ignore_folders = ["workshop"]
# inotify mask
NOTIFY_MASK = pyinotify.IN_CLOSE_WRITE | pyinotify.IN_DELETE | pyinotify.IN_MOVED_TO | pyinotify.IN_MOVED_FROM
def static_var(varname, value):
def decorate(func):
setattr(func, varname, value)
return func
return decorate
def PrettyPrint(filename, status):
if status == "Exists":
color = c_white
elif status == "Added":
color = c_orange
elif status == "Done" or status == "Moved":
color = c_green
else:
color = c_red
columns = int(os.popen("stty size", 'r').read().split()[1])
rows = math.ceil((len(filename) + len(status))/columns)
text = filename + '.'*(columns*rows - (len(filename) + len(status))) + color + status + c_null
text += chr(8)*(len(text) + 1)
print(text + '\n'*rows)
# This is called a lot during startup.
@static_var("cache_path", 0)
@static_var("cache_resp", 0)
@static_var("cache_ftp", 0)
def FTP_FileExists(ftp, path):
Exists = False
try:
# Cache should only be valid for one ftp connection
if FTP_FileExists.cache_ftp != ftp:
FTP_FileExists.cache_ftp = ftp
FTP_FileExists.cache_path = 0
FTP_FileExists.cache_resp = 0
if FTP_FileExists.cache_path != os.path.dirname(path):
FTP_FileExists.cache_path = os.path.dirname(path)
FTP_FileExists.cache_resp = []
ftp.dir(os.path.dirname(path), FTP_FileExists.cache_resp.append)
for line in FTP_FileExists.cache_resp:
if line[0] == '-':
line = line.split(maxsplit=8)[8]
if line == os.path.basename(path):
Exists = True
break
except ftplib.all_errors:
return Exists
return Exists
def FTP_DirExists(ftp, path):
Exists = False
try:
resp = []
ftp.dir(os.path.abspath(os.path.join(path, "..")), resp.append)
for line in resp:
if line[0] == 'd':
line = line.split(maxsplit=8)[8]
if line == os.path.basename(path):
Exists = True
break
except ftplib.all_errors:
return Exists
return Exists
def Compress(ftp, item):
sourcefile, destfile = item
# Remove destination file if already exists
if FTP_FileExists(ftp, destfile):
ftp.delete(destfile)
# Check whether directory tree exists at destination, create it if necessary
directory = os.path.dirname(destfile)
if not FTP_DirExists(ftp, directory):
create_tree = os.path.relpath(directory, commonprefix_ftp).split('/')
create_tree.reverse()
# First one will always be /cstrike or whatever...
create_dir = os.path.abspath(os.path.join(commonprefix_ftp, create_tree.pop()))
while create_tree:
create_dir = os.path.abspath(os.path.join(create_dir, create_tree.pop()))
try:
ftp.mkd(create_dir)
except ftplib.error_perm as e:
# ignore "directory already exists"
if not e.args[0].startswith('550'):
raise
folder = "/tmp/fastDL_sync_" + random_string(10)
os.mkdir(folder)
tempfile = os.path.join(folder, os.path.basename(destfile))
with open(sourcefile, "rb") as infile:
with bz2.BZ2File(tempfile, "wb", compresslevel=9) as outfile:
shutil.copyfileobj(infile, outfile, 64*1024)
with open(tempfile, "rb") as temp:
ftp.storbinary("STOR {0}".format(destfile), temp)
os.remove(tempfile)
PrettyPrint(os.path.relpath(sourcefile, commonprefix), "Done")
os.rmdir(folder)
def Delete(ftp, item):
item = item[0]
try:
ftp.delete(item)
PrettyPrint(os.path.relpath(item, commonprefix_ftp), "Deleted")
except ftplib.error_perm:
pass
def Move(ftp, item):
sourcepath, destpath = item
# Check whether directory tree exists at destination, create it if necessary
directory = os.path.dirname(destpath)
if not FTP_DirExists(ftp, directory):
ftp.mkd(directory)
ftp.rename(sourcepath, destpath)
PrettyPrint("{0} -> {1}".format(os.path.relpath(sourcepath, commonprefix_ftp), os.path.relpath(destpath, commonprefix_ftp)), "Moved")
def Worker():
while True:
job = jobs.get()
try:
if args.dry_run:
print("Job: {0}({1})".format(job[0].__name__, job[1]))
else:
ftp = ftplib.FTP(parsed.netloc)
#example for NA fastdl running with vsftpd on custom port
#ftp = ftplib.FTP(parsed.netloc, 2121)
ftp.login(USER, PASSWORD)
job[0](ftp, job[1:])
ftp.quit()
except Exception as e:
print("worker error {0}".format(e))
print(traceback.format_exc())
finally:
jobs.task_done()
class EventHandler(pyinotify.ProcessEvent):
def my_init(self, source, destination):
self.SourceDirectory = os.path.abspath(source)
self.DestinationDirectory = os.path.abspath(destination)
def process_IN_CLOSE_WRITE(self, event):
if not event.pathname.endswith(valid_extensions) or os.path.basename(event.pathname) in ignore_names or any(folder in event.pathname for folder in ignore_folders):
return
destpath = os.path.join(self.DestinationDirectory, os.path.relpath(event.pathname, os.path.join(self.SourceDirectory, "..")))
jobs.put((Compress, event.pathname, destpath + ".bz2"))
def process_IN_DELETE(self, event):
destpath = os.path.join(self.DestinationDirectory, os.path.relpath(event.pathname, os.path.join(self.SourceDirectory, "..")))
if event.dir:
if os.path.exists(destpath):
jobs.put((Delete, destpath))
else:
if not event.pathname.endswith(valid_extensions) or os.path.basename(event.pathname) in ignore_names or any(folder in event.pathname for folder in ignore_folders):
return
jobs.put((Delete, destpath + ".bz2"))
def process_IN_MOVED_TO(self, event):
# Moved from untracked directory, handle as new file
if not hasattr(event, "src_pathname"):
if not event.pathname.endswith(valid_extensions) or os.path.basename(event.pathname) in ignore_names or any(folder in event.pathname for folder in ignore_folders):
return
destpath = os.path.join(self.DestinationDirectory, os.path.relpath(event.pathname, os.path.join(self.SourceDirectory, "..")))
jobs.put((Compress, event.pathname, destpath + ".bz2"))
return
# Moved inside tracked directory, handle as rename
sourcepath = os.path.join(self.DestinationDirectory, os.path.relpath(event.src_pathname, os.path.join(self.SourceDirectory, "..")))
destpath = os.path.join(self.DestinationDirectory, os.path.relpath(event.pathname, os.path.join(self.SourceDirectory, "..")))
if event.dir:
jobs.put((Move, sourcepath, destpath))
else:
if event.src_pathname.endswith(valid_extensions) or os.path.basename(event.pathname) in ignore_names or any(folder in event.pathname for folder in ignore_folders):
return
if not event.src_pathname.endswith(valid_extensions) and event.pathname.endswith(valid_extensions):
# Renamed invalid_ext file to valid one -> compress
jobs.put((Compress, event.pathname, destpath + ".bz2"))
return
elif event.src_pathname.endswith(valid_extensions) and not event.pathname.endswith(valid_extensions):
# Renamed valid_ext file to invalid one -> delete from destination
jobs.put((Delete, sourcepath + ".bz2"))
return
jobs.put((Move, sourcepath + ".bz2", destpath + ".bz2"))
class DirectoryHandler:
def __init__(self, source, destination, watchmanager=None):
self.SourceDirectory = os.path.abspath(source)
self.DestinationDirectory = destination
if watchmanager:
self.WatchManager = watchmanager
self.NotifyHandler = EventHandler(source=self.SourceDirectory, destination=self.DestinationDirectory)
self.NotifyNotifier = pyinotify.Notifier(self.WatchManager, self.NotifyHandler, timeout=1000)
self.NotifyWatch = self.WatchManager.add_watch(self.SourceDirectory, NOTIFY_MASK, rec=True, auto_add=True)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.WatchManager.rm_watch(self.NotifyWatch, rec=True)
def Loop(self):
self.NotifyNotifier.process_events()
while self.NotifyNotifier.check_events():
self.NotifyNotifier.read_events()
self.NotifyNotifier.process_events()
def Do(self, ftp): # Normal mode
for dirpath, dirnames, filenames in os.walk(self.SourceDirectory):
if not any(folder in dirpath for folder in ignore_folders):
filenames.sort()
for filename in [f for f in filenames if f.endswith(valid_extensions) and f not in ignore_names]:
self.Checkfile(ftp, dirpath, filename)
def Checkfile(self, ftp, dirpath, filename):
sourcefile = os.path.join(dirpath, filename)
destfile = os.path.join(self.DestinationDirectory, os.path.relpath(dirpath, os.path.join(self.SourceDirectory, "..")), filename + ".bz2")
if FTP_FileExists(ftp, destfile):
PrettyPrint(os.path.relpath(sourcefile, commonprefix), "Exists")
else:
PrettyPrint(os.path.relpath(sourcefile, commonprefix), "Added")
jobs.put((Compress, sourcefile, destfile))
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Automate FastDL BZip2 process")
parser.add_argument("-t", "--threads", type=int, default=1, help="Worker thread count")
parser.add_argument("--dry-run", action="store_true", help="Test mode (don't run any jobs, just print them)")
parser.add_argument("source", nargs='+', help="Source Path")
parser.add_argument("destination", help="Destination Path")
args = parser.parse_args()
parsed = urlparse(args.destination)
if not parsed.scheme == "ftp":
print("Destination is not an ftp address!")
sys.exit(1)
#NA example running vsftpd on custom port
#ftp = ftplib.FTP()
#ftp.connect(parsed.netloc, 2121)
#ftp.login(USER, PASSWORD)
ftp = ftplib.FTP(parsed.netloc)
ftp.login(USER, PASSWORD)
# make common prefix for better logging
commonprefix = os.path.abspath(os.path.join(os.path.dirname(os.path.commonprefix(args.source)), ".."))
commonprefix_ftp = os.path.dirname(parsed.path)
jobs = queue.Queue()
# Create initial jobs
WatchManager = pyinotify.WatchManager()
DirectoryHandlers = []
for source in args.source:
handler = DirectoryHandler(source, parsed.path, WatchManager)
DirectoryHandlers.append(handler)
handler.Do(ftp)
ftp.quit()
# Start worker threads
for i in range(args.threads):
worker_thread = threading.Thread(target=Worker)
worker_thread.daemon = True
worker_thread.start()
# inotify loop
try:
while True:
for handler in DirectoryHandlers:
handler.Loop()
except KeyboardInterrupt:
print("Waiting for remaining jobs to complete...")
jobs.join()
print("Exiting!")