From 0ea3e6dbe2288854d9d4a971fc6539c2e740a95a Mon Sep 17 00:00:00 2001 From: Kahrl Date: Thu, 29 Aug 2013 05:04:56 +0200 Subject: Implement httpfetch module and initialize it from main() Add curl_parallel_limit setting that will replace media_fetch_threads in a later commit. Fix a typo in MutexedQueue::pop_back() that made it impossible to compile code that used this function. (Noticed this while implementing httpfetch.) --- src/httpfetch.cpp | 718 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 718 insertions(+) create mode 100644 src/httpfetch.cpp (limited to 'src/httpfetch.cpp') diff --git a/src/httpfetch.cpp b/src/httpfetch.cpp new file mode 100644 index 000000000..4342a8b2a --- /dev/null +++ b/src/httpfetch.cpp @@ -0,0 +1,718 @@ +/* +Minetest +Copyright (C) 2013 celeron55, Perttu Ahola + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU Lesser General Public License as published by +the Free Software Foundation; either version 2.1 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public License along +with this program; if not, write to the Free Software Foundation, Inc., +51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +*/ + +#include "httpfetch.h" +#include +#include +#include +#include +#include +#include "jthread/jevent.h" +#include "config.h" +#include "exceptions.h" +#include "debug.h" +#include "log.h" +#include "util/container.h" +#include "util/thread.h" +#include "socket.h" // for select() + +JMutex g_httpfetch_mutex; +std::map > g_httpfetch_results; + +static void httpfetch_deliver_result(const HTTPFetchResult &fetchresult) +{ + unsigned long caller = fetchresult.caller; + if (caller != HTTPFETCH_DISCARD) { + JMutexAutoLock lock(g_httpfetch_mutex); + g_httpfetch_results[caller].push_back(fetchresult); + } +} + +static void httpfetch_request_clear(unsigned long caller); + +unsigned long httpfetch_caller_alloc() +{ + JMutexAutoLock lock(g_httpfetch_mutex); + + // Check each caller ID except HTTPFETCH_DISCARD + const unsigned long discard = HTTPFETCH_DISCARD; + for (unsigned long caller = discard + 1; caller != discard; ++caller) { + std::map >::iterator + it = g_httpfetch_results.find(caller); + if (it == g_httpfetch_results.end()) { + verbosestream<<"httpfetch_caller_alloc: allocating " + < >::iterator + it = g_httpfetch_results.find(caller); + if (it == g_httpfetch_results.end()) + return false; + + // Check that result queue is nonempty + std::list &callerresults = it->second; + if (callerresults.empty()) + return false; + + // Pop first result + fetchresult = callerresults.front(); + callerresults.pop_front(); + return true; +} + +#if USE_CURL +#include + +/* + USE_CURL is on: use cURL based httpfetch implementation +*/ + +static size_t httpfetch_writefunction( + char *ptr, size_t size, size_t nmemb, void *userdata) +{ + std::ostringstream *stream = (std::ostringstream*)userdata; + size_t count = size * nmemb; + stream->write(ptr, count); + return count; +} + +static size_t httpfetch_discardfunction( + char *ptr, size_t size, size_t nmemb, void *userdata) +{ + return size * nmemb; +} + +class CurlHandlePool +{ + std::list handles; + +public: + CurlHandlePool() {} + ~CurlHandlePool() + { + for (std::list::iterator it = handles.begin(); + it != handles.end(); ++it) { + curl_easy_cleanup(*it); + } + } + CURL * alloc() + { + CURL *curl; + if (handles.empty()) { + curl = curl_easy_init(); + if (curl == NULL) { + errorstream<<"curl_easy_init returned NULL"<alloc(); + if (curl != NULL) { + // Set static cURL options + curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1); + curl_easy_setopt(curl, CURLOPT_FAILONERROR, 1); + curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1); + curl_easy_setopt(curl, CURLOPT_MAXREDIRS, 1); + +#if LIBCURL_VERSION_NUM >= 0x071304 + // Restrict protocols so that curl vulnerabilities in + // other protocols don't affect us. + // These settings were introduced in curl 7.19.4. + long protocols = + CURLPROTO_HTTP | + CURLPROTO_HTTPS | + CURLPROTO_FTP | + CURLPROTO_FTPS; + curl_easy_setopt(curl, CURLOPT_PROTOCOLS, protocols); + curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS, protocols); +#endif + + // Set cURL options based on HTTPFetchRequest + curl_easy_setopt(curl, CURLOPT_URL, + request.url.c_str()); + curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, + request.timeout); + curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT_MS, + request.connect_timeout); + // Set up a write callback that writes to the + // ostringstream ongoing->oss, unless the data + // is to be discarded + if (request.caller == HTTPFETCH_DISCARD) { + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, + httpfetch_discardfunction); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, NULL); + } + else { + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, + httpfetch_writefunction); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &oss); + } + // Set POST (or GET) data + if (request.post_fields.empty()) { + curl_easy_setopt(curl, CURLOPT_HTTPGET, 1); + } + else { + curl_easy_setopt(curl, CURLOPT_POST, 1); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, + request.post_fields.size()); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, + request.post_fields.c_str()); + // request.post_fields must now *never* be + // modified until CURLOPT_POSTFIELDS is cleared + } + // Set additional HTTP headers + for (size_t i = 0; i < request.extra_headers.size(); ++i) { + httpheader = curl_slist_append( + httpheader, + request.extra_headers[i].c_str()); + } + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, httpheader); + } + } + + CURLcode start(CURLM *multi_) + { + if (curl == NULL) + return CURLE_FAILED_INIT; + + if (multi_) { + // Multi interface (async) + CURLMcode mres = curl_multi_add_handle(multi_, curl); + if (mres != CURLM_OK) { + errorstream<<"curl_multi_add_handle" + <<" returned error code "<free(curl); + } +}; + +class CurlFetchThread : public SimpleThread +{ +protected: + enum RequestType { + RT_FETCH, + RT_CLEAR, + RT_WAKEUP, + }; + + struct Request { + RequestType type; + HTTPFetchRequest fetchrequest; + Event *event; + }; + + CURLM *m_multi; + MutexedQueue m_requests; + size_t m_parallel_limit; + + // Variables exclusively used within thread + std::vector m_all_ongoing; + std::list m_queued_fetches; + +public: + CurlFetchThread(int parallel_limit) + { + if (parallel_limit >= 1) + m_parallel_limit = parallel_limit; + else + m_parallel_limit = 1; + } + + void requestFetch(const HTTPFetchRequest &fetchrequest) + { + Request req; + req.type = RT_FETCH; + req.fetchrequest = fetchrequest; + req.event = NULL; + m_requests.push_back(req); + } + + void requestClear(unsigned long caller, Event *event) + { + Request req; + req.type = RT_CLEAR; + req.fetchrequest.caller = caller; + req.event = event; + m_requests.push_back(req); + } + + void requestWakeUp() + { + Request req; + req.type = RT_WAKEUP; + req.event = NULL; + m_requests.push_back(req); + } + +protected: + // Handle a request from some other thread + // E.g. new fetch; clear fetches for one caller; wake up + void processRequest(const Request &req) + { + if (req.type == RT_FETCH) { + // New fetch, queue until there are less + // than m_parallel_limit ongoing fetches + m_queued_fetches.push_back(req.fetchrequest); + + // see processQueued() for what happens next + + } + else if (req.type == RT_CLEAR) { + unsigned long caller = req.fetchrequest.caller; + + // Abort all ongoing fetches for the caller + for (std::vector::iterator + it = m_all_ongoing.begin(); + it != m_all_ongoing.end();) { + if ((*it)->request.caller == caller) { + delete (*it); + it = m_all_ongoing.erase(it); + } + else + ++it; + } + + // Also abort all queued fetches for the caller + for (std::list::iterator + it = m_queued_fetches.begin(); + it != m_queued_fetches.end();) { + if ((*it).caller == caller) + it = m_queued_fetches.erase(it); + else + ++it; + } + } + else if (req.type == RT_WAKEUP) { + // Wakeup: Nothing to do, thread is awake at this point + } + + if (req.event != NULL) + req.event->signal(); + } + + // Start new ongoing fetches if m_parallel_limit allows + void processQueued(CurlHandlePool *pool) + { + while (m_all_ongoing.size() < m_parallel_limit && + !m_queued_fetches.empty()) { + HTTPFetchRequest request = m_queued_fetches.front(); + m_queued_fetches.pop_front(); + + // Create ongoing fetch data and make a cURL handle + // Set cURL options based on HTTPFetchRequest + HTTPFetchOngoing *ongoing = + new HTTPFetchOngoing(request, pool); + + // Initiate the connection (curl_multi_add_handle) + CURLcode res = ongoing->start(m_multi); + if (res == CURLE_OK) { + m_all_ongoing.push_back(ongoing); + } + else { + ongoing->complete(res); + httpfetch_deliver_result(ongoing->result); + delete ongoing; + } + } + } + + // Process CURLMsg (indicates completion of a fetch) + void processCurlMessage(CURLMsg *msg) + { + // Determine which ongoing fetch the message pertains to + size_t i = 0; + bool found = false; + for (i = 0; i < m_all_ongoing.size(); ++i) { + if (m_all_ongoing[i]->curl == msg->easy_handle) { + found = true; + break; + } + } + if (msg->msg == CURLMSG_DONE && found) { + // m_all_ongoing[i] succeeded or failed. + HTTPFetchOngoing *ongoing = m_all_ongoing[i]; + ongoing->complete(msg->data.result); + httpfetch_deliver_result(ongoing->result); + delete ongoing; + m_all_ongoing.erase(m_all_ongoing.begin() + i); + } + } + + // Wait for a request from another thread, or timeout elapses + void waitForRequest(long timeout) + { + if (m_queued_fetches.empty()) { + try { + Request req = m_requests.pop_front(timeout); + processRequest(req); + } + catch (ItemNotFoundException &e) {} + } + } + + // Wait until some IO happens, or timeout elapses + void waitForIO(long timeout) + { + fd_set read_fd_set; + fd_set write_fd_set; + fd_set exc_fd_set; + int max_fd; + long select_timeout = -1; + struct timeval select_tv; + CURLMcode mres; + + FD_ZERO(&read_fd_set); + FD_ZERO(&write_fd_set); + FD_ZERO(&exc_fd_set); + + mres = curl_multi_fdset(m_multi, &read_fd_set, + &write_fd_set, &exc_fd_set, &max_fd); + if (mres != CURLM_OK) { + errorstream<<"curl_multi_fdset" + <<" returned error code "< timeout) + select_timeout = timeout; + + if (select_timeout > 0) { + select_tv.tv_sec = select_timeout / 1000; + select_tv.tv_usec = (select_timeout % 1000) * 1000; + int retval = select(max_fd + 1, &read_fd_set, + &write_fd_set, &exc_fd_set, + &select_tv); + if (retval == -1) { + #ifdef _WIN32 + errorstream<<"select returned error code " + <setRun(false); + g_httpfetch_thread->requestWakeUp(); + g_httpfetch_thread->stop(); + delete g_httpfetch_thread; + + curl_global_cleanup(); +} + +void httpfetch_async(const HTTPFetchRequest &fetchrequest) +{ + g_httpfetch_thread->requestFetch(fetchrequest); + if (!g_httpfetch_thread->IsRunning()) + g_httpfetch_thread->Start(); +} + +static void httpfetch_request_clear(unsigned long caller) +{ + if (g_httpfetch_thread->IsRunning()) { + Event event; + g_httpfetch_thread->requestClear(caller, &event); + event.wait(); + } + else { + g_httpfetch_thread->requestClear(caller, NULL); + } +} + +void httpfetch_sync(const HTTPFetchRequest &fetchrequest, + HTTPFetchResult &fetchresult) +{ + // Create ongoing fetch data and make a cURL handle + // Set cURL options based on HTTPFetchRequest + CurlHandlePool pool; + HTTPFetchOngoing ongoing(fetchrequest, &pool); + // Do the fetch (curl_easy_perform) + CURLcode res = ongoing.start(NULL); + // Update fetchresult + ongoing.complete(res); + fetchresult = ongoing.result; +} + +#else // USE_CURL + +/* + USE_CURL is off: + + Dummy httpfetch implementation that always returns an error. +*/ + +void httpfetch_init(int parallel_limit) +{ +} + +void httpfetch_cleanup() +{ +} + +void httpfetch_async(const HTTPFetchRequest &fetchrequest) +{ + errorstream<<"httpfetch_async: unable to fetch "<