added file counter; win32 longfile mod reverted to 'just-in-time'

This commit is contained in:
bilditup1 2023-12-08 23:44:03 -05:00
parent 32de77e8de
commit 360ebc6245

View file

@ -340,6 +340,7 @@ g_tkServerButton: tk.Button | None = None
g_tkTipMessage: int = 0
g_tkScrolledTextLog: scrolledtext.ScrolledText | None = None
g_tkVerboseCheckbox: tk.Checkbutton | None = None
g_tkLogToFileCheckbox: tk.Checkbutton | None = None
g_logger: logging.Logger | None = None
@ -544,7 +545,6 @@ class ProgressBarWindow:
self.unit = unit
if self.tk_pbar:
#print()
self.tk_pbar.configure(maximum=self.total_div, mode='determinate')
self.start_time = time.time()
else:
@ -633,12 +633,13 @@ def utilsLogException(exception_str: str) -> None:
if (not g_cliMode) and (g_logger is not None):
g_logger.debug(exception_str)
# Returns the extension of the given file without leading dot
def utilsGetExt(path_arg: str) -> str:
return os.path.splitext(path_arg)[1][1:]
def utilsGetPath(path_arg: str, fallback_path: str, is_file: bool, create: bool = False) -> str:
path = os.path.abspath(os.path.expanduser(os.path.expandvars(path_arg if path_arg else fallback_path)))
if not is_file and create:
os.makedirs(path, exist_ok=True)
@ -647,18 +648,23 @@ def utilsGetPath(path_arg: str, fallback_path: str, is_file: bool, create: bool
return path
# MSYS2 uses regular slashes by default, so we unconditionally
# change this to backslashes for consistency
def utilsGetWinSeparatedPath(path_arg: str) -> str:
global g_isWindows
return path_arg.replace('/', '\\') if g_isWindows else path_arg
# Prepends `\\?\` to enable ~64KiB long paths in Windows.
# ref0: https://learn.microsoft.com/en-us/windows/win32/fileio/naming-a-file?redirectedfrom=MSDN#win32-file-namespaces
# ref1: https://learn.microsoft.com/en-us/windows/win32/fileio/maximum-file-path-limitation?tabs=registry
# ref2: https://stackoverflow.com/a/15373771
# Also replaces '/' separator with proper '\' in case running under MSYS2 env.
# Also replaces '/' separator in case running under MSYS2 env.
def utilsGetWinFullPath(path_arg: str) -> str:
global g_isWindows
return ('\\\\?\\' + path_arg.replace("/", "\\")) if g_isWindows else path_arg
return ('\\\\?\\' + utilsGetWinSeparatedPath(path_arg)) if g_isWindows else path_arg
# Strips the preceding prefix when we want to print
# Does /not/ actually check if it's there, do not call if prefix absent!
def utilsStripWinPrefix(path_arg: str) -> str:
global g_isWindows
return path_arg[4:] if g_isWindows else path_arg
@ -666,12 +672,10 @@ def utilsStripWinPrefix(path_arg: str) -> str:
# Updates the path of the log
def utilsUpdateLogPath() -> None:
global g_logPath
g_logPath = os.path.abspath(g_outputDir + os.path.sep + \
"nxdt_host_" + datetime.now().strftime('%Y-%m-%d_%H%M%S') + '.log')
return
g_logPath = utilsGetWinFullPath(os.path.abspath(g_outputDir + os.path.sep + \
"nxdt_host_" + datetime.now().strftime('%Y-%m-%d_%H%M%S') + '.log'))
# Enable terminal colors on *nix and supported Windows (10.0.10586+)
def utilsSetupTerminal() -> None:
global g_terminalColors
@ -703,11 +707,8 @@ def utilsLogBasicScriptInfo() -> None:
g_logger.info('\nServer started...\n')
g_logger.info('Sys:\tPython ' + platform.python_version() + " on "+ g_osType+" "+g_osVersion)
# if g_isWindows:
# g_logger.info('Dst:\t' + utilsStripWinPrefix(g_outputDir))
# else:
# g_logger.info('Dst:\t' + g_outputDir)
g_logger.info('Dst:\t' + utilsStripWinPrefix(g_outputDir))
# Forward slashes changed to back in case running inside MSYS2
g_logger.info('Dst:\t' + utilsGetWinSeparatedPath(g_outputDir))
if g_logToFile:
g_logger.info('Log:\t' + g_logPath.rsplit(g_pathSep, 1)[-1])
@ -719,14 +720,15 @@ def utilsLogBasicScriptInfo() -> None:
else:
g_logger.info('Verbose logging is disabled.\n')
return
# On successful transfer, log elapsed time and (within reason) average transfer speed
def utilsLogTransferStats(elapsed_time: float) -> None:
def utilsLogTransferStats(elapsed_time: float, file_counter = 0) -> None:
global g_formattedFileSize, g_formattedFileUnit, g_fileSizeMiB
if g_formattedFileUnit != "B":
formatted_time = f'{elapsed_time:.2f}s' if round(elapsed_time < 60) else tqdm.format_interval(elapsed_time)
g_logger.info(f'{g_formattedFileSize:.2f} {g_formattedFileUnit} transferred in {formatted_time}.')
formatted_info = f'{g_formattedFileSize:.2f} {g_formattedFileUnit}'
formatted_info += f' in {file_counter} files' if file_counter > 1 else ' in one file'
formatted_info += f' transferred in {formatted_time}.'
g_logger.info(formatted_info)
if elapsed_time > float(1):
g_logger.info(f'Avg speed: {g_fileSizeMiB/elapsed_time:.2f} MiB/s\n')
else:
@ -736,7 +738,7 @@ def utilsIsValueAlignedToEndpointPacketSize(value: int) -> bool:
return bool((value & (g_usbEpMaxPacketSize - 1)) == 0)
def utilsResetNspInfo(delete: bool = False) -> None:
global g_nspTransferMode, g_nspSize, g_nspHeaderSize, g_nspRemainingSize, g_nspFile, g_nspFilePath
global g_nspTransferMode, g_nspSize, g_nspHeaderSize, g_nspRemainingSize, g_nspFile, g_nspFilePath, g_fileCounter
if g_nspFile:
g_nspFile.close()
@ -750,6 +752,8 @@ def utilsResetNspInfo(delete: bool = False) -> None:
g_nspRemainingSize = 0
g_nspFile = None
g_nspFilePath = ''
g_fileCounter = 0
def utilsGetSizeUnitAndDivisor(size: int) -> tuple[str, int]:
size_suffixes = [ 'B', 'KiB', 'MiB', 'GiB' ]
@ -777,7 +781,7 @@ def usbGetDeviceEndpoints() -> bool:
global g_usbEpIn, g_usbEpOut, g_usbEpMaxPacketSize, g_usbVer
assert g_logger is not None
#assert g_stopEvent is not None
#assert g_stopEvent is not None
cur_dev: Generator[usb.core.Device, Any, None] | None = None
prev_dev: usb.core.Device | None = None
@ -891,9 +895,6 @@ def usbHandleStartSession(cmd_block: bytes) -> int:
global g_nxdtVersionMajor, g_nxdtVersionMinor, g_nxdtVersionMicro, g_nxdtAbiVersionMajor, g_nxdtAbiVersionMinor, g_nxdtGitCommit
assert g_logger is not None
if g_cliMode:
print()
g_logger.debug(f'\nReceived StartSession ({USB_CMD_START_SESSION:02X}) command.')
@ -924,7 +925,7 @@ def usbHandleStartSession(cmd_block: bytes) -> int:
def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
global g_nspTransferMode, g_nspSize, g_nspHeaderSize, g_nspRemainingSize, g_nspFile, g_nspFilePath, g_outputDir, g_tkRoot, g_progressBarWindow
global g_formattedFileSize, g_formattedFileUnit, g_fileSizeMiB, g_startTime
global g_formattedFileSize, g_formattedFileUnit, g_fileSizeMiB, g_startTime, g_fileCounter
assert g_logger is not None
assert g_progressBarWindow is not None
@ -948,7 +949,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
if not g_nspTransferMode and not g_extractedFsDumpMode:
fp = filename.split('/') # fp[0] is '/' character
fp_len = len(fp)
ext = utilsGetExt(filename)
ext = utilsGetExt(filename) ##fp[fp_len-1][fp[fp_len-1].index(".")+1:]
utilsInitTransferVars(file_size)
g_logger.info("\nTransfer started!")
if (fp_len >= 2): # if file has parent directories
@ -977,8 +978,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
g_logger.info(f'\t{fp[4][:fp[4].index(".")]}')
g_logger.info(f'Size:\t{g_formattedFileSize:.2f} {g_formattedFileUnit}\n')
case 'NCA' | 'HFS':
printable_ext = ext#fp[fp_len-1][fp[fp_len-1].index(".")+1:] if fp[1] == 'HFS' else ext
g_logger.info(f'\nType:\t{fp[1]} ({fp[2]}) [{printable_ext.upper()}]')
g_logger.info(f'\nType:\t{fp[1]} ({fp[2]}) [{ext.upper()}]')
g_logger.info(f'Src:\t{fp[3][:fp[3].index("[")]}')
g_logger.info(f'\t{fp[3][fp[3].index("["):]}')
if fp[1] == 'NCA':
@ -999,6 +999,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
g_logger.warning(f'\n\tNovel source!!!')
g_logger.info:(f'"{filename}"')
else:
g_fileCounter += 1
(unit,div) = utilsGetSizeUnitAndDivisor(file_size)
fs = file_size / div
if g_extractedFsDumpMode:
@ -1038,19 +1039,19 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
# Perform additional validity checks and get a file object to work with.
if (not g_nspTransferMode) or (g_nspFile is None):
# Generate full, absolute path to the destination file.
fullpath = os.path.abspath(g_outputDir + os.path.sep + filename)
printable_fullpath = utilsStripWinPrefix(fullpath)
fullpath = utilsGetWinFullPath(os.path.abspath(g_outputDir + os.path.sep + filename))
#printable_fullpath = utilsStripWinPrefix(fullpath)
# Get parent directory path.
dirpath = os.path.dirname(fullpath)
# Create full directory tree.
os.makedirs(dirpath, exist_ok=True)
# Make sure the output filepath doesn't point to an existing directory.
if os.path.exists(fullpath) and (not os.path.isfile(fullpath)):
utilsResetNspInfo()
g_logger.error(f'Output filepath points to an existing directory! ("{printable_fullpath}").\n')
g_logger.error(f'Output filepath points to an existing directory! ("{fullpath}").\n') #printable_fullpath
return USB_STATUS_HOST_IO_ERROR
# Make sure we have enough free space.
@ -1078,7 +1079,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
fullpath = g_nspFilePath
dirpath = os.path.dirname(fullpath)
printable_fullpath = utilsStripWinPrefix(fullpath)
#printable_fullpath = utilsStripWinPrefix(fullpath)
# Check if we're dealing with an empty file or with the first SendFileProperties command from a NSP.
if (not file_size) or (g_nspTransferMode and file_size == g_nspSize):
@ -1092,7 +1093,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
usbSendStatus(USB_STATUS_SUCCESS)
# Start data transfer stage.
g_logger.debug(f'Data transfer started. Saving {file_type_str} to: "{printable_fullpath}".')
g_logger.debug(f'Data transfer started. Saving {file_type_str} to: "{fullpath}".') # printable_fullpath
offset = 0
blksize = USB_TRANSFER_BLOCK_SIZE
@ -1105,7 +1106,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
prefix = ''
else:
prefix = f'Current {file_type_str}: "{os.path.basename(filename)}".\n'
prefix += 'Use your console to cancel the file transfer if you wish to do so.'
prefix += 'Hold \'B\' on your console to cancel the file transfer if you wish to do so.'
if (not g_nspTransferMode) or g_nspRemainingSize == (g_nspSize - g_nspHeaderSize):
if not g_nspTransferMode:
@ -1139,9 +1140,6 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
# Start transfer process.
start_time = time.time()
# if not g_nspTransferMode:
# g_startTime = start_time
# print(f'1: {g_startTime}')
while offset < file_size:
# Update block size (if needed).
@ -1208,7 +1206,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
g_logger.info('\nTransfer complete!\n')
utilsLogTransferStats(elapsed_time)
g_logger.info('Your file may be found here:')
g_logger.info(f'{printable_fullpath}\n')
g_logger.info(f'{utilsStripWinPrefix(fullpath)}\n')
return USB_STATUS_SUCCESS
@ -1226,7 +1224,7 @@ def usbHandleCancelFileTransfer(cmd_block: bytes) -> int:
return USB_STATUS_MALFORMED_CMD
def usbHandleSendNspHeader(cmd_block: bytes) -> int:
global g_nspTransferMode, g_nspHeaderSize, g_nspRemainingSize, g_nspFile, g_nspFilePath, g_isWindows
global g_nspTransferMode, g_nspHeaderSize, g_nspRemainingSize, g_nspFile, g_nspFilePath, g_isWindows, g_fileCounter
assert g_logger is not None
assert g_nspFile is not None
@ -1252,16 +1250,14 @@ def usbHandleSendNspHeader(cmd_block: bytes) -> int:
g_nspFile.seek(0)
g_nspFile.write(cmd_block)
# Log successful NSP transfer (header distinguishes it from constituent NCA transfers)
# Log successful NSP transfer (header distinguishes it from constituent transfers)
g_logger.debug(f'Successfully wrote 0x{nsp_header_size:X}-byte long NSP header to "{g_nspFilePath}".')
if not g_logVerbose:
g_logger.info('\nTransfer complete!\n')
utilsLogTransferStats(time.time() - g_startTime)
utilsLogTransferStats(time.time() - g_startTime, g_fileCounter)
g_logger.info('Your file may be found here:')
g_nspFilePath = utilsStripWinPrefix(g_nspFilePath) #if g_isWindows else g_nspFilePath
g_logger.info(f'{g_nspFilePath}\n')
g_logger.info(f'{utilsStripWinPrefix(g_nspFilePath)}\n')
# Disable NSP transfer mode.
utilsResetNspInfo()
@ -1275,7 +1271,7 @@ def usbHandleEndSession(cmd_block: bytes) -> int:
def usbHandleStartExtractedFsDump(cmd_block: bytes) -> int:
assert g_logger is not None
global g_isWindows, g_outputDir, g_extractedFsDumpMode, g_extractedFsAbsRoot, g_formattedFileSize, g_formattedFileUnit, g_fileSizeMiB, g_startTime
global g_isWindows, g_outputDir, g_extractedFsDumpMode, g_extractedFsAbsRoot, g_formattedFileSize, g_formattedFileUnit, g_fileSizeMiB, g_startTime, g_fileCounter
g_logger.debug(f'Received StartExtractedFsDump ({USB_CMD_START_EXTRACTED_FS_DUMP:02X}) command.')
@ -1317,8 +1313,9 @@ def usbHandleStartExtractedFsDump(cmd_block: bytes) -> int:
else:
g_logger.debug(f'Starting extracted FS dump (size 0x{extracted_fs_size:X}, output relative path "{extracted_fs_root_path}").')
g_extractedFsAbsRoot = utilsStripWinPrefix(os.path.abspath(g_outputDir + os.path.sep + extracted_fs_root_path))
g_extractedFsAbsRoot = utilsGetWinSeparatedPath(os.path.abspath(g_outputDir + os.path.sep + extracted_fs_root_path)) + g_pathSep
g_extractedFsDumpMode = True
g_fileCounter = 0
g_startTime = time.time()
# Return status code.
@ -1326,16 +1323,17 @@ def usbHandleStartExtractedFsDump(cmd_block: bytes) -> int:
def usbHandleEndExtractedFsDump(cmd_block: bytes) -> int:
assert g_logger is not None
global g_extractedFsDumpMode, g_extractedFsAbsRoot
global g_extractedFsDumpMode, g_extractedFsAbsRoot, g_fileCounter
g_logger.debug(f'Received EndExtractedFsDump ({USB_CMD_END_EXTRACTED_FS_DUMP:02X}) command.')
g_logger.debug(f'\nFinished extracted FS dump.')
if not g_logVerbose:
g_logger.info(f'\nExtracted FS dump complete!\n')
utilsLogTransferStats(time.time() - g_startTime)
utilsLogTransferStats(time.time() - g_startTime, g_fileCounter)
g_logger.info(f'Your files may be found here:')
g_logger.info(f'{g_extractedFsAbsRoot}{g_pathSep}\n')
g_logger.info(f'{g_extractedFsAbsRoot}\n')
g_extractedFsDumpMode = False
g_extractedFsAbsRoot = ""
g_fileCounter = 0
return USB_STATUS_SUCCESS
def usbCommandHandler() -> None:
@ -1437,7 +1435,7 @@ def uiStopServer() -> None:
def uiStartServer() -> None:
uiUpdateOutputDir()
uiUpdateOutputDirectory()
# Set new log path for this session if logging to file is turned on.
if g_logToFile:
utilsUpdateLogPath()
@ -1484,11 +1482,12 @@ def uiToggleElements(flag: bool) -> None:
g_tkVerboseCheckbox.configure(state='disabled')
def uiChooseDirectory() -> None:
assert g_tkDirText is not None
dirtext = g_tkDirText.get('1.0', tk.END).strip()
initdir = dirtext if os.path.exists(dirtext) else INITIAL_DIR
# Using \\?\ longfile syntax for Windows will not work for tkinter.filedialog.askdirectory's initialdir parameter.
# User must choose a directory considered the 'normal' length on their system if using the UI.
# Thus, win32 users must choose a directory considered the 'normal' length on their system if using the UI.
dir = filedialog.askdirectory(parent=g_tkRoot, title='Select an output directory', initialdir=initdir, mustexist=True)
if dir:
@ -1499,16 +1498,18 @@ def uiUpdateDirectoryField(path: str) -> None:
assert g_tkDirText is not None
g_tkDirText.configure(state='normal')
g_tkDirText.delete('1.0', tk.END)
g_tkDirText.insert('1.0', path)
# For consistency, change path separator on Windows under MSYS2, since
# tkinter.filedialog.askdirectory will return a fwd-slashed path in that case.
g_tkDirText.insert('1.0', utilsGetWinSeparatedPath(path))
g_tkDirText.configure(state='disabled')
def uiUpdateOutputDir() -> None:
def uiUpdateOutputDirectory() -> None:
global g_outputDir
assert g_tkDirText is not None
g_outputDir = utilsGetWinFullPath(g_tkDirText.get('1.0', tk.END).strip())
# g_outputDir = utilsGetWinFullPath(g_outputDir) if g_isWindows else g_outputDir
# Note: longpaths for Windows are now set at the file rather than dir level
g_outputDir = g_tkDirText.get('1.0', tk.END).strip()
if not g_outputDir:
# We should never reach this, honestly.
@ -1544,7 +1545,7 @@ def uiHandleVerboseCheckbox() -> None:
assert g_logger is not None
assert g_logLevelIntVar is not None
global g_logVerbose
logLevel=g_logLevelIntVar.get()
logLevel = g_logLevelIntVar.get()
g_logger.setLevel(logLevel)
g_logVerbose = True if(logLevel == logging.DEBUG) else False
@ -1692,16 +1693,15 @@ def cliInitialize() -> None:
global g_progressBarWindow, g_outputDir, g_logToFile
assert g_logger is not None
# Unconditionally enable long paths on Windows.
g_outputDir = utilsGetWinFullPath(g_outputDir)
# Note: longpaths for Windows are now set at the file rather than dir level
# Determines whether to use colors in terminal and sets up accordingly.
utilsSetupTerminal()
# Set log path if logging to file specified at cmd line.
# NB, g_outputDir should be adjusted for Windows prior.
if g_logToFile:
utilsUpdateLogPath()
# Initialize console logger.
console = LogConsole()
@ -1729,9 +1729,8 @@ def main() -> int:
args = parser.parse_args()
# Update global flags.
g_cliMode = args.cli
g_outputDir = utilsGetPath(args.outdir, DEFAULT_DIR, False, True)
# Update global flags, other than g_outputDir, which should be set only after platform seciton
g_cliMode = args.cli
g_logToFile = args.log
g_logVerbose = args.verbose
@ -1752,7 +1751,18 @@ def main() -> int:
# ANSI colors in cmd.exe min. build
# ref: https://github.com/dart-lang/sdk/issues/28614#issuecomment-287282970
g_isWindows10 = (win_ver_major >= 10 and win_build >= 10586)
g_pathSep = os.path.sep if not g_isWindows else '\\'
g_pathSep = '\\'
else:
g_pathSep = os.path.sep
# utilsGetWinFullPath() enables output directories with longfile names on Windows.
# After creation by utilsGetPath(), the Windows prefix is stripped, as setting
# the long-path on a per-file basis in GUI mode, which uses the same codepath
# for actual file dumping, is more consistent across Windows environments,
# and needs comparatively less management during logging.
# g_outputDir = utilsGetPath(args.outdir, DEFAULT_DIR, False, True)
g_outputDir = utilsStripWinPrefix(utilsGetPath(utilsGetWinFullPath(args.outdir), utilsGetWinFullPath(DEFAULT_DIR), False, True))
# Setup logging mechanism.
logging.basicConfig(level=(logging.DEBUG if g_logVerbose else logging.INFO))