Fix crashes related to tqdm.

Use our own child window with strings generated by tqdm.format_meter() instead of relying on tqdm_tk.
This commit is contained in:
Pablo Curiel 2021-03-15 23:28:06 -04:00
parent 651e5d5f87
commit e70979248a
2 changed files with 96 additions and 29 deletions

10
.gitignore vendored
View file

@ -1,4 +1,6 @@
build
__pycache__
dist
*.elf
*.nacp
*.nro
@ -7,11 +9,7 @@ build
*.pfs0
*.lst
*.tar.bz2
*.log
*.spec
/code_templates/tmp/*
/source/main.c
*.log
# Clion files
.idea
cmake-build-debug
CMakeLists.txt

View file

@ -40,11 +40,13 @@ import time
import struct
import usb.core
import usb.util
import warnings
import tkinter as tk
import tkinter.ttk as ttk
from tkinter import filedialog, messagebox, font, scrolledtext
from tqdm.tk import trange, tqdm_tk
from tqdm import tqdm
import base64
import io
@ -249,6 +251,76 @@ class LogConsole:
self.frame.after(100, self.poll_log_queue)
# Loosely based on tk.py from tqdm.
class ProgressBar:
def __init__(self, total, prefix='', unit='B', bar_format=None, tk_parent=None, window_title='', window_resize=False, window_protocol=None):
if (tk_parent is None) or (total <= 0):
raise Exception('Invalid arguments!')
self.n = 0
self.total = total
self.prefix = prefix
self.unit = unit
self.bar_format = bar_format
self.tk_parent = tk_parent
self.tk_window = tk.Toplevel(self.tk_parent)
if window_title:
self.tk_window.title(window_title)
self.tk_window.resizable(window_resize, window_resize)
if window_protocol:
self.tk_window.protocol('WM_DELETE_WINDOW', window_protocol)
self.tk_window.attributes('-topmost', True)
self.tk_window.after(0, lambda: self.tk_window.attributes('-topmost', False))
pbar_frame = ttk.Frame(self.tk_window, padding=5)
pbar_frame.pack()
self.tk_text_var = tk.StringVar(self.tk_window)
tk_label = ttk.Label(pbar_frame, textvariable=self.tk_text_var, wraplength=600, anchor='center', justify='center')
tk_label.pack()
self.tk_n_var = tk.DoubleVar(self.tk_window, value=0)
self.tk_pbar = ttk.Progressbar(pbar_frame, variable=self.tk_n_var, length=450)
self.tk_pbar.configure(maximum=self.total, mode='determinate')
self.tk_pbar.pack()
self.start = time.time()
self.update(0)
def close(self):
self.tk_parent.after(0, self.tk_window.destroy)
def update(self, n):
if (self.n + n) > self.total:
return
self.n += n
elapsed = (time.time() - self.start)
msg = tqdm.format_meter(n=self.n, total=self.total, elapsed=elapsed, prefix=self.prefix, unit=self.unit, bar_format=self.bar_format)
self.tk_text_var.set(msg)
self.tk_n_var.set(self.n)
def reset(self, total):
if (total <= 0):
raise Exception('Invalid total value!')
self.n = 0
self.total = total
self.tk_pbar.configure(maximum=self.total, mode='determinate')
self.start = time.time()
self.update(0)
def utilsIsValueAlignedToEndpointPacketSize(value):
return bool((value & (g_usbEpMaxPacketSize - 1)) == 0)
@ -496,20 +568,17 @@ def usbHandleSendFileProperties(cmd_block):
# Initialize progress bar.
(unit, unit_divisor) = utilsGetSizeUnitAndDivisor(file_size)
total = (float(file_size) / unit_divisor)
idx = filename.rfind(os.path.sep)
bar_format_filename = (filename[idx+1:] if (idx >= 0) else filename)
prefix_filename = (filename[idx+1:] if (idx >= 0) else filename)
bar_format = ('Current file: "%s".\n' % (bar_format_filename))
bar_format += 'Use your console to cancel the file transfer if you wish to do so.\n\n'
bar_format += '{percentage:.2f}% - {n:.2f} / {total:.2f} {unit}\nElapsed time: {elapsed}. Remaining time: {remaining}.\nSpeed: {rate_fmt}.'
prefix = ('Current file: "%s".\n' % (prefix_filename))
prefix += 'Use your console to cancel the file transfer if you wish to do so.\n\n'
pbar = tqdm_tk(total=(float(file_size) / unit_divisor), unit=unit, bar_format=bar_format, grab=False, tk_parent=g_tkRoot)
pbar._tk_window.title('File transfer')
pbar._tk_window.resizable(False, False)
pbar._tk_window.protocol('WM_DELETE_WINDOW', uiHandleExitProtocolStub)
bar_format = '{desc}{percentage:.2f}% - {n:.2f} / {total:.2f} {unit}\nElapsed time: {elapsed}. Remaining time: {remaining}.\nSpeed: {rate_fmt}.'
start_time = time.time()
pbar = ProgressBar(total, prefix, unit, bar_format, g_tkRoot, 'File transfer', False, uiHandleExitProtocolStub)
def cancelTransfer():
# Cancel file transfer.
@ -517,7 +586,6 @@ def usbHandleSendFileProperties(cmd_block):
os.remove(fullpath)
utilsResetNspInfo()
pbar.close()
pbar._tk_window.destroy()
while offset < file_size:
# Update block size (if needed).
@ -569,20 +637,18 @@ def usbHandleSendFileProperties(cmd_block):
# Update progress bar once per second.
pbar.update(float(chunk_size) / unit_divisor)
pbar.refresh()
#pbar.refresh()
elapsed_time = round(time.time() - pbar.start)
g_Logger.info('File transfer successfully completed in %s!\n' % (tqdm.format_interval(elapsed_time)))
# Close progress bar
pbar.close()
pbar._tk_window.destroy()
# Close file handle (if needed).
if g_nspTransferMode == False:
file.close()
# I'd like to get this info from tqdm but it's not possible.
elapsed_time = round(time.time() - start_time)
g_Logger.info('File transfer successfully completed in %us!\n' % (elapsed_time))
return USB_STATUS_SUCCESS
def usbHandleSendNspHeader(cmd_block):
@ -610,7 +676,7 @@ def usbHandleSendNspHeader(cmd_block):
g_nspFile.write(cmd_block)
g_nspFile.close()
g_Logger.debug('Successfully wrote 0x%X byte-long NSP header to "%s".' % (nsp_header_size, g_nspFilePath))
g_Logger.debug('Successfully wrote 0x%X byte-long NSP header to "%s".\n' % (nsp_header_size, g_nspFilePath))
# Disable NSP transfer mode.
utilsResetNspInfo()
@ -778,11 +844,8 @@ def uiScaleMeasure(measure):
def main():
global SCALE, g_tkRoot, g_tkCanvas, g_tkDirText, g_tkChooseDirButton, g_tkServerButton, g_tkTipMessage, g_tkScrolledTextLog
# Configure logging mechanism.
logging.basicConfig(level=logging.DEBUG)
"""if len(g_Logger.handlers) > 0:
log_stderr = g_Logger.handlers[0]
g_Logger.removeHandler(log_stderr)"""
# Disable warnings.
warnings.filterwarnings("ignore")
# Get OS information.
os_type = platform.system()
@ -874,6 +937,12 @@ def main():
g_tkCanvas.create_text(uiScaleMeasure(5), uiScaleMeasure(WINDOW_HEIGHT - 10), text="Copyright (c) {}, {}".format(COPYRIGHT_YEAR, USB_DEV_MANUFACTURER), anchor=tk.W)
# Configure logging mechanism.
logging.basicConfig(level=logging.DEBUG)
if len(g_Logger.handlers) > 0:
log_stderr = g_Logger.handlers[0]
g_Logger.removeHandler(log_stderr)
# Initialize console g_Logger.
console = LogConsole(g_tkScrolledTextLog)