DataExchangePipe.cpp 4.05 KB
#include <stdlib.h>
#include <string.h>
#include "DataExchangePipe.h"

CDataExchangePipe::CDataExchangePipe(long lDataBufferCount)
{
	m_lDataBufferCount = lDataBufferCount;
	m_pBuffer = malloc(sizeof(DataExchangeBlock) * (m_lDataBufferCount + 1));
	m_ppDataBuffers = (DataExchangeBlock**) malloc(sizeof(DataExchangeBlock*) * (m_lDataBufferCount + 1));
	m_ppDataBuffers[0] = (DataExchangeBlock*) m_pBuffer;
	for (int i = 1; i <= m_lDataBufferCount; i++)
	{
		m_ppDataBuffers[i] = m_ppDataBuffers[i - 1] + 1;
		m_ppDataBuffers[i]->wDataLen = 0;
	}
	m_lFirstIndex = 0;
	m_lCurrentIndex = 0;
	m_ppDataBuffers[m_lCurrentIndex]->wDataLen = 0;
}

CDataExchangePipe::~CDataExchangePipe()
{
	if (m_pBuffer)
	{
		free(m_pBuffer);
		m_pBuffer = NULL;
	}

	if (m_ppDataBuffers)
	{
		free(m_ppDataBuffers);
		m_ppDataBuffers = NULL;
	}
}

DataExchangeBlock* CDataExchangePipe::GetDataExchangeBlock(DWORD nTimeOut)
{
	m_DataArriveEvent.Wait(nTimeOut);
	Lock();
	//add by wg 2016-02-29
	//if not receive data arrive event, means no data arrived, just return NULL pointer
	//不删除锁之外的wait,是为了节省因锁定后等待而带来的时间浪费,降低了系统的并发度
	INT32 iRes = m_DataArriveEvent.Wait(0);
	if (iRes == CON_EVENT_TIME_OUT)
	{
		Unlock();
		return NULL;
	}
	//add over

	if (m_ppDataBuffers[m_lFirstIndex]->wDataLen == 0)
	{
		m_DataArriveEvent.ResetEvent();
		Unlock();
		return NULL;
	}

	DataExchangeBlock* p;
	m_ppDataBuffers[m_lDataBufferCount]->wDataLen = 0;
	p = m_ppDataBuffers[m_lFirstIndex];
	m_ppDataBuffers[m_lFirstIndex] = m_ppDataBuffers[m_lDataBufferCount];
	m_ppDataBuffers[m_lDataBufferCount] = p;
	if (m_lCurrentIndex == m_lFirstIndex)
	{
		m_lCurrentIndex = 0;
		m_lFirstIndex = 0;
		m_DataArriveEvent.ResetEvent();
	}
	else
	{
		m_lFirstIndex = (m_lFirstIndex + 1) % m_lDataBufferCount;
	}
	m_DataBufferAvailableEvent.SetEvent();
	Unlock();
	return m_ppDataBuffers[m_lDataBufferCount];
}

BOOL CDataExchangePipe::PutData(void *pData, INT32 wDataLen, DWORD nTimeOut, BOOL bNotifyImmediate)
{
	if (nTimeOut > 0)
	{
		m_DataBufferAvailableEvent.Wait(nTimeOut);
	}
	Lock();
	if (m_ppDataBuffers[m_lCurrentIndex]->wDataLen + wDataLen > DATA_EXCHANGE_BLOCK_SIZE)
	{
//		m_DataArriveEvent.SetEvent();//delete by wg: 2016-03-23
		if ((m_lCurrentIndex + 1) % m_lDataBufferCount == m_lFirstIndex)
		{
			m_DataBufferAvailableEvent.ResetEvent();
			Unlock();
			return FALSE;
		}
		else
		{
			if (wDataLen > DATA_EXCHANGE_BLOCK_SIZE)
			{
				Unlock();
				return FALSE;
			}

			m_lCurrentIndex = (m_lCurrentIndex + 1) % m_lDataBufferCount;
		}
	}
	memcpy(m_ppDataBuffers[m_lCurrentIndex]->lpData + m_ppDataBuffers[m_lCurrentIndex]->wDataLen, pData, wDataLen);
	m_ppDataBuffers[m_lCurrentIndex]->wDataLen += wDataLen;
	if (bNotifyImmediate)
	{
		m_DataArriveEvent.SetEvent();
	}
	Unlock();
	return TRUE;
}

void CDataExchangePipe::ClearData()
{
	Lock();
	for (int i = 1; i <= m_lDataBufferCount; i++)
	{
		m_ppDataBuffers[i]->wDataLen = 0;
	}
	m_lFirstIndex = 0;
	m_lCurrentIndex = 0;
	m_ppDataBuffers[m_lCurrentIndex]->wDataLen = 0;
	m_DataArriveEvent.ResetEvent();
	m_DataBufferAvailableEvent.SetEvent();
	Unlock();
}

BOOL CDataExchangePipe::PutDataEx(void *pData, INT32 wDataLen, DWORD nTimeOut, BOOL bNotifyImmediate)
{
	if (PutData(pData, wDataLen, 0, bNotifyImmediate))
	{
		return TRUE;
	}
	else
	{
		if (PutData(pData, wDataLen, nTimeOut, bNotifyImmediate))
		{
			return TRUE;
		}
		else
		{
			return FALSE;
		}
	}
}

void CDataExchangePipe::LockPut()
{
	m_MultiPutObject.Lock();
}

void CDataExchangePipe::UnlockPut()
{
	m_MultiPutObject.Unlock();
}

BOOL CDataExchangePipe::MultiPut(void *pData, INT32 wDataLen, DWORD nTimeOut, BOOL bNotifyImmediate)
{
	LockPut();
	if (PutData(pData, wDataLen, 0, bNotifyImmediate))
	{
		UnlockPut();
		return TRUE;
	}
	else
	{
		if (PutData(pData, wDataLen, nTimeOut, bNotifyImmediate))
		{
			UnlockPut();
			return TRUE;
		}
		else
		{
			UnlockPut();
			return FALSE;
		}
	}
}