1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330 |
- /****************************************************************************
- Copyright (c) 2010-2012 cocos2d-x.org
- Copyright (c) 2013-2016 Chukong Technologies Inc.
- Copyright (c) 2017-2018 Xiamen Yaji Software Co., Ltd.
- http://www.cocos2d-x.org
- Permission is hereby granted, free of charge, to any person obtaining a copy
- of this software and associated documentation files (the "Software"), to deal
- in the Software without restriction, including without limitation the rights
- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- copies of the Software, and to permit persons to whom the Software is
- furnished to do so, subject to the following conditions:
- The above copyright notice and this permission notice shall be included in
- all copies or substantial portions of the Software.
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- THE SOFTWARE.
- "[WebSocket module] is based in part on the work of the libwebsockets project
- (http://libwebsockets.org)"
- ****************************************************************************/
- #include "network/WebSocket.h"
- #include "network/Uri.h"
- #include "base/CCDirector.h"
- #include "base/CCScheduler.h"
- #include "base/CCEventDispatcher.h"
- #include "base/CCEventListenerCustom.h"
- #include "platform/CCFileUtils.h"
- #include <thread>
- #include <mutex>
- #include <queue>
- #include <list>
- #include <signal.h>
- #include <errno.h>
- #include "libwebsockets.h"
- #define NS_NETWORK_BEGIN namespace cocos2d { namespace network {
- #define NS_NETWORK_END }}
- #define WS_RX_BUFFER_SIZE (65536)
- #define WS_RESERVE_RECEIVE_BUFFER_SIZE (4096)
- #define LOG_TAG "WebSocket.cpp"
- #if (CC_TARGET_PLATFORM == CC_PLATFORM_WIN32)
- // log, CCLOG aren't threadsafe, since we uses sub threads for parsing pcm data, threadsafe log output
- // is needed. Define the following macros (ALOGV, ALOGD, ALOGI, ALOGW, ALOGE) for threadsafe log output.
- //FIXME: Move _winLog, winLog to a separated file
- static void _winLog(const char *format, va_list args)
- {
- static const int MAX_LOG_LENGTH = 16 * 1024;
- int bufferSize = MAX_LOG_LENGTH;
- char* buf = nullptr;
- do
- {
- buf = new (std::nothrow) char[bufferSize];
- if (buf == nullptr)
- return; // not enough memory
- int ret = vsnprintf(buf, bufferSize - 3, format, args);
- if (ret < 0)
- {
- bufferSize *= 2;
- delete[] buf;
- }
- else
- break;
- } while (true);
- strcat(buf, "\n");
- int pos = 0;
- int len = strlen(buf);
- char tempBuf[MAX_LOG_LENGTH + 1] = { 0 };
- WCHAR wszBuf[MAX_LOG_LENGTH + 1] = { 0 };
- do
- {
- std::copy(buf + pos, buf + pos + MAX_LOG_LENGTH, tempBuf);
- tempBuf[MAX_LOG_LENGTH] = 0;
- MultiByteToWideChar(CP_UTF8, 0, tempBuf, -1, wszBuf, sizeof(wszBuf));
- OutputDebugStringW(wszBuf);
- pos += MAX_LOG_LENGTH;
- } while (pos < len);
- delete[] buf;
- }
- static void wsLog(const char * format, ...)
- {
- va_list args;
- va_start(args, format);
- _winLog(format, args);
- va_end(args);
- }
- #else
- #define wsLog printf
- #endif
- #define QUOTEME_(x) #x
- #define QUOTEME(x) QUOTEME_(x)
- // Since CCLOG isn't thread safe, we uses LOGD for multi-thread logging.
- #ifdef ANDROID
- #if COCOS2D_DEBUG > 0
- #define LOGD(...) __android_log_print(ANDROID_LOG_DEBUG, LOG_TAG,__VA_ARGS__)
- #else
- #define LOGD(...)
- #endif
- #define LOGE(...) __android_log_print(ANDROID_LOG_ERROR, LOG_TAG,__VA_ARGS__)
- #else
- #if COCOS2D_DEBUG > 0
- #define LOGD(fmt, ...) wsLog("D/" LOG_TAG " (" QUOTEME(__LINE__) "): " fmt "", ##__VA_ARGS__)
- #else
- #define LOGD(fmt, ...)
- #endif
- #define LOGE(fmt, ...) wsLog("E/" LOG_TAG " (" QUOTEME(__LINE__) "): " fmt "", ##__VA_ARGS__)
- #endif
- static void printWebSocketLog(int level, const char *line)
- {
- #if COCOS2D_DEBUG > 0
- static const char * const log_level_names[] = {
- "ERR",
- "WARN",
- "NOTICE",
- "INFO",
- "DEBUG",
- "PARSER",
- "HEADER",
- "EXTENSION",
- "CLIENT",
- "LATENCY",
- };
- char buf[30] = {0};
- int n;
- for (n = 0; n < LLL_COUNT; n++) {
- if (level != (1 << n))
- continue;
- sprintf(buf, "%s: ", log_level_names[n]);
- break;
- }
- LOGD("%s%s\n", buf, line);
- #endif // #if COCOS2D_DEBUG > 0
- }
- NS_NETWORK_BEGIN
- enum WS_MSG {
- WS_MSG_TO_SUBTRHEAD_SENDING_STRING = 0,
- WS_MSG_TO_SUBTRHEAD_SENDING_BINARY,
- WS_MSG_TO_SUBTHREAD_CREATE_CONNECTION
- };
- static std::vector<WebSocket*>* __websocketInstances = nullptr;
- static std::mutex __instanceMutex;
- static struct lws_context* __wsContext = nullptr;
- static WsThreadHelper* __wsHelper = nullptr;
- #if (CC_TARGET_PLATFORM == CC_PLATFORM_ANDROID)
- static std::string getFileNameForPath(const std::string& filePath)
- {
- std::string fileName = filePath;
- const size_t lastSlashIdx = fileName.find_last_of("\\/");
- if (std::string::npos != lastSlashIdx)
- {
- fileName.erase(0, lastSlashIdx + 1);
- }
- return fileName;
- }
- #endif
- static struct lws_protocols __defaultProtocols[2];
- static lws_context_creation_info convertToContextCreationInfo(const struct lws_protocols* protocols, bool peerServerCert)
- {
- lws_context_creation_info info;
- memset(&info, 0, sizeof(info));
- /*
- * create the websocket context. This tracks open connections and
- * knows how to route any traffic and which protocol version to use,
- * and if each connection is client or server side.
- *
- * For this client-only demo, we tell it to not listen on any port.
- */
- info.port = CONTEXT_PORT_NO_LISTEN;
- info.protocols = protocols;
- // FIXME: Disable 'permessage-deflate' extension temporarily because of issues:
- // https://github.com/cocos2d/cocos2d-x/issues/16045, https://github.com/cocos2d/cocos2d-x/issues/15767
- // libwebsockets issue: https://github.com/warmcat/libwebsockets/issues/593
- // Currently, we couldn't find out the exact reason.
- // libwebsockets official said it's probably an issue of user code
- // since 'libwebsockets' passed AutoBahn stressed Test.
- // info.extensions = exts;
- info.gid = -1;
- info.uid = -1;
- if (peerServerCert)
- {
- info.options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS | LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
- }
- else
- {
- info.options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS | LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT | LWS_SERVER_OPTION_PEER_CERT_NOT_REQUIRED;
- }
- info.user = nullptr;
- return info;
- }
- class WsMessage
- {
- public:
- WsMessage() : id(++__id), what(0), data(nullptr), user(nullptr){}
- unsigned int id;
- unsigned int what; // message type
- void* data;
- void* user;
- private:
- static unsigned int __id;
- };
- unsigned int WsMessage::__id = 0;
- /**
- * @brief Websocket thread helper, it's used for sending message between UI thread and websocket thread.
- */
- class WsThreadHelper
- {
- public:
- WsThreadHelper();
- ~WsThreadHelper();
- // Creates a new thread
- bool createWebSocketThread();
- // Quits websocket thread.
- void quitWebSocketThread();
- // Sends message to Cocos thread. It's needed to be invoked in Websocket thread.
- void sendMessageToCocosThread(const std::function<void()>& cb);
- // Sends message to Websocket thread. It's needs to be invoked in Cocos thread.
- void sendMessageToWebSocketThread(WsMessage *msg);
- // Waits the sub-thread (websocket thread) to exit,
- void joinWebSocketThread();
- void onSubThreadStarted();
- void onSubThreadLoop();
- void onSubThreadEnded();
- protected:
- void wsThreadEntryFunc();
- public:
- std::list<WsMessage*>* _subThreadWsMessageQueue;
- std::mutex _subThreadWsMessageQueueMutex;
- std::thread* _subThreadInstance;
- private:
- bool _needQuit;
- };
- // Wrapper for converting websocket callback from static function to member function of WebSocket class.
- class WebSocketCallbackWrapper {
- public:
- static int onSocketCallback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
- {
- // Gets the user data from context. We know that it's a 'WebSocket' instance.
- if (wsi == nullptr) {
- return 0;
- }
- int ret = 0;
- WebSocket* ws = (WebSocket*)lws_wsi_user(wsi);
- if (ws != nullptr && __websocketInstances != nullptr)
- {
- if (std::find(__websocketInstances->begin(), __websocketInstances->end(), ws) != __websocketInstances->end())
- {
- ret = ws->onSocketCallback(wsi, reason, in, len);
- }
- }
- else
- {
- // LOGD("ws instance is nullptr.\n");
- }
- return ret;
- }
- };
- // Implementation of WsThreadHelper
- WsThreadHelper::WsThreadHelper()
- : _subThreadInstance(nullptr)
- , _needQuit(false)
- {
- _subThreadWsMessageQueue = new (std::nothrow) std::list<WsMessage*>();
- }
- WsThreadHelper::~WsThreadHelper()
- {
- joinWebSocketThread();
- CC_SAFE_DELETE(_subThreadInstance);
- delete _subThreadWsMessageQueue;
- }
- bool WsThreadHelper::createWebSocketThread()
- {
- // Creates websocket thread
- _subThreadInstance = new (std::nothrow) std::thread(&WsThreadHelper::wsThreadEntryFunc, this);
- return true;
- }
- void WsThreadHelper::quitWebSocketThread()
- {
- _needQuit = true;
- }
- void WsThreadHelper::onSubThreadLoop()
- {
- if (__wsContext)
- {
- // _readyStateMutex.unlock();
- __wsHelper->_subThreadWsMessageQueueMutex.lock();
- bool isEmpty = __wsHelper->_subThreadWsMessageQueue->empty();
- if (!isEmpty)
- {
- auto iter = __wsHelper->_subThreadWsMessageQueue->begin();
- for (; iter != __wsHelper->_subThreadWsMessageQueue->end(); )
- {
- auto msg = (*iter);
- auto ws = (WebSocket*)msg->user;
- // TODO: ws may be a invalid pointer
- if (msg->what == WS_MSG_TO_SUBTHREAD_CREATE_CONNECTION)
- {
- ws->onClientOpenConnectionRequest();
- delete *iter;
- iter = __wsHelper->_subThreadWsMessageQueue->erase(iter);
- }
- else
- {
- ++iter;
- }
- }
- }
- __wsHelper->_subThreadWsMessageQueueMutex.unlock();
- // The second parameter passed to 'lws_service' means the timeout in milliseconds while polling websocket events.
- // The lower value the better, otherwise, it may trigger high CPU usage.
- // We set 2ms in 'lws_service' then sleep 3ms to make lower CPU cost.
- // Since messages are received in websocket thread and user code is in cocos thread, we need to post event to
- // cocos thread and trigger user callbacks by 'Scheduler::performFunctionInCocosThread'. If game's fps is set
- // to 60 (16.66ms), the latency will be (2ms + 3ms + 16.66ms + internet delay) > 21ms
- lws_service(__wsContext, 2);
- std::this_thread::sleep_for(std::chrono::milliseconds(3));
- }
- }
- void WsThreadHelper::onSubThreadStarted()
- {
- int log_level = LLL_ERR | LLL_WARN | LLL_NOTICE | LLL_INFO/* | LLL_DEBUG | LLL_PARSER | LLL_HEADER*/ | LLL_EXT | LLL_CLIENT | LLL_LATENCY;
- lws_set_log_level(log_level, printWebSocketLog);
- memset(__defaultProtocols, 0, sizeof(2 * sizeof(struct lws_protocols)));
- __defaultProtocols[0].name = "";
- __defaultProtocols[0].callback = WebSocketCallbackWrapper::onSocketCallback;
- __defaultProtocols[0].rx_buffer_size = WS_RX_BUFFER_SIZE;
- __defaultProtocols[0].id = std::numeric_limits<uint32_t>::max();
- lws_context_creation_info creationInfo = convertToContextCreationInfo(__defaultProtocols, true);
- __wsContext = lws_create_context(&creationInfo);
- }
- void WsThreadHelper::onSubThreadEnded()
- {
- if (__wsContext != nullptr)
- {
- lws_context_destroy(__wsContext);
- }
- }
- void WsThreadHelper::wsThreadEntryFunc()
- {
- LOGD("WebSocket thread start, helper instance: %p\n", this);
- onSubThreadStarted();
- while (!_needQuit)
- {
- onSubThreadLoop();
- }
- onSubThreadEnded();
- LOGD("WebSocket thread exit, helper instance: %p\n", this);
- }
- void WsThreadHelper::sendMessageToCocosThread(const std::function<void()>& cb)
- {
- Director::getInstance()->getScheduler()->performFunctionInCocosThread(cb);
- }
- void WsThreadHelper::sendMessageToWebSocketThread(WsMessage *msg)
- {
- std::lock_guard<std::mutex> lk(_subThreadWsMessageQueueMutex);
- _subThreadWsMessageQueue->push_back(msg);
- }
- void WsThreadHelper::joinWebSocketThread()
- {
- if (_subThreadInstance->joinable())
- {
- _subThreadInstance->join();
- }
- }
- // Define a WebSocket frame
- class WebSocketFrame
- {
- public:
- WebSocketFrame()
- : _payload(nullptr)
- , _payloadLength(0)
- , _frameLength(0)
- {
- }
- bool init(unsigned char* buf, ssize_t len)
- {
- if (buf == nullptr && len > 0)
- return false;
- if (!_data.empty())
- {
- LOGD("WebSocketFrame was initialized, should not init it again!\n");
- return false;
- }
- _data.reserve(LWS_PRE + len);
- _data.resize(LWS_PRE, 0x00);
- if (len > 0)
- {
- _data.insert(_data.end(), buf, buf + len);
- }
- _payload = _data.data() + LWS_PRE;
- _payloadLength = len;
- _frameLength = len;
- return true;
- }
- void update(ssize_t issued)
- {
- _payloadLength -= issued;
- _payload += issued;
- }
- unsigned char* getPayload() const { return _payload; }
- ssize_t getPayloadLength() const { return _payloadLength; }
- ssize_t getFrameLength() const { return _frameLength; }
- private:
- unsigned char* _payload;
- ssize_t _payloadLength;
- ssize_t _frameLength;
- std::vector<unsigned char> _data;
- };
- //
- void WebSocket::closeAllConnections()
- {
- if (__websocketInstances != nullptr)
- {
- ssize_t count = __websocketInstances->size();
- for (ssize_t i = count-1; i >=0 ; i--)
- {
- WebSocket* instance = __websocketInstances->at(i);
- instance->close();
- }
- std::lock_guard<std::mutex> lk(__instanceMutex);
- __websocketInstances->clear();
- delete __websocketInstances;
- __websocketInstances = nullptr;
- }
- }
- WebSocket::WebSocket()
- : _readyState(State::CONNECTING)
- , _wsInstance(nullptr)
- , _lwsProtocols(nullptr)
- , _isDestroyed(std::make_shared<std::atomic<bool>>(false))
- , _delegate(nullptr)
- , _closeState(CloseState::NONE)
- {
- // reserve data buffer to avoid allocate memory frequently
- _receivedData.reserve(WS_RESERVE_RECEIVE_BUFFER_SIZE);
- if (__websocketInstances == nullptr)
- {
- __websocketInstances = new (std::nothrow) std::vector<WebSocket*>();
- }
- __websocketInstances->push_back(this);
-
- std::shared_ptr<std::atomic<bool>> isDestroyed = _isDestroyed;
- _resetDirectorListener = Director::getInstance()->getEventDispatcher()->addCustomEventListener(Director::EVENT_RESET, [this, isDestroyed](EventCustom*){
- if (*isDestroyed)
- return;
- close();
- });
- }
- WebSocket::~WebSocket()
- {
- LOGD("In the destructor of WebSocket (%p)\n", this);
- std::lock_guard<std::mutex> lk(__instanceMutex);
- if (__websocketInstances != nullptr)
- {
- auto iter = std::find(__websocketInstances->begin(), __websocketInstances->end(), this);
- if (iter != __websocketInstances->end())
- {
- __websocketInstances->erase(iter);
- }
- else
- {
- LOGD("ERROR: WebSocket instance (%p) wasn't added to the container which saves websocket instances!\n", this);
- }
- }
- if (__websocketInstances == nullptr || __websocketInstances->empty())
- {
- __wsHelper->quitWebSocketThread();
- LOGD("before join ws thread\n");
- __wsHelper->joinWebSocketThread();
- LOGD("after join ws thread\n");
- CC_SAFE_DELETE(__wsHelper);
- }
- Director::getInstance()->getEventDispatcher()->removeEventListener(_resetDirectorListener);
-
- *_isDestroyed = true;
- }
- bool WebSocket::init(const Delegate& delegate,
- const std::string& url,
- const std::vector<std::string>* protocols/* = nullptr*/,
- const std::string& caFilePath/* = ""*/)
- {
- _delegate = const_cast<Delegate*>(&delegate);
- _url = url;
- _caFilePath = caFilePath;
- if (_url.empty())
- return false;
- if (protocols != nullptr && !protocols->empty())
- {
- size_t size = protocols->size();
- _lwsProtocols = (struct lws_protocols*)malloc((size + 1) * sizeof(struct lws_protocols));
- memset(_lwsProtocols, 0, (size + 1) * sizeof(struct lws_protocols));
- static uint32_t __wsId = 0;
- for (size_t i = 0; i < size; ++i)
- {
- _lwsProtocols[i].callback = WebSocketCallbackWrapper::onSocketCallback;
- size_t nameLen = protocols->at(i).length();
- char* name = (char*)malloc(nameLen + 1);
- name[nameLen] = '\0';
- strcpy(name, protocols->at(i).c_str());
- _lwsProtocols[i].name = name;
- _lwsProtocols[i].id = ++__wsId;
- _lwsProtocols[i].rx_buffer_size = WS_RX_BUFFER_SIZE;
- _lwsProtocols[i].per_session_data_size = 0;
- _lwsProtocols[i].user = nullptr;
- _clientSupportedProtocols += name;
- if (i < (size - 1))
- {
- _clientSupportedProtocols += ",";
- }
- }
- }
- bool isWebSocketThreadCreated = true;
- if (__wsHelper == nullptr)
- {
- __wsHelper = new (std::nothrow) WsThreadHelper();
- isWebSocketThreadCreated = false;
- }
- WsMessage* msg = new (std::nothrow) WsMessage();
- msg->what = WS_MSG_TO_SUBTHREAD_CREATE_CONNECTION;
- msg->user = this;
- __wsHelper->sendMessageToWebSocketThread(msg);
- // fixed https://github.com/cocos2d/cocos2d-x/issues/17433
- // createWebSocketThread has to be after message WS_MSG_TO_SUBTHREAD_CREATE_CONNECTION was sent.
- // And websocket thread should only be created once.
- if (!isWebSocketThreadCreated)
- {
- __wsHelper->createWebSocketThread();
- }
- return true;
- }
- void WebSocket::send(const std::string& message)
- {
- if (_readyState == State::OPEN)
- {
- // In main thread
- Data* data = new (std::nothrow) Data();
- data->bytes = (char*)malloc(message.length() + 1);
- // Make sure the last byte is '\0'
- data->bytes[message.length()] = '\0';
- strcpy(data->bytes, message.c_str());
- data->len = static_cast<ssize_t>(message.length());
- WsMessage* msg = new (std::nothrow) WsMessage();
- msg->what = WS_MSG_TO_SUBTRHEAD_SENDING_STRING;
- msg->data = data;
- msg->user = this;
- __wsHelper->sendMessageToWebSocketThread(msg);
- }
- else
- {
- LOGD("Couldn't send message since websocket wasn't opened!\n");
- }
- }
- void WebSocket::send(const unsigned char* binaryMsg, unsigned int len)
- {
- if (_readyState == State::OPEN)
- {
- // In main thread
- Data* data = new (std::nothrow) Data();
- if (len == 0)
- {
- // If data length is zero, allocate 1 byte for safe.
- data->bytes = (char*)malloc(1);
- data->bytes[0] = '\0';
- }
- else
- {
- data->bytes = (char*)malloc(len);
- memcpy((void*)data->bytes, (void*)binaryMsg, len);
- }
- data->len = len;
- WsMessage* msg = new (std::nothrow) WsMessage();
- msg->what = WS_MSG_TO_SUBTRHEAD_SENDING_BINARY;
- msg->data = data;
- msg->user = this;
- __wsHelper->sendMessageToWebSocketThread(msg);
- }
- else
- {
- LOGD("Couldn't send message since websocket wasn't opened!\n");
- }
- }
- void WebSocket::close()
- {
- if (_closeState != CloseState::NONE)
- {
- LOGD("close was invoked, don't invoke it again!\n");
- return;
- }
- _closeState = CloseState::SYNC_CLOSING;
- LOGD("close: WebSocket (%p) is closing...\n", this);
- {
- _readyStateMutex.lock();
- if (_readyState == State::CLOSED)
- {
- // If readState is closed, it means that onConnectionClosed was invoked in websocket thread,
- // but the callback of performInCocosThread has not been triggered. We need to invoke
- // onClose to release the websocket instance.
- _readyStateMutex.unlock();
- _delegate->onClose(this);
- return;
- }
- _readyState = State::CLOSING;
- _readyStateMutex.unlock();
- }
- {
- std::unique_lock<std::mutex> lkClose(_closeMutex);
- _closeCondition.wait(lkClose);
- _closeState = CloseState::SYNC_CLOSED;
- }
- // Wait 5 milliseconds for onConnectionClosed to exit!
- std::this_thread::sleep_for(std::chrono::milliseconds(5));
- _delegate->onClose(this);
- }
- void WebSocket::closeAsync()
- {
- if (_closeState != CloseState::NONE)
- {
- LOGD("close was invoked, don't invoke it again!\n");
- return;
- }
- _closeState = CloseState::ASYNC_CLOSING;
- LOGD("closeAsync: WebSocket (%p) is closing...\n", this);
- std::lock_guard<std::mutex> lk(_readyStateMutex);
- if (_readyState == State::CLOSED || _readyState == State::CLOSING)
- {
- LOGD("closeAsync: WebSocket (%p) was closed, no need to close it again!\n", this);
- return;
- }
- _readyState = State::CLOSING;
- }
- WebSocket::State WebSocket::getReadyState()
- {
- std::lock_guard<std::mutex> lk(_readyStateMutex);
- return _readyState;
- }
- struct lws_vhost* WebSocket::createVhost(struct lws_protocols* protocols, int& sslConnection)
- {
- auto fileUtils = FileUtils::getInstance();
- bool isCAFileExist = fileUtils->isFileExist(_caFilePath);
- if (isCAFileExist)
- {
- _caFilePath = fileUtils->fullPathForFilename(_caFilePath);
- }
- lws_context_creation_info info = convertToContextCreationInfo(protocols, isCAFileExist);
- if (sslConnection != 0)
- {
- if (isCAFileExist)
- {
- #if (CC_TARGET_PLATFORM == CC_PLATFORM_ANDROID)
- // if ca file is in the apk, try to extract it to writable path
- std::string writablePath = fileUtils->getWritablePath();
- std::string caFileName = getFileNameForPath(_caFilePath);
- std::string newCaFilePath = writablePath + caFileName;
- if (fileUtils->isFileExist(newCaFilePath))
- {
- LOGD("CA file (%s) in writable path exists!", newCaFilePath.c_str());
- _caFilePath = newCaFilePath;
- info.ssl_ca_filepath = _caFilePath.c_str();
- }
- else
- {
- if (fileUtils->isFileExist(_caFilePath))
- {
- std::string fullPath = fileUtils->fullPathForFilename(_caFilePath);
- LOGD("Found CA file: %s", fullPath.c_str());
- if (fullPath[0] != '/')
- {
- LOGD("CA file is in APK");
- auto caData = fileUtils->getDataFromFile(fullPath);
- if (!caData.isNull())
- {
- FILE* fp = fopen(newCaFilePath.c_str(), "wb");
- if (fp != nullptr)
- {
- LOGD("New CA file path: %s", newCaFilePath.c_str());
- fwrite(caData.getBytes(), caData.getSize(), 1, fp);
- fclose(fp);
- _caFilePath = newCaFilePath;
- info.ssl_ca_filepath = _caFilePath.c_str();
- }
- else
- {
- CCASSERT(false, "Open new CA file failed");
- }
- }
- else
- {
- CCASSERT(false, "CA file is empty!");
- }
- }
- else
- {
- LOGD("CA file isn't in APK!");
- _caFilePath = fullPath;
- info.ssl_ca_filepath = _caFilePath.c_str();
- }
- }
- else
- {
- CCASSERT(false, "CA file doesn't exist!");
- }
- }
- #else
- info.ssl_ca_filepath = _caFilePath.c_str();
- #endif
- }
- else
- {
- LOGD("WARNING: CA Root file isn't set. SSL connection will not peer server certificate\n");
- sslConnection = sslConnection | LCCSCF_ALLOW_SELFSIGNED | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK;
- }
- }
- lws_vhost* vhost = lws_create_vhost(__wsContext, &info);
- return vhost;
- }
- void WebSocket::onClientOpenConnectionRequest()
- {
- if (nullptr != __wsContext)
- {
- static const struct lws_extension exts[] = {
- {
- "permessage-deflate",
- lws_extension_callback_pm_deflate,
- // client_no_context_takeover extension is not supported in the current version, it will cause connection fail
- // It may be a bug of lib websocket build
- // "permessage-deflate; client_no_context_takeover; client_max_window_bits"
- "permessage-deflate; client_max_window_bits"
- },
- {
- "deflate-frame",
- lws_extension_callback_pm_deflate,
- "deflate_frame"
- },
- { nullptr, nullptr, nullptr /* terminator */ }
- };
- _readyStateMutex.lock();
- _readyState = State::CONNECTING;
- _readyStateMutex.unlock();
- Uri uri = Uri::parse(_url);
- LOGD("scheme: %s, host: %s, port: %d, path: %s\n", uri.getScheme().c_str(), uri.getHostName().c_str(), static_cast<int>(uri.getPort()), uri.getPathEtc().c_str());
- int sslConnection = 0;
- if (uri.isSecure())
- sslConnection = LCCSCF_USE_SSL;
- struct lws_vhost* vhost = nullptr;
- if (_lwsProtocols != nullptr)
- {
- vhost = createVhost(_lwsProtocols, sslConnection);
- }
- else
- {
- vhost = createVhost(__defaultProtocols, sslConnection);
- }
- int port = static_cast<int>(uri.getPort());
- if (port == 0)
- port = uri.isSecure() ? 443 : 80;
- const std::string& hostName = uri.getHostName();
- std::string path = uri.getPathEtc();
- const std::string& authority = uri.getAuthority();
- if (path.empty())
- path = "/";
- struct lws_client_connect_info connectInfo;
- memset(&connectInfo, 0, sizeof(connectInfo));
- connectInfo.context = __wsContext;
- connectInfo.address = hostName.c_str();
- connectInfo.port = port;
- connectInfo.ssl_connection = sslConnection;
- connectInfo.path = path.c_str();
- connectInfo.host = hostName.c_str();
- connectInfo.origin = authority.c_str();
- connectInfo.protocol = _clientSupportedProtocols.empty() ? nullptr : _clientSupportedProtocols.c_str();
- connectInfo.ietf_version_or_minus_one = -1;
- connectInfo.userdata = this;
- connectInfo.client_exts = exts;
- connectInfo.vhost = vhost;
- _wsInstance = lws_client_connect_via_info(&connectInfo);
- if (nullptr == _wsInstance)
- {
- onConnectionError();
- return;
- }
- }
- else
- {
- LOGE("Create websocket context failed!");
- }
- }
- int WebSocket::onClientWritable()
- {
- // LOGD("onClientWritable ... \n");
- {
- std::lock_guard<std::mutex> readMutex(_readyStateMutex);
- if (_readyState == State::CLOSING)
- {
- LOGD("Closing websocket (%p) connection.\n", this);
- return -1;
- }
- }
- do
- {
- std::lock_guard<std::mutex> lk(__wsHelper->_subThreadWsMessageQueueMutex);
- if (__wsHelper->_subThreadWsMessageQueue->empty())
- {
- break;
- }
- std::list<WsMessage*>::iterator iter = __wsHelper->_subThreadWsMessageQueue->begin();
- while (iter != __wsHelper->_subThreadWsMessageQueue->end())
- {
- WsMessage* msg = *iter;
- if (msg->user == this)
- {
- break;
- }
- else
- {
- ++iter;
- }
- }
- ssize_t bytesWrite = 0;
- if (iter != __wsHelper->_subThreadWsMessageQueue->end())
- {
- WsMessage* subThreadMsg = *iter;
- Data* data = (Data*)subThreadMsg->data;
- const ssize_t c_bufferSize = WS_RX_BUFFER_SIZE;
- const ssize_t remaining = data->len - data->issued;
- const ssize_t n = std::min(remaining, c_bufferSize);
- WebSocketFrame* frame = nullptr;
- if (data->ext)
- {
- frame = (WebSocketFrame*)data->ext;
- }
- else
- {
- frame = new (std::nothrow) WebSocketFrame();
- bool success = frame && frame->init((unsigned char*)(data->bytes + data->issued), n);
- if (success)
- {
- data->ext = frame;
- }
- else
- { // If frame initialization failed, delete the frame and drop the sending data
- // These codes should never be called.
- LOGD("WebSocketFrame initialization failed, drop the sending data, msg(%d)\n", (int)subThreadMsg->id);
- delete frame;
- CC_SAFE_FREE(data->bytes);
- CC_SAFE_DELETE(data);
- __wsHelper->_subThreadWsMessageQueue->erase(iter);
- CC_SAFE_DELETE(subThreadMsg);
- break;
- }
- }
- int writeProtocol;
- if (data->issued == 0)
- {
- if (WS_MSG_TO_SUBTRHEAD_SENDING_STRING == subThreadMsg->what)
- {
- writeProtocol = LWS_WRITE_TEXT;
- }
- else
- {
- writeProtocol = LWS_WRITE_BINARY;
- }
- // If we have more than 1 fragment
- if (data->len > c_bufferSize)
- writeProtocol |= LWS_WRITE_NO_FIN;
- } else {
- // we are in the middle of fragments
- writeProtocol = LWS_WRITE_CONTINUATION;
- // and if not in the last fragment
- if (remaining != n)
- writeProtocol |= LWS_WRITE_NO_FIN;
- }
- bytesWrite = lws_write(_wsInstance, frame->getPayload(), frame->getPayloadLength(), (lws_write_protocol)writeProtocol);
- // Handle the result of lws_write
- // Buffer overrun?
- if (bytesWrite < 0)
- {
- LOGD("ERROR: msg(%u), lws_write return: %d, but it should be %d, drop this message.\n", subThreadMsg->id, (int)bytesWrite, (int)n);
- // socket error, we need to close the socket connection
- CC_SAFE_FREE(data->bytes);
- delete ((WebSocketFrame*)data->ext);
- data->ext = nullptr;
- CC_SAFE_DELETE(data);
- __wsHelper->_subThreadWsMessageQueue->erase(iter);
- CC_SAFE_DELETE(subThreadMsg);
- closeAsync();
- }
- else if (bytesWrite < frame->getPayloadLength())
- {
- frame->update(bytesWrite);
- LOGD("frame wasn't sent completely, bytesWrite: %d, remain: %d\n", (int)bytesWrite, (int)frame->getPayloadLength());
- }
- // Do we have another fragments to send?
- else if (remaining > frame->getFrameLength() && bytesWrite == frame->getPayloadLength())
- {
- // A frame was totally sent, plus data->issued to send next frame
- LOGD("msg(%u) append: %d + %d = %d\n", subThreadMsg->id, (int)data->issued, (int)frame->getFrameLength(), (int)(data->issued + frame->getFrameLength()));
- data->issued += frame->getFrameLength();
- delete ((WebSocketFrame*)data->ext);
- data->ext = nullptr;
- }
- // Safely done!
- else
- {
- LOGD("Safely done, msg(%d)!\n", subThreadMsg->id);
- if (remaining == frame->getFrameLength())
- {
- LOGD("msg(%u) append: %d + %d = %d\n", subThreadMsg->id, (int)data->issued, (int)frame->getFrameLength(), (int)(data->issued + frame->getFrameLength()));
- LOGD("msg(%u) was totally sent!\n", subThreadMsg->id);
- }
- else
- {
- LOGD("ERROR: msg(%u), remaining(%d) < bytesWrite(%d)\n", subThreadMsg->id, (int)remaining, (int)frame->getFrameLength());
- LOGD("Drop the msg(%u)\n", subThreadMsg->id);
- closeAsync();
- }
- CC_SAFE_FREE(data->bytes);
- delete ((WebSocketFrame*)data->ext);
- data->ext = nullptr;
- CC_SAFE_DELETE(data);
- __wsHelper->_subThreadWsMessageQueue->erase(iter);
- CC_SAFE_DELETE(subThreadMsg);
- LOGD("-----------------------------------------------------------\n");
- }
- }
- } while(false);
- if (_wsInstance != nullptr)
- {
- lws_callback_on_writable(_wsInstance);
- }
- return 0;
- }
- int WebSocket::onClientReceivedData(void* in, ssize_t len)
- {
- // In websocket thread
- static int packageIndex = 0;
- packageIndex++;
- if (in != nullptr && len > 0)
- {
- LOGD("Receiving data:index:%d, len=%d\n", packageIndex, (int)len);
- unsigned char* inData = (unsigned char*)in;
- _receivedData.insert(_receivedData.end(), inData, inData + len);
- }
- else
- {
- LOGD("Empty message received, index=%d!\n", packageIndex);
- }
- // If no more data pending, send it to the client thread
- size_t remainingSize = lws_remaining_packet_payload(_wsInstance);
- int isFinalFragment = lws_is_final_fragment(_wsInstance);
- // LOGD("remainingSize: %d, isFinalFragment: %d\n", (int)remainingSize, isFinalFragment);
- if (remainingSize == 0 && isFinalFragment)
- {
- std::vector<char>* frameData = new (std::nothrow) std::vector<char>(std::move(_receivedData));
- // reset capacity of received data buffer
- _receivedData.reserve(WS_RESERVE_RECEIVE_BUFFER_SIZE);
- ssize_t frameSize = frameData->size();
- bool isBinary = (lws_frame_is_binary(_wsInstance) != 0);
- if (!isBinary)
- {
- frameData->push_back('\0');
- }
- std::shared_ptr<std::atomic<bool>> isDestroyed = _isDestroyed;
- __wsHelper->sendMessageToCocosThread([this, frameData, frameSize, isBinary, isDestroyed](){
- // In UI thread
- LOGD("Notify data len %d to Cocos thread.\n", (int)frameSize);
- Data data;
- data.isBinary = isBinary;
- data.bytes = (char*)frameData->data();
- data.len = frameSize;
- if (*isDestroyed)
- {
- LOGD("WebSocket instance was destroyed!\n");
- }
- else
- {
- _delegate->onMessage(this, data);
- }
- delete frameData;
- });
- }
- return 0;
- }
- int WebSocket::onConnectionOpened()
- {
- const lws_protocols* lwsSelectedProtocol = lws_get_protocol(_wsInstance);
- _selectedProtocol = lwsSelectedProtocol->name;
- LOGD("onConnectionOpened...: %p, client protocols: %s, server selected protocol: %s\n", this, _clientSupportedProtocols.c_str(), _selectedProtocol.c_str());
- /*
- * start the ball rolling,
- * LWS_CALLBACK_CLIENT_WRITEABLE will come next service
- */
- lws_callback_on_writable(_wsInstance);
- {
- std::lock_guard<std::mutex> lk(_readyStateMutex);
- if (_readyState == State::CLOSING || _readyState == State::CLOSED)
- {
- return 0;
- }
- _readyState = State::OPEN;
- }
- std::shared_ptr<std::atomic<bool>> isDestroyed = _isDestroyed;
- __wsHelper->sendMessageToCocosThread([this, isDestroyed](){
- if (*isDestroyed)
- {
- LOGD("WebSocket instance was destroyed!\n");
- }
- else
- {
- _delegate->onOpen(this);
- }
- });
- return 0;
- }
- int WebSocket::onConnectionError()
- {
- {
- std::lock_guard<std::mutex> lk(_readyStateMutex);
- LOGD("WebSocket (%p) onConnectionError, state: %d ...\n", this, (int)_readyState);
- if (_readyState == State::CLOSED)
- {
- return 0;
- }
- _readyState = State::CLOSING;
- }
- std::shared_ptr<std::atomic<bool>> isDestroyed = _isDestroyed;
- __wsHelper->sendMessageToCocosThread([this, isDestroyed](){
- if (*isDestroyed)
- {
- LOGD("WebSocket instance was destroyed!\n");
- }
- else
- {
- _delegate->onError(this, ErrorCode::CONNECTION_FAILURE);
- }
- });
- onConnectionClosed();
- return 0;
- }
- int WebSocket::onConnectionClosed()
- {
- {
- std::lock_guard<std::mutex> lk(_readyStateMutex);
- LOGD("WebSocket (%p) onConnectionClosed, state: %d ...\n", this, (int)_readyState);
- if (_readyState == State::CLOSED)
- {
- return 0;
- }
- if (_readyState == State::CLOSING)
- {
- if (_closeState == CloseState::SYNC_CLOSING)
- {
- LOGD("onConnectionClosed, WebSocket (%p) is closing by client synchronously.\n", this);
- for(;;)
- {
- std::lock_guard<std::mutex> lkClose(_closeMutex);
- _closeCondition.notify_one();
- if (_closeState == CloseState::SYNC_CLOSED)
- {
- break;
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
- }
- return 0;
- }
- else if (_closeState == CloseState::ASYNC_CLOSING)
- {
- LOGD("onConnectionClosed, WebSocket (%p) is closing by client asynchronously.\n", this);
- }
- else
- {
- LOGD("onConnectionClosed, WebSocket (%p) is closing by server.\n", this);
- }
- }
- else
- {
- LOGD("onConnectionClosed, WebSocket (%p) is closing by server.\n", this);
- }
- _readyState = State::CLOSED;
- }
- std::shared_ptr<std::atomic<bool>> isDestroyed = _isDestroyed;
- __wsHelper->sendMessageToCocosThread([this, isDestroyed](){
- if (*isDestroyed)
- {
- LOGD("WebSocket instance (%p) was destroyed!\n", this);
- }
- else
- {
- _delegate->onClose(this);
- }
- });
- LOGD("WebSocket (%p) onConnectionClosed DONE!\n", this);
- return 0;
- }
- int WebSocket::onSocketCallback(struct lws *wsi,
- int reason,
- void *in, ssize_t len)
- {
- //LOGD("socket callback for %d reason\n", reason);
- int ret = 0;
- switch (reason)
- {
- case LWS_CALLBACK_CLIENT_ESTABLISHED:
- ret = onConnectionOpened();
- break;
- case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
- ret = onConnectionError();
- break;
- case LWS_CALLBACK_WSI_DESTROY:
- ret = onConnectionClosed();
- break;
- case LWS_CALLBACK_CLIENT_RECEIVE:
- ret = onClientReceivedData(in, len);
- break;
- case LWS_CALLBACK_CLIENT_WRITEABLE:
- ret = onClientWritable();
- break;
- case LWS_CALLBACK_CHANGE_MODE_POLL_FD:
- case LWS_CALLBACK_LOCK_POLL:
- case LWS_CALLBACK_UNLOCK_POLL:
- break;
- case LWS_CALLBACK_PROTOCOL_INIT:
- LOGD("protocol init...");
- break;
- case LWS_CALLBACK_PROTOCOL_DESTROY:
- LOGD("protocol destroy...");
- break;
- default:
- LOGD("WebSocket (%p) Unhandled websocket event: %d\n", this, reason);
- break;
- }
- return ret;
- }
- NS_NETWORK_END
|