//====================================================================================================== // Copyright 2025, Rokoko Glove OptiTrack Integration - FIXED VERSION //====================================================================================================== /** * FIXES APPLIED: * 1. ✅ WSACleanup duplicate calls - Added reference counting * 2. ✅ CPU 100% usage - Added select() with timeout instead of busy-wait * 3. ✅ Thread safety - Using lock_guard consistently */ #include "RokokoUDPReceiver.h" #include #include #include namespace RokokoIntegration { // Static members for WSA reference counting std::atomic RokokoUDPReceiver::s_wsaRefCount{0}; std::mutex RokokoUDPReceiver::s_wsaMutex; RokokoUDPReceiver::RokokoUDPReceiver() : mSocket(INVALID_SOCKET) , mIsRunning(false) , mIsListening(false) , mPacketsReceived(0) , mBytesReceived(0) , mLastPacketTime(0.0) , mPort(14043) , mBufferSize(65000) , mWsaInitialized(false) { } RokokoUDPReceiver::~RokokoUDPReceiver() { StopListening(); CloseSocket(); } // FIX 1: WSA Reference Counting bool RokokoUDPReceiver::InitializeWSA() { std::lock_guard lock(s_wsaMutex); if (s_wsaRefCount == 0) { WSADATA wsaData; if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) { return false; } } s_wsaRefCount++; mWsaInitialized = true; return true; } void RokokoUDPReceiver::CleanupWSA() { if (!mWsaInitialized) { return; } std::lock_guard lock(s_wsaMutex); s_wsaRefCount--; if (s_wsaRefCount == 0) { WSACleanup(); } mWsaInitialized = false; } bool RokokoUDPReceiver::Initialize(int port) { try { mPort = port; // FIX: Use reference-counted WSA initialization if (!InitializeWSA()) { SetError("WSAStartup failed"); return false; } // UDP 소켓 생성 mSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); if (mSocket == INVALID_SOCKET) { SetError("Failed to create UDP socket"); CleanupWSA(); return false; } // 소켓 옵션 설정 int sendBufferSize = mBufferSize; if (setsockopt(mSocket, SOL_SOCKET, SO_SNDBUF, (char*)&sendBufferSize, sizeof(sendBufferSize)) == SOCKET_ERROR) { SetError("Failed to set send buffer size"); CloseSocket(); return false; } // 주소 구조체 설정 struct sockaddr_in serverAddr; serverAddr.sin_family = AF_INET; serverAddr.sin_port = htons(mPort); serverAddr.sin_addr.s_addr = INADDR_ANY; // 소켓 바인딩 if (bind(mSocket, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) == SOCKET_ERROR) { std::ostringstream oss; oss << "Failed to bind to port " << mPort << ". Port may be in use."; SetError(oss.str()); CloseSocket(); return false; } // 논블로킹 모드는 select()와 함께 사용 u_long mode = 1; if (ioctlsocket(mSocket, FIONBIO, &mode) == SOCKET_ERROR) { SetError("Failed to set non-blocking mode"); CloseSocket(); return false; } return true; } catch (...) { SetError("Exception during UDP initialization"); return false; } } bool RokokoUDPReceiver::StartListening() { if (mSocket == INVALID_SOCKET) { SetError("UDP socket not initialized"); return false; } if (mIsListening) { SetError("Already listening"); return false; } try { mIsRunning = true; mIsListening = true; // 수신 스레드 시작 mReceiveThread = std::thread(&RokokoUDPReceiver::ReceiveThread, this); return true; } catch (...) { SetError("Exception starting receive thread"); mIsRunning = false; mIsListening = false; return false; } } void RokokoUDPReceiver::StopListening() { mIsRunning = false; mIsListening = false; if (mReceiveThread.joinable()) { mReceiveThread.join(); } } void RokokoUDPReceiver::SetDataCallback(std::function&, const std::string&)> callback) { mDataCallback = callback; } bool RokokoUDPReceiver::IsListening() const { return mIsListening; } bool RokokoUDPReceiver::IsConnected() const { return mIsListening && mPacketsReceived > 0; } std::string RokokoUDPReceiver::GetLastError() const { std::lock_guard lock(mErrorMutex); return mLastError; } void RokokoUDPReceiver::GetStatistics(uint64_t& packetsReceived, uint64_t& bytesReceived, double& lastPacketTime) const { std::lock_guard lock(mStatsMutex); packetsReceived = mPacketsReceived; bytesReceived = mBytesReceived; lastPacketTime = mLastPacketTime; } // FIX 2: Use select() to prevent CPU 100% usage void RokokoUDPReceiver::ReceiveThread() { std::vector buffer(mBufferSize); while (mIsRunning) { try { // ✅ FIX: Use select() to wait for data efficiently fd_set readSet; FD_ZERO(&readSet); FD_SET(mSocket, &readSet); // Timeout: 100ms (10 checks per second) timeval timeout; timeout.tv_sec = 0; timeout.tv_usec = 100000; // 100ms int selectResult = select(0, &readSet, nullptr, nullptr, &timeout); if (selectResult > 0 && FD_ISSET(mSocket, &readSet)) { // Data available - receive it struct sockaddr_in senderAddr; int senderAddrSize = sizeof(senderAddr); int bytesReceived = recvfrom(mSocket, reinterpret_cast(buffer.data()), mBufferSize, 0, (struct sockaddr*)&senderAddr, &senderAddrSize); if (bytesReceived > 0) { // 발신자 IP 주소 추출 char senderIP[INET_ADDRSTRLEN]; inet_ntop(AF_INET, &senderAddr.sin_addr, senderIP, INET_ADDRSTRLEN); std::string senderIPStr(senderIP); // 데이터 처리 ProcessIncomingData(buffer.data(), bytesReceived, senderIPStr); // 통계 업데이트 UpdateStatistics(bytesReceived); } } else if (selectResult == SOCKET_ERROR) { int error = WSAGetLastError(); std::ostringstream oss; oss << "select() error: " << error; SetError(oss.str()); break; } // selectResult == 0 means timeout - just continue loop (no busy-wait!) } catch (...) { SetError("Exception in receive thread"); break; } } mIsListening = false; } void RokokoUDPReceiver::ProcessIncomingData(const uint8_t* data, int size, const std::string& senderIP) { if (mDataCallback && data && size > 0) { try { // 데이터를 벡터로 복사 std::vector dataCopy(data, data + size); // 콜백 호출 mDataCallback(dataCopy, senderIP); } catch (...) { SetError("Exception in data callback"); } } } void RokokoUDPReceiver::SetError(const std::string& error) { std::lock_guard lock(mErrorMutex); mLastError = error; // 에러 로깅 (OptiTrack에서 확인 가능) std::cerr << "[RokokoUDPReceiver] Error: " << error << std::endl; } void RokokoUDPReceiver::UpdateStatistics(int packetSize) { std::lock_guard lock(mStatsMutex); mPacketsReceived++; mBytesReceived += packetSize; mLastPacketTime = std::chrono::duration_cast( std::chrono::high_resolution_clock::now().time_since_epoch() ).count() / 1000.0; } void RokokoUDPReceiver::CloseSocket() { if (mSocket != INVALID_SOCKET) { closesocket(mSocket); mSocket = INVALID_SOCKET; } // FIX: Use reference-counted cleanup CleanupWSA(); } }