#include #include #include "DataExchangePipe.h" CDataExchangePipe::CDataExchangePipe(long lDataBufferCount) { m_lDataBufferCount = lDataBufferCount; m_pBuffer = malloc(sizeof(DataExchangeBlock) * (m_lDataBufferCount + 1)); m_ppDataBuffers = (DataExchangeBlock**) malloc(sizeof(DataExchangeBlock*) * (m_lDataBufferCount + 1)); m_ppDataBuffers[0] = (DataExchangeBlock*) m_pBuffer; for (int i = 1; i <= m_lDataBufferCount; i++) { m_ppDataBuffers[i] = m_ppDataBuffers[i - 1] + 1; m_ppDataBuffers[i]->wDataLen = 0; } m_lFirstIndex = 0; m_lCurrentIndex = 0; m_ppDataBuffers[m_lCurrentIndex]->wDataLen = 0; } CDataExchangePipe::~CDataExchangePipe() { if (m_pBuffer) { free(m_pBuffer); m_pBuffer = NULL; } if (m_ppDataBuffers) { free(m_ppDataBuffers); m_ppDataBuffers = NULL; } } DataExchangeBlock* CDataExchangePipe::GetDataExchangeBlock(DWORD nTimeOut) { m_DataArriveEvent.Wait(nTimeOut); Lock(); //add by wg 2016-02-29 //if not receive data arrive event, means no data arrived, just return NULL pointer //不删除锁之外的wait,是为了节省因锁定后等待而带来的时间浪费,降低了系统的并发度 INT32 iRes = m_DataArriveEvent.Wait(0); if (iRes == CON_EVENT_TIME_OUT) { Unlock(); return NULL; } //add over if (m_ppDataBuffers[m_lFirstIndex]->wDataLen == 0) { m_DataArriveEvent.ResetEvent(); Unlock(); return NULL; } DataExchangeBlock* p; m_ppDataBuffers[m_lDataBufferCount]->wDataLen = 0; p = m_ppDataBuffers[m_lFirstIndex]; m_ppDataBuffers[m_lFirstIndex] = m_ppDataBuffers[m_lDataBufferCount]; m_ppDataBuffers[m_lDataBufferCount] = p; if (m_lCurrentIndex == m_lFirstIndex) { m_lCurrentIndex = 0; m_lFirstIndex = 0; m_DataArriveEvent.ResetEvent(); } else { m_lFirstIndex = (m_lFirstIndex + 1) % m_lDataBufferCount; } m_DataBufferAvailableEvent.SetEvent(); Unlock(); return m_ppDataBuffers[m_lDataBufferCount]; } BOOL CDataExchangePipe::PutData(void *pData, INT32 wDataLen, DWORD nTimeOut, BOOL bNotifyImmediate) { if (nTimeOut > 0) { m_DataBufferAvailableEvent.Wait(nTimeOut); } Lock(); if (m_ppDataBuffers[m_lCurrentIndex]->wDataLen + wDataLen > DATA_EXCHANGE_BLOCK_SIZE) { // m_DataArriveEvent.SetEvent();//delete by wg: 2016-03-23 if ((m_lCurrentIndex + 1) % m_lDataBufferCount == m_lFirstIndex) { m_DataBufferAvailableEvent.ResetEvent(); Unlock(); return FALSE; } else { if (wDataLen > DATA_EXCHANGE_BLOCK_SIZE) { Unlock(); return FALSE; } m_lCurrentIndex = (m_lCurrentIndex + 1) % m_lDataBufferCount; } } memcpy(m_ppDataBuffers[m_lCurrentIndex]->lpData + m_ppDataBuffers[m_lCurrentIndex]->wDataLen, pData, wDataLen); m_ppDataBuffers[m_lCurrentIndex]->wDataLen += wDataLen; if (bNotifyImmediate) { m_DataArriveEvent.SetEvent(); } Unlock(); return TRUE; } void CDataExchangePipe::ClearData() { Lock(); for (int i = 1; i <= m_lDataBufferCount; i++) { m_ppDataBuffers[i]->wDataLen = 0; } m_lFirstIndex = 0; m_lCurrentIndex = 0; m_ppDataBuffers[m_lCurrentIndex]->wDataLen = 0; m_DataArriveEvent.ResetEvent(); m_DataBufferAvailableEvent.SetEvent(); Unlock(); } BOOL CDataExchangePipe::PutDataEx(void *pData, INT32 wDataLen, DWORD nTimeOut, BOOL bNotifyImmediate) { if (PutData(pData, wDataLen, 0, bNotifyImmediate)) { return TRUE; } else { if (PutData(pData, wDataLen, nTimeOut, bNotifyImmediate)) { return TRUE; } else { return FALSE; } } } void CDataExchangePipe::LockPut() { m_MultiPutObject.Lock(); } void CDataExchangePipe::UnlockPut() { m_MultiPutObject.Unlock(); } BOOL CDataExchangePipe::MultiPut(void *pData, INT32 wDataLen, DWORD nTimeOut, BOOL bNotifyImmediate) { LockPut(); if (PutData(pData, wDataLen, 0, bNotifyImmediate)) { UnlockPut(); return TRUE; } else { if (PutData(pData, wDataLen, nTimeOut, bNotifyImmediate)) { UnlockPut(); return TRUE; } else { UnlockPut(); return FALSE; } } }