Implement DownloadTask, DownloadFileTask and DownloadDataTask classes.

Other changes include:

* AsyncTask: use a recursive mutex instead of atomics. Fixes build issues related to non-trivially-copyable elements.
* http: remove CURLOPT_ACCEPT_ENCODING option in httpPerformGetRequest().
This commit is contained in:
Pablo Curiel 2021-07-27 11:00:09 -04:00
parent 882a7581cc
commit eb97ae3de5
7 changed files with 261 additions and 23 deletions

View file

@ -133,7 +133,7 @@ SERVER_START_MSG = 'Please connect a Nintendo Switch console running {}.'.format
SERVER_STOP_MSG = 'Exit {} on your console or disconnect it at any time to stop the server.'.format(USB_DEV_PRODUCT)
# Default directory paths.
INITIAL_DIR = os.path.abspath(os.path.dirname(__file__))
INITIAL_DIR = os.path.abspath(os.path.dirname(sys.executable if getattr(sys, 'frozen', False) else __file__))
DEFAULT_DIR = (INITIAL_DIR + os.path.sep + USB_DEV_PRODUCT)
# Application icon (PNG).
@ -221,7 +221,7 @@ APP_ICON = b'iVBORw0KGgoAAAANSUhEUgAAADAAAAAwCAYAAABXAvmHAAAAAXNSR0IArs4c6QAAAAR
b'43EDnoiNHI8a8FRs5HjMgCdjI8cj7+rp2MhR/Z3p7b5gyzRyjN0ei80cwP+bQrjkWSh1LgAAAABJRU5ErkJggg=='
# Taskbar Type Library (TLB). Used under Windows 7 or greater.
TASKBAR_LIB_NAME = 'TaskbarLib.tlb'
TASKBAR_LIB_PATH = (INITIAL_DIR + os.path.sep + 'TaskbarLib.tlb')
TASKBAR_LIB = b'TVNGVAIAAQAAAAAACQQAAAAAAABBAAAAAQAAAAAAAAAOAAAA/////wAAAAAAAAAATgAAADMDAAAAAAAA/////xgAAAAgAAAAgAAAAP////8AAAAAAAAAAGQAAADIAAAA' + \
b'LAEAAJABAAD0AQAAWAIAALwCAAAgAwAAhAMAAOgDAABMBAAAsAQAABQFAAB8AQAAeAUAAP////8PAAAA/////wAAAAD/////DwAAAP////8AAAAA/////w8AAABMCAAA' + \
@ -1033,19 +1033,19 @@ def uiInitialize():
try:
import comtypes.client as cc
tlb_fp = open(TASKBAR_LIB_NAME, 'wb')
tlb_fp = open(TASKBAR_LIB_PATH, 'wb')
tlb_fp.write(base64.b64decode(TASKBAR_LIB))
tlb_fp.close()
del_tlb = True
g_tlb = cc.GetModule(TASKBAR_LIB_NAME)
g_tlb = cc.GetModule('TASKBAR_LIB_PATH')
g_taskbar = cc.CreateObject('{56FDF344-FD6D-11D0-958A-006097C9A090}', interface=g_tlb.ITaskbarList3)
g_taskbar.HrInit()
except:
traceback.print_exc()
if del_tlb: os.remove(TASKBAR_LIB_NAME)
if del_tlb: os.remove(TASKBAR_LIB_PATH)
# Create root Tkinter object.
g_tkRoot = tk.Tk()

View file

@ -29,7 +29,7 @@
#include <exception>
#include <future>
#include <atomic>
#include <mutex>
namespace nxdt::utils
{
@ -65,20 +65,25 @@ namespace nxdt::utils
class AsyncTask
{
private:
std::recursive_mutex mtx;
AsyncTaskStatus m_status = AsyncTaskStatus::PENDING;
Result m_result{};
std::future<Result> m_future{};
std::atomic<Progress> m_progress{};
std::atomic_bool m_cancelled = false;
std::atomic_bool m_rethrowException = false;
Progress m_progress{};
bool m_cancelled = false, m_rethrowException = false;
std::exception_ptr m_exceptionPtr{};
/* Runs on the calling thread after doInBackground() finishes execution. */
void finish(Result&& result)
{
std::lock_guard<std::recursive_mutex> lock(this->mtx);
/* Copy result. */
this->m_result = result;
/* Update status. */
this->m_status = AsyncTaskStatus::FINISHED;
/* Call appropiate post-execution function. */
if (this->isCancelled())
{
@ -87,11 +92,8 @@ namespace nxdt::utils
this->onPostExecute(this->m_result);
}
/* Update status. */
this->m_status = AsyncTaskStatus::FINISHED;
/* Rethrow asynchronous task exception (if available). */
if (this->m_rethrowException.load() && this->m_exceptionPtr) std::rethrow_exception(this->m_exceptionPtr);
if (this->m_rethrowException && this->m_exceptionPtr) std::rethrow_exception(this->m_exceptionPtr);
}
protected:
@ -139,11 +141,20 @@ namespace nxdt::utils
/* Stores the current progress inside the class. Runs on the asynchronous task thread. */
virtual void publishProgress(const Progress& progress)
{
std::lock_guard<std::recursive_mutex> lock(this->mtx);
/* Don't proceed if the task isn't running. */
if (this->getStatus() != AsyncTaskStatus::RUNNING || this->isCancelled()) return;
/* Update progress. */
this->m_progress.store(progress);
this->m_progress = progress;
}
/* Returns the current progress. May run on both threads. */
Progress getProgress(void)
{
std::lock_guard<std::recursive_mutex> lock(this->mtx);
return this->m_progress;
}
public:
@ -152,11 +163,13 @@ namespace nxdt::utils
/* Cancels the task. Runs on the calling thread. */
void cancel(void) noexcept
{
std::lock_guard<std::recursive_mutex> lock(this->mtx);
/* Return right away if the task has already completed, or if it has already been cancelled. */
if (this->getStatus() == AsyncTaskStatus::FINISHED || this->isCancelled()) return;
/* Update cancel flag. */
this->m_cancelled.store(true);
this->m_cancelled = true;
}
/* Starts the asynchronous task. Runs on the calling thread. */
@ -188,8 +201,9 @@ namespace nxdt::utils
try {
return this->postResult(this->doInBackground(params...));
} catch(...) {
std::lock_guard<std::recursive_mutex> lock(this->mtx);
this->cancel();
this->m_rethrowException.store(true);
this->m_rethrowException = true;
this->m_exceptionPtr = std::current_exception();
}
@ -264,17 +278,20 @@ namespace nxdt::utils
return this->m_status;
}
/* Returns true if the task was cancelled before it completed normally. Runs on both threads. */
/* Returns true if the task was cancelled before it completed normally. May be used on both threads. */
/* Can be used by the asynchronous task to return prematurely. */
bool isCancelled(void) noexcept
{
return this->m_cancelled.load();
std::lock_guard<std::recursive_mutex> lock(this->mtx);
return this->m_cancelled;
}
/* Used by the calling thread to refresh the task progress, preferrably inside a loop. Returns true if the task finished. */
/* If an exception is thrown by the asynchronous task, it will be rethrown by this function. */
bool loopCallback(void)
{
std::lock_guard<std::recursive_mutex> lock(this->mtx);
auto status = this->getStatus();
/* Return immediately if the task already finished. */
@ -289,8 +306,8 @@ namespace nxdt::utils
{
case std::future_status::timeout:
/* Update progress. */
this->onProgressUpdate(this->m_progress.load());
return false;
this->onProgressUpdate(this->m_progress);
break;
case std::future_status::ready:
/* Finish task. */
this->finish(this->m_future.get());

View file

@ -100,6 +100,7 @@
#define GITHUB_REPOSITORY_URL "https://github.com/DarkMatterCore/nxdumptool"
#define GITHUB_NEW_ISSUE_URL GITHUB_REPOSITORY_URL "/issues/new/choose"
#define NSWDB_XML_URL "http://nswdb.com/xml.php"
#define NSWDB_XML_PATH APP_BASE_PATH "NSWreleases.xml"
#define BOREALIS_URL "https://github.com/natinusala/borealis"

View file

@ -31,6 +31,8 @@
#include "core/ums.h"
#include "core/usb.h"
#include "async_task.hpp"
namespace nxdt::tasks
{
/* Used to hold status info data. */
@ -196,6 +198,220 @@ namespace nxdt::tasks
this->usb_host_event.unsubscribe(subscription);
}
};
typedef struct {
/// Fields set by DownloadTask::HttpProgressCallback().
size_t size; ///< Total download size.
size_t current; ///< Number of bytes downloaded thus far.
int percentage; ///< Progress percentage.
/// Fields set by DownloadTask::onProgressUpdate().
double speed; ///< Download speed expressed in KiB/s.
std::string eta; ///< Formatted ETA string.
} DownloadTaskProgress;
typedef brls::Event<const DownloadTaskProgress&> DownloadProgressEvent;
typedef std::pair<char*, size_t> DownloadDataResult;
/* Class template to asynchronously download data on a background thread. */
/* Automatically allocates and registers a RepeatingTask on its own, which is started along with the actual task when execute() is called. */
/* This internal RepeatingTask is guaranteed to work on the UI thread, and it is also automatically unregistered on object destruction. */
/* Progress updates are pushed through a DownloadProgressEvent. Make sure to register all event listeners before executing the task. */
template<typename Result, typename... Params>
class DownloadTask: public nxdt::utils::AsyncTask<DownloadTaskProgress, Result, Params...>
{
public:
/* Handles task progress updates on the calling thread. */
class DownloadTaskHandler: public brls::RepeatingTask
{
private:
DownloadTask<Result, Params...>* task = nullptr;
protected:
void run(retro_time_t current_time) override final
{
brls::RepeatingTask::run(current_time);
if (this->task) this->task->loopCallback();
}
public:
DownloadTaskHandler(retro_time_t interval, DownloadTask<Result, Params...>* task) : brls::RepeatingTask(interval), task(task) { }
};
private:
DownloadProgressEvent progress_event;
DownloadTaskHandler *task_handler = nullptr;
std::chrono::time_point<std::chrono::steady_clock> start_time{}, prev_time{};
size_t prev_current = 0;
protected:
/* Runs on the calling thread. */
void onCancelled(const Result& result) override final
{
(void)result;
/* Pause task handler. */
this->task_handler->pause();
}
/* Runs on the calling thread. */
void onPostExecute(const Result& result) override final
{
(void)result;
/* Pause task handler. */
this->task_handler->pause();
/* Update progress one last time. */
this->onProgressUpdate(this->getProgress());
}
/* Runs on the calling thread. */
void onPreExecute(void) override final
{
/* Start task handler. */
this->task_handler->start();
/* Set start time. */
this->start_time = this->prev_time = std::chrono::steady_clock::now();
}
/* Runs on the calling thread. */
void onProgressUpdate(const DownloadTaskProgress& progress) override final
{
/* Return immediately if there has been no progress at all, or if it the task has been cancelled. */
bool proceed = (progress.current > prev_current || (progress.current == prev_current && (!progress.size || progress.current >= progress.size)));
if (!proceed || this->isCancelled()) return;
/* Calculate time difference between the last progress update and the current one. */
/* Return immediately if it's less than 1 second, but only if this isn't the last chunk (or if the task is still running, if we don't know the download size). */
std::chrono::time_point<std::chrono::steady_clock> cur_time = std::chrono::steady_clock::now();
std::chrono::duration<double> diff_time = (cur_time - this->prev_time);
double diff_time_conv = diff_time.count();
if (diff_time_conv < 1.0 && ((progress.size && progress.current < progress.size) || this->getStatus() == nxdt::utils::AsyncTaskStatus::RUNNING)) return;
/* Calculate transferred data size difference between the last progress update and the current one. */
double diff_current = static_cast<double>(progress.current - prev_current);
/* Calculate download speed in kibibytes per second (KiB/s). */
double speed = ((diff_current / diff_time_conv) / 1024.0);
/* Calculate remaining data size in kibibytes (KiB) and ETA if we know the download size. */
double eta = 0.0;
if (progress.size)
{
double remaining = (static_cast<double>(progress.size - progress.current) / 1024.0);
eta = (remaining / speed);
}
/* Fill struct. */
DownloadTaskProgress new_progress = progress;
new_progress.speed = speed;
new_progress.eta = (progress.size ? fmt::format("{:02}H{:02}M{:02}S", std::fmod(eta, 86400.0) / 3600.0, std::fmod(eta, 3600.0) / 60.0, std::fmod(eta, 60.0)) : "");
/* Update class variables. */
this->prev_time = cur_time;
this->prev_current = progress.current;
/* Send updated progress to all subscribers. */
this->progress_event.fire(new_progress);
}
public:
/* Runs on the calling thread. */
DownloadTask(retro_time_t interval)
{
/* Create task handler. */
this->task_handler = new DownloadTaskHandler(interval, this);
}
/* Runs on the calling thread. */
~DownloadTask(void)
{
/* Stop task handler. Borealis' task manager will take care of deleting it. */
this->task_handler->stop();
/* Unregister all event listeners. */
this->progress_event.unsubscribeAll();
}
/* Runs on the asynchronous task thread. Required by CURL. */
/* Make sure to pass it to either httpDownloadFile() or httpDownloadData() with 'this' as the user pointer. */
static int HttpProgressCallback(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow)
{
(void)ultotal;
(void)ulnow;
DownloadTaskProgress progress = {0};
DownloadTask<Result, Params...>* task = static_cast<DownloadTask<Result, Params...>*>(clientp);
/* Don't proceed if we're dealing with an invalid task pointer, or if the task has been cancelled. */
if (!task || task->isCancelled()) return 1;
/* Fill struct. */
progress.size = static_cast<size_t>(dltotal);
progress.current = static_cast<size_t>(dlnow);
progress.percentage = static_cast<int>((progress.current * 100) / progress.size);
/* Push progress onto the class. */
task->publishProgress(progress);
return 0;
}
ALWAYS_INLINE DownloadProgressEvent::Subscription RegisterListener(DownloadProgressEvent::Callback cb)
{
return this->progress_event.subscribe(cb);
}
ALWAYS_INLINE void UnregisterListener(DownloadProgressEvent::Subscription subscription)
{
this->progress_event.unsubscribe(subscription);
}
};
/* Asynchronous task to download a file using an output path and a URL. */
class DownloadFileTask: public DownloadTask<bool, std::string, std::string, bool>
{
protected:
bool doInBackground(const std::string& path, const std::string& url, const bool& force_https) override final
{
/* If the process fails or if it's cancelled, httpDownloadFile() will take care of closing the incomplete output file and delete it. */
return httpDownloadFile(path.c_str(), url.c_str(), force_https, DownloadFileTask::HttpProgressCallback, this);
}
public:
DownloadFileTask(retro_time_t interval) : DownloadTask(interval) { }
};
/* Asynchronous task to store downloaded data into a dynamically allocated buffer using a URL. */
class DownloadDataTask: public DownloadTask<DownloadDataResult, std::string, bool>
{
protected:
DownloadDataResult doInBackground(const std::string& url, const bool& force_https)
{
char *buf = NULL;
size_t buf_size = 0;
/* If the process fails or if it's cancelled, httpDownloadData() will take care of freeing up the allocated memory and return NULL. */
buf = httpDownloadData(&buf_size, url.c_str(), force_https, DownloadDataTask::HttpProgressCallback, this);
return std::make_pair(buf, buf_size);
}
public:
DownloadDataTask(retro_time_t interval) : DownloadTask(interval) { }
};
}
#endif /* __TASKS_HPP__ */

@ -1 +1 @@
Subproject commit 097693eb5264941f8697902ea88a89e2efd25c11
Subproject commit ef8e8e96302064e52171204b75cd8f0145adc1ad

View file

@ -123,7 +123,6 @@ bool httpPerformGetRequest(const char *url, bool force_https, size_t *outsize, H
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0L);
curl_easy_setopt(curl, CURLOPT_MAXREDIRS, 50L);
curl_easy_setopt(curl, CURLOPT_TCP_KEEPALIVE, 1L);
curl_easy_setopt(curl, CURLOPT_ACCEPT_ENCODING, "");
curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, HTTP_CONNECT_TIMEOUT);
curl_easy_setopt(curl, CURLOPT_BUFFERSIZE, HTTP_BUFFER_SIZE);
curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, (long)(force_https ? CURL_HTTP_VERSION_2TLS : CURL_HTTP_VERSION_1_1));

View file

@ -78,7 +78,12 @@ namespace nxdt::views
/* Update NSWDB XML. */
brls::ListItem *update_nswdb_xml = new brls::ListItem("options_tab/update_nswdb_xml/label"_i18n, "options_tab/update_nswdb_xml/description"_i18n);
update_nswdb_xml->getClickEvent()->subscribe([this](brls::View* view) {
this->DisplayNotification("Not implemented.");
brls::Dialog *dialog = new brls::Dialog("this is a test");
dialog->setCancelable(false);
dialog->addButton("cancel?", [dialog](brls::View *view) {
dialog->close();
});
dialog->open(false);
});
this->addView(update_nswdb_xml);