上一篇文章我详细介绍了如何开发一款高性能的TCP服务器的网络传输层.本章我将谈谈如何开发一个高性能的UDP服务器的网络层.UDP服务器的网络层开发相对与TCP服务器来说要容易和简单的多,UDP服务器的大致流程为创建一个socket然后将其绑定到完成端口上并投递一定数量的recv操作.当有数据到来时从完成队列中取出数据发送到接收队列中即可。 测试结果如下: WindowsXP Professional,Intel Core Duo E4600 双核2.4G , 2G内存。同时30K个用户和该UDP服务器进行交互其CPU使用率为10%左右,内存占用7M左右。 下面详细介绍该服务器的架构及流程: 1. 首先介绍服务器的接收和发送缓存UDP_CONTEXT。
1 class UDP_CONTEXT : protected NET_CONTEXT 2 { 3 friend class UdpSer; 4 protected: 5 IP_ADDR m_RemoteAddr; //对端地址 6 7 enum 8 { 9 HEAP_SIZE = 1024 * 1024 * 5, 10 MAX_IDL_DATA = 10000, 11 }; 12 13 public: 14 UDP_CONTEXT() {} 15 virtual ~UDP_CONTEXT() {} 16 17 void* operator new(size_t nSize); 18 void operator delete(void* p); 19 20 private: 21 static vector<UDP_CONTEXT* > s_IDLQue; 22 static CRITICAL_SECTION s_IDLQueLock; 23 static HANDLE s_hHeap; 24 };
UDP_CONTEXT的实现流程和TCP_CONTEXT的实现流程大致相同,此处就不进行详细介绍。 2. UDP_RCV_DATA,当服务器收到客户端发来的数据时会将数据以UDP_RCV_DATA的形式放入到数据接收队列中,其声明如下:
1 class DLLENTRY UDP_RCV_DATA 2 { 3 friend class UdpSer; 4 public: 5 CHAR* m_pData; //数据缓冲区 6 INT m_nLen; //数据的长度 7 IP_ADDR m_PeerAddr; //发送报文的地址 8 9 UDP_RCV_DATA(const CHAR* szBuf, int nLen, const IP_ADDR& PeerAddr); 10 ~UDP_RCV_DATA(); 11 12 void* operator new(size_t nSize); 13 void operator delete(void* p); 14 15 enum 16 { 17 RCV_HEAP_SIZE = 1024 * 1024 *50, //s_Heap堆的大小 18 DATA_HEAP_SIZE = 100 * 1024* 1024, //s_DataHeap堆的大小 19 MAX_IDL_DATA = 250000, 20 }; 21 22 private: 23 static vector<UDP_RCV_DATA* > s_IDLQue; 24 static CRITICAL_SECTION s_IDLQueLock; 25 static HANDLE s_DataHeap; //数据缓冲区的堆 26 static HANDLE s_Heap; //RCV_DATA的堆 27 };
UDP_RCV_DATA的实现和TCP_RCV_DATA大致相同, 此处不在详细介绍. 下面将主要介绍UdpSer类, 该类主要用来管理UDP服务.其定义如下:
1 class DLLENTRY UdpSer 2 { 3 public: 4 UdpSer(); 5 ~UdpSer(); 6 7 /**//************************************************************************ 8 * Desc : 初始化静态资源,在申请UDP实例对象之前应先调用该函数, 否则程序无法正常运行 9 ************************************************************************/ 10 static void InitReource(); 11 12 /**//************************************************************************ 13 * Desc : 在释放UDP实例以后, 掉用该函数释放相关静态资源 14 ************************************************************************/ 15 static void ReleaseReource(); 16 17 //用指定本地地址和端口进行初始化 18 BOOL StartServer(const CHAR* szIp = "0.0.0.0", INT nPort = 0); 19 20 //从数据队列的头部获取一个接收数据, pCount不为null时返回队列的长度 21 UDP_RCV_DATA* GetRcvData(DWORD* pCount); 22 23 //向对端发送数据 24 BOOL SendData(const IP_ADDR& PeerAddr, const CHAR* szData, INT nLen); 25 26 /**//**************************************************** 27 * Name : CloseServer() 28 * Desc : 关闭服务器 29 ****************************************************/ 30 void CloseServer(); 31 32 protected: 33 SOCKET m_hSock; 34 vector<UDP_RCV_DATA* > m_RcvDataQue; //接收数据队列 35 CRITICAL_SECTION m_RcvDataLock; //访问m_RcvDataQue的互斥锁 36 long volatile m_bThreadRun; //是否允许后台线程继续运行 37 BOOL m_bSerRun; //服务器是否正在运行 38 39 HANDLE *m_pThreads; //线程数组 40 HANDLE m_hCompletion; //完成端口句柄 41 42 void ReadCompletion(BOOL bSuccess, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped); 43 44 /**//**************************************************** 45 * Name : WorkThread() 46 * Desc : I/O 后台管理线程 47 ****************************************************/ 48 static UINT WINAPI WorkThread(LPVOID lpParam); 49 };
1. InitReource() 主要对相关的静态资源进行初始化.其实大致和TcpServer::InitReource()大致相同.在UdpSer实例使用之前必须调用该函数进行静态资源的初始化, 否则服务器无法正常使用. 2.ReleaseReource() 主要对相关静态资源进行释放.只有在应用程序结束时才能调用该函数进行静态资源的释放. 3. StartServer() 该函数的主要功能启动一个UDP服务.其大致流程为先创建服务器UDP socket, 将其绑定到完成端口上然后投递一定数量的recv操作以接收客户端的数据.其实现如下:
1 BOOL UdpSer::StartServer(const CHAR* szIp /**//* = */, INT nPort /**//* = 0 */) 2 { 3 BOOL bRet = TRUE; 4 const int RECV_COUNT = 500; 5 WSABUF RcvBuf = { NULL, 0 }; 6 DWORD dwBytes = 0; 7 DWORD dwFlag = 0; 8 INT nAddrLen = sizeof(IP_ADDR); 9 INT iErrCode = 0; 10 11 try 12 { 13 if (m_bSerRun) 14 { 15 THROW_LINE; 16 } 17 18 m_bSerRun = TRUE; 19 m_hSock = WSASocket(AF_INET, SOCK_DGRAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); 20 if (INVALID_SOCKET == m_hSock) 21 { 22 THROW_LINE; 23 } 24 ULONG ul = 1; 25 ioctlsocket(m_hSock, FIONBIO, &ul); 26 27 //设置为地址重用,优点在于服务器关闭后可以立即启用 28 int nOpt = 1; 29 setsockopt(m_hSock, SOL_SOCKET, SO_REUSEADDR, (char*)&nOpt, sizeof(nOpt)); 30 31 //关闭系统缓存,使用自己的缓存以防止数据的复制操作 32 INT nZero = 0; 33 setsockopt(m_hSock, SOL_SOCKET, SO_SNDBUF, (char*)&nZero, sizeof(nZero)); 34 setsockopt(m_hSock, SOL_SOCKET, SO_RCVBUF, (CHAR*)&nZero, sizeof(nZero)); 35 36 IP_ADDR addr(szIp, nPort); 37 if (SOCKET_ERROR == bind(m_hSock, (sockaddr*)&addr, sizeof(addr))) 38 { 39 closesocket(m_hSock); 40 THROW_LINE; 41 } 42 43 //将SOCKET绑定到完成端口上 44 CreateIoCompletionPort((HANDLE)m_hSock, m_hCompletion, 0, 0); 45 46 //投递读操作 47 for (int nIndex = 0; nIndex < RECV_COUNT; nIndex++) 48 { 49 UDP_CONTEXT* pRcvContext = new UDP_CONTEXT(); 50 if (pRcvContext && pRcvContext->m_pBuf) 51 { 52 dwFlag = 0; 53 dwBytes = 0; 54 nAddrLen = sizeof(IP_ADDR); 55 RcvBuf.buf = pRcvContext->m_pBuf; 56 RcvBuf.len = UDP_CONTEXT::S_PAGE_SIZE; 57 58 pRcvContext->m_hSock = m_hSock; 59 pRcvContext->m_nOperation = OP_READ; 60 iErrCode = WSARecvFrom(pRcvContext->m_hSock, &RcvBuf, 1, &dwBytes, &dwFlag, (sockaddr*)(&pRcvContext->m_RemoteAddr) 61 , &nAddrLen, &(pRcvContext->m_ol), NULL); 62 if (SOCKET_ERROR == iErrCode && ERROR_IO_PENDING != WSAGetLastError()) 63 { 64 delete pRcvContext; 65 pRcvContext = NULL; 66 } 67 } 68 else 69 { 70 delete pRcvContext; 71 } 72 } 73 } 74 catch (const long &lErrLine) 75 { 76 bRet = FALSE; 77 _TRACE("Exp : %s -- %ld ", __FILE__, lErrLine); 78 } 79 80 return bRet; 81 }
4. GetRcvData(), 从接收队列中取出一个数据包.
1 UDP_RCV_DATA *UdpSer::GetRcvData(DWORD* pCount) 2 { 3 UDP_RCV_DATA* pRcvData = NULL; 4 5 EnterCriticalSection(&m_RcvDataLock); 6 vector<UDP_RCV_DATA* >::iterator iterRcv = m_RcvDataQue.begin(); 7 if (iterRcv != m_RcvDataQue.end()) 8 { 9 pRcvData = *iterRcv; 10 m_RcvDataQue.erase(iterRcv); 11 } 12 13 if (pCount) 14 { 15 *pCount = (DWORD)(m_RcvDataQue.size()); 16 } 17 LeaveCriticalSection(&m_RcvDataLock); 18 19 return pRcvData; 20 }
5. SendData() 发送指定长度的数据包.
1 BOOL UdpSer::SendData(const IP_ADDR& PeerAddr, const CHAR* szData, INT nLen) 2 { 3 BOOL bRet = TRUE; 4 try 5 { 6 if (nLen >= 1500) 7 { 8 THROW_LINE; 9 } 10 11 UDP_CONTEXT* pSendContext = new UDP_CONTEXT(); 12 if (pSendContext && pSendContext->m_pBuf) 13 { 14 pSendContext->m_nOperation = OP_WRITE; 15 pSendContext->m_RemoteAddr = PeerAddr; 16 17 memcpy(pSendContext->m_pBuf, szData, nLen); 18 19 WSABUF SendBuf = { NULL, 0 }; 20 DWORD dwBytes = 0; 21 SendBuf.buf = pSendContext->m_pBuf; 22 SendBuf.len = nLen; 23 24 INT iErrCode = WSASendTo(m_hSock, &SendBuf, 1, &dwBytes, 0, (sockaddr*)&PeerAddr, sizeof(PeerAddr), &(pSendContext->m_ol), NULL); 25 if (SOCKET_ERROR == iErrCode && ERROR_IO_PENDING != WSAGetLastError()) 26 { 27 delete pSendContext; 28 THROW_LINE; 29 } 30 } 31 else 32 { 33 delete pSendContext; 34 THROW_LINE; 35 } 36 } 37 catch (const long &lErrLine) 38 { 39 bRet = FALSE; 40 _TRACE("Exp : %s -- %ld ", __FILE__, lErrLine); 41 } 42 43 return bRet; 44 }
6. CloseServer() 关闭服务
1 void UdpSer::CloseServer() 2 { 3 m_bSerRun = FALSE; 4 closesocket(m_hSock); 5 }
7. WorkThread() 在完成端口上工作的后台线程
1 UINT WINAPI UdpSer::WorkThread(LPVOID lpParam) 2 { 3 UdpSer *pThis = (UdpSer *)lpParam; 4 DWORD dwTrans = 0, dwKey = 0; 5 LPOVERLAPPED pOl = NULL; 6 UDP_CONTEXT *pContext = NULL; 7 8 while (TRUE) 9 { 10 BOOL bOk = GetQueuedCompletionStatus(pThis->m_hCompletion, &dwTrans, &dwKey, (LPOVERLAPPED *)&pOl, WSA_INFINITE); 11 12 pContext = CONTAINING_RECORD(pOl, UDP_CONTEXT, m_ol); 13 if (pContext) 14 { 15 switch (pContext->m_nOperation) 16 { 17 case OP_READ: 18 pThis->ReadCompletion(bOk, dwTrans, pOl); 19 break; 20 case OP_WRITE: 21 delete pContext; 22 pContext = NULL; 23 break; 24 } 25 } 26 27 if (FALSE == InterlockedExchangeAdd(&(pThis->m_bThreadRun), 0)) 28 { 29 break; 30 } 31 } 32 33 return 0; 34 }
8.ReadCompletion(), 接收操作完成后的回调函数
1 void UdpSer::ReadCompletion(BOOL bSuccess, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped) 2 { 3 UDP_CONTEXT* pRcvContext = CONTAINING_RECORD(lpOverlapped, UDP_CONTEXT, m_ol); 4 WSABUF RcvBuf = { NULL, 0 }; 5 DWORD dwBytes = 0; 6 DWORD dwFlag = 0; 7 INT nAddrLen = sizeof(IP_ADDR); 8 INT iErrCode = 0; 9 10 if (TRUE == bSuccess && dwNumberOfBytesTransfered <= UDP_CONTEXT::S_PAGE_SIZE) 11 { 12#ifdef _XML_NET_ 13 EnterCriticalSection(&m_RcvDataLock); 14 15 UDP_RCV_DATA* pRcvData = new UDP_RCV_DATA(pRcvContext->m_pBuf, dwNumberOfBytesTransfered, pRcvContext->m_RemoteAddr); 16 if (pRcvData && pRcvData->m_pData) 17 { 18 m_RcvDataQue.push_back(pRcvData); 19 } 20 else 21 { 22 delete pRcvData; 23 } 24 25 LeaveCriticalSection(&m_RcvDataLock); 26#else 27 if (dwNumberOfBytesTransfered >= sizeof(PACKET_HEAD)) 28 { 29 EnterCriticalSection(&m_RcvDataLock); 30 31 UDP_RCV_DATA* pRcvData = new UDP_RCV_DATA(pRcvContext->m_pBuf, dwNumberOfBytesTransfered, pRcvContext->m_RemoteAddr); 32 if (pRcvData && pRcvData->m_pData) 33 { 34 m_RcvDataQue.push_back(pRcvData); 35 } 36 else 37 { 38 delete pRcvData; 39 } 40 41 LeaveCriticalSection(&m_RcvDataLock); 42 } 43#endif 44 45 //投递下一个接收操作 46 RcvBuf.buf = pRcvContext->m_pBuf; 47 RcvBuf.len = UDP_CONTEXT::S_PAGE_SIZE; 48 49 iErrCode = WSARecvFrom(pRcvContext->m_hSock, &RcvBuf, 1, &dwBytes, &dwFlag, (sockaddr*)(&pRcvContext->m_RemoteAddr) 50 , &nAddrLen, &(pRcvContext->m_ol), NULL); 51 if (SOCKET_ERROR == iErrCode && ERROR_IO_PENDING != WSAGetLastError()) 52 { 53 ATLTRACE("\r\n%s -- %ld dwNumberOfBytesTransfered = %ld, LAST_ERR = %ld" 54 , __FILE__, __LINE__, dwNumberOfBytesTransfered, WSAGetLastError()); 55 delete pRcvContext; 56 pRcvContext = NULL; 57 } 58 } 59 else 60 { 61 delete pRcvContext; 62 } 63 }
|
|
CALENDER
| 日 | 一 | 二 | 三 | 四 | 五 | 六 |
---|
29 | 30 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | 16 | 17 | 18 | 19 | 20 | 21 | 22 | 23 | 24 | 25 | 26 | 27 | 28 | 29 | 30 | 31 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |
|
常用链接
留言簿(3)
随笔分类
随笔档案
文章分类
搜索
最新评论
阅读排行榜
评论排行榜
Powered By: 博客园 模板提供:沪江博客
|