host: switch to Nuitka-based EXE building.

Other changes include:

* host: rename windows_install_deps.py -> install_deps.py and update it to support multiple operating systems.
* host: restore assertions and fix most static analysis errors related to type aliases and checks.
* host: catch exceptions thrown by usb.core.find() in usbGetDeviceEndpoints().
* host: move code to unconditionally enable 32-bit paths under Windows to uiStartServer() and cliInitialize(), respectively.
* host: remove incomplete file if the ongoing transfer was cancelled and we're not dealing with a NSP file.
This commit is contained in:
Pablo Curiel 2023-11-26 22:10:42 +01:00
parent 300c650b8f
commit bd1b56e5c2
4 changed files with 126 additions and 110 deletions

7
.gitignore vendored
View file

@ -1,9 +1,10 @@
.vscode .vscode
build build
host/__pycache__ host/__pycache__
host/installer host/standalone
host/build host/nxdt_host.build
host/dist host/nxdt_host.dist
host/nxdt_host.onefile-build
host/nxdumptool host/nxdumptool
*.elf *.elf
*.nacp *.nacp

View file

@ -7,8 +7,11 @@
from subprocess import run from subprocess import run
from os.path import dirname, join from os.path import dirname, join
from sys import executable from sys import executable
from platform import system
root_dir = dirname(__file__) root_dir = dirname(__file__)
run([executable, '-m', 'pip', 'install', '-r', join(root_dir, 'requirements-win32.txt')]) requirements_file = ('requirements-win32.txt' if system() == 'Windows' else 'requirements.txt')
run([executable, '-m', 'pip', 'install', '-r', join(root_dir, requirements_file)])
input('Press enter to close') input('Press enter to close')

View file

@ -58,7 +58,7 @@ from tqdm import tqdm
from argparse import ArgumentParser from argparse import ArgumentParser
from io import BufferedWriter from io import BufferedWriter
from typing import List, Tuple, Any, Callable, Optional from typing import Generator, Any, Callable
# Scaling factors. # Scaling factors.
WINDOWS_SCALING_FACTOR = 96.0 WINDOWS_SCALING_FACTOR = 96.0
@ -137,8 +137,8 @@ SERVER_START_MSG = f'Please connect a Nintendo Switch console running {USB_DEV_P
SERVER_STOP_MSG = f'Exit {USB_DEV_PRODUCT} on your console or disconnect it at any time to stop the server.' SERVER_STOP_MSG = f'Exit {USB_DEV_PRODUCT} on your console or disconnect it at any time to stop the server.'
# Default directory paths. # Default directory paths.
INITIAL_DIR = os.path.abspath(os.path.dirname(sys.executable if getattr(sys, 'frozen', False) else __file__)) INITIAL_DIR = os.path.dirname(os.path.abspath(os.path.expanduser(os.path.expandvars(sys.argv[0]))))
DEFAULT_DIR = (INITIAL_DIR + os.path.sep + USB_DEV_PRODUCT) DEFAULT_DIR = os.path.join(INITIAL_DIR, USB_DEV_PRODUCT)
# Application icon (PNG). # Application icon (PNG).
# Embedded to load it as the icon for all windows using PhotoImage (which doesn't support ICO files) + wm_iconphoto. # Embedded to load it as the icon for all windows using PhotoImage (which doesn't support ICO files) + wm_iconphoto.
@ -225,7 +225,7 @@ APP_ICON = b'iVBORw0KGgoAAAANSUhEUgAAADAAAAAwCAYAAABXAvmHAAAAAXNSR0IArs4c6QAAAAR
b'43EDnoiNHI8a8FRs5HjMgCdjI8cj7+rp2MhR/Z3p7b5gyzRyjN0ei80cwP+bQrjkWSh1LgAAAABJRU5ErkJggg==' b'43EDnoiNHI8a8FRs5HjMgCdjI8cj7+rp2MhR/Z3p7b5gyzRyjN0ei80cwP+bQrjkWSh1LgAAAABJRU5ErkJggg=='
# Taskbar Type Library (TLB). Used under Windows 7 or greater. # Taskbar Type Library (TLB). Used under Windows 7 or greater.
TASKBAR_LIB_PATH = (INITIAL_DIR + os.path.sep + 'TaskbarLib.tlb') TASKBAR_LIB_PATH = os.path.join(INITIAL_DIR, 'TaskbarLib.tlb')
TASKBAR_LIB = b'TVNGVAIAAQAAAAAACQQAAAAAAABBAAAAAQAAAAAAAAAOAAAA/////wAAAAAAAAAATgAAADMDAAAAAAAA/////xgAAAAgAAAAgAAAAP////8AAAAAAAAAAGQAAADIAAAA' + \ TASKBAR_LIB = b'TVNGVAIAAQAAAAAACQQAAAAAAABBAAAAAQAAAAAAAAAOAAAA/////wAAAAAAAAAATgAAADMDAAAAAAAA/////xgAAAAgAAAAgAAAAP////8AAAAAAAAAAGQAAADIAAAA' + \
b'LAEAAJABAAD0AQAAWAIAALwCAAAgAwAAhAMAAOgDAABMBAAAsAQAABQFAAB8AQAAeAUAAP////8PAAAA/////wAAAAD/////DwAAAP////8AAAAA/////w8AAABMCAAA' + \ b'LAEAAJABAAD0AQAAWAIAALwCAAAgAwAAhAMAAOgDAABMBAAAsAQAABQFAAB8AQAAeAUAAP////8PAAAA/////wAAAAD/////DwAAAP////8AAAAA/////w8AAABMCAAA' + \
@ -303,7 +303,7 @@ TASKBAR_LIB = b'TVNGVAIAAQAAAAAACQQAAAAAAABBAAAAAQAAAAAAAAAOAAAA/////wAAAAAAAAAA
# Global variables used throughout the code. # Global variables used throughout the code.
g_cliMode: bool = False g_cliMode: bool = False
g_outputDir: str = '' g_outputDir: str = ''
g_logLevelIntVar: Optional[tk.IntVar] = None g_logLevelIntVar: tk.IntVar | None = None
g_osType: str = '' g_osType: str = ''
g_osVersion: str = '' g_osVersion: str = ''
@ -312,18 +312,18 @@ g_isWindows: bool = False
g_isWindowsVista: bool = False g_isWindowsVista: bool = False
g_isWindows7: bool = False g_isWindows7: bool = False
g_tkRoot: Optional[tk.Tk] = None g_tkRoot: tk.Tk | None = None
g_tkCanvas: Optional[tk.Canvas] = None g_tkCanvas: tk.Canvas | None = None
g_tkDirText: Optional[tk.Text] = None g_tkDirText: tk.Text | None = None
g_tkChooseDirButton: Optional[tk.Button] = None g_tkChooseDirButton: tk.Button | None = None
g_tkServerButton: Optional[tk.Button] = None g_tkServerButton: tk.Button | None = None
g_tkTipMessage: Any = None g_tkTipMessage: int = 0
g_tkScrolledTextLog: Optional[scrolledtext.ScrolledText] = None g_tkScrolledTextLog: scrolledtext.ScrolledText | None = None
g_tkVerboseCheckbox: Optional[tk.Checkbutton] = None g_tkVerboseCheckbox: tk.Checkbutton | None = None
g_logger: Optional[logging.Logger] = None g_logger: logging.Logger | None = None
g_stopEvent: Optional[threading.Event] = None g_stopEvent: threading.Event | None = None
g_tlb: Any = None g_tlb: Any = None
g_taskbar: Any = None g_taskbar: Any = None
@ -343,12 +343,12 @@ g_nspTransferMode: bool = False
g_nspSize: int = 0 g_nspSize: int = 0
g_nspHeaderSize: int = 0 g_nspHeaderSize: int = 0
g_nspRemainingSize: int = 0 g_nspRemainingSize: int = 0
g_nspFile: Optional[BufferedWriter] = None g_nspFile: BufferedWriter | None = None
g_nspFilePath: str = '' g_nspFilePath: str = ''
# Reference: https://beenje.github.io/blog/posts/logging-to-a-tkinter-scrolledtext-widget. # Reference: https://beenje.github.io/blog/posts/logging-to-a-tkinter-scrolledtext-widget.
class LogQueueHandler(logging.Handler): class LogQueueHandler(logging.Handler):
def __init__(self, log_queue: queue.Queue): def __init__(self, log_queue: queue.Queue) -> None:
super().__init__() super().__init__()
self.log_queue = log_queue self.log_queue = log_queue
@ -361,8 +361,8 @@ class LogQueueHandler(logging.Handler):
# Reference: https://beenje.github.io/blog/posts/logging-to-a-tkinter-scrolledtext-widget. # Reference: https://beenje.github.io/blog/posts/logging-to-a-tkinter-scrolledtext-widget.
class LogConsole: class LogConsole:
def __init__(self, scrolled_text: Optional[scrolledtext.ScrolledText] = None): def __init__(self, scrolled_text: scrolledtext.ScrolledText | None = None) -> None:
#assert g_logger is not None assert g_logger is not None
self.scrolled_text = scrolled_text self.scrolled_text = scrolled_text
self.frame = (self.scrolled_text.winfo_toplevel() if self.scrolled_text else None) self.frame = (self.scrolled_text.winfo_toplevel() if self.scrolled_text else None)
@ -406,7 +406,7 @@ class LogConsole:
class ProgressBarWindow: class ProgressBarWindow:
global g_tlb, g_taskbar global g_tlb, g_taskbar
def __init__(self, bar_format: str = '', tk_parent: Any = None, window_title: str = '', window_resize: bool = False, window_protocol: Optional[Callable] = None): def __init__(self, bar_format: str = '', tk_parent: Any = None, window_title: str = '', window_resize: bool = False, window_protocol: Callable | None = None) -> None:
self.n: int = 0 self.n: int = 0
self.total: int = 0 self.total: int = 0
self.divider: float = 1.0 self.divider: float = 1.0
@ -421,11 +421,11 @@ class ProgressBarWindow:
self.tk_parent = tk_parent self.tk_parent = tk_parent
self.tk_window = (tk.Toplevel(self.tk_parent) if self.tk_parent else None) self.tk_window = (tk.Toplevel(self.tk_parent) if self.tk_parent else None)
self.withdrawn = False self.withdrawn = False
self.tk_text_var: Optional[tk.StringVar] = None self.tk_text_var: tk.StringVar | None = None
self.tk_n_var: Optional[tk.DoubleVar] = None self.tk_n_var: tk.DoubleVar | None = None
self.tk_pbar: Optional[ttk.Progressbar] = None self.tk_pbar: ttk.Progressbar | None = None
self.pbar: Optional[tqdm] = None self.pbar: tqdm | None = None
if self.tk_window: if self.tk_window:
self.tk_window.withdraw() self.tk_window.withdraw()
@ -451,8 +451,8 @@ class ProgressBarWindow:
self.tk_pbar.configure(maximum=100, mode='indeterminate') self.tk_pbar.configure(maximum=100, mode='indeterminate')
self.tk_pbar.pack() self.tk_pbar.pack()
def __del__(self): def __del__(self) -> None:
if self.tk_parent: if self.tk_window:
self.tk_parent.after(0, self.tk_window.destroy) self.tk_parent.after(0, self.tk_window.destroy)
def start(self, total: int, n: int = 0, divider: int = 1, prefix: str = '', unit: str = 'B') -> None: def start(self, total: int, n: int = 0, divider: int = 1, prefix: str = '', unit: str = 'B') -> None:
@ -479,8 +479,8 @@ class ProgressBarWindow:
return return
if self.tk_window: if self.tk_window:
#assert self.tk_text_var is not None assert self.tk_text_var is not None
#assert self.tk_n_var is not None assert self.tk_n_var is not None
cur_n_div = (float(cur_n) / self.divider) cur_n_div = (float(cur_n) / self.divider)
self.elapsed_time = (time.time() - self.start_time) self.elapsed_time = (time.time() - self.start_time)
@ -505,7 +505,7 @@ class ProgressBarWindow:
if g_taskbar: if g_taskbar:
g_taskbar.SetProgressValue(self.hwnd, cur_n, self.total) g_taskbar.SetProgressValue(self.hwnd, cur_n, self.total)
else: else:
#assert self.pbar is not None assert self.pbar is not None
n_div = (float(n) / self.divider) n_div = (float(n) / self.divider)
self.pbar.update(n_div) self.pbar.update(n_div)
@ -522,7 +522,7 @@ class ProgressBarWindow:
self.elapsed_time = 0 self.elapsed_time = 0
if self.tk_window: if self.tk_window:
#assert self.tk_pbar is not None assert self.tk_pbar is not None
if g_taskbar: if g_taskbar:
g_taskbar.SetProgressState(self.hwnd, g_tlb.TBPF_NOPROGRESS) g_taskbar.SetProgressState(self.hwnd, g_tlb.TBPF_NOPROGRESS)
@ -535,7 +535,7 @@ class ProgressBarWindow:
self.tk_pbar.configure(maximum=100, mode='indeterminate') self.tk_pbar.configure(maximum=100, mode='indeterminate')
else: else:
#assert self.pbar is not None assert self.pbar is not None
self.pbar.close() self.pbar.close()
self.pbar = None self.pbar = None
print() print()
@ -543,7 +543,7 @@ class ProgressBarWindow:
def set_prefix(self, prefix) -> None: def set_prefix(self, prefix) -> None:
self.prefix = prefix self.prefix = prefix
g_progressBarWindow: Optional[ProgressBarWindow] = None g_progressBarWindow: ProgressBarWindow | None = None
def eprint(*args, **kwargs) -> None: def eprint(*args, **kwargs) -> None:
print(*args, file=sys.stderr, **kwargs) print(*args, file=sys.stderr, **kwargs)
@ -586,7 +586,7 @@ def utilsResetNspInfo(delete: bool = False) -> None:
g_nspFile = None g_nspFile = None
g_nspFilePath = '' g_nspFilePath = ''
def utilsGetSizeUnitAndDivisor(size: int) -> Tuple[str, int]: def utilsGetSizeUnitAndDivisor(size: int) -> tuple[str, int]:
size_suffixes = [ 'B', 'KiB', 'MiB', 'GiB' ] size_suffixes = [ 'B', 'KiB', 'MiB', 'GiB' ]
size_suffixes_count = len(size_suffixes) size_suffixes_count = len(size_suffixes)
@ -603,10 +603,11 @@ def utilsGetSizeUnitAndDivisor(size: int) -> Tuple[str, int]:
def usbGetDeviceEndpoints() -> bool: def usbGetDeviceEndpoints() -> bool:
global g_usbEpIn, g_usbEpOut, g_usbEpMaxPacketSize global g_usbEpIn, g_usbEpOut, g_usbEpMaxPacketSize
#assert g_logger is not None assert g_logger is not None
#assert g_stopEvent is not None assert g_stopEvent is not None
prev_dev = cur_dev = None cur_dev: Generator[usb.core.Device, Any, None] | None = None
prev_dev: usb.core.Device | None = None
usb_ep_in_lambda = lambda ep: usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_IN usb_ep_in_lambda = lambda ep: usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_IN
usb_ep_out_lambda = lambda ep: usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_OUT usb_ep_out_lambda = lambda ep: usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_OUT
usb_version = 0 usb_version = 0
@ -622,8 +623,20 @@ def usbGetDeviceEndpoints() -> bool:
# Find a connected USB device with a matching VID/PID pair. # Find a connected USB device with a matching VID/PID pair.
# Using == here to compare both device instances would also compare the backend, so we'll just compare certain elements manually. # Using == here to compare both device instances would also compare the backend, so we'll just compare certain elements manually.
cur_dev = usb.core.find(idVendor=USB_DEV_VID, idProduct=USB_DEV_PID) try:
if (cur_dev is None) or ((prev_dev is not None) and (cur_dev.bus == prev_dev.bus) and (cur_dev.address == prev_dev.address)): cur_dev = usb.core.find(find_all=False, idVendor=USB_DEV_VID, idProduct=USB_DEV_PID)
except:
if not g_cliMode:
utilsLogException(traceback.format_exc())
g_logger.error('\nFatal error ocurred while enumerating USB devices.')
if g_isWindows:
g_logger.error('\nTry reinstalling the libusbK driver using Zadig.')
return False
if (not isinstance(cur_dev, usb.core.Device)) or (isinstance(prev_dev, usb.core.Device) and (cur_dev.bus == prev_dev.bus) and (cur_dev.address == prev_dev.address)):
time.sleep(0.1) time.sleep(0.1)
continue continue
@ -672,8 +685,6 @@ def usbGetDeviceEndpoints() -> bool:
return True return True
def usbRead(size: int, timeout: int = -1) -> bytes: def usbRead(size: int, timeout: int = -1) -> bytes:
#assert g_logger is not None
rd = b'' rd = b''
try: try:
@ -682,13 +693,13 @@ def usbRead(size: int, timeout: int = -1) -> bytes:
except usb.core.USBError: except usb.core.USBError:
if not g_cliMode: if not g_cliMode:
utilsLogException(traceback.format_exc()) utilsLogException(traceback.format_exc())
g_logger.error('\nUSB timeout triggered or console disconnected.')
if g_logger is not None:
g_logger.error('\nUSB timeout triggered or console disconnected.')
return rd return rd
def usbWrite(data: bytes, timeout: int = -1) -> int: def usbWrite(data: bytes, timeout: int = -1) -> int:
#assert g_logger is not None
wr = 0 wr = 0
try: try:
@ -696,18 +707,20 @@ def usbWrite(data: bytes, timeout: int = -1) -> int:
except usb.core.USBError: except usb.core.USBError:
if not g_cliMode: if not g_cliMode:
utilsLogException(traceback.format_exc()) utilsLogException(traceback.format_exc())
g_logger.error('\nUSB timeout triggered or console disconnected.')
if g_logger is not None:
g_logger.error('\nUSB timeout triggered or console disconnected.')
return wr return wr
def usbSendStatus(code: int) -> bool: def usbSendStatus(code: int) -> bool:
status = struct.pack('<4sIH6p', USB_MAGIC_WORD, code, g_usbEpMaxPacketSize, b'') status = struct.pack('<4sIH6x', USB_MAGIC_WORD, code, g_usbEpMaxPacketSize)
return bool(usbWrite(status, USB_TRANSFER_TIMEOUT) == len(status)) return bool(usbWrite(status, USB_TRANSFER_TIMEOUT) == len(status))
def usbHandleStartSession(cmd_block: bytes) -> int: def usbHandleStartSession(cmd_block: bytes) -> int:
global g_nxdtVersionMajor, g_nxdtVersionMinor, g_nxdtVersionMicro, g_nxdtAbiVersionMajor, g_nxdtAbiVersionMinor, g_nxdtGitCommit global g_nxdtVersionMajor, g_nxdtVersionMinor, g_nxdtVersionMicro, g_nxdtAbiVersionMajor, g_nxdtAbiVersionMinor, g_nxdtGitCommit
#assert g_logger is not None assert g_logger is not None
if g_cliMode: if g_cliMode:
print() print()
@ -736,8 +749,8 @@ def usbHandleStartSession(cmd_block: bytes) -> int:
def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
global g_nspTransferMode, g_nspSize, g_nspHeaderSize, g_nspRemainingSize, g_nspFile, g_nspFilePath, g_outputDir, g_tkRoot, g_progressBarWindow global g_nspTransferMode, g_nspSize, g_nspHeaderSize, g_nspRemainingSize, g_nspFile, g_nspFilePath, g_outputDir, g_tkRoot, g_progressBarWindow
#assert g_logger is not None assert g_logger is not None
#assert g_progressBarWindow is not None assert g_progressBarWindow is not None
if g_cliMode and not g_nspTransferMode: if g_cliMode and not g_nspTransferMode:
print() print()
@ -786,10 +799,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
if (not g_nspTransferMode) or (g_nspFile is None): if (not g_nspTransferMode) or (g_nspFile is None):
# Generate full, absolute path to the destination file. # Generate full, absolute path to the destination file.
fullpath = os.path.abspath(g_outputDir + os.path.sep + filename) fullpath = os.path.abspath(g_outputDir + os.path.sep + filename)
printable_fullpath = (fullpath[4:] if g_isWindows else fullpath)
# Unconditionally enable 32-bit paths on Windows.
if g_isWindows:
fullpath = '\\\\?\\' + fullpath.replace("/", "\\")
# Get parent directory path. # Get parent directory path.
dirpath = os.path.dirname(fullpath) dirpath = os.path.dirname(fullpath)
@ -800,11 +810,11 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
# Make sure the output filepath doesn't point to an existing directory. # Make sure the output filepath doesn't point to an existing directory.
if os.path.exists(fullpath) and (not os.path.isfile(fullpath)): if os.path.exists(fullpath) and (not os.path.isfile(fullpath)):
utilsResetNspInfo() utilsResetNspInfo()
g_logger.error(f'Output filepath points to an existing directory! ("{fullpath}").\n') g_logger.error(f'Output filepath points to an existing directory! ("{printable_fullpath}").\n')
return USB_STATUS_HOST_IO_ERROR return USB_STATUS_HOST_IO_ERROR
# Make sure we have enough free space. # Make sure we have enough free space.
(total_space, used_space, free_space) = shutil.disk_usage(dirpath) (_, _, free_space) = shutil.disk_usage(dirpath)
if free_space <= file_size: if free_space <= file_size:
utilsResetNspInfo() utilsResetNspInfo()
g_logger.error('Not enough free space available in output volume!\n') g_logger.error('Not enough free space available in output volume!\n')
@ -826,12 +836,8 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
# Retrieve what we need using global variables. # Retrieve what we need using global variables.
file = g_nspFile file = g_nspFile
fullpath = g_nspFilePath fullpath = g_nspFilePath
# Unconditionally enable 32-bit paths on Windows.
if g_isWindows:
fullpath = '\\\\?\\' + fullpath.replace("/", "\\")
dirpath = os.path.dirname(fullpath) dirpath = os.path.dirname(fullpath)
printable_fullpath = (fullpath[4:] if g_isWindows else fullpath)
# Check if we're dealing with an empty file or with the first SendFileProperties command from a NSP. # Check if we're dealing with an empty file or with the first SendFileProperties command from a NSP.
if (not file_size) or (g_nspTransferMode and file_size == g_nspSize): if (not file_size) or (g_nspTransferMode and file_size == g_nspSize):
@ -846,7 +852,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
usbSendStatus(USB_STATUS_SUCCESS) usbSendStatus(USB_STATUS_SUCCESS)
# Start data transfer stage. # Start data transfer stage.
g_logger.debug(f'Data transfer started. Saving {file_type_str} to: "{fullpath}".') g_logger.debug(f'Data transfer started. Saving {file_type_str} to: "{printable_fullpath}".')
offset = 0 offset = 0
blksize = USB_TRANSFER_BLOCK_SIZE blksize = USB_TRANSFER_BLOCK_SIZE
@ -858,10 +864,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
# We're not using dynamic tqdm prefixes under CLI mode. # We're not using dynamic tqdm prefixes under CLI mode.
prefix = '' prefix = ''
else: else:
idx = filename.rfind(os.path.sep) prefix = f'Current {file_type_str}: "{os.path.basename(filename)}".\n'
prefix_filename = (filename[idx+1:] if (idx >= 0) else filename)
prefix = f'Current {file_type_str}: "{prefix_filename}".\n'
prefix += 'Use your console to cancel the file transfer if you wish to do so.' prefix += 'Use your console to cancel the file transfer if you wish to do so.'
if (not g_nspTransferMode) or g_nspRemainingSize == (g_nspSize - g_nspHeaderSize): if (not g_nspTransferMode) or g_nspRemainingSize == (g_nspSize - g_nspHeaderSize):
@ -885,8 +888,13 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
def cancelTransfer(): def cancelTransfer():
# Cancel file transfer. # Cancel file transfer.
utilsResetNspInfo(True) if g_nspTransferMode:
if use_pbar: utilsResetNspInfo(True)
else:
file.close()
os.remove(fullpath)
if use_pbar and (g_progressBarWindow is not None):
g_progressBarWindow.end() g_progressBarWindow.end()
# Start transfer process. # Start transfer process.
@ -917,7 +925,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
# Check if we're dealing with a CancelFileTransfer command. # Check if we're dealing with a CancelFileTransfer command.
if chunk_size == USB_CMD_HEADER_SIZE: if chunk_size == USB_CMD_HEADER_SIZE:
(magic, cmd_id, cmd_block_size) = struct.unpack_from('<4sII', chunk, 0) (magic, cmd_id, _) = struct.unpack_from('<4sII', chunk, 0)
if (magic == USB_MAGIC_WORD) and (cmd_id == USB_CMD_CANCEL_FILE_TRANSFER): if (magic == USB_MAGIC_WORD) and (cmd_id == USB_CMD_CANCEL_FILE_TRANSFER):
# Cancel file transfer. # Cancel file transfer.
cancelTransfer() cancelTransfer()
@ -957,7 +965,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
return USB_STATUS_SUCCESS return USB_STATUS_SUCCESS
def usbHandleCancelFileTransfer(cmd_block: bytes) -> int: def usbHandleCancelFileTransfer(cmd_block: bytes) -> int:
#assert g_logger is not None assert g_logger is not None
g_logger.debug(f'Received CancelFileTransfer ({USB_CMD_START_SESSION:02X}) command.') g_logger.debug(f'Received CancelFileTransfer ({USB_CMD_START_SESSION:02X}) command.')
@ -972,8 +980,8 @@ def usbHandleCancelFileTransfer(cmd_block: bytes) -> int:
def usbHandleSendNspHeader(cmd_block: bytes) -> int: def usbHandleSendNspHeader(cmd_block: bytes) -> int:
global g_nspTransferMode, g_nspHeaderSize, g_nspRemainingSize, g_nspFile, g_nspFilePath global g_nspTransferMode, g_nspHeaderSize, g_nspRemainingSize, g_nspFile, g_nspFilePath
#assert g_logger is not None assert g_logger is not None
#assert g_nspFile is not None assert g_nspFile is not None
nsp_header_size = len(cmd_block) nsp_header_size = len(cmd_block)
@ -1004,12 +1012,12 @@ def usbHandleSendNspHeader(cmd_block: bytes) -> int:
return USB_STATUS_SUCCESS return USB_STATUS_SUCCESS
def usbHandleEndSession(cmd_block: bytes) -> int: def usbHandleEndSession(cmd_block: bytes) -> int:
#assert g_logger is not None assert g_logger is not None
g_logger.debug(f'Received EndSession ({USB_CMD_END_SESSION:02X}) command.') g_logger.debug(f'Received EndSession ({USB_CMD_END_SESSION:02X}) command.')
return USB_STATUS_SUCCESS return USB_STATUS_SUCCESS
def usbHandleStartExtractedFsDump(cmd_block: bytes) -> int: def usbHandleStartExtractedFsDump(cmd_block: bytes) -> int:
#assert g_logger is not None assert g_logger is not None
g_logger.debug(f'Received StartExtractedFsDump ({USB_CMD_START_EXTRACTED_FS_DUMP:02X}) command.') g_logger.debug(f'Received StartExtractedFsDump ({USB_CMD_START_EXTRACTED_FS_DUMP:02X}) command.')
@ -1027,13 +1035,13 @@ def usbHandleStartExtractedFsDump(cmd_block: bytes) -> int:
return USB_STATUS_SUCCESS return USB_STATUS_SUCCESS
def usbHandleEndExtractedFsDump(cmd_block: bytes) -> int: def usbHandleEndExtractedFsDump(cmd_block: bytes) -> int:
#assert g_logger is not None assert g_logger is not None
g_logger.debug(f'Received EndExtractedFsDump ({USB_CMD_END_EXTRACTED_FS_DUMP:02X}) command.') g_logger.debug(f'Received EndExtractedFsDump ({USB_CMD_END_EXTRACTED_FS_DUMP:02X}) command.')
g_logger.info(f'Finished extracted FS dump.') g_logger.info(f'Finished extracted FS dump.')
return USB_STATUS_SUCCESS return USB_STATUS_SUCCESS
def usbCommandHandler() -> None: def usbCommandHandler() -> None:
#assert g_logger is not None assert g_logger is not None
cmd_dict = { cmd_dict = {
USB_CMD_START_SESSION: usbHandleStartSession, USB_CMD_START_SESSION: usbHandleStartSession,
@ -1054,8 +1062,8 @@ def usbCommandHandler() -> None:
if not g_cliMode: if not g_cliMode:
# Update UI. # Update UI.
#assert g_tkCanvas is not None assert g_tkCanvas is not None
#assert g_tkServerButton is not None assert g_tkServerButton is not None
g_tkCanvas.itemconfigure(g_tkTipMessage, state='normal', text=SERVER_STOP_MSG) g_tkCanvas.itemconfigure(g_tkTipMessage, state='normal', text=SERVER_STOP_MSG)
g_tkServerButton.configure(state='disabled') g_tkServerButton.configure(state='disabled')
@ -1123,13 +1131,13 @@ def usbCommandHandler() -> None:
def uiStopServer() -> None: def uiStopServer() -> None:
# Signal the shared stop event. # Signal the shared stop event.
#assert g_stopEvent is not None assert g_stopEvent is not None
g_stopEvent.set() g_stopEvent.set()
def uiStartServer() -> None: def uiStartServer() -> None:
global g_outputDir global g_outputDir
#assert g_tkDirText is not None assert g_tkDirText is not None
g_outputDir = g_tkDirText.get('1.0', tk.END).strip() g_outputDir = g_tkDirText.get('1.0', tk.END).strip()
if not g_outputDir: if not g_outputDir:
@ -1137,6 +1145,10 @@ def uiStartServer() -> None:
messagebox.showerror('Error', 'You must provide an output directory!', parent=g_tkRoot) messagebox.showerror('Error', 'You must provide an output directory!', parent=g_tkRoot)
return return
# Unconditionally enable 32-bit paths on Windows.
if g_isWindows:
g_outputDir = '\\\\?\\' + g_outputDir
# Make sure the full directory tree exists. # Make sure the full directory tree exists.
try: try:
os.makedirs(g_outputDir, exist_ok=True) os.makedirs(g_outputDir, exist_ok=True)
@ -1153,11 +1165,11 @@ def uiStartServer() -> None:
server_thread.start() server_thread.start()
def uiToggleElements(flag: bool) -> None: def uiToggleElements(flag: bool) -> None:
#assert g_tkRoot is not None assert g_tkRoot is not None
#assert g_tkChooseDirButton is not None assert g_tkChooseDirButton is not None
#assert g_tkServerButton is not None assert g_tkServerButton is not None
#assert g_tkCanvas is not None assert g_tkCanvas is not None
#assert g_tkVerboseCheckbox is not None assert g_tkVerboseCheckbox is not None
if flag: if flag:
g_tkRoot.protocol('WM_DELETE_WINDOW', uiHandleExitProtocol) g_tkRoot.protocol('WM_DELETE_WINDOW', uiHandleExitProtocol)
@ -1168,7 +1180,7 @@ def uiToggleElements(flag: bool) -> None:
g_tkVerboseCheckbox.configure(state='normal') g_tkVerboseCheckbox.configure(state='normal')
else: else:
#assert g_tkScrolledTextLog is not None assert g_tkScrolledTextLog is not None
g_tkRoot.protocol('WM_DELETE_WINDOW', uiHandleExitProtocolStub) g_tkRoot.protocol('WM_DELETE_WINDOW', uiHandleExitProtocolStub)
@ -1188,14 +1200,14 @@ def uiChooseDirectory() -> None:
uiUpdateDirectoryField(os.path.abspath(dir)) uiUpdateDirectoryField(os.path.abspath(dir))
def uiUpdateDirectoryField(path: str) -> None: def uiUpdateDirectoryField(path: str) -> None:
#assert g_tkDirText is not None assert g_tkDirText is not None
g_tkDirText.configure(state='normal') g_tkDirText.configure(state='normal')
g_tkDirText.delete('1.0', tk.END) g_tkDirText.delete('1.0', tk.END)
g_tkDirText.insert('1.0', path) g_tkDirText.insert('1.0', path)
g_tkDirText.configure(state='disabled') g_tkDirText.configure(state='disabled')
def uiHandleExitProtocol() -> None: def uiHandleExitProtocol() -> None:
#assert g_tkRoot is not None assert g_tkRoot is not None
g_tkRoot.destroy() g_tkRoot.destroy()
def uiHandleExitProtocolStub() -> None: def uiHandleExitProtocolStub() -> None:
@ -1205,8 +1217,8 @@ def uiScaleMeasure(measure: int) -> int:
return round(float(measure) * SCALE) return round(float(measure) * SCALE)
def uiHandleVerboseCheckbox() -> None: def uiHandleVerboseCheckbox() -> None:
#assert g_logger is not None assert g_logger is not None
#assert g_logLevelIntVar is not None assert g_logLevelIntVar is not None
g_logger.setLevel(g_logLevelIntVar.get()) g_logger.setLevel(g_logLevelIntVar.get())
def uiInitialize() -> None: def uiInitialize() -> None:
@ -1340,9 +1352,9 @@ def uiInitialize() -> None:
g_tkRoot.mainloop() g_tkRoot.mainloop()
def cliInitialize() -> None: def cliInitialize() -> None:
global g_progressBarWindow global g_progressBarWindow, g_outputDir
#assert g_logger is not None assert g_logger is not None
# Initialize console logger. # Initialize console logger.
console = LogConsole() console = LogConsole()
@ -1352,8 +1364,12 @@ def cliInitialize() -> None:
g_progressBarWindow = ProgressBarWindow(bar_format) g_progressBarWindow = ProgressBarWindow(bar_format)
# Print info. # Print info.
g_logger.info('\n' + SCRIPT_TITLE + '. ' + COPYRIGHT_TEXT + '.') g_logger.info(f'\n{SCRIPT_TITLE}. {COPYRIGHT_TEXT}.')
g_logger.info('Output directory: "' + g_outputDir + '".\n') g_logger.info(f'Output directory: "{g_outputDir}".\n')
# Unconditionally enable 32-bit paths on Windows.
if g_isWindows:
g_outputDir = '\\\\?\\' + g_outputDir
# Start USB command handler directly. # Start USB command handler directly.
usbCommandHandler() usbCommandHandler()
@ -1365,9 +1381,9 @@ def main() -> int:
warnings.filterwarnings("ignore") warnings.filterwarnings("ignore")
# Parse command line arguments. # Parse command line arguments.
parser = ArgumentParser(description=SCRIPT_TITLE + '. ' + COPYRIGHT_TEXT + '.') parser = ArgumentParser(description=f'{SCRIPT_TITLE}. {COPYRIGHT_TEXT}.')
parser.add_argument('-c', '--cli', required=False, action='store_true', default=False, help='Start the script in CLI mode.') parser.add_argument('-c', '--cli', required=False, action='store_true', default=False, help='Start the script in CLI mode.')
parser.add_argument('-o', '--outdir', required=False, type=str, metavar='DIR', help='Path to output directory. Defaults to "' + DEFAULT_DIR + '".') parser.add_argument('-o', '--outdir', required=False, type=str, metavar='DIR', help=f'Path to output directory. Defaults to "{DEFAULT_DIR}".')
parser.add_argument('-v', '--verbose', required=False, action='store_true', default=False, help='Enable verbose output.') parser.add_argument('-v', '--verbose', required=False, action='store_true', default=False, help='Enable verbose output.')
args = parser.parse_args() args = parser.parse_args()
@ -1415,7 +1431,7 @@ if __name__ == "__main__":
except KeyboardInterrupt: except KeyboardInterrupt:
time.sleep(0.2) time.sleep(0.2)
print('\nScript interrupted.') print('\nScript interrupted.')
except Exception as e: except:
utilsLogException(traceback.format_exc()) utilsLogException(traceback.format_exc())
try: try:

View file

@ -3,26 +3,22 @@
set scriptdir=%~dp0 set scriptdir=%~dp0
set scriptdir=%scriptdir:~0,-1% set scriptdir=%scriptdir:~0,-1%
set venvname=installer set venvname=standalone
set venvscripts=%scriptdir%\%venvname%\Scripts set venvscripts=%scriptdir%\%venvname%\Scripts
set venvpython=%venvscripts%\python.exe set venvpython=%venvscripts%\python.exe
set venvpyinstaller=%venvscripts%\pyinstaller.exe
cd /D "%scriptdir%" cd /D "%scriptdir%"
python -m venv "%venvname%" python -m venv "%venvname%"
"%venvpython%" -m pip install --upgrade pyinstaller -r requirements-win32.txt "%venvpython%" -m pip install --upgrade nuitka -r requirements-win32.txt
"%venvpyinstaller%" -y --clean --log-level WARN -F --add-binary "C:\Windows\System32\libusb0.dll;." -w -i nxdt.ico nxdt_host.py "%venvpython%" -m nuitka --standalone --onefile --deployment --disable-console --windows-icon-from-ico=nxdt.ico --enable-plugin=tk-inter nxdt_host.py
move dist\nxdt_host.exe nxdt_host.exe rmdir /s /q nxdt_host.build
rmdir /s /q nxdt_host.dist
rmdir /s /q __pycache__ rmdir /s /q nxdt_host.onefile-build
rmdir /s /q build rmdir /s /q standalone
rmdir /s /q dist
rmdir /s /q installer
del nxdt_host.spec
pause pause