Giới thiệu:
Blocking Queue là một Queue, nhưng có thể sử dụng bởi nhiều luồng (thread) khác nhau. Khi Blocking Queue là rỗng, một thread muốn lấy (pop) dữ liệu từ nó phải bị block cho đến có một thread khác đưa dữ liệu (push) vào. Khi Queue đầy, một thread muốn đưa dữ liệu vào cũng phải bị block cho đến khi một thread khác lấy dữ liệu ra khỏi nó. (Tham khảo : http://docs.oracle.com/javase/6/docs...kingQueue.html)

Blocking Queue là kiểu dữ liệu rất thích hợp để giải quyết bài toán kinh điển Producer-Consumer. Tuy nhiên ở C++11, queue có sẵn của nó không phải là blocking queue cho nên ta sẽ phải tự cài đặt. Ở đây ta sẽ cài đặt queue có sức chứa vô hạn (miễn là bộ nhớ còn đáp ứng) nên chỉ cần phải chờ đợi khi queue rỗng.

Prototype
class blocking_queue về cơ bản sẽ như sau:


Mã:
template <typename _Tp, typename _Sequence = std::deque<_Tp>>class blocking_queue {    typedef typename _Sequence::value_type value_type;     // Yêu cầi value_type phải cùng kiểu với _Tp     static_assert(std::is_same<_Sequence_value_type, _Tp>::value,             "require _Sequence::value_type is the same as _Tp");public:    typedef typename _Sequence::reference                 reference;    typedef typename _Sequence::const_reference           const_reference;    typedef typename _Sequence::size_type                 size_type;    typedef          _Sequence                            container_type; private:        _Sequence c; public:    blocking_queue() = default;    // phương thức khởi tạo mặc định    blocking_queue(const _Sequence& __c) = delete;        // Không sử dụng phương thức khởi tạo sao chép    blocking_queue& operator=(blocking_queue const&) = delete;  // không sử dụng toán tử gán     bool empty() const; // Kiểm tra queue rỗng hay không    size_type size() const; // Kích thước hiện tại     void push(const_reference __x);  // Thêm x vào cuối queue    void push(value_type&& __x);         // Thêm x vào cuối queue    template<typename... _Args> void emplace(_Args&&... __args);  // Thêm phần tử vào cuối queue, khởi tạo ngay tại chỗ     value_type take(reference __x); // Trả về giá trị phần tử đầu tiên đồng thời xoá nó ra khỏi queue     void clear(); // xoá hết tất cả các phần tử};
Về cơ bản thì class blocking_queue giống với std::queue, nhưng khác ở chỗ: thay vì có phương thức pop() (xoá phần tử đầu) và front() (lấy giá trị phần tử đầu) như ở queue thì ở blocking_queue ta dùng phương thức take() để làm cả hai việc trên.

Như vậy ở std::queue, để lấy một phần tử và xoá nó ra khỏi queue thì ta làm như sau:

Mã:
auto x = Queue.front();Queue.pop();
Còn ở blocking_queue:

Mã:
auto x = Queue.take();
Lý do của sự khác nhau này là:
- std::queue được thiết kể để sự dụng bởi 1 thread, nên sau khi 1 thread lấy giá trị các phần tử đầu/cuối, nếu thread đó không thay đổi queue thì các giá trị đầu/cuối queue vẫn chính là các giá trị mà ta đã lấy ra.
- với blocking_queue, nếu như ta lấy và xoá phần tử đầu theo cách của std::queue: gọi 2 phương thức front(), pop() riêng biệt ở thread A thì có thể xảy ra trường hợp: ở thread B ta cũng gọi front() vào pop(), trong đó front() của B xảy ra trước pop() của A. Khi đó, 2 thread A và B cùng lấy được 1 giá trị nhưng Queue thì bị xoá đi 2 phần tử.



ngoài ra còn có nhiều tình huống nữa có thể dẫn đến kết quá không như ý muốn.
Để giải quyết vấn đề trên, ta phải đưa hai thao tác này vào một phương thức và áp dụng các biện pháp điều độ vào phương thức này.

Cài đặt
Để đảm bảo chỉ có duy nhất 1 thread được truy cập vào blocking_queue trong 1 thời điểm, ta sử dụng mutex


Mã:
...class blocking_queue { private:    std::mutex _mutex;...};
- Các phương thức size(), empty(), clear():

Mã:
...classs blocking_queue {...private:    _Sequence c;    std::mutex _mutex;public:    size_type size() const {        std::unique_lock<std::mutex> lock(_mutex);        return c.size();    }     bool empty() const {        std::unique_lock<std::mutex> lock(_mutex);        return c.empty();    }     void clear() {        std::unique_lock<std::mutex> lock(_mutex);        c.clear();    }...};
std::unique_lock<std::mutex> là một wrapper của std::mutex, constructor nhận đối số là một đối tượng kiểu std::mutex và tự động lock đối tượng std::mutex đó bằng phương thức lock(). Destructor gọi đến phương thức unlock() của mutex. Nhờ đó, khi thoát khỏi các phương thức size(), empty(), _mutex sẽ được tự động mở khoá.

- phương thức push(), emplace(), take():

Mã:
class blocking_queue {...private:    std::condition_variable _cond;public:    void push(const value_type& __x) {         std::unique_lock<std::mutex> lock(_mutex);        c.push_back(__x);         lock.unlock();        _cond.notify_one();    }     void push(value_type&& __x) {         std::unique_lock<std::mutex> lock(_mutex);        c.push_back(std::move(__x));        lock.unlock();        _cond.notify_one();    }     template<typename... _Args>        void emplace(_Args&&... __args)        {             std::unique_lock<std::mutex> lock(_mutex);            c.emplace_back(std::forward<_Args>(__args)...);             _cond.notify_one();        }     value_type take() {        std::unique_lock<std::mutex> lock(_mutex);        _cond.wait(lock);        value_type x = std::move(c.front());        c.pop_front();        return x;    }};
Ở đây, ta sử dụng condition_variable (biến điều kiện) để cài đặt việc chờ đợi (ở phương thức take()) và thông báo (ở phương thức push(), emplace()).
- Ở phương thức take(), câu lệnh

Mã:
_cond.wait(lock);
mở khoá lock (tức là mở khoá _mutex) và chờ đợi cho đến khi một thread khác gọi notify_one() hoặc notify_all() đánh thức. Sau đó, lock lại bị khoá và thread thực hiện các câu lệnh tiếp theo

- Ở phương thức push(), emplace(), câu lệnh:

Mã:
_cond.notify_one();
được thực hiện sau khi đưa một phần tử vào cuối Queue (Queue lúc này chắc chắn không rỗng), có tác dụng thông báo (đánh thức) cho 1 trong các thread đang chờ (nếu có).

Mã hoàn chỉnh:

Mã:
// =====================================================================================// //       Filename:  blocking_queue.hh// //    Description:  // //        Version:  1.0//        Created:  06/19/2012 08:03:03 PM//       Revision:  none//       Compiler:  g++// //         Author:  BOSS14420, boss14420[at]gmail[dot]com//        Company:  // // ===================================================================================== #include <deque>#include <thread>#include <mutex>#include <condition_variable>#include <type_traits> template<typename _Tp, typename _Sequence = std::deque<_Tp> >class blocking_queue{    typedef typename _Sequence::value_type _Sequence_value_type;      static_assert(std::is_same<_Sequence_value_type, _Tp>::value,             "require _Sequence::value_type is the same as _Tp"); private:        _Sequence c;        std::mutex _mutex;        std::condition_variable _cond; public:    typedef typename _Sequence::value_type                value_type;    typedef typename _Sequence::reference                 reference;    typedef typename _Sequence::const_reference           const_reference;    typedef typename _Sequence::size_type                 size_type;    typedef          _Sequence                            container_type; public:    blocking_queue() = default;    blocking_queue(const _Sequence& __c) = delete;    blocking_queue& operator=(blocking_queue const&) = delete;     void push(const value_type& __x) {         std::unique_lock<std::mutex> lock(_mutex);        c.push_back(__x);         lock.unlock();        _cond.notify_one();    }     void push(value_type&& __x) {         std::unique_lock<std::mutex> lock(_mutex);        c.push_back(std::move(__x));        lock.unlock();        _cond.notify_one();    }     template<typename... _Args>        void emplace(_Args&&... __args)        {             std::unique_lock<std::mutex> lock(_mutex);            c.emplace_back(std::forward<_Args>(__args)...);             _cond.notify_one();        }     value_type take() {        std::unique_lock<std::mutex> lock(_mutex);        _cond.wait(lock);        value_type x = std::move(c.front());        c.pop_front();        return x;    }     void clear() {        std::unique_lock<std::mutex> lock(_mutex);        c.clear();    }};
Lưu ý:
  • Ở các phương thức push(), emplace(), thay vì chỉ đánh thức 1 thread, có thể đánh thức tất cả các thread đang chờ đợi bằng phương thức notify_all();

    Mã:
  • _cond.notify_all();