Thread-Safe Queue – Two Serious Errors

In my last post “Monitor Object”  I implemented a thread-safe queue. I made two serious errors. Sorry. Today, I will fix these issues.

First, I want to show you again the erroneous implementation from my last post to understand the context.

// monitorObject.cpp

#include <condition_variable>
#include <functional>
#include <queue>
#include <iostream>
#include <mutex>
#include <random>
#include <thread>

class Monitor {
public:
    void lock() const {
        monitMutex.lock();
    }

    void unlock() const {
        monitMutex.unlock();
    }

    void notify_one() const noexcept {
        monitCond.notify_one();
    }

    template <typename Predicate>
    void wait(Predicate pred) const {                 // (10)
        std::unique_lock<std::mutex> monitLock(monitMutex);
        monitCond.wait(monitLock, pred);
    }
    
private:
    mutable std::mutex monitMutex;
    mutable std::condition_variable monitCond;
};

template <typename T>                                  // (1)
class ThreadSafeQueue: public Monitor {
 public:
    void add(T val){ 
        lock();
        myQueue.push(val);                             // (6)
        unlock();
        notify_one();
    }
    
    T get(){ 
        wait( [this] { return ! myQueue.empty(); } );  // (2)
        lock();
        auto val = myQueue.front();                    // (4)
        myQueue.pop();                                 // (5)
        unlock();
        return val;
    }

private:
    std::queue<T> myQueue;                            // (3)
};


class Dice {
public:
    int operator()(){ return rand(); }
private:
    std::function<int()> rand = std::bind(std::uniform_int_distribution<>(1, 6), 
                                          std::default_random_engine());
};


int main(){
    
    std::cout << '\n';
    
    constexpr auto NumberThreads = 100;
    
    ThreadSafeQueue<int> safeQueue;                      // (7)

    auto addLambda = [&safeQueue](int val){ safeQueue.add(val);          // (8)
                                            std::cout << val << " "
                                            << std::this_thread::get_id() << "; "; 
                                          }; 
    auto getLambda = [&safeQueue]{ safeQueue.get(); };  // (9)

    std::vector<std::thread> addThreads(NumberThreads);
    Dice dice;
    for (auto& thr: addThreads) thr = std::thread(addLambda, dice());

    std::vector<std::thread> getThreads(NumberThreads);
    for (auto& thr: getThreads) thr = std::thread(getLambda);

    for (auto& thr: addThreads) thr.join();
    for (auto& thr: getThreads) thr.join();
    
    std::cout << "\n\n";
     
}

The key idea of the example is that the Monitor Object is encapsulated in a class and can, therefore, be reused. The class Monitor uses a std::mutex as monitor lock and a std::condition_variable as monitor condition. The class Monitor provides the minimal interface that a Monitor Object should support.

ThreadSafeQueue in line (1) extends  std::queue in line (3) with a thread-safe interface. ThreadSafeQueue derives from the class Monitor and uses its member functions to support the synchronized member functions add and get. The member functions add and get use the monitor lock to protect the Monitor Object, particularly the non-thread-safe myQueue. add notifies the waiting thread when a new item was added to myQueue. This notification is thread-safe. The member function get (line (3)) deserves more attention. First, the wait member function of the underlying condition variable is called. This wait call needs an additional predicate to protect against spurious and lost wakeups (C++ Core Guidelines: Be Aware of the Traps of Condition Variables). The operations modifying  myQueue (lines 4 and 5) must also be protected because they can interleave with the call myQueue.push(val) (line 6). The Monitor Object safeQueue line (7) uses the lambda functions in lines (8) and (9) to add or remove a number from the synchronized safeQueue. ThreadSafeQueue itself is a class template and can hold values from an arbitrary type. One hundred clients add 100 random numbers between 1 – 6 to  safeQueue (line 7), while hundred clients remove these 100 numbers concurrently from the safeQueue. The output of the program shows the numbers and the thread ids.

This program has two serious issues. Dietmar Kühl and Frank Birbacher wrote me an e-mail about it. These are their words, which I translated into English. My additions are cursive and bold.
 
  1. In ThreadSafeQueue::get() it is tested by Monitor::wait() whether myQueue contains an element or waits for an element to be contained. However, the lock is only held inside wait(), i.e. in get() you cannot be sure that the element is still in myQueue: another thread may get the lock and remove the element, resulting in undefined behavior on the call to myQueue.front().
  2. If the copy/move constructor of T throws an exception, the ThreadSafeQueue is in an inconsistent state: no member function is active, but the mutex is locked

 

The fix is that Monitor::wait() can only be called if a unique_lock is held. This can be achieved, for example, by having Monitor provide an appropriate (protected?) function that returns a suitable object and requests a reference to it in wait():

struct Monitor {
   using Lock = std::unique_lock<std::mutex>; // could be wrapper if you prefer
   [[nodiscard]] Lock receiveGuard() { return Lock(monitMutex); }
   template <typename Predicate>
   void wait(Lock& kerberos, Predicate pred) { monitCond.wait(kerberos, pred); }
   // …
};

template <typename T>
T ThreadSafeQueue<T>::get() {
   auto kerberos = receiveGuard();
   wait(kerberos, [this]{ return not myQueue.empty(); });
   T rc = std::move(myQueue.front());
   myqueue.pop();
   return rc;
}

This version corrects the exception problem for get(). For add() you can simply use the monitor object with a lock_guard:

template <typename T>
void add(T val) {
   {
        std::lock_guard<Monitor> kerberos(*this);
        myqueue.push(std::move(val));
    }
    notify_one();
}

I would probably wrap the notification in a “SendGuard” that contains a lock_guard and a reference to the condition_variable and sends the notification upon destruction:

class SendGuard {
    friend class Monitor;
    using deleter = decltype([](auto& cond){ cond->notify_one(); });
    std::unique_ptr<std::condition_variable, deleter> notifier;
    std::lock_guard<std::mutex> kerberos;
    SendGuard(auto& mutex, auto& cond): notifier(&cond), kerberos(mutex) {}
};

The move constructor and destructor should still be public and represent the whole interface! This would also make it much easier to use in add():

 

Rainer D 6 P2 500x500Modernes C++ Mentoring

Be part of my mentoring programs:

  • "Fundamentals for C++ Professionals" (open)
  • "Design Patterns and Architectural Patterns with C++" (open)
  • "C++20: Get the Details" (open)
  • "Concurrency with Modern C++" (starts March 2024)
  • Do you want to stay informed: Subscribe.

     

    template <typename T>
    void add(T val) {
       auto kerberos = sendGuard();
       myqueue.push(val);
    }
    

    Finally, here is the full implementation of Dietmar. The numbers correspond to the numbers in my monitorObjec.cpp example.

    // threadsafequeue1.cpp
    
    #include <condition_variable>
    #include <functional>
    #include <queue>
    #include <iostream>
    #include <mutex>
    #include <random>
    #include <thread>
    
    class Monitor {
    public:
        using Lock = std::unique_lock<std::mutex>;
        [[nodiscard]] Lock receiveGuard() {
            return Lock(monitMutex);
        }
    
        template <typename Predicate>
        void wait(Lock& kerberos, Predicate pred) {
            monitCond.wait(kerberos, pred);
        }
    
        class SendGuard {
            friend class Monitor;
            using deleter = decltype([](auto* cond){ cond->notify_one(); });
            std::unique_ptr<std::condition_variable, deleter> notifier;
            std::lock_guard<std::mutex> kerberos;
            SendGuard(auto& mutex, auto& cond): notifier(&cond), kerberos(mutex) {}
        };
    
        SendGuard sendGuard() { return {monitMutex, monitCond}; }
        
    private:
        mutable std::mutex monitMutex;
        mutable std::condition_variable monitCond;
    };
    
    template <typename T>                                  // (1)
    class ThreadSafeQueue: public Monitor {
     public:
        void add(T val){ 
            auto kerberos = sendGuard();
            myQueue.push(val);                             // (6)
        }
        
        T get(){ 
            auto kerberos = receiveGuard();
            wait(kerberos, [this] { return ! myQueue.empty(); } );  // (2)
            auto val = myQueue.front();                    // (4)
            myQueue.pop();                                 // (5)
            return val;
        }
    
    private:
        std::queue<T> myQueue;                            // (3)
    };
    
    
    class Dice {
    public:
        int operator()(){ return rand(); }
    private:
        std::function<int()> rand = std::bind(std::uniform_int_distribution<>(1, 6), 
                                              std::default_random_engine());
    };
    
    
    int main(){
        
        std::cout << '\n';
        
        constexpr auto NumberThreads = 100;
        
        ThreadSafeQueue<int> safeQueue;                      // (7)
    
        auto addLambda = [&safeQueue](int val){ safeQueue.add(val);          // (8)
                                                std::cout << val << " "
                                                << std::this_thread::get_id() << "; "; 
                                              }; 
        auto getLambda = [&safeQueue]{ safeQueue.get(); };  // (9)
    
        std::vector<std::thread> addThreads(NumberThreads);
        Dice dice;
        for (auto& thr: addThreads) thr = std::thread(addLambda, dice());
    
        std::vector<std::thread> getThreads(NumberThreads);
        for (auto& thr: getThreads) thr = std::thread(getLambda);
    
        for (auto& thr: addThreads) thr.join();
        for (auto& thr: getThreads) thr.join();
        
        std::cout << "\n\n";
         
    }
    

    As a result of the discussion above, Frank has proposed the following version below, which has a consistent and easy-to-use interface for Monitor.

    // threadSafeQueue.cpp
    
    #ifndef INCLUDED_PATTERNS_MONITOR2_MONITOR_HPP
    #define INCLUDED_PATTERNS_MONITOR2_MONITOR_HPP
    
    #include <atomic>
    #include <algorithm>
    #include <condition_variable>
    #include <deque>
    #include <iterator>
    #include <mutex>
    #include <stdexcept>
    #include <thread>
    #include <vector>
    
    
    class Monitor {
    public:
        struct UnlockAndNotify {
            std::mutex d_mutex;
            std::condition_variable d_condition;
    
            void lock() { d_mutex.lock(); }
            void unlock() { d_mutex.unlock(); d_condition.notify_one(); }
        };
    
    private:
        UnlockAndNotify d_combined;
    
    public:
        std::unique_lock<UnlockAndNotify> makeLockWithNotify() {
            return std::unique_lock{d_combined};
        }
    
        template <typename PRED>
        std::unique_lock<std::mutex> makeLockWithWait(PRED waitForCondition) {
            std::unique_lock lock{d_combined.d_mutex};
            d_combined.d_condition.wait(lock, waitForCondition);
            return lock;
        }
    };
    
    class ThreadQueue {
        Monitor d_monitor;
        std::deque<int> d_numberQueue;
    
        auto makeLockWhenNotEmpty() {
            return d_monitor.makeLockWithWait([this] { return !d_numberQueue.empty(); });
        }
    
    public:
        void addNumber(int number) {
            const auto lock = d_monitor.makeLockWithNotify();
            d_numberQueue.push_back(number);
        }
    
        int removeNumber() {
            const auto lock = makeLockWhenNotEmpty();
            const auto number = d_numberQueue.front();
            d_numberQueue.pop_front();
            return number;
        }
    };
    
    #endif
    
    int main() {
       ThreadQueue queue;
       std::atomic<int> sharedSum{};
       std::atomic<int> sharedCounter{};
    
       std::vector<std::jthread> threads;
       threads.reserve(200);
       std::generate_n(std::back_inserter(threads), 100, [&] {
           return std::jthread{[&] { sharedSum += queue.removeNumber(); }};
       });
       std::generate_n(std::back_inserter(threads), 100, [&] {
           return std::jthread{[&] { queue.addNumber(++sharedCounter); }};
       });
    
       threads.clear(); // wait for all threads to finish
       if (sharedSum.load() != 5050) {
           throw std::logic_error("Wrong result for sum of 1..100");
       }
    }
    

    The implementation of the monitor pattern here is based on the flexibility of std::unique_lock through its template parameter.  All of the std RAII lock guards can be used with any class that has lock() and unlock() methods.  The UnlockAndNotify class implements this interface and notifies its condition variable from within the unlock() method.  On top of that, the Monitor class provides a reduced public interface to create two different kinds of locks, one with notification and one without, by creating a std::unique_lock on either the whole UnlockAndNotify instance or just the contained std::mutex.

    On the choice of std::unique_lock versus std::lock_guard I (Frank) prefer the unique_lock in the interface.  This choice allows the user of the Monitor class more flexibility.  I value this flexibility higher than a possible performance difference to lock_guard which anyway needs to be measured first.  I acknowledge that the given examples don’t make use of this extra flexibility.

    Afterward, Dietmar further developed Frank’s idea: here, the protected data is kept in the Monitor, making it harder to access it unprotected.

    // threadsafequeue2.cpp
    
    #ifndef INCLUDED_PATTERNS_MONITOR3_MONITOR_HPP
    #define INCLUDED_PATTERNS_MONITOR3_MONITOR_HPP
    
    #include <algorithm>
    #include <atomic>
    #include <condition_variable>
    #include <deque>
    #include <functional>
    #include <iostream>
    #include <iterator>
    #include <mutex>
    #include <random>
    #include <stdexcept>
    #include <thread>
    #include <tuple>
    #include <vector>
    
    namespace patterns::monitor3 {
    
    template <typename T>
    class Monitor {
    public:
       struct UnlockAndNotify {
           std::mutex d_mutex;
           std::condition_variable d_condition;
       
           void lock() { d_mutex.lock(); }
           void unlock() { d_mutex.unlock(); d_condition.notify_one(); }
       };
    
    private:
       mutable UnlockAndNotify d_combined;
       mutable T               d_data;
    
    public:
       std::tuple<T&, std::unique_lock<UnlockAndNotify>> makeProducerLock() const {
           return { d_data, std::unique_lock{d_combined} };
       }
    
       template <typename PRED>
       std::tuple<T&, std::unique_lock<std::mutex>> makeConsumerLockWhen(PRED predicate) const {
           std::unique_lock lock{d_combined.d_mutex};
           d_combined.d_condition.wait(lock, [this, predicate]{ return predicate(d_data); });
           return { d_data, std::move(lock) };
       }
    };
    
    template <typename T>
    class ThreadQueue {
       Monitor<std::deque<T>> d_monitor;
    
    public:
       void add(T number) {
           auto[numberQueue, lock] = d_monitor.makeProducerLock();
           numberQueue.push_back(number);
       }
    
       T remove() {
           auto[numberQueue, lock] = d_monitor.makeConsumerLockWhen([](auto& numberQueue) { return !numberQueue.empty(); });
           const auto number = numberQueue.front();
           numberQueue.pop_front();
           return number;
       }
    };
    }
    
    #endif
    
    class Dice {
    public:
        int operator()(){ return rand(); }
    private:
        std::function<int()> rand = std::bind(std::uniform_int_distribution<>(1, 6), 
                                              std::default_random_engine());
    };
    
    int main(){
        
        std::cout << '\n';
        
        constexpr auto NumberThreads = 100;
        
        patterns::monitor3::ThreadQueue<int> safeQueue;                     
    
        auto addLambda = [&safeQueue](int val){ safeQueue.add(val);         
                                                std::cout << val << " "
                                                << std::this_thread::get_id() << "; "; 
                                              }; 
        auto getLambda = [&safeQueue]{ safeQueue.remove(); };  
    
        std::vector<std::thread> addThreads(NumberThreads);
        Dice dice;
        for (auto& thr: addThreads) thr = std::thread(addLambda, dice());
    
        std::vector<std::thread> getThreads(NumberThreads);
        for (auto& thr: getThreads) thr = std::thread(getLambda);
    
        for (auto& thr: addThreads) thr.join();
        for (auto& thr: getThreads) thr.join();
        
        std::cout << "\n\n";
         
    }
    

    Once more, thanks a lot to Frank and Dietmar. I didn’t want to prove, with my erroneous implementation of a thread-safe queue in my previous post, that concurrency is hard to get right. I’m particularly annoyed that I don’t put the mutex inside a lock (error 2). I teach this in my C++ classes: NNM (No Naked Mutex).

    What’s next?

    In my next post, I dive into the future of C++: C++23.

    Thanks a lot to my Patreon Supporters: Matt Braun, Roman Postanciuc, Tobias Zindl, G Prvulovic, Reinhold Dröge, Abernitzke, Frank Grimm, Sakib, Broeserl, António Pina, Sergey Agafyin, Андрей Бурмистров, Jake, GS, Lawton Shoemake, Jozo Leko, John Breland, Venkat Nandam, Jose Francisco, Douglas Tinkham, Kuchlong Kuchlong, Robert Blanch, Truels Wissneth, Kris Kafka, Mario Luoni, Friedrich Huber, lennonli, Pramod Tikare Muralidhara, Peter Ware, Daniel Hufschläger, Alessandro Pezzato, Bob Perry, Satish Vangipuram, Andi Ireland, Richard Ohnemus, Michael Dunsky, Leo Goodstadt, John Wiederhirn, Yacob Cohen-Arazi, Florian Tischler, Robin Furness, Michael Young, Holger Detering, Bernd Mühlhaus, Stephen Kelley, Kyle Dean, Tusar Palauri, Juan Dent, George Liao, Daniel Ceperley, Jon T Hess, Stephen Totten, Wolfgang Fütterer, Matthias Grün, Phillip Diekmann, Ben Atakora, Ann Shatoff, Rob North, Bhavith C Achar, Marco Parri Empoli, moon, Philipp Lenk, Hobsbawm, Charles-Jianye Chen, and Keith Jeffery.

    Thanks, in particular, to Jon Hess, Lakshman, Christian Wittenhorst, Sherhy Pyton, Dendi Suhubdy, Sudhakar Belagurusamy, Richard Sargeant, Rusty Fleming, John Nebel, Mipko, Alicja Kaminska, Slavko Radman, and David Poole.

    My special thanks to Embarcadero
    My special thanks to PVS-Studio
    My special thanks to Tipi.build 
    My special thanks to Take Up Code
    My special thanks to SHAVEDYAKS

    Seminars

    I’m happy to give online seminars or face-to-face seminars worldwide. Please call me if you have any questions.

    Standard Seminars (English/German)

    Here is a compilation of my standard seminars. These seminars are only meant to give you a first orientation.

    • C++ – The Core Language
    • C++ – The Standard Library
    • C++ – Compact
    • C++11 and C++14
    • Concurrency with Modern C++
    • Design Pattern and Architectural Pattern with C++
    • Embedded Programming with Modern C++
    • Generic Programming (Templates) with C++
    • Clean Code with Modern C++
    • C++20

    Online Seminars (German)

    Contact Me

    Modernes C++ Mentoring,

     

     

    3 replies
    1. Jens
      Jens says:

      I think there is a design issue in the example in addition to the concurrency issues. Monitor is “encapsulated in a class and can, therefore, be reused”, but this is done using public inheritance. Public inheritance models an is-a-relationship, but here I think a uses-relationship is more appropriate. With public inheritance, I can write code like “q.lock(); q.add(42);” and this would compile. So I would propose to use private inheritance if you want to stick to inheritance, or preferably a Monitor member object as in the last examples.

      Reply
      • Rainer Grimm
        Rainer Grimm says:

        Jens, you are right. The reason that I have a public inheritance in my example is historical. I started with CRTP. Afterward, I removed CRTP and ended with public inheritance (is-a relation.) As you also mentioned, I generally prefer composition (use-a relation).

        Reply
    2. Cristian
      Cristian says:

      “The fix is that Monitor::wait() can only be called if a unique_lock is held. ”

      I feel that the wording here is a bit too relaxed as specific lifetime requirements are critical (but not clear from “a unique_lock”) although the example following the text clarified it.
      It is not any/just “a unique_lock” is what is necessary to be held but one that outlives the Monitor::wait().
      As shown in the example this must come from the user context (ThreadSafeQueue::get()) which fulfills the requirement that once the conditions declared by the predicate are held the lock on mutex is also held.

      In my opinion the lifetime of objects must be stressed.
      This is a fine point to reason about and one that creates issues when reasoning about locking.
      It’s critical when the code is not executed in the same function where the condition variable wait() is called, like in this case where the design strives to achieve this separation.
      This is not always obvious to someone being used with the simpler use case where lifetime is not an issue as the condition variable and the code making use of the conditions declared by the predicate are living the same lifetime (e.g. same function/method).

      Reply

    Leave a Reply

    Want to join the discussion?
    Feel free to contribute!

    Leave a Reply

    Your email address will not be published. Required fields are marked *