Use IO Completion port to implement thread safe FIFO

Here is an example to implement thread safe FIFO using IO Completion port as a queue implementation. The implementation assumes there is only on thread pushing and another thread pulling.

/*! A thread safe FIFO object with single thread push / single
 *  thread pull restriction. The element must be an integral type
 *  compatible with ULONG_PTR.
 */
template <class T>
class IocQueue {
public:
    IocQueue() : m_queueSize(16) {
        Initialize();
    }
        
    /*!  param queueSize The maximum number of objects in the
      Queue. Use 0 if you need infinite queue.
    */
    IocQueue(size_t queueSize) : m_queueSize(queueSize){
        Initialize();
    }
        
        
    ~IocQueue(){
        CloseHandle(m_hCancelEvent);
        CloseHandle(m_hOverflowWaiting);
    }

    /*! The Push method push an element to the queue. The call is
     *  blocked when the queue is full (upper bound).
     */
    bool Push(const T& obj){
        if(InterlockedIncrement(&m_elmCount) > m_queueSize){
            // the queue is full.
            HANDLE waitList[] = {m_hOverflowWaiting, m_hCancelEvent};
            DWORD wret = ::WaitForMultipleObjects(2, waitList, FALSE, INFINITE);
            if(wret == WAIT_OBJECT_0 + 1){
                // cancellation
                return false;
            }else if(wret != WAIT_OBJECT_0){
                // some error.
                return false;
            }
        }
            
        BOOL ret = ::PostQueuedCompletionStatus(m_hCompletionPort, 0, obj, NULL);
        if(!ret){
            return false;
        }
        return true;
    }

    /*! The Pull method pulls an element from the queue. The call
     *  is blocked when the queue is empty.
     */
    bool Pull(T& obj){
        if(InterlockedDecrement(&m_elmCount) == m_queueSize){
            // there is an waiting thread in Push.
            if(!SetEvent(m_hOverflowWaiting)){
                // Not expected ...
                return false;
            }
        }
            
        DWORD numberOfBytes;
        ULONG_PTR key;
        OVERLAPPED* overlapped = NULL;
        BOOL bResult = GetQueuedCompletionStatus(m_hCompletionPort, &numberOfBytes, &key, &overlapped, INFINITE);
        if(!bResult){
            return false;
        }
        obj = key;
        return true;
    }

    /*! The Activate method activates the FIFO. The FIFO must be
     *  activated to call any Push/Pull function.
     */
    void Activate(){
        m_hCompletionPort = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
        if(m_hCompletionPort == NULL){
            throw std::runtime_error("Failed to CreateIoCompletionPort");
        }

        ResetEvent(m_hCancelEvent);
        ResetEvent(m_hCancelEvent);
        m_elmCount = 0;            
    }

    /*! The Inactivate method inactivates the FIFO. When FIFO is
     *  inactivated, Push/Pull calls just return false.
     *
     *  The Inactivate method can be called from any thread to
     *  cancel any blocking operation.
     */
    void Inactivate(){
        SetEvent(m_hCancelEvent);
        CloseHandle(m_hCompletionPort);
    }
private:
    void Initialize(){
        // The event object is set to cancel a blocking push call
        // due to queue size upper bound.
        m_hCancelEvent = ::CreateEvent(NULL,
                                       TRUE,  // manual reset
                                       FALSE, // not signaned.
                                       NULL);

        // The event object is set when queue size is less than
        // upper bound. It's reset when queue size is at the upper
        // bound.
        m_hOverflowWaiting = ::CreateEvent(NULL,
                                           FALSE,  // auto reset
                                           FALSE,  // not signaled.
                                           NULL);
    }
        
private:
    // The IO Completion port is used as a FIFO object.
    HANDLE m_hCompletionPort;

    // The event object is set when queue size is less than upper
    // bound. It's reset when queue size is at the upper bound.
    HANDLE m_hOverflowWaiting;

    // The event object is set to cancel a blocking push call due
    // to queue size upper bound.
    HANDLE m_hCancelEvent;

    // The queue size upper bound.
    LONG m_queueSize;

    // The current queue size.
    volatile LONG m_elmCount;
};

The implementation is much faster than generic monitor pattern based FIFO implementation, although the monitor pattern based FIFO allows any number of threads to access the object concurrently. The performance bench mark shows:

Boost monitor pattern based FIFO
Elapsed Time 14.578

IO Completion FIFO
Elapsed Time 1.375

The IO completion version is more than 10 times faster than boost monitor pattern FIFO.

Here is the full source code of the boost monitor pattern based FIFO and the test program:

 

// Boost monitor pattern based FIFO

class BoostQueue {
public:
    BoostQueue(size_t queueSize) :
        m_queueSize(queueSize){
    }

    ~BoostQueue(){
    }
    
    bool Push(const int& obj){
        boost::mutex::scoped_lock lock(m_mutex);

        while(m_queue.size() == m_queueSize && m_isActivated){
            // no slot available
            m_waitForSlot.wait(lock);
        }
        if(!m_isActivated){
            return false;
        }
        m_queue.push_back(obj);
        m_waitForObj.notify_one();
        return true;
    }

    bool Pull(int& obj){
        boost::mutex::scoped_lock lock(m_mutex);
        while(m_queue.empty() && m_isActivated){
            // no obj available
            m_waitForObj.wait(lock);
        }
        if(!m_isActivated){
            return false;
        }
        
        obj = m_queue.front();
        m_queue.pop_front();
        m_waitForSlot.notify_one();
        return true;
    }
    
    void Activate(){
        boost::mutex::scoped_lock lock(m_mutex);
        m_isActivated = true;
    }
    
    void Inactivate(){
        boost::mutex::scoped_lock lock(m_mutex);
        m_isActivated = false;
    }
private:
    mutable boost::mutex m_mutex;
    boost::condition m_waitForSlot;
    boost::condition m_waitForObj;
    bool m_isActivated;
    const size_t m_queueSize;
    std::deque<int> m_queue;
};

 

// Test program

#include "ioc_queue.hpp"
#include "boost_queue.hpp"

using namespace std;
using namespace boost;

const static size_t COUNT = 1024 * 1024;

template <class QueueType>
void QueuePush(QueueType* queue){
    for(size_t i = 0; i < COUNT; i++){
        if(!queue->Push(i)){
            cout << "failed to push" << endl;
            break;
        }
    }
}

template <class QueueType>
void QueueMain(){
    boost::timer tim;
    QueueType queue(16);
    queue.Activate();
    
    thread th(bind(QueuePush<QueueType>, &queue));
    for(size_t i = 0; i < COUNT; i++){
        int v;
        if(!queue.Pull(v)){
            cout << "failed to pull" << endl;
            break;
        }
        if(i != v){
            cout << "illegal value" << endl;
            break;
        }
    }
    queue.Inactivate();
    cout << "Elapsed Time " << tim.elapsed() << endl;
}

int main(){
    cout << "Boost monitor pattern based FIFO" << endl;
    QueueMain< BoostQueue >();
    cout << endl;
    
    cout << "IO Completion FIFO" << endl;
    QueueMain< IocQueue<int> >();
    cout << endl;
}

 

 

Advertisements

About Moto

Engineer who likes coding
This entry was posted in Optimization. Bookmark the permalink.

One Response to Use IO Completion port to implement thread safe FIFO

  1. “`
    if(wret == WAIT_OBJECT_0 + 1){
    // cancellation
    return false;
    }else if(wret != WAIT_OBJECT_0){
    // some error.
    return false;
    }
    “`
    …Saying “else if” is pretty redundant – you’re returning!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s