Host script changes.

* Use Python keywords in conditional blocks.
* Use one-liners whenever possible.
* Removed unused code.
* Only use a progress bar if the overall file size is greater than the hardcoded threshold value.
This commit is contained in:
Pablo Curiel 2021-03-17 13:25:30 -04:00
parent 2103c08582
commit a291408b9d

View file

@ -84,6 +84,9 @@ USB_TRANSFER_TIMEOUT = 5000
# USB transfer block size.
USB_TRANSFER_BLOCK_SIZE = 0x800000
# USB transfer threshold. Used to determine whether a progress bar should be displayed or not.
USB_TRANSFER_THRESHOLD = round(float(USB_TRANSFER_BLOCK_SIZE) * 2.5)
# USB command header/status magic word.
USB_MAGIC_WORD = b'NXDT'
@ -254,8 +257,7 @@ class LogConsole:
# Loosely based on tk.py from tqdm.
class ProgressBar:
def __init__(self, bar_format=None, tk_parent=None, window_title='', window_resize=False, window_protocol=None):
if tk_parent is None:
raise Exception('`tk_parent` must be provided!')
if tk_parent is None: raise Exception('`tk_parent` must be provided!')
self.n = 0
self.total = 0
@ -272,13 +274,11 @@ class ProgressBar:
self.tk_window.withdraw()
self.withdrawn = True
if window_title:
self.tk_window.title(window_title)
if window_title: self.tk_window.title(window_title)
self.tk_window.resizable(window_resize, window_resize)
if window_protocol:
self.tk_window.protocol('WM_DELETE_WINDOW', window_protocol)
if window_protocol: self.tk_window.protocol('WM_DELETE_WINDOW', window_protocol)
pbar_frame = ttk.Frame(self.tk_window, padding=5)
pbar_frame.pack()
@ -296,8 +296,7 @@ class ProgressBar:
self.tk_parent.after(0, self.tk_window.destroy)
def start(self, n, total, prefix='', unit='B'):
if (n < 0) or (total <= 0):
raise Exception('Invalid arguments!')
if (n < 0) or (total <= 0): raise Exception('Invalid arguments!')
self.total = total
self.prefix = prefix
@ -309,8 +308,7 @@ class ProgressBar:
def update(self, n):
cur_n = (self.n + n)
if cur_n > self.total:
return
if cur_n > self.total: return
self.elapsed_time = (time.time() - self.start_time)
@ -492,28 +490,27 @@ def usbHandleSendFileProperties(cmd_block):
# Print info.
info_str = ('File size: 0x%X | Filename length: 0x%X' % (file_size, filename_length))
if nsp_header_size > 0:
info_str += (' | NSP header size: 0x%X' % (nsp_header_size))
if nsp_header_size > 0: info_str += (' | NSP header size: 0x%X' % (nsp_header_size))
info_str += '.'
g_Logger.info(info_str)
g_Logger.info('Filename: "%s".' % (filename))
# Perform integrity checks
if (g_nspTransferMode == False) and (file_size > 0) and (nsp_header_size >= file_size):
if (not g_nspTransferMode) and file_size and (nsp_header_size >= file_size):
g_Logger.error('NSP header size must be smaller than the full NSP size!\n')
return USB_STATUS_MALFORMED_CMD
if (g_nspTransferMode == True) and (nsp_header_size > 0):
if g_nspTransferMode and nsp_header_size:
g_Logger.error('Received non-zero NSP header size during NSP transfer mode!\n')
return USB_STATUS_MALFORMED_CMD
if (filename_length <= 0) or (filename_length > USB_FILE_PROPERTIES_MAX_NAME_LENGTH):
if (not filename_length) or (filename_length > USB_FILE_PROPERTIES_MAX_NAME_LENGTH):
g_Logger.error('Invalid filename length!\n')
return USB_STATUS_MALFORMED_CMD
# Enable NSP transfer mode (if needed).
if (g_nspTransferMode == False) and (file_size > 0) and (nsp_header_size > 0):
if (not g_nspTransferMode) and file_size and nsp_header_size:
g_nspTransferMode = True
g_nspSize = file_size
g_nspHeaderSize = nsp_header_size
@ -523,14 +520,13 @@ def usbHandleSendFileProperties(cmd_block):
g_Logger.debug('NSP transfer mode enabled!\n')
# Perform additional integrity checks and get a file object to work with.
if (g_nspTransferMode == False) or ((g_nspTransferMode == True) and (g_nspFile is None)):
if (not g_nspTransferMode) or (g_nspFile is None):
# Check if we're dealing with an absolute path.
if filename[0] == '/':
filename = filename[1:]
# Replace all slashes with backslashes if we're running under Windows.
if os.name == 'nt':
filename = filename.replace('/', '\\')
if os.name == 'nt': filename = filename.replace('/', '\\')
# Generate full, absolute path to the destination file.
fullpath = os.path.abspath(g_outputDir + os.path.sep + filename)
@ -542,7 +538,7 @@ def usbHandleSendFileProperties(cmd_block):
os.makedirs(dirpath, exist_ok=True)
# Make sure the output filepath doesn't point to an existing directory.
if (os.path.exists(fullpath) == True) and (os.path.isfile(fullpath) == False):
if os.path.exists(fullpath) and (not os.path.isfile(fullpath)):
utilsResetNspInfo()
g_Logger.error('Output filepath points to an existing directory! ("%s").\n' % (fullpath))
return USB_STATUS_HOST_IO_ERROR
@ -557,7 +553,7 @@ def usbHandleSendFileProperties(cmd_block):
# Get file object.
file = open(fullpath, "wb")
if g_nspTransferMode == True:
if g_nspTransferMode:
# Update NSP file object.
g_nspFile = file
@ -573,10 +569,9 @@ def usbHandleSendFileProperties(cmd_block):
dirpath = os.path.dirname(fullpath)
# Check if we're dealing with an empty file or with the first SendFileProperties command from a NSP.
if (file_size == 0) or ((g_nspTransferMode == True) and (file_size == g_nspSize)):
if (not file_size) or (g_nspTransferMode and file_size == g_nspSize):
# Close file (if needed).
if g_nspTransferMode == False:
file.close()
if not g_nspTransferMode: file.close()
# Let the command handler take care of sending the status response for us.
return USB_STATUS_SUCCESS
@ -585,61 +580,67 @@ def usbHandleSendFileProperties(cmd_block):
usbSendStatus(USB_STATUS_SUCCESS)
# Start data transfer stage.
file_type_str = ('file' if (g_nspTransferMode == False) else 'NSP file entry')
file_type_str = ('file' if (not g_nspTransferMode) else 'NSP file entry')
g_Logger.info('Data transfer started. Saving %s to: "%s".' % (file_type_str, fullpath))
offset = 0
blksize = USB_TRANSFER_BLOCK_SIZE
# Initialize progress bar.
idx = filename.rfind(os.path.sep)
prefix_filename = (filename[idx+1:] if (idx >= 0) else filename)
prefix = ('Current %s: "%s".\n' % (file_type_str, prefix_filename))
prefix += 'Use your console to cancel the file transfer if you wish to do so.\n\n'
if (g_nspTransferMode == False) or ((g_nspTransferMode == True) and (g_nspRemainingSize == (g_nspSize - g_nspHeaderSize))):
if g_nspTransferMode == False:
pbar_n = 0
pbar_file_size = file_size
use_pbar = (((not g_nspTransferMode) and (file_size > USB_TRANSFER_THRESHOLD)) or (g_nspTransferMode and (g_nspSize > USB_TRANSFER_THRESHOLD)))
if use_pbar:
idx = filename.rfind(os.path.sep)
prefix_filename = (filename[idx+1:] if (idx >= 0) else filename)
prefix = ('Current %s: "%s".\n' % (file_type_str, prefix_filename))
prefix += 'Use your console to cancel the file transfer if you wish to do so.\n\n'
if (not g_nspTransferMode) or g_nspRemainingSize == (g_nspSize - g_nspHeaderSize):
if not g_nspTransferMode:
# Set current progress to zero and the maximum value to the provided file size.
pbar_n = 0
pbar_file_size = file_size
else:
# Set current progress to the NSP header size and the maximum value to the provided NSP size.
pbar_n = g_nspHeaderSize
pbar_file_size = g_nspSize
# Get progress bar unit and unit divider. These will be used to display and calculate size values using a specific size unit (B, KiB, MiB, GiB).
(unit, unit_divider) = utilsGetSizeUnitAndDivisor(pbar_file_size)
total = (float(pbar_file_size) / unit_divider)
# Start the progress bar.
g_progressBar.start(pbar_n, total, prefix, unit)
# Update the NSP unit divider (if needed).
if g_nspTransferMode: g_nspSizeUnitDivider = unit_divider
else:
pbar_n = g_nspHeaderSize
pbar_file_size = g_nspSize
(unit, unit_divider) = utilsGetSizeUnitAndDivisor(pbar_file_size)
total = (float(pbar_file_size) / unit_divider)
g_progressBar.start(pbar_n, total, prefix, unit)
if g_nspTransferMode == True:
g_nspSizeUnitDivider = unit_divider
else:
unit_divider = g_nspSizeUnitDivider
g_progressBar.set_prefix(prefix)
# Set unit divider for the whole NSP size and the current prefix (which holds the filename for the current NSP file entry).
unit_divider = g_nspSizeUnitDivider
g_progressBar.set_prefix(prefix)
def cancelTransfer():
# Cancel file transfer.
file.close()
os.remove(fullpath)
utilsResetNspInfo()
g_progressBar.end()
if use_pbar: g_progressBar.end()
# Start transfer process.
start_time = time.time()
while offset < file_size:
# Update block size (if needed).
diff = (file_size - offset)
if blksize > diff:
blksize = diff
if blksize > diff: blksize = diff
# Handle Zero-Length Termination packet (if needed).
if ((offset + blksize) >= file_size) and (utilsIsValueAlignedToEndpointPacketSize(blksize) == True):
rd_size = (blksize + 1)
else:
rd_size = blksize
# Set block size and handle Zero-Length Termination packet (if needed).
rd_size = blksize
if ((offset + blksize) >= file_size) and utilsIsValueAlignedToEndpointPacketSize(blksize): rd_size += 1
# Read current chunk.
chunk = usbRead(rd_size, USB_TRANSFER_TIMEOUT)
if chunk == None:
if chunk is None:
g_Logger.error('Failed to read 0x%X-byte long data chunk!' % (rd_size))
# Cancel file transfer.
@ -670,22 +671,19 @@ def usbHandleSendFileProperties(cmd_block):
offset = (offset + chunk_size)
# Update remaining NSP data size.
if g_nspTransferMode == True:
g_nspRemainingSize -= chunk_size
if g_nspTransferMode: g_nspRemainingSize -= chunk_size
# Update progress bar once per second.
g_progressBar.update(float(chunk_size) / unit_divider)
# Update progress bar (if needed).
if use_pbar: g_progressBar.update(float(chunk_size) / unit_divider)
elapsed_time = round(g_progressBar.elapsed_time)
elapsed_time = round(time.time() - start_time)
g_Logger.info('File transfer successfully completed in %s!\n' % (tqdm.format_interval(elapsed_time)))
# Close file handle (if needed).
if g_nspTransferMode == False:
file.close()
if not g_nspTransferMode: file.close()
# Hide progress bar (if needed).
if (g_nspTransferMode == False) or ((g_nspTransferMode == True) and (g_nspRemainingSize == 0)):
g_progressBar.end()
if use_pbar and ((not g_nspTransferMode) or (not g_nspRemainingSize)): g_progressBar.end()
return USB_STATUS_SUCCESS
@ -697,11 +695,11 @@ def usbHandleSendNspHeader(cmd_block):
g_Logger.debug('Received SendNspHeader (%02X) command.' % (USB_CMD_SEND_NSP_HEADER))
# Integrity checks.
if g_nspTransferMode == False:
if not g_nspTransferMode:
g_Logger.error('Received NSP header out of NSP transfer mode!\n')
return USB_STATUS_MALFORMED_CMD
if g_nspRemainingSize > 0:
if g_nspRemainingSize:
g_Logger.error('Received NSP header before receiving all NSP data! (missing 0x%X byte[s]).\n' % (g_nspRemainingSize))
return USB_STATUS_MALFORMED_CMD
@ -735,7 +733,7 @@ def usbCommandHandler():
}
# Get device endpoints.
if usbGetDeviceEndpoints() == False:
if not usbGetDeviceEndpoints():
# Update UI and return.
uiToggleElements(True)
return
@ -759,9 +757,9 @@ def usbCommandHandler():
# Read command block right away (if needed).
# nxdumptool expects us to read it right after sending the command header.
cmd_block = None
if cmd_block_size > 0:
if cmd_block_size:
# Handle Zero-Length Termination packet (if needed).
if utilsIsValueAlignedToEndpointPacketSize(cmd_block_size) == True:
if utilsIsValueAlignedToEndpointPacketSize(cmd_block_size):
rd_size = (cmd_block_size + 1)
else:
rd_size = cmd_block_size
@ -785,9 +783,9 @@ def usbCommandHandler():
continue
# Verify command block size.
if ((cmd_id == USB_CMD_START_SESSION) and (cmd_block_size != USB_CMD_BLOCK_SIZE_START_SESSION)) or \
((cmd_id == USB_CMD_SEND_FILE_PROPERTIES) and (cmd_block_size != USB_CMD_BLOCK_SIZE_SEND_FILE_PROPERTIES)) or \
((cmd_id == USB_CMD_SEND_NSP_HEADER) and (cmd_block_size == 0)):
if (cmd_id == USB_CMD_START_SESSION and cmd_block_size != USB_CMD_BLOCK_SIZE_START_SESSION) or \
(cmd_id == USB_CMD_SEND_FILE_PROPERTIES and cmd_block_size != USB_CMD_BLOCK_SIZE_SEND_FILE_PROPERTIES) or \
(cmd_id == USB_CMD_SEND_NSP_HEADER and not cmd_block_size):
g_Logger.error('Invalid command block size for command ID %02X! (0x%X).\n' % (cmd_id, cmd_block_size))
usbSendStatus(USB_STATUS_MALFORMED_COMMAND)
continue
@ -795,7 +793,7 @@ def usbCommandHandler():
# Run command handler function.
# Send status response afterwards. Bail out if requested.
status = cmd_func(cmd_block)
if (status == None) or (not usbSendStatus(status)) or (cmd_id == USB_CMD_END_SESSION) or (status == USB_STATUS_UNSUPPORTED_ABI_VERSION):
if (status is None) or (not usbSendStatus(status)) or (cmd_id == USB_CMD_END_SESSION) or (status == USB_STATUS_UNSUPPORTED_ABI_VERSION):
break
g_Logger.info('Stopping server.')
@ -832,7 +830,7 @@ def uiStartServer():
server_thread.start()
def uiToggleElements(enable):
if enable == True:
if enable:
g_tkRoot.protocol('WM_DELETE_WINDOW', uiHandleExitProtocol)
g_tkChooseDirButton.configure(state='normal')
@ -851,9 +849,7 @@ def uiToggleElements(enable):
def uiChooseDirectory():
dir = filedialog.askdirectory(parent=g_tkRoot, title='Select an output directory', initialdir=INITIAL_DIR, mustexist=True)
if dir:
dir = os.path.abspath(dir)
uiUpdateDirectoryField(dir)
if dir: uiUpdateDirectoryField(os.path.abspath(dir))
def uiUpdateDirectoryField(dir):
g_tkDirText.configure(state='normal')
@ -867,14 +863,6 @@ def uiHandleExitProtocol():
def uiHandleExitProtocolStub():
pass
def uiHandleTabInput(event):
event.widget.tk_focusNext().focus()
return 'break'
def uiHandleShiftTabInput(event):
event.widget.tk_focusPrev().focus()
return 'break'
def uiScaleMeasure(measure):
return round(float(measure) * SCALE)
@ -891,12 +879,11 @@ def main():
# Check if we're running under Windows Vista or later.
dpi_aware = False
win_vista = ((os_type == 'Windows') and (int(os_version[:os_version.find('.')]) >= 6))
if win_vista == True:
if win_vista:
try:
# Enable high DPI scaling.
dpi_aware = (ctypes.windll.user32.SetProcessDPIAware() == 1)
if dpi_aware == False:
dpi_aware = (ctypes.windll.shcore.SetProcessDpiAwareness(1) == 0)
if not dpi_aware: dpi_aware = (ctypes.windll.shcore.SetProcessDpiAwareness(1) == 0)
except:
traceback.print_exc()
@ -911,8 +898,7 @@ def main():
screen_dpi = round(g_tkRoot.winfo_fpixels('1i'))
# Update scaling factor (if needed).
if (win_vista == True) and (dpi_aware == True):
SCALE = (float(screen_dpi) / WINDOWS_SCALING_FACTOR)
if win_vista and dpi_aware: SCALE = (float(screen_dpi) / WINDOWS_SCALING_FACTOR)
# Decode embedded icon and set it.
try:
@ -976,11 +962,12 @@ def main():
# Configure logging mechanism.
logging.basicConfig(level=logging.DEBUG)
if len(g_Logger.handlers) > 0:
if len(g_Logger.handlers):
# Remove stderr output handler from logger.
log_stderr = g_Logger.handlers[0]
g_Logger.removeHandler(log_stderr)
# Initialize console g_Logger.
# Initialize console logger.
console = LogConsole(g_tkScrolledTextLog)
# Create hidden progress bar child window.