[ci skip] nxdt_host: update to v0.5

Changes include:

* Fix progress bar window not being automatically closed if a cancel request is received from the console in-between two different NSP file entry transfers.
* Handle unit and divisor retrieval directly inside the ProgressBarWindow class.
* Carry out a separate speed calculation whenever necessary to always display transfer rates in MiB/s.
* Fix CLI mode.
* Minor grammar corrections in the code.
This commit is contained in:
Pablo Curiel 2024-08-26 11:17:23 +02:00
parent 392887c12f
commit 76de7b10fd

View file

@ -69,7 +69,7 @@ WINDOW_WIDTH = 500
WINDOW_HEIGHT = 470 WINDOW_HEIGHT = 470
# Application version. # Application version.
APP_VERSION = '0.4' APP_VERSION = '0.5'
# Copyright year. # Copyright year.
COPYRIGHT_YEAR = '2020-2024' COPYRIGHT_YEAR = '2020-2024'
@ -409,13 +409,14 @@ class ProgressBarWindow:
def __init__(self, bar_format: str = '', tk_parent: Any = None, window_title: str = '', window_resize: bool = False, window_protocol: Callable | None = None) -> None: def __init__(self, bar_format: str = '', tk_parent: Any = None, window_title: str = '', window_resize: bool = False, window_protocol: Callable | None = None) -> None:
self.n: int = 0 self.n: int = 0
self.total: int = 0 self.total: int = 0
self.divider: float = 1.0 self.divisor: float = 1.0
self.total_div: float = 0 self.total_div: float = 0
self.prefix: str = '' self.prefix: str = ''
self.unit: str = 'B' self.unit: str = 'B'
self.bar_format = bar_format self.bar_format = bar_format
self.start_time: float = 0 self.start_time: float = 0
self.elapsed_time: float = 0 self.prev_iter_time: float = 0
self.prev_n: int = 0
self.hwnd: int = 0 self.hwnd: int = 0
self.tk_parent = tk_parent self.tk_parent = tk_parent
@ -455,22 +456,30 @@ class ProgressBarWindow:
if self.tk_window: if self.tk_window:
self.tk_parent.after(0, self.tk_window.destroy) self.tk_parent.after(0, self.tk_window.destroy)
def start(self, total: int, n: int = 0, divider: int = 1, prefix: str = '', unit: str = 'B') -> None: def start(self, total: int, n: int = 0, prefix: str = '') -> None:
if (total <= 0) or (n < 0) or (divider < 1): if (total <= 0) or (n < 0):
raise Exception('Invalid arguments!') raise Exception('Invalid arguments!')
self.n = n self.n = n
self.total = total self.total = total
self.divider = float(divider)
self.total_div = (float(self.total) / self.divider)
self.prefix = prefix self.prefix = prefix
self.unit = unit
# Get progress bar unit and unit divisor. These will be used to display and calculate size values using a specific size unit (B, KiB, MiB, GiB).
unit_and_divisor = utilsGetSizeUnitAndDivisor(self.total)
self.unit = unit_and_divisor[0]
self.divisor = float(unit_and_divisor[1])
self.total_div = (float(self.total) / self.divisor)
# Replace our custom rate variable with tqdm's rate_fmt if our unit is set to MiB.
if self.unit == 'MiB':
self.bar_format = self.bar_format.replace('__custom_rate_fmt__', '{rate_fmt}')
if self.tk_pbar: if self.tk_pbar:
self.tk_pbar.configure(maximum=self.total_div, mode='determinate') self.tk_pbar.configure(maximum=self.total_div, mode='determinate')
self.start_time = time.time() self.start_time = time.time()
else: else:
n_div = (float(self.n) / self.divider) n_div = (float(self.n) / self.divisor)
self.pbar = tqdm(initial=n_div, total=self.total_div, unit=self.unit, dynamic_ncols=True, desc=self.prefix, bar_format=self.bar_format) self.pbar = tqdm(initial=n_div, total=self.total_div, unit=self.unit, dynamic_ncols=True, desc=self.prefix, bar_format=self.bar_format)
def update(self, n: int) -> None: def update(self, n: int) -> None:
@ -478,14 +487,19 @@ class ProgressBarWindow:
if cur_n > self.total: if cur_n > self.total:
return return
cur_time = time.time()
cur_n_div = (float(cur_n) / self.divisor)
msg = self._format_speed(cur_time, cur_n)
if not msg:
self.n = cur_n
return
if self.tk_window: if self.tk_window:
assert self.tk_text_var is not None assert self.tk_text_var is not None
assert self.tk_n_var is not None assert self.tk_n_var is not None
cur_n_div = (float(cur_n) / self.divider) msg = tqdm.format_meter(n=cur_n_div, total=self.total_div, elapsed=(cur_time - self.start_time), prefix=self.prefix, unit=self.unit, bar_format=msg)
self.elapsed_time = (time.time() - self.start_time)
msg = tqdm.format_meter(n=cur_n_div, total=self.total_div, elapsed=self.elapsed_time, prefix=self.prefix, unit=self.unit, bar_format=self.bar_format)
self.tk_text_var.set(msg) self.tk_text_var.set(msg)
self.tk_n_var.set(cur_n_div) self.tk_n_var.set(cur_n_div)
@ -506,20 +520,43 @@ class ProgressBarWindow:
g_taskbar.SetProgressValue(self.hwnd, cur_n, self.total) g_taskbar.SetProgressValue(self.hwnd, cur_n, self.total)
else: else:
assert self.pbar is not None assert self.pbar is not None
n_div = (float(n) / self.divider)
self.pbar.update(n_div) self.pbar.bar_format = msg
self.pbar.n = (float(self.n) / self.divisor)
self.pbar.update(float(n) / self.divisor)
self.n = cur_n self.n = cur_n
def _format_speed(self, cur_time: float, cur_n: int) -> str:
# Short-circuit: return immediately if our unit is set to MiB. We'll let tqdm do its thing.
if self.unit == 'MiB':
return self.bar_format
# I absolutely hate to roll out my own speed calculation for the UI, but tqdm offers no way to use different units for the progress/total/rate values.
# Please forgive me.
last_iter_time = (cur_time - self.prev_iter_time)
if (self.prev_n > 0) and (last_iter_time != cur_time) and (last_iter_time < 1.0) and (cur_n < self.total):
return ''
rate = (float(cur_n - self.prev_n) / 1048576.0)
if last_iter_time != cur_time:
rate /= last_iter_time
self.prev_n = cur_n
self.prev_iter_time = cur_time
return self.bar_format.replace('__custom_rate_fmt__', f'{rate:.2f} MiB/s')
def end(self) -> None: def end(self) -> None:
self.n = 0 self.n = 0
self.total = 0 self.total = 0
self.divider = 1 self.divisor = 1
self.total_div = 0 self.total_div = 0
self.prefix = '' self.prefix = ''
self.unit = 'B' self.unit = 'B'
self.start_time = 0 self.start_time = 0
self.elapsed_time = 0 self.prev_iter_time = 0
self.prev_n = 0
if self.tk_window: if self.tk_window:
assert self.tk_pbar is not None assert self.tk_pbar is not None
@ -604,7 +641,6 @@ def usbGetDeviceEndpoints() -> bool:
global g_usbEpIn, g_usbEpOut, g_usbEpMaxPacketSize global g_usbEpIn, g_usbEpOut, g_usbEpMaxPacketSize
assert g_logger is not None assert g_logger is not None
assert g_stopEvent is not None
cur_dev: Generator[usb.core.Device, Any, None] | None = None cur_dev: Generator[usb.core.Device, Any, None] | None = None
prev_dev: usb.core.Device | None = None prev_dev: usb.core.Device | None = None
@ -613,11 +649,13 @@ def usbGetDeviceEndpoints() -> bool:
usb_version = 0 usb_version = 0
if g_cliMode: if g_cliMode:
g_logger.info(f'Please connect a Nintendo Switch console running {USB_DEV_PRODUCT}.') g_logger.info(SERVER_START_MSG)
while True: while True:
# Check if the user decided to stop the server. # Check if the user decided to stop the server.
if not g_cliMode and g_stopEvent.is_set(): if not g_cliMode:
assert g_stopEvent is not None
if g_stopEvent.is_set():
g_stopEvent.clear() g_stopEvent.clear()
return False return False
@ -680,7 +718,7 @@ def usbGetDeviceEndpoints() -> bool:
g_logger.debug(f'Max packet size: 0x{g_usbEpMaxPacketSize:X} (USB {usb_version >> 8}.{(usb_version & 0xFF) >> 4}).\n') g_logger.debug(f'Max packet size: 0x{g_usbEpMaxPacketSize:X} (USB {usb_version >> 8}.{(usb_version & 0xFF) >> 4}).\n')
if g_cliMode: if g_cliMode:
g_logger.info(f'Exit {USB_DEV_PRODUCT} or disconnect your console at any time to close this script.') g_logger.info(SERVER_STOP_MSG)
return True return True
@ -772,7 +810,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
if not g_cliMode or (g_cliMode and not g_nspTransferMode): if not g_cliMode or (g_cliMode and not g_nspTransferMode):
g_logger.info(f'Receiving {file_type_str}: "{filename}".') g_logger.info(f'Receiving {file_type_str}: "{filename}".')
# Perform validity checks. # Perform sanity checks.
if (not g_nspTransferMode) and file_size and (nsp_header_size >= file_size): if (not g_nspTransferMode) and file_size and (nsp_header_size >= file_size):
g_logger.error('NSP header size must be smaller than the full NSP size!\n') g_logger.error('NSP header size must be smaller than the full NSP size!\n')
return USB_STATUS_MALFORMED_CMD return USB_STATUS_MALFORMED_CMD
@ -795,7 +833,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
g_nspFilePath = '' g_nspFilePath = ''
g_logger.debug('NSP transfer mode enabled!\n') g_logger.debug('NSP transfer mode enabled!\n')
# Perform additional validity checks and get a file object to work with. # Perform additional sanity checks and get a file object to work with.
if (not g_nspTransferMode) or (g_nspFile is None): if (not g_nspTransferMode) or (g_nspFile is None):
# Generate full, absolute path to the destination file. # Generate full, absolute path to the destination file.
fullpath = os.path.abspath(g_outputDir + os.path.sep + filename) fullpath = os.path.abspath(g_outputDir + os.path.sep + filename)
@ -852,7 +890,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
usbSendStatus(USB_STATUS_SUCCESS) usbSendStatus(USB_STATUS_SUCCESS)
# Start data transfer stage. # Start data transfer stage.
g_logger.debug(f'Data transfer started. Saving {file_type_str} to: "{printable_fullpath}".') g_logger.debug(f'Data transfer started. {"Saving" if file_type_str == "file" else "Writing"} {file_type_str} to: "{printable_fullpath}".')
offset = 0 offset = 0
blksize = USB_TRANSFER_BLOCK_SIZE blksize = USB_TRANSFER_BLOCK_SIZE
@ -877,11 +915,8 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
pbar_n = g_nspHeaderSize pbar_n = g_nspHeaderSize
pbar_file_size = g_nspSize pbar_file_size = g_nspSize
# Get progress bar unit and unit divider. These will be used to display and calculate size values using a specific size unit (B, KiB, MiB, GiB).
(unit, unit_divider) = utilsGetSizeUnitAndDivisor(pbar_file_size)
# Display progress bar window. # Display progress bar window.
g_progressBarWindow.start(pbar_file_size, pbar_n, unit_divider, prefix, unit) g_progressBarWindow.start(pbar_file_size, pbar_n, prefix)
else: else:
# Set current prefix (holds the filename for the current NSP file entry). # Set current prefix (holds the filename for the current NSP file entry).
g_progressBarWindow.set_prefix(prefix) g_progressBarWindow.set_prefix(prefix)
@ -970,7 +1005,11 @@ def usbHandleCancelFileTransfer(cmd_block: bytes) -> int:
g_logger.debug(f'Received CancelFileTransfer ({USB_CMD_START_SESSION:02X}) command.') g_logger.debug(f'Received CancelFileTransfer ({USB_CMD_START_SESSION:02X}) command.')
if g_nspTransferMode: if g_nspTransferMode:
if (g_nspSize > USB_TRANSFER_THRESHOLD) and (g_progressBarWindow is not None):
g_progressBarWindow.end()
utilsResetNspInfo(True) utilsResetNspInfo(True)
g_logger.warning('Transfer cancelled.') g_logger.warning('Transfer cancelled.')
return USB_STATUS_SUCCESS return USB_STATUS_SUCCESS
else: else:
@ -1344,7 +1383,7 @@ def uiInitialize() -> None:
console = LogConsole(g_tkScrolledTextLog) console = LogConsole(g_tkScrolledTextLog)
# Initialize progress bar window object. # Initialize progress bar window object.
bar_format = '{desc}\n\n{percentage:.2f}% - {n:.2f} / {total:.2f} {unit}\nElapsed time: {elapsed}. Remaining time: {remaining}.\nSpeed: {rate_fmt}.' bar_format = '{desc}\n\n{percentage:.2f}% - {n:.2f} / {total:.2f} {unit}\nElapsed time: {elapsed}. Remaining time: {remaining}.\nSpeed: __custom_rate_fmt__.'
g_progressBarWindow = ProgressBarWindow(bar_format, g_tkRoot, 'File transfer', False, uiHandleExitProtocolStub) g_progressBarWindow = ProgressBarWindow(bar_format, g_tkRoot, 'File transfer', False, uiHandleExitProtocolStub)
# Enter Tkinter main loop. # Enter Tkinter main loop.
@ -1360,7 +1399,7 @@ def cliInitialize() -> None:
console = LogConsole() console = LogConsole()
# Initialize progress bar window object. # Initialize progress bar window object.
bar_format = '{percentage:.2f}% |{bar}| {n:.2f}/{total:.2f} [{elapsed}<{remaining}, {rate_fmt}]' bar_format = '{percentage:.2f}% |{bar}| {n:.2f}/{total:.2f} {unit} [{elapsed}<{remaining}, __custom_rate_fmt__]'
g_progressBarWindow = ProgressBarWindow(bar_format) g_progressBarWindow = ProgressBarWindow(bar_format)
# Print info. # Print info.