"
# Do the replacements in order from first to last.
line = old_line
for match_replace_pair in self.user_settings.SUBSTRING_REPLACEMENT_LIST:
# Search for the pattern.
[pat, match] = pattern_match(match_replace_pair[0], line)
# print( f"\nline = |{line}|\n")
# print( f"\nmatch_replace_pair[0] = |{match_replace_pair[0]}|\n")
# print( f"\nmatch_replace_pair[1] = |{match_replace_pair[1]}|\n")
# print( f"\npat = {pat} match = {match}\n" )
# Replace with the new pattern. Since we use raw strings, we need
# to strip off leading and trailing whitespace.
if match:
new_substring = match_replace_pair[1].strip().lstrip()
sub = pat.sub(new_substring, line)
# print(f"\ntransform old line = \n{line:s}\ninto new line =\n{sub:s}\n\
# using new substring =\n{new_substring:s}\n")
line = sub
# print( f" (after current substitution, line = |{line:s}| ) " )
# At this point, we have done complete list of serial substitutions online.
rewritten_line = line
self.assertEqual(
new_line,
rewritten_line,
f"\n new_line = |{new_line:s}|\nrewritten_line = |{rewritten_line:s}|\n")
# Test file time and date.
def test_file_time_and_date(self):
file_name = "/Electronics/Images/PowerSupply1Schematic.psd"
full_file_name = self.user_settings.master_root_dir + file_name
file_epoch_time = os.path.getmtime(full_file_name)
file_time_utc = time.gmtime(file_epoch_time)[0: 6]
d = datetime.datetime(
file_time_utc[0],
file_time_utc[1],
file_time_utc[2],
file_time_utc[3],
file_time_utc[4],
file_time_utc[5])
computed = f"file {file_name:s} datetime {d.ctime():s}"
actual = "file /Electronics/Images/PowerSupply1Schematic.psd datetime Tue Jan 3 05:16:49 2023"
self.assertEqual(computed, actual)
# Test pattern matching directories we want to skip over.
def test_pattern_match_dir_to_skip(self):
dir_skip = "Primpoly-cswhfrwgwdikgzfdpiorbeaiennz"
pat = re.compile(self.user_settings.DIR_TO_SKIP)
if pat.search(dir_skip):
self.assertTrue(True)
else:
self.assertTrue(False)
# ----------------------------------------------------------------------------
# Main function
# ----------------------------------------------------------------------------
def main(raw_args=None):
"""Main program. Clean up and update my website."""
# Print the obligatory legal notice.
print("""
updateweb Version 6.3 - A Python utility program which maintains my web site.
Copyright (C) 2007-2024 by Sean Erik O'Connor. All Rights Reserved.
It deletes temporary files, rewrites old copyright lines and email address
lines in source files, then synchronizes all changes to my web sites.
updateweb comes with ABSOLUTELY NO WARRANTY; for details see the
GNU General Public License. This is free software, and you are welcome
to redistribute it under certain conditions; see the GNU General Public
License for details.
""")
# ---------------------------------------------------------------------
# Load default settings and start logging.
# ---------------------------------------------------------------------
# Default user settings.
user_settings = UserSettings()
print(
f"Running main( {raw_args} ) Python version {sys.version_info[0]:d}.\
{sys.version_info[1]:d}.{sys.version_info[2]:d} local web directory\
{user_settings.master_root_dir}\n")
# Get command line options such as --verbose. Pass them back as flags in
# user_settings.
CommandLineSettings(user_settings, raw_args)
# Load all unit test functions named test_* from UnitTest class, run the
# tests and exit.
if user_settings.UNITTEST:
suite = unittest.TestLoader().loadTestsFromTestCase(UnitTest)
unittest.TextTestRunner(verbosity=2).run(suite)
sys.exit()
# Start logging to file. Verbose turns on logging for
# DEBUG, INFO, WARNING, ERROR, and CRITICAL levels,
# otherwise we log only WARNING, ERROR, and CRITICAL levels.
if user_settings.VERBOSE:
loglevel = logging.DEBUG
else:
loglevel = logging.WARNING
# Pick the log file name on the host.
if user_settings.CLEANONLY:
user_settings.LOGFILENAME = "/private/logMaster.txt"
else:
user_settings.LOGFILENAME = "/private/logRemote.txt"
logging.basicConfig(
level=loglevel,
format='%(asctime)s %(levelname)-8s %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S',
filename=user_settings.master_root_dir + user_settings.LOGFILENAME,
filemode='w')
logging.debug("*** Begin logging ******************************")
# ---------------------------------------------------------------------
# Scan the master website, finding out all files and directories.
# ---------------------------------------------------------------------
try:
logging.debug("Scanning master (local on disk) web site")
master = MasterWebSite(user_settings)
print(f"Local web site directory = {user_settings.master_root_dir}")
# Suppress newline and flush output buffer, so we can see the message
# right away.
print("Scanning and cleaning local web site...", end='', flush=True)
master.scan()
# Clean up the directory by rewriting source code and hypertext and
# removing temporary files.
logging.debug("Cleaning up master (local on disk) web site")
changed = master.clean()
# Rescan if any changes happened.
if changed:
logging.debug("Detected changes due to to cleanup.")
master.finish()
logging.debug("Disposing of the old scan.")
del master
master = MasterWebSite(user_settings)
logging.debug("*** Rescanning ****************************")
master.scan()
else:
logging.debug("No changes detected. Keeping the original scan.")
print("...done!", flush=True)
# Master website directories.
master_directory_list = master.directories
# Master website filenames only.
master_files_list = [file_info[user_settings.FILE_NAME]
for file_info in master.files]
logging.debug("*** Master Directories **********************")
for d in master_directory_list:
logging.debug(f"\t {d:s} (directory)")
logging.debug("*** Master Files **********************")
for f in master_files_list:
logging.debug(f"\t {f:s} (file)")
master.finish()
# Clean up master website only. Don't update remote websites.
if user_settings.CLEANONLY:
logging.debug("Cleanup finished. Exiting...")
sys.exit()
# ---------------------------------------------------------------------
# Scan the remote hosted web site.
# ---------------------------------------------------------------------
logging.debug("Reading private settings.")
private_settings = user_settings.private_settings
print("Scanning remote web site...", end='', flush=True)
# Pick which website to update.
logging.debug("Connecting to primary remote site.")
remote = RemoteWebSite(user_settings,
private_settings[user_settings.SERVER],
private_settings[user_settings.USER],
private_settings[user_settings.PASSWORD],
private_settings[user_settings.FTP_ROOT])
logging.debug("Scanning remote web site")
remote.scan()
remote.finish()
print("...done!", flush=True)
remote_directory_list = remote.directories
remote_files_list = [file_info[user_settings.FILE_NAME]
for file_info in remote.files]
logging.debug("*** Remote Directories **********************")
for d in remote_directory_list:
logging.debug(f"\t remote dir: {d:s}")
logging.debug("*** Remote Files **********************")
for f in remote_files_list:
logging.debug(f"\t remote file: {f:s}")
# ---------------------------------------------------------------------
# Synchronize the master and remote web sites.
# ---------------------------------------------------------------------
print("Synchronizing remote and local web sites...", end='', flush=True)
# Primary website.
logging.debug("Connecting to primary remote site for synchronization.")
u = UpdateWeb(user_settings,
private_settings[user_settings.SERVER],
private_settings[user_settings.USER],
private_settings[user_settings.PASSWORD],
private_settings[user_settings.FTP_ROOT],
private_settings[user_settings.FILE_SIZE_LIMIT],
master.directories,
master.files,
remote.directories,
remote.files)
logging.debug("Synchronizing remote web site")
u.update()
u.finish()
print("...done!", flush=True)
del u
del remote
del master
except RecursionError as detail:
logging.error(
f"Walking the directory tree got too deep for Python's recursion {str(detail):s}. Aborting...")
sys.exit()
return
# ----------------------------------------------------------------------------
# Command line option class
# ----------------------------------------------------------------------------
class CommandLineSettings(object):
"""Get the command line options."""
def __init__(self, user_settings, raw_args=None):
"""Get command line options"""
command_line_parser = argparse.ArgumentParser(
description="updateweb options")
# Log all changes, not just warnings and errors.
command_line_parser.add_argument(
"-v",
"--verbose",
help="Turn on verbose mode to log everything",
action="store_true")
# Clean up the master website only.
command_line_parser.add_argument(
"-c",
"--cleanonly",
help="Do a cleanup on the master web site only.",
action="store_true")
# Run unit tests only.
command_line_parser.add_argument("-t", "--test",
help="Run unit tests.",
action="store_true")
args = command_line_parser.parse_args(raw_args)
if args.verbose:
user_settings.VERBOSE = True
if args.cleanonly:
user_settings.CLEANONLY = True
if args.test:
user_settings.UNITTEST = True
# ----------------------------------------------------------------------------
# Base class for web site processing.
# ----------------------------------------------------------------------------
class WebSite(object):
"""
Abstract class used for analyzing both master (local to disk) and remote (ftp server) websites.
Contains the common web-walking functions which traverse the directory structures and files.
Subclasses fill in the lower level functions which actually access the directories and files.
Subclasses may also define additional functions unique to local websites.
"""
def __init__(self, settings):
"""Set up root directories"""
# Import the user settings.
self.user_settings = settings
# Queue keeps track of directories not yet processed.
self.queue = []
# List of all directories traversed.
self.directories = []
# List of files traversed, with file information.
self.files = []
# Find out the root directory and go there.
self.root_dir = self.get_root_dir()
self.go_to_root_dir(self.root_dir)
@staticmethod
def get_current_year():
"""Get the current year."""
return int(time.gmtime()[0])
@staticmethod
def get_current_two_digit_year():
"""Get the last two digits of the current year."""
return WebSite.get_current_year() % 100
@staticmethod
def is_file_info_type(file_info):
"""Check if we have a file information structure or merely a simple file name."""
try:
if isinstance(file_info, list):
return True
elif isinstance(file_info, str):
return False
else:
logging.error(
"is_file_info_type found a bad type. Aborting...")
sys.exit()
except TypeError as detail:
logging.error(
f"is_file_info_type found a bad type {str(detail):s}. Aborting...")
sys.exit()
def get_root_dir(self):
"""Subclass: Put code here to get the root directory"""
return ""
def go_to_root_dir(self, root_dir):
"""Subclass: Put code here to go to the root directory"""
pass # Pythons's do-nothing statement.
def one_level_down(self, d):
"""Subclass: Fill in with a method which returns a list of the
directories and files immediately beneath dir"""
return [], []
def walk(self, d, type_of_tree_search=TreeWalk.BREADTH_FIRST_SEARCH):
"""Walk a directory in either depth first or breadth first order. BFS is the default."""
# Get all subfiles and subdirectories off this node.
subdirectories, subfiles = self.one_level_down(d)
# Add all the subfiles in order.
for f in subfiles:
name = self.strip_root(f)
logging.debug(
f"Webwalking: Adding file {name[self.user_settings.FILE_NAME]:s} to list.")
# Some files are private so skip them from consideration.
pat = re.compile(self.user_settings.FILE_TO_SKIP)
if pat.search(name[self.user_settings.FILE_NAME]):
logging.warning(
f"Webwalking: Skipping private file {name[self.user_settings.FILE_NAME]:s}")
# Don't upload the log file due to file locking problems.
elif name[self.user_settings.FILE_NAME].find(self.user_settings.LOGFILENAME) >= 0:
logging.debug(
f"Webwalking: Skipping log file {name[self.user_settings.FILE_NAME]:s}")
# File size limit on some servers.
else:
self.files.append(name)
# Queue up the subdirectories.
for d in subdirectories:
# Some directories are private such as .git or just temporary file
# caches so skip them from consideration.
pat = re.compile(self.user_settings.DIR_TO_SKIP)
if pat.search(d):
logging.warning(f"Webwalking: Skipping private dir {d:s}")
else:
logging.debug(f"Webwalking: Pushing dir {d:s} on the queue.")
self.queue.append(d)
# Search through the directories.
while len(self.queue) > 0:
# For breadth first search, remove from beginning of queue.
if type_of_tree_search == TreeWalk.BREADTH_FIRST_SEARCH:
d = self.queue.pop(0)
# For depth first search, remove from end of queue.
elif type_of_tree_search == TreeWalk.DEPTH_FIRST_SEARCH:
d = self.queue.pop()
else:
d = self.queue.pop(0)
name = self.strip_root(d)
logging.debug(
f"Webwalking: Adding relative directory {name:s} to list, full path = {d:s}.")
self.directories.append(name)
self.walk(d)
def strip_root(self, file_info):
"""Return a path, but strip off the root directory"""
root = self.root_dir
# Extract the file name.
if self.is_file_info_type(file_info):
name = file_info[self.user_settings.FILE_NAME]
else:
name = file_info
# e.g. root = / and name = /Art/foo.txt yields stripped_path = Art/foo.txt
# but root = /Sean and name = /Sean/Art/foo.txt yields stripped_path =
# Art/foo.txt
lenroot = len(root)
if root == self.user_settings.DEFAULT_ROOT_DIR:
pass
else:
lenroot = lenroot + 1
stripped_path = name[lenroot:]
if self.is_file_info_type(file_info):
# Update the file name only.
return [stripped_path,
file_info[self.user_settings.FILE_TYPE],
file_info[self.user_settings.FILE_DATE_TIME],
file_info[self.user_settings.FILE_SIZE]]
else:
return stripped_path
def append_root_dir(self, root_dir, name):
"""Append the root directory to a path"""
# e.g. root = /, and name = Art/foo.txt yields /Art/foo.txt
# but root = /Sean, and name = Art/foo.txt yields /Sean/Art/foo.txt
if root_dir == self.user_settings.DEFAULT_ROOT_DIR:
return root_dir + name
else:
return root_dir + "/" + name
def scan(self):
"""Scan the directory tree recursively from the root"""
logging.debug(
f"Webwalking: Beginning recursive directory scan from root directory {self.root_dir:s}")
self.walk(self.root_dir)
def modtime(self, f):
"""Subclass: Get file modification time"""
pass
def finish(self):
"""Quit web site walking"""
logging.debug("Finished webwalking the master.")
pass
def remove_dir(self, dir_name):
"""Subclass: Remove a directory"""
pass
def remove_file(self, file_name):
"""Subclass: Remove a file"""
pass
def clean(self):
"""Scan through all directories and files in the master on disk website and clean them up."""
num_changes = 0
logging.debug("Cleaning up the master web page.")
if self.directories is None or self.files is None:
logging.error("Web site has no directories or files. Aborting...")
sys.exit()
for d in self.directories:
if self.is_temp_dir(d):
# Add the full path prefix from the root.
name = self.append_root_dir(self.get_root_dir(), d)
try:
logging.debug(
f"Removing temp dir {self.root_dir:s} recursively")
shutil.rmtree(name)
num_changes += 1
except OSError as detail:
logging.error(
f"Cannot remove temp dir {name:s}: {str(detail):s}")
for f in self.files:
# Add the full path prefix from the root.
name = self.append_root_dir(
self.get_root_dir(), f[self.user_settings.FILE_NAME])
# Remove all temporary files.
if self.is_temp_file(f):
try:
logging.debug(f"Removing temp file {name:s}")
os.remove(name)
num_changes += 1
except OSError as detail:
logging.error(
f"Cannot remove temp dir {name:s}: {str(detail):s}")
# Update hypertext files.
if self.is_source_file(f):
changed = self.rewrite_source_file(name)
if changed:
num_changes += 1
logging.debug(f"Rewrote hypertext file {self.root_dir:s}")
# Flag that at least one file was changed.
if num_changes > 0:
return True
return False
def is_temp_file(self, file_info):
"""Identify a file name as a temporary file"""
file_name = file_info[self.user_settings.FILE_NAME]
# Suffixes and names for temporary files be deleted.
# Pattern is assigned to _ and thrown away to suppress unused variable
# warnings.
[_, match] = pattern_match(
self.user_settings.TEMP_FILE_SUFFIXES, file_name)
# Remove any files containing twiddles anywhere in the name.
if match or file_name.find(self.user_settings.VIM_TEMP_FILE_EXT) >= 0:
return True
return False
def is_temp_dir(self, dir_name):
"""Identify a name as a temporary directory."""
p = re.compile(self.user_settings.TEMP_DIR_SUFFIX, re.VERBOSE)
return p.search(dir_name)
def is_source_file(self, file_info):
""" Check if the file name is a hypertext file."""
file_name = file_info[self.user_settings.FILE_NAME]
p = re.compile(self.user_settings.SOURCE_FILE_SUFFIX, re.VERBOSE)
return p.search(file_name)
def copy_to_text_file(self, file_name):
"""Make a copy of a file with a .txt extension"""
pass
def clean_up_temp_file(self, temp_file_name, file_name, changed):
"""Remove the original file, rename the temporary file name to the original name.
If there are no changes, just remove the temporary file.
"""
pass
def process_lines_of_file(
self,
in_file_name,
out_file_name,
process_line_function_list=None):
"""Process each line of a file with a list of functions. Create a new temporary file.
The default list is None which means make an exact copy.
"""
pass
def rewrite_substring(self, line):
"""Rewrite a line containing a pattern of your choice"""
# Do the replacements in order from first to last.
for match_replace_pair in self.user_settings.SUBSTRING_REPLACEMENT_LIST:
# Search for the pattern.
[pat, match] = pattern_match(match_replace_pair[0], line)
# Replace with the new pattern.
if match:
# Replace with the new pattern. Since we use raw strings, we
# need to strip off leading and trailing whitespace.
new_substring = match_replace_pair[1].strip().lstrip()
sub = pat.sub(new_substring, line)
logging.debug(
f"\ntransform old line = \n{line:s}\ninto new line =\n\
{sub:s}\nusing new substring =\n{new_substring:s}\n")
line = sub
return line
def rewrite_email_address_line(self, line):
"""Rewrite lines containing old email addresses."""
# Search for the old email address.
[pat, match] = pattern_match(
self.user_settings.OLD_EMAIL_ADDRESS, line)
# Replace the old address with my new email address.
if match:
new_address = self.user_settings.NEW_EMAIL_ADDRESS
sub = pat.sub(new_address, line)
line = sub
return line
def rewrite_version_line(self, line):
"""Rewrite lines containing the current version of software."""
# Search for the current version.
[pat, match] = pattern_match(
self.user_settings.CURRENT_SOFTWARE_VERSION, line)
# Replace with the new version.
if match:
# Note that since we are using raw strings leading and trailing
# whitespace is ignored.
new_version = self.user_settings.NEW_SOFTWARE_VERSION.lstrip().strip()
sub = pat.sub(new_version, line)
line = sub
return line
def rewrite_copyright_line(self, line):
"""Rewrite copyright lines if they are out of date."""
# Match the lines,
# Copyright (C) nnnn-mmmm by Sean Erik O'Connor.
# Copyright © nnnn-mmmm by Sean Erik O'Connor.
# and pull out the old year and save it.
[pat, match] = pattern_match(self.user_settings.COPYRIGHT_LINE, line)
# Found a match.
if match:
old_year = int(match.group('old_year'))
# Replace the old year with the current year.
# We matched and extracted the old copyright symbol into the variable
# 'symbol' using the pattern syntax (?P
\(C\) | ©)
# We now insert it back by placing the special syntax \g
# into the replacement string.
if old_year < WebSite.get_current_year():
new_copyright = r"Copyright \g \g-" + \
str(WebSite.get_current_year())
sub = pat.sub(new_copyright, line)
line = sub
return line
def rewrite_last_update_line(self, line):
"""Rewrite the Last Updated line if the year is out of date."""
# Match the last updated line and pull out the year.
# last updated 01 Jan 24.
p = re.compile(
self.user_settings.LAST_UPDATED_LINE,
re.VERBOSE | re.IGNORECASE)
m = p.search(line)
if m:
last_update_year = int(m.group('year'))
# Convert to four digit years.
if last_update_year > 90:
last_update_year += 1900
else:
last_update_year += 2000
# If the year is old, rewrite to "01 Jan ".
if last_update_year < WebSite.get_current_year():
two_digit_year = self.user_settings.TWO_DIGIT_YEAR_FORMAT % self.get_current_two_digit_year()
sub = p.sub('last updated 01 Jan ' + two_digit_year, line)
line = sub
return line
def rewrite_source_file(self, file_name):
"""Rewrite copyright lines, last updated lines, etc."""
changed = False
# Create a new temporary file name for the rewritten file.
temp_file_name = file_name + self.user_settings.TEMP_FILE_EXT
# Apply changes to all lines of the file. Apply change functions in
# the sequence listed.
if self.process_lines_of_file(file_name, temp_file_name,
[self.rewrite_copyright_line,
self.rewrite_last_update_line,
self.rewrite_email_address_line,
self.rewrite_substring,
self.rewrite_version_line]):
changed = True
# Rename the temp file to the original file name. If no changes, just
# delete the temp file.
self.clean_up_temp_file(temp_file_name, file_name, changed)
return changed
# ----------------------------------------------------------------------------
# Subclass for local web site processing.
# ----------------------------------------------------------------------------
class MasterWebSite(WebSite):
"""Walk the master web directory on local disk down from the root.
Clean up temporary files and do other cleanup work."""
def __init__(self, settings):
"""Go to web page root and list all files and directories."""
# Initialize the parent class.
WebSite.__init__(self, settings)
self.root_dir = self.get_root_dir()
logging.debug(
f"MasterWebSite.__init__(): \tRoot directory: {self.root_dir:s}")
def get_root_dir(self):
"""Get the name of the root directory"""
return self.user_settings.master_root_dir
def go_to_root_dir(self, root_dir):
"""Go to the root directory"""
# Go to the root directory.
logging.debug(
f"MasterWebSite.go_to_root_dir(): \tchdir to root directory: {root_dir:s}")
os.chdir(root_dir)
# Read it back.
self.root_dir = os.getcwd()
logging.debug(
f"MasterWebSite.go_to_root_dir(): \tgetcwd root directory: {self.root_dir:s}")
def one_level_down(self, d):
"""List all files and subdirectories in the current directory, dir. For files, collect file info
such as time, date and size."""
directories = []
files = []
# Change to current directory.
os.chdir(d)
# List all subdirectories and files.
dir_list = os.listdir(d)
if dir_list:
for line in dir_list:
logging.debug(
f"MasterWebSite.one_level_down(): \tlistdir( {d:s} ) = {line:s}")
# Add the full path prefix from the root.
name = self.append_root_dir(d, line)
logging.debug(
f"MasterWebSite.one_level_down(): \tmaster dir/file (full path): {name:s}")
# Is it a directory or a file?
if os.path.isdir(name):
directories.append(name)
elif os.path.isfile(name):
# First assemble the file information of name, time/date and size into a list.
# Can index it like an array.
# e.g. file_info = [ '/WebDesign/EquationImages/equation001.png', 1, \
# datetime.datetime(2010, 2, 3, 17, 15), 4675]
# file_info[ 0 ] = '/WebDesign/EquationImages/equation001.png'
# file_info[ 3 ] = 4675
file_info = [name,
FileType.FILE,
self.get_file_date_time(name),
self.get_file_size(name)]
files.append(file_info)
# Sort the names into order.
if directories:
directories.sort()
if files:
files.sort()
return directories, files
@staticmethod
def get_file_date_time(file_name):
"""Get a local file time and date in UTC."""
file_epoch_time = os.path.getmtime(file_name)
file_time_utc = time.gmtime(file_epoch_time)[0: 6]
# year, month, day, hour, minute, seconds
d = datetime.datetime(file_time_utc[0], file_time_utc[1],
file_time_utc[2], file_time_utc[3],
file_time_utc[4], file_time_utc[5])
return d
@staticmethod
def get_file_size(file_name):
"""Get file size in bytes."""
return os.path.getsize(file_name)
def copy_to_text_file(self, file_name):
"""Make a copy of a file with a .txt extension"""
# Remove the old copy with the text file extension.
copy_file_name = file_name + self.user_settings.TEXT_FILE_EXT
try:
os.remove(copy_file_name)
except OSError as detail:
logging.error(
f"Cannot remove old text file copy {copy_file_name:s}: {str(detail):s}")
# Create the new copy, which is an exact duplicate.
self.process_lines_of_file(file_name, copy_file_name)
# Make the new copy have the same modification and access time and date as the original
# since it is just an exact copy.
# That way we won't upload copies with newer times constantly, just because they look as
# though they've been recently modified.
file_stat = os.stat(file_name)
os.utime(copy_file_name,
(file_stat[stat.ST_ATIME],
file_stat[stat.ST_MTIME]))
logging.debug(
f"Reset file time to original time for copy {copy_file_name:s}")
def clean_up_temp_file(self, temp_file_name, file_name, changed):
"""Remove the original file, rename the temporary file name to the original name.
If there are no changes, just remove the temporary file.
"""
if changed:
# Remove the old file now that we have the rewritten file.
try:
os.remove(file_name)
logging.debug(
f"Changes were made. Remove original file {file_name:s}")
except OSError as detail:
logging.error(
f"Cannot remove old file {file_name:s}: {str(detail):s}. Need to remove it manually.")
# Rename the new file to the old file name.
try:
os.rename(temp_file_name, file_name)
logging.debug(
f"Rename temp file {temp_file_name:s} to original file {file_name:s}")
except OSError as detail:
logging.error(
f"Cannot rename temporary file {temp_file_name:s} to old file name {file_name:s}: {str(detail):s}."
f"Need to rename manually")
else:
# No changes? Remove the temporary file.
try:
os.remove(temp_file_name)
logging.debug(
f"No changes were made. Remove temporary file {temp_file_name:s}")
except OSError as detail:
logging.error(
f"Cannot remove temporary file {temp_file_name:s}: {str(detail):s}. Need to remove it manually.")
return
def process_lines_of_file(
self,
in_file_name,
out_file_name,
process_line_function_list=None):
"""Process each line of a file with a list of functions. Create a new temporary file.
The default list is None which means make an exact copy.
"""
fin = None
fout = None
# Assume no changes.
changed = False
try:
fin = open(in_file_name, "r")
except IOError as detail:
logging.error(
f"process_lines_of_file(): \tCannot open file {in_file_name:s} for reading: {str(detail):s}")
try:
fout = open(out_file_name, "w")
except IOError as detail:
logging.error(
f"process_lines_of_file(): \tCannot open file {out_file_name:s} for writing: {str(detail):s}")
# Read each line of the file, aborting if there is a read error.
try:
line = fin.readline()
while line:
original_line = line
if process_line_function_list is None:
# For a simple copy, just duplicate the line unchanged.
pass
else:
# Otherwise, apply changes in succession to the line.
for processLineFunction in process_line_function_list:
line = processLineFunction(line)
if original_line != line:
logging.debug(
f"Rewrote the line >>>{original_line:s}<<< to >>>{line:s}<<<")
changed = True
fout.write(line)
line = fin.readline()
fin.close()
fout.close()
except IOError as detail:
logging.error(
f"File I/O error during reading/writing file {in_file_name:s} in process_lines_of_file: {str(detail):s}"
f" Aborting...")
sys.exit()
if changed:
logging.debug(
f"process_lines_of_file(): \tRewrote original file {in_file_name:s}."
f"Changes are in temporary copy {out_file_name:s}")
# Return True if any lines were changed.
return changed
# ----------------------------------------------------------------------------
# Subclass for remote web site processing.
# ----------------------------------------------------------------------------
class RemoteWebSite(WebSite):
"""Walk the remote web directory on a web server down from the root."""
def __init__(self, settings, server, user, password, ftproot):
"""Connect to FTP server and list all files and directories."""
# Root directory of FTP server.
self.root_dir = ftproot
logging.debug(
f"Requesting remote web site ftp root dir {self.root_dir:s}")
# Connect to FTP server and log in.
try:
# self.ftp.set_debuglevel( 2 )
self.ftp = ftplib.FTP(server)
self.ftp.login(user, password)
# Catch all exceptions with the parent class Exception: all built-in,
# non-system-exiting exceptions are derived from this class.
except Exception as detail:
# Extract the string message from the exception class with str().
logging.error(
f"Remote web site cannot login to ftp server: {str(detail):s} Aborting...")
sys.exit()
else:
logging.debug("Remote web site ftp login succeeded.")
logging.debug(
f"Remote web site ftp welcome message {self.ftp.getwelcome():s}")
# Initialize the superclass.
WebSite.__init__(self, settings)
def go_to_root_dir(self, root_dir):
"""Go to the root directory"""
try:
# Go to the root directory.
self.ftp.cwd(root_dir)
logging.debug(
f"ftp root directory (requested) = {self.root_dir:s}")
# Read it back.
self.root_dir = self.ftp.pwd()
logging.debug(
f"ftp root directory (read back from server): {self.root_dir:s}")
except Exception as detail:
logging.error(
f"go_to_root_dir(): \tCannot ftp cwd or pwd root dir {root_dir:s} {str(detail):s} Aborting...")
sys.exit()
def get_root_dir(self):
"""Get the root directory name"""
return self.root_dir
def finish(self):
"""Quit web site walking"""
logging.debug("RemoteWebSite::finish().")
try:
self.ftp.quit()
except Exception as detail:
logging.error(f"Cannot ftp quit: {str(detail):s}")
def one_level_down(self, d):
"""List files and directories in a subdirectory using ftp"""
directories = []
files = []
try:
# ftp listing from current dir.
logging.debug(f"RemoteWebSite.one_level_down(): \tftp cwd: {d:s}")
self.ftp.cwd(d)
dir_list = []
self.ftp.retrlines('LIST', dir_list.append)
except Exception as detail:
logging.error(
f"one_level_down(): \tCannot ftp cwd or ftp LIST dir {d:s}: {str(detail):s} Aborting...")
sys.exit()
for line in dir_list:
logging.debug(
f"RemoteWebSite.one_level_down(): \tftp LIST: {line:s}")
# Line should at least have the minimum FTP information.
if len(line) >= self.user_settings.MIN_FTP_LINE_LENGTH:
file_info = self.get_ftp_file_info(line)
if file_info[self.user_settings.FILE_NAME] == "":
logging.error(
"RemoteWebSite.one_level_down(): \tFTP LIST file name is NULL:")
logging.debug(
f"RemoteWebSite.one_level_down(): \tftp parsed file info:\
{file_info[self.user_settings.FILE_NAME]:s}")
# Prefix the full path prefix from the root to the directory
# name and add to the directory list.
if file_info[self.user_settings.FILE_TYPE] == FileType.DIRECTORY:
dirname = self.append_root_dir(
d, file_info[self.user_settings.FILE_NAME])
logging.debug(
f"RemoteWebSite.one_level_down(): \tftp dir (full path): {dirname:s}")
directories.append(dirname)
# Add file information to the list of files.
else:
# Update the file name only: add the full path prefix from
# the root.
file_info[self.user_settings.FILE_NAME] = self.append_root_dir(
d, file_info[self.user_settings.FILE_NAME])
logging.debug(
f"RemoteWebSite.one_level_down(): \tftp file (full path):\
{file_info[self.user_settings.FILE_NAME]:s}")
files.append(file_info)
else:
logging.error(
f"RemoteWebSite.one_level_down(): \tFTP LIST line is too short: {line:s}")
directories.sort()
files.sort()
return directories, files
def modtime(self, f):
"""Get the modification time of a file via ftp. Return 0 if ftp cannot get it."""
modtime = 0
try:
response = self.ftp.sendcmd('MDTM ' + f)
# MDTM returns the last modified time of the file in the format
# "213 YYYYMMDDhhmmss \r\n
# MM is 01 to 12, DD is 01 to 31, hh is 00 to 23, mm is 00 to 59, ss is 0 to 59.
# error-response is 550 for info not available, and 500 or 501 if command cannot
# be parsed.
if response[:3] == '213':
modtime = response[4:]
except ftplib.error_perm:
modtime = 0
return modtime
def get_ftp_file_info(self, line):
"""Parse the ftp file listing and return file name, datetime and file size.
FTP uses UTC for its listings; the conversion to local time is done by the OS.
We can have problems on New Year's Eve. For example, the master file date/time is
Mon Jan 1 06:23:12 2018
But the remote file date/time from FTP listing doesn't show a year even though we know it was written to the server in 2017.
Mon Dec 31 03:02:00
So we default the remote file year to current year 2018 and get
Mon Dec 31 03:02:00 2018
Now we think that the remote file is newer by 363.860278 days.
"""
# Find out if we've a directory or a file.
if line[0] == 'd':
dir_or_file = FileType.DIRECTORY
else:
dir_or_file = FileType.FILE
pattern = re.compile(self.user_settings.FTP_LISTING, re.VERBOSE)
# Sensible defaults.
filesize = 0
filename = ""
# Default the time to hour 0, minute 0, second 0 (i.e. midnight).
hour = 0
minute = 0
seconds = 0
# Default the date to Jan 1
month = 1
day = 1
# Extract time and date from the ftp listing.
match = pattern.search(line)
logging.debug(f"ftp file listing {line}")
if match:
filesize = int(match.group('bytes'))
month = self.user_settings.monthToNumber[match.group('mon')]
day = int(match.group('day'))
# Remote file listing contains the year. The FTP listing will omit the hour and minute.
if match.group('year'):
year = int(match.group('year'))
logging.debug(f"ftp has year = {year} but is probably missing hour and minute")
else:
# Remote file listing omits the year. Default the year to the current UTC time year.
# That may be incorrect (see comments above).
year = WebSite.get_current_year()
logging.debug(f"ftp is missing the year; use the current year = {year}")
# If the FTP listing has the hour and minute, it will omit the year.
if match.group('hour') and match.group('min'):
hour = int(match.group('hour'))
minute = int(match.group('min'))
logging.debug(f"ftp has hour = {hour} and minute = {minute} so is probably missing the year")
filename = match.group('filename')
# Package up the time and date nicely.
# Note if we didn't get any matches, we'll default the remote date and
# time to Jan 1 midnight of the current year.
d = datetime.datetime(year, month, day, hour, minute, seconds)
return [filename, dir_or_file, d, filesize]
class UpdateWeb(object):
"""Given previously scanned master and remote directories, update the remote website."""
def __init__(
self,
settings,
server,
user,
password,
ftproot,
file_size_limit,
master_directory_list,
master_file_info,
remote_directory_list,
remote_file_info):
"""Connect to remote site. Accept previously scanned master and remote files and directories."""
self.user_settings = settings
self.master_files_list = []
self.remote_files_list = []
self.master_file_to_size = {}
self.master_file_to_date_time = {}
self.remote_file_to_date_time = {}
self.master_only_dirs = []
self.master_only_files = []
self.remote_only_dirs = []
self.remote_only_files = []
self.common_files = []
# Connect to FTP server and log in.
try:
self.ftp = ftplib.FTP(server)
self.ftp.login(user, password)
except Exception as detail:
logging.error(
f"Cannot login to ftp server: {str(detail):s} Aborting...")
sys.exit()
else:
logging.debug("ftp login succeeded.")
logging.debug(
f"ftp server welcome message: {self.ftp.getwelcome():s}")
# Master root directory.
self.master_root_dir = self.user_settings.master_root_dir
logging.debug(
f"Master (local to disk) root directory: {self.master_root_dir:s}")
# Root directory of FTP server.
self.ftp_root_dir = ftproot
logging.debug(
f"ftp root directory (requested) = {self.ftp_root_dir:s}")
# Transform KB string to integer bytes. e.g. "200" => 2048000
self.file_size_limit = int(file_size_limit) * 1024
try:
# Go to the root directory.
self.ftp.cwd(self.ftp_root_dir)
# Read it back.
self.ftp_root_dir = self.ftp.pwd()
logging.debug(
f"ftp root directory (read back from server): {self.ftp_root_dir:s}")
except Exception as detail:
logging.error(
f"UpdateWeb(): \tCannot ftp cwd or ftp LIST dir {self.ftp_root_dir:s} {str(detail):s} Aborting...")
self.master_directory_list = master_directory_list
self.remote_directory_list = remote_directory_list
self.master_file_info = master_file_info
self.remote_file_info = remote_file_info
def append_root_dir(self, root_dir, name):
"""Append the root directory to a path"""
# e.g. root = /, and name = Art/foo.txt yields /Art/foo.txt
# but root = /Sean, and name = Art/foo.txt yields /Sean/Art/foo.txt
if root_dir == self.user_settings.DEFAULT_ROOT_DIR:
return root_dir + name
else:
return root_dir + "/" + name
def file_info(self):
"""Create lists of file names from the file information. Also create dictionaries which map file names onto
dates, times, and sizes."""
# Extract file names.
self.master_files_list = [
file_info[self.user_settings.FILE_NAME] for file_info in self.master_file_info]
self.remote_files_list = [
file_info[self.user_settings.FILE_NAME] for file_info in self.remote_file_info]
# Use a dictionary comprehension to create key/value pairs, (file name,
# file date/time), which map file names onto date/time.
self.master_file_to_date_time = {
file_info[self.user_settings.FILE_NAME]: file_info[self.user_settings.FILE_DATE_TIME]
for file_info in self.master_file_info}
self.remote_file_to_date_time = {
file_info[self.user_settings.FILE_NAME]: file_info[self.user_settings.FILE_DATE_TIME]
for file_info in self.remote_file_info}
# Dictionary comprehension creates a mapping of master file names onto
# file sizes.
self.master_file_to_size = {file_info[self.user_settings.FILE_NAME]
: file_info[self.user_settings.FILE_SIZE] for file_info in self.master_file_info}
def update(self):
"""Scan through the master website, cleaning it up.
Go to remote website on my servers and synchronize all files."""
self.file_info()
# Which files and directories are different.
self.changes()
# Synchronize with the master.
self.synchronize()
def changes(self):
"""Find the set of different directories and files on master and remote."""
# Add all directories which are only on master to the dictionary.
dir_to_type = {
d: FileType.ON_MASTER_ONLY for d in self.master_directory_list}
# Scan through all remote directories, adding those only on remote or
# on both.
for d in self.remote_directory_list:
if d in dir_to_type:
dir_to_type[d] = FileType.ON_BOTH_MASTER_AND_REMOTE
else:
dir_to_type[d] = FileType.ON_REMOTE_ONLY
# Add all files which are only on master to the dictionary.
file_to_type = {
f: FileType.ON_MASTER_ONLY for f in self.master_files_list}
# Scan through all remote files, adding those only on remote or on
# both.
for f in self.remote_files_list:
if f in file_to_type:
file_to_type[f] = FileType.ON_BOTH_MASTER_AND_REMOTE
else:
file_to_type[f] = FileType.ON_REMOTE_ONLY
logging.debug("Raw dictionary dump of directories")
for k, v in dir_to_type.items():
logging.debug(f"\t dir: {str(k):s} type: {str(v):s}")
logging.debug("Raw dictionary dump of files")
for k, v in file_to_type.items():
logging.debug(f"\t file: {str(k):s} type: {str(v):s}")
# List of directories only on master. Keep the ordering.
self.master_only_dirs = [
d for d in self.master_directory_list if dir_to_type[d] == FileType.ON_MASTER_ONLY]
# List of directories only on remote. Keep the ordering.
self.remote_only_dirs = [
d for d in self.remote_directory_list if dir_to_type[d] == FileType.ON_REMOTE_ONLY]
# We don't care about common directories, only their changed files, if
# any.
# List of files only on master. Keep the ordering.
self.master_only_files = [
f for f in self.master_files_list if file_to_type[f] == FileType.ON_MASTER_ONLY]
# List of files only on remote. Keep the ordering.
self.remote_only_files = [
f for f in self.remote_files_list if file_to_type[f] == FileType.ON_REMOTE_ONLY]
# List of common files on both master and remote. Keep the ordering.
self.common_files = [
f for f in self.master_files_list if file_to_type[f] == FileType.ON_BOTH_MASTER_AND_REMOTE]
logging.debug(
"*** Directories only on master ******************************")
for d in self.master_only_dirs:
logging.debug(f"\t {d:s}")
logging.debug(
"*** Directories only on remote ******************************")
for d in self.remote_only_dirs:
logging.debug(f"\t {d:s}")
logging.debug(
"*** Files only on master ******************************")
for f in self.master_only_files:
logging.debug(f"\t {f:s}")
logging.debug(
"*** Files only on remote ******************************")
for f in self.remote_only_files:
logging.debug(f"\t {f:s}")
logging.debug("*** Common files ******************************")
for f in self.common_files:
logging.debug(
f"\tname {f:s} master time {self.master_file_to_date_time[f].ctime():s} remote time {self.remote_file_to_date_time[f].ctime():s}")
def synchronize(self):
"""Synchronize files and subdirectories in the remote directory with the master directory."""
# If we have the same files in master and remote, compare their times
# and dates.
for f in self.common_files:
master_file_time = self.master_file_to_date_time[f]
remote_file_time = self.remote_file_to_date_time[f]
# How many fractional days different are we?
days_different = abs((remote_file_time -
master_file_time).days +
(remote_file_time -
master_file_time).seconds /
(60.0 *
60.0 *
24.0))
# Assume no upload initially.
upload_to_host = False
logging.debug(f"Common file: {f:s}.")
# Remote file time is newer.
if remote_file_time > master_file_time:
# Remote file time is MUCH newer: suspect time is out of joint
# on the server, so upload local master file to be safe.
if days_different >= self.user_settings.DAYS_NEWER_FOR_REMOTE_NEW_YEARS_GLITCH:
logging.error(
f"Remote file {f:s} is newer by {days_different:f}\
days. Probably New Year's glitch. Upload file to be safe.")
logging.error(
f"\tmaster time {master_file_time.ctime():s} remote time\
{remote_file_time.ctime():s}")
# Set the master file to the current time.
full_file_name = self.append_root_dir(
self.master_root_dir, f)
if os.path.exists(full_file_name):
os.utime(full_file_name, None)
logging.error(
f"Touching master file {full_file_name:s} to make it the current time")
upload_to_host = True
# Remote file time is newer; probably OK, just a little time
# inaccuracy on the server.
else:
logging.debug(
f"Remote file {f:s} is newer by {days_different:f} days."
f"Probably time inaccuracy on the server. Wait -- don't upload yet.")
logging.debug(
f"\tmaster time {master_file_time.ctime():s} remote time {remote_file_time.ctime():s}")
upload_to_host = False
# Master file time is newer.
elif master_file_time > remote_file_time:
# Master file time is newer (by several minutes), that it's
# likely to be changed; upload.
if days_different >= self.user_settings.DAYS_NEWER_FOR_MASTER_BEFORE_UPLOAD:
logging.warning(
f"Master file {f:s} is newer by {days_different:f} days. Preparing for upload.")
logging.warning(
f"\tmaster time {master_file_time.ctime():s} remote time {remote_file_time.ctime():s}")
upload_to_host = True
else:
logging.debug(
f"Master file {f:s} is slightly newer by {days_different:f} days. Wait -- don't upload yet.")
logging.debug(
f"\tmaster time {master_file_time.ctime():s} remote time {remote_file_time.ctime():s}")
upload_to_host = False
# Cancel the upload if the file is too big for the server.
size = self.master_file_to_size[f]
if size >= self.file_size_limit:
logging.error(
f"upload(): Skipping upload of file {f:s} of size {size:d};\
too large for server, limit is {self.file_size_limit:d} bytes")
upload_to_host = False
# Finally do the file upload.
if upload_to_host:
print(f"Uploading changed file {f:s}...", end='', flush=True)
self.upload(f)
# Remote directory is not in master. Delete it.
for d in self.remote_only_dirs:
logging.debug(f"Remote only dir. Attempting to delete it: {d:s}")
print(f"Deleting remote directory {d:s}...", end='', flush=True)
self.rmdir(d)
# Master directory missing on remote. Create it.
# Due to breadth first order scan, we'll create parent directories
# before child directories.
for d in self.master_only_dirs:
logging.debug(f"Master only dir. Creating dir {d:s} on remote.")
print(
f"Creating new remote directory {d:s}...",
end='',
flush=True)
self.mkdir(d)
# Master file missing on remote. Upload it.
for f in self.master_only_files:
logging.debug(f"Master only file. Uploading {f:s} to remote.")
# But cancel the upload if the file is too big for the server.
size = self.master_file_to_size[f]
if size >= self.file_size_limit:
logging.error(
f"upload(): Skipping upload of file {f:s} of size {size:d};"
f" too large for server, limit is {self.file_size_limit:d} bytes")
else:
print(f"Uploading new file {f:s}...", end='', flush=True)
self.upload(f)
# Remote contains a file not present on the master. Delete the file.
for f in self.remote_only_files:
logging.debug(f"Remote only file. Deleting remote file {f:s}.")
print(f"Deleting remote file {f:s}...", end='', flush=True)
self.del_remote(f)
def del_remote(self, relative_file_path):
"""Delete a file using ftp."""
logging.debug(
f"del_remote(): \trelative file path name: {relative_file_path:s}")
# Parse the relative file path into file name and relative directory.
relative_dir, file_name = os.path.split(relative_file_path)
logging.debug(f"del_remote(): \tfile name: {file_name:s}")
logging.debug(f"del_remote(): \trelative dir: {relative_dir:s}")
logging.debug(
f"del_remote(): \tremote root dir: {self.ftp_root_dir:s}")
try:
# Add the remote root path and go to the remote directory.
remote_dir = self.append_root_dir(self.ftp_root_dir, relative_dir)
logging.debug(
f"del_remote(): \tftp cd remote dir: {remote_dir:s}")
self.ftp.cwd(remote_dir)
except Exception as detail:
logging.error(
f"del_remote(): \tCannot ftp chdir: {str(detail):s} Skipping...")
else:
try:
logging.debug(f"del_remote(): \tftp rm: {file_name:s}")
# Don't remove zero length file names.
if len(file_name) > 0:
self.ftp.delete(file_name)
else:
logging.warning(
"fdel_remote(): skipping ftp delete; file NAME {file_name:s} had zero length")
except Exception as detail:
logging.error(
f"del_remote(): \tCannot ftp rm: {str(detail):s}")
def mkdir(self, relative_dir):
"""Create new remote directory using ftp."""
logging.debug(f"mkdir(): \trelative dir path name: {relative_dir:s}")
logging.debug(f"mkdir(): \tremote root dir: {self.ftp_root_dir:s}")
# Parse the relative dir path into prefix dir and suffix dir.
path, d = os.path.split(relative_dir)
logging.debug(f"mkdir(): \tremote prefix dir: {path:s}")
logging.debug(f"mkdir(): \tremote dir: {d:s}")
try:
# Add the remote root path and go to the remote directory.
remote_dir = self.append_root_dir(self.ftp_root_dir, path)
logging.debug(f"mkdir(): \tftp cd remote dir: {remote_dir:s}")
self.ftp.cwd(remote_dir)
except Exception as detail:
logging.error(
f"mkdir(): \tCannot ftp chrdir: {str(detail):s} Skipping...")
else:
try:
logging.debug(f"mkdir(): \tftp mkd: {d:s}")
self.ftp.mkd(d)
except Exception as detail:
logging.error(f"mkdir(): \tCannot ftp mkdir: {str(detail):s}")
def rmdir(self, relative_dir):
"""Delete an empty directory using ftp."""
logging.debug(
f"rmdir(): \tintermediate dir path name: {relative_dir:s}")
logging.debug(f"rmdir(): \tremote root dir: {self.ftp_root_dir:s}")
# Parse the relative dir path into prefix dir and suffix dir.
path, d = os.path.split(relative_dir)
logging.debug(f"rmdir(): \tremote prefix dir: {path:s}")
logging.debug(f"rmdir(): \tremote dir: {d:s}")
try:
# Add the remote root path and go to the remote directory.
remote_dir = self.append_root_dir(self.ftp_root_dir, path)
logging.debug(f"rmdir(): \tftp cd remote dir: {remote_dir:s}")
self.ftp.cwd(remote_dir)
except Exception as detail:
logging.error(
f"rmdir(): \tCannot ftp chdir: {str(detail):s} Skipping...")
else:
try:
logging.debug(f"rmdir(): \tftp rmd: {d:s}")
self.ftp.rmd(d)
except Exception as detail:
logging.error(
f"rmdir(): \tCannot ftp rmdir dir {d:s}: {str(detail):s}"
f" Directory is probably not empty. Do a manual delete.")
def download(self, relative_file_path):
"""Download a binary file using ftp."""
logging.debug(f"download(): \tfile name: {relative_file_path:s}")
# Parse the relative file path into file name and relative directory.
relative_dir, file_name = os.path.split(relative_file_path)
logging.debug(f"download(): \tfile name: {file_name:s}")
logging.debug(f"download(): \trelative dir: {relative_dir:s}")
logging.debug(f"download(): \troot dir: {self.ftp_root_dir:s}")
# Add the remote root path and go to the remote directory.
remote_dir = self.append_root_dir(self.ftp_root_dir, relative_dir)
logging.debug(f"download(): \tftp cd remote dir: {remote_dir:s}")
try:
self.ftp.cwd(remote_dir)
except Exception as detail:
logging.error(
f"download(): \tCannot ftp chdir: {str(detail):s} Skipping...")
else:
# Add the master root path to get the local file name.
# Open local binary file to write into.
local_file_name = self.append_root_dir(
self.master_root_dir, relative_file_path)
logging.debug(
f"download(): \topen local file name: {local_file_name:s}")
try:
f = open(local_file_name, "wb")
try:
# Calls f.write() on each block of the binary file.
# ftp.retrbinary( "RETR " + file_name, f.write )
pass
except Exception as detail:
logging.error(
f"download(): \tCannot cannot ftp retrbinary: {str(detail):s}")
f.close()
except IOError as detail:
logging.error(
f"download(): \tCannot open local file {local_file_name:s} for reading: {str(detail):s}")
def upload(self, relative_file_path):
"""Upload a binary file using ftp."""
logging.debug(
f"upload(): \trelative file path name: {relative_file_path:s}")
# Parse the relative file path into file name and relative directory.
relative_dir, file_name = os.path.split(relative_file_path)
logging.debug(f"upload(): \tfile name: {file_name:s}")
logging.debug(f"upload(): \trelative dir: {relative_dir:s}")
logging.debug(f"upload(): \tremote root dir: {self.ftp_root_dir:s}")
# Add the remote root path and go to the remote directory.
remote_dir = self.append_root_dir(self.ftp_root_dir, relative_dir)
logging.debug(f"upload(): \tftp cd remote dir: {remote_dir:s}")
try:
self.ftp.cwd(remote_dir)
except Exception as detail:
logging.error(
f"upload(): \tCannot ftp chdir: {str(detail):s} Skipping...")
else:
# Add the master root path to get the local file name.
# Open local binary file to read from.
local_file_name = self.append_root_dir(
self.master_root_dir, relative_file_path)
logging.debug(
f"upload(): \topen local file name: {local_file_name:s}")
try:
f = open(local_file_name, "rb")
try:
# f.read() is called on each block of the binary file until
# EOF.
logging.debug(f"upload(): \tftp STOR file {file_name:s}")
self.ftp.storbinary("STOR " + file_name, f)
except Exception as detail:
logging.error(
f"upload(): \tCannot ftp storbinary: {str(detail):s}")
f.close()
except IOError as detail:
logging.error(
f"upload(): \tCannot open local file {local_file_name:s} for reading: {str(detail):s}")
def finish(self):
"""Log out of an ftp session"""
logging.debug("UpdateWeb::finish()")
try:
self.ftp.quit()
except Exception as detail:
logging.error(f"Cannot ftp quit because {str(detail):s}")
if __name__ == '__main__':
"""Python executes all code in the file, so all classes and functions get defined first. Finally we come here.
If we are executing this file as a Python script, the name of the current module is set to main,
thus we'll call the main() function."""
main()
else:
"""When using as a module, start python, then import the module and call it:
python
import updateweb
updateweb.main(["--test"])
Or if you want to debug, do this:
python
import pdb
import updateweb
pdb.run('updateweb.main(["--test"])')
b updateweb.main
c
"""
pass