Skip to main content
Tweeted twitter.com/StackCodeReview/status/1347468109997891586
fix syntax highlighting, fix typo/spelling
Source Link
Stephen Rauch
  • 4.3k
  • 12
  • 24
  • 36

I think I can summarize the idea to the Producer-Consumer problem, with some modifications. And I think iI misused the term "Producer" (it depends from which point of view :))

That's it! I

I was wondering if the code is o.kOK, ESPECIALLY about thread-safety, and also about copy optimizations, C++ errors and so on.

#pragma once

#include <mutex>
#include <thread>
#include <condition_variable>

/**
 * Thread that infinitely make a task consuming each time a resource
 * When there is no more resource to consume, the thread exit.
 * When the thread is working, it cannot be canceled and wait the end of current operation to
 * ask if there is a pending request and see that there is no more pending request and also can end.
 */
template<typename Input, typename Output>
class ThreadConsumer
{
public:
    /**
     * Ensure cleanup before destruction
     */
    virtual ~ThreadConsumer()
    { stop(); }

    /**
     * Notify the consumer to shutdown and that no new input will be done.
     * If a task is currently running, wait the running task to finish before returns.
     * Used to join if a task is running before exiting, or free some output generated data.
     */
    void stop()
    {
        std::unique_lock lock(m_mutex);

        while(!m_waiting) {
            m_condition.wait(lock);
        }

        if(m_done) { // if zero tasks were accomplished, do not join the empty constructed default thread.
            m_thread.join(); // should returns immediately. Required & cleanup
        }
    }

    /**
     * @return true if the worker is waiting for an input resource to be processed.
     */
    bool ready() const
    {
        std::lock_guard lock(m_mutex);
        return m_waiting;
    }

    /**
     * Give a resource to the Thread. There is no process queue, the thread calling this function will wait
     * until the worker take the input. If the worker is waiting (that is ready() returned true in the current thread),
     * for an incoming resource, returns immediately.
     */
    void give(Input&& resource)
    {
        std::unique_lock lock(m_mutex);

        while(!m_waiting) {
            m_condition.wait(lock);
        }

        if(m_done) {
            m_thread.join(); // should return immediately. Required & cleanup
        }

        m_waiting = false;
        m_done = false;

        std::thread thread([&] {
            m_output = start(std::move(resource));

            std::lock_guard<std::mutex> lock(m_mutex);
            m_done = true;
            m_waiting = true;

            m_condition.notify_one();
        });

        m_thread = std::move(thread);
    }

    /**
     * @return true if the worker has finished a task and can provide an output result.
     * Not synonym for ready(): the only difference is just after construction of the consumer: at this time,
     * ready() returns true and done() returns false. In others cases, the two functions returns the same value.
     */
    bool done() const
    {
        std::lock_guard lock(m_mutex);
        return m_done;
    }

    /**
     * @return the output of the latest task. Do not check if the object is the one default-constructed with this
     * object. After at least one task finished, the output is always the result of a preceding task (unless moved from
     * caller).
     */
    Output& output()
    { return m_output; }

    const Output& output() const
    { return m_output; }

protected:
    virtual Output start(Input &&input) = 0;

private:
    /**
     * Result of last computation. Default-constructed if the consumer has not be launched one time.
     */
    Output m_output;

    /**
     * Protect all this class private fields except m_output that should be accessed only after a task finished,
     * also without concurrency.
     */
    mutable std::mutex m_mutex;
    std::condition_variable m_condition;

    /**
     * Represents current operation thread (if any)
     */
    std::thread m_thread;

    bool m_waiting = true;
    bool m_done = false;
};

template class ThreadConsumer<int, int>; // To debug syntax errors
```

Thanks in advance.

I think I can summarize the idea to the Producer-Consumer problem, with some modifications. And I think i misused the term "Producer" (it depends from which point of view :))

That's it! I was wondering if the code is o.k, ESPECIALLY about thread-safety, and also about copy optimizations, C++ errors and so on.

#pragma once

#include <mutex>
#include <thread>
#include <condition_variable>

/**
 * Thread that infinitely make a task consuming each time a resource
 * When there is no more resource to consume, the thread exit.
 * When the thread is working, it cannot be canceled and wait the end of current operation to
 * ask if there is a pending request and see that there is no more pending request and also can end.
 */
template<typename Input, typename Output>
class ThreadConsumer
{
public:
    /**
     * Ensure cleanup before destruction
     */
    virtual ~ThreadConsumer()
    { stop(); }

    /**
     * Notify the consumer to shutdown and that no new input will be done.
     * If a task is currently running, wait the running task to finish before returns.
     * Used to join if a task is running before exiting, or free some output generated data.
     */
    void stop()
    {
        std::unique_lock lock(m_mutex);

        while(!m_waiting) {
            m_condition.wait(lock);
        }

        if(m_done) { // if zero tasks were accomplished, do not join the empty constructed default thread.
            m_thread.join(); // should returns immediately. Required & cleanup
        }
    }

    /**
     * @return true if the worker is waiting for an input resource to be processed.
     */
    bool ready() const
    {
        std::lock_guard lock(m_mutex);
        return m_waiting;
    }

    /**
     * Give a resource to the Thread. There is no process queue, the thread calling this function will wait
     * until the worker take the input. If the worker is waiting (that is ready() returned true in the current thread),
     * for an incoming resource, returns immediately.
     */
    void give(Input&& resource)
    {
        std::unique_lock lock(m_mutex);

        while(!m_waiting) {
            m_condition.wait(lock);
        }

        if(m_done) {
            m_thread.join(); // should return immediately. Required & cleanup
        }

        m_waiting = false;
        m_done = false;

        std::thread thread([&] {
            m_output = start(std::move(resource));

            std::lock_guard<std::mutex> lock(m_mutex);
            m_done = true;
            m_waiting = true;

            m_condition.notify_one();
        });

        m_thread = std::move(thread);
    }

    /**
     * @return true if the worker has finished a task and can provide an output result.
     * Not synonym for ready(): the only difference is just after construction of the consumer: at this time,
     * ready() returns true and done() returns false. In others cases, the two functions returns the same value.
     */
    bool done() const
    {
        std::lock_guard lock(m_mutex);
        return m_done;
    }

    /**
     * @return the output of the latest task. Do not check if the object is the one default-constructed with this
     * object. After at least one task finished, the output is always the result of a preceding task (unless moved from
     * caller).
     */
    Output& output()
    { return m_output; }

    const Output& output() const
    { return m_output; }

protected:
    virtual Output start(Input &&input) = 0;

private:
    /**
     * Result of last computation. Default-constructed if the consumer has not be launched one time.
     */
    Output m_output;

    /**
     * Protect all this class private fields except m_output that should be accessed only after a task finished,
     * also without concurrency.
     */
    mutable std::mutex m_mutex;
    std::condition_variable m_condition;

    /**
     * Represents current operation thread (if any)
     */
    std::thread m_thread;

    bool m_waiting = true;
    bool m_done = false;
};

template class ThreadConsumer<int, int>; // To debug syntax errors

Thanks in advance.

I think I can summarize the idea to the Producer-Consumer problem, with some modifications. And I think I misused the term "Producer" (it depends from which point of view :))

That's it!

I was wondering if the code is OK, ESPECIALLY about thread-safety, and also about copy optimizations, C++ errors and so on.

#pragma once

#include <mutex>
#include <thread>
#include <condition_variable>

/**
 * Thread that infinitely make a task consuming each time a resource
 * When there is no more resource to consume, the thread exit.
 * When the thread is working, it cannot be canceled and wait the end of current operation to
 * ask if there is a pending request and see that there is no more pending request and also can end.
 */
template<typename Input, typename Output>
class ThreadConsumer
{
public:
    /**
     * Ensure cleanup before destruction
     */
    virtual ~ThreadConsumer()
    { stop(); }

    /**
     * Notify the consumer to shutdown and that no new input will be done.
     * If a task is currently running, wait the running task to finish before returns.
     * Used to join if a task is running before exiting, or free some output generated data.
     */
    void stop()
    {
        std::unique_lock lock(m_mutex);

        while(!m_waiting) {
            m_condition.wait(lock);
        }

        if(m_done) { // if zero tasks were accomplished, do not join the empty constructed default thread.
            m_thread.join(); // should returns immediately. Required & cleanup
        }
    }

    /**
     * @return true if the worker is waiting for an input resource to be processed.
     */
    bool ready() const
    {
        std::lock_guard lock(m_mutex);
        return m_waiting;
    }

    /**
     * Give a resource to the Thread. There is no process queue, the thread calling this function will wait
     * until the worker take the input. If the worker is waiting (that is ready() returned true in the current thread),
     * for an incoming resource, returns immediately.
     */
    void give(Input&& resource)
    {
        std::unique_lock lock(m_mutex);

        while(!m_waiting) {
            m_condition.wait(lock);
        }

        if(m_done) {
            m_thread.join(); // should return immediately. Required & cleanup
        }

        m_waiting = false;
        m_done = false;

        std::thread thread([&] {
            m_output = start(std::move(resource));

            std::lock_guard<std::mutex> lock(m_mutex);
            m_done = true;
            m_waiting = true;

            m_condition.notify_one();
        });

        m_thread = std::move(thread);
    }

    /**
     * @return true if the worker has finished a task and can provide an output result.
     * Not synonym for ready(): the only difference is just after construction of the consumer: at this time,
     * ready() returns true and done() returns false. In others cases, the two functions returns the same value.
     */
    bool done() const
    {
        std::lock_guard lock(m_mutex);
        return m_done;
    }

    /**
     * @return the output of the latest task. Do not check if the object is the one default-constructed with this
     * object. After at least one task finished, the output is always the result of a preceding task (unless moved from
     * caller).
     */
    Output& output()
    { return m_output; }

    const Output& output() const
    { return m_output; }

protected:
    virtual Output start(Input &&input) = 0;

private:
    /**
     * Result of last computation. Default-constructed if the consumer has not be launched one time.
     */
    Output m_output;

    /**
     * Protect all this class private fields except m_output that should be accessed only after a task finished,
     * also without concurrency.
     */
    mutable std::mutex m_mutex;
    std::condition_variable m_condition;

    /**
     * Represents current operation thread (if any)
     */
    std::thread m_thread;

    bool m_waiting = true;
    bool m_done = false;
};

template class ThreadConsumer<int, int>; // To debug syntax errors
```

That's it! I was wondering if the code is o.k, SPECIALLYESPECIALLY about thread-safety, and secondarlyalso about copy optimizations, C++ errors and so-on on.

#pragma once

#include <mutex>
#include <thread>
#include <condition_variable>

/**
 * Thread that infinitely make a task consuming each time a resource
 * When there is no more resource to consume, the thread exit.
 * When the thread is working, it cannot be canceled and wait the end of current operation to
 * ask if there is a pending request and see that there is no more pending request and also can end.
 */
template<typename Input, typename Output>
class ThreadConsumer
{
public:
    /**
     * Ensure cleanup before destruction
     */
    virtual ~ThreadConsumer()
    { stop(); }

    /**
     * Notify the consumer to shutdown and that no new input will be done.
     * If a task is currently running, wait the running task to finish before returns.
     * Used to join if a task is running before exiting, or free some output generated data.
     */
    void stop()
    {
        std::unique_lock lock(m_mutex);

        while(!m_waiting) {
            m_condition.wait(lock);
        }

        if(m_done) { // if zero tasks were accomplished, do not join the empty constructed default thread.
            m_thread.join(); // should returns immediately. Required & cleanup
        }
    }

    /**
     * @return true if the worker is waiting for an input resource to be processed.
     */
    bool ready() const
    {
        std::lock_guard lock(m_mutex);
        return m_waiting;
    }

    /**
     * Give a resource to the Thread. There is no process queue, the thread calling this function will wait
     * until the worker take the input. If the worker is waiting (that is ready() returned true in the current thread),
     * for an incoming resource, returns immediately.
     */
    void give(Input&& resource)
    {
        std::unique_lock lock(m_mutex);

        while(!m_waiting) {
            m_condition.wait(lock);
        }

        if(m_done) {
            m_thread.join(); // should return immediately. Required & cleanup
        }

        m_waiting = false;
        m_done = false;

        std::thread thread([&] {
            m_output = start(std::move(resource));

            std::lock_guard<std::mutex> lock(m_mutex);
            m_done = true;
            m_waiting = true;

            m_condition.notify_one();
        });

        m_thread = std::move(thread);
    }

    /**
     * @return true if the worker has finished a task and can provide an output result.
     * Not synonym for ready(): the only difference is just after construction of the consumer: at this time,
     * ready() returns true and done() returns false. In others cases, the two functions returns the same value.
     */
    bool done() const
    {
        std::lock_guard lock(m_mutex);
        return m_done;
    }

    /**
     * @return the output of the latest task. Do not check if the object is the one default-constructed with this
     * object. After at least one task finished, the output is always the result of a preceding task (unless moved from
     * caller).
     */
    Output& output()
    { return m_output; }

    const Output& output() const
    { return m_output; }

protected:
    virtual Output start(Input &&input) = 0;

private:
    /**
     * Result of last computation. Default-constructed if the consumer has not be launched one time.
     */
    Output m_output;

    /**
     * Protect all this class private fields except m_output that should be accessed only after a task finished,
     * also without concurrency.
     */
    mutable std::mutex m_mutex;
    std::condition_variable m_condition;

    /**
     * Represents current operation thread (if any)
     */
    std::thread m_thread;

    bool m_waiting = true;
    bool m_done = false;
};

template class ThreadConsumer<int, int>; // To debug syntax errors
#pragma once

#include <mutex>
#include <thread>
#include <condition_variable>

/**
 * Thread that infinitely make a task consuming each time a resource
 * When there is no more resource to consume, the thread exit.
 * When the thread is working, it cannot be canceled and wait the end of current operation to
 * ask if there is a pending request and see that there is no more pending request and also can end.
 */
template<typename Input, typename Output>
class ThreadConsumer
{
public:
    /**
     * Ensure cleanup before destruction
     */
    virtual ~ThreadConsumer()
    { stop(); }

    /**
     * Notify the consumer to shutdown and that no new input will be done.
     * If a task is currently running, wait the running task to finish before returns.
     * Used to join if a task is running before exiting, or free some output generated data.
     */
    void stop()
    {
        std::unique_lock lock(m_mutex);

        while(!m_waiting) {
            m_condition.wait(lock);
        }

        if(m_done) { // if zero tasks were accomplished, do not join the empty constructed default thread.
            m_thread.join(); // should returns immediately. Required & cleanup
        }
    }

    /**
     * @return true if the worker is waiting for an input resource to be processed.
     */
    bool ready() const
    {
        std::lock_guard lock(m_mutex);
        return m_waiting;
    }

    /**
     * Give a resource to the Thread. There is no process queue, the thread calling this function will wait
     * until the worker take the input. If the worker is waiting (that is ready() returned true in the current thread),
     * for an incoming resource, returns immediately.
     */
    void give(Input&& resource)
    {
        std::unique_lock lock(m_mutex);

        while(!m_waiting) {
            m_condition.wait(lock);
        }

        if(m_done) {
            m_thread.join(); // should return immediately. Required & cleanup
        }

        m_waiting = false;
        m_done = false;

        std::thread thread([&] {
            m_output = start(std::move(resource));

            std::lock_guard<std::mutex> lock(m_mutex);
            m_done = true;
            m_waiting = true;

            m_condition.notify_one();
        });

        m_thread = std::move(thread);
    }

    /**
     * @return true if the worker has finished a task and can provide an output result.
     * Not synonym for ready(): the only difference is just after construction of the consumer: at this time,
     * ready() returns true and done() returns false. In others cases, the two functions returns the same value.
     */
    bool done() const
    {
        std::lock_guard lock(m_mutex);
        return m_done;
    }

    /**
     * @return the output of the latest task. Do not check if the object is the one default-constructed with this
     * object. After at least one task finished, the output is always the result of a preceding task (unless moved from
     * caller).
     */
    Output& output()
    { return m_output; }

    const Output& output() const
    { return m_output; }

protected:
    virtual Output start(Input &&input) = 0;

private:
    /**
     * Result of last computation. Default-constructed if the consumer has not be launched one time.
     */
    Output m_output;

    /**
     * Protect all this class private fields except m_output that should be accessed only after a task finished,
     * also without concurrency.
     */
    mutable std::mutex m_mutex;
    std::condition_variable m_condition;

    /**
     * Represents current operation thread (if any)
     */
    std::thread m_thread;

    bool m_waiting = true;
    bool m_done = false;
};

template class ThreadConsumer<int, int>; // To debug syntax errors

That's it! I was wondering if the code is o.k, SPECIALLY about thread-safety, and secondarly about copy optimizations, C++ errors and so-on.

#pragma once

#include <mutex>
#include <thread>
#include <condition_variable>

/**
 * Thread that infinitely make a task consuming each time a resource
 * When there is no more resource to consume, the thread exit.
 * When the thread is working, it cannot be canceled and wait the end of current operation to
 * ask if there is a pending request and see that there is no more pending request and also can end.
 */
template<typename Input, typename Output>
class ThreadConsumer
{
public:
    /**
     * Ensure cleanup before destruction
     */
    virtual ~ThreadConsumer()
    { stop(); }

    /**
     * Notify the consumer to shutdown and that no new input will be done.
     * If a task is currently running, wait the running task to finish before returns.
     * Used to join if a task is running before exiting, or free some output generated data.
     */
    void stop()
    {
        std::unique_lock lock(m_mutex);

        while(!m_waiting) {
            m_condition.wait(lock);
        }

        if(m_done) { // if zero tasks were accomplished, do not join the empty constructed default thread.
            m_thread.join(); // should returns immediately. Required & cleanup
        }
    }

    /**
     * @return true if the worker is waiting for an input resource to be processed.
     */
    bool ready() const
    {
        std::lock_guard lock(m_mutex);
        return m_waiting;
    }

    /**
     * Give a resource to the Thread. There is no process queue, the thread calling this function will wait
     * until the worker take the input. If the worker is waiting (that is ready() returned true in the current thread),
     * for an incoming resource, returns immediately.
     */
    void give(Input&& resource)
    {
        std::unique_lock lock(m_mutex);

        while(!m_waiting) {
            m_condition.wait(lock);
        }

        if(m_done) {
            m_thread.join(); // should return immediately. Required & cleanup
        }

        m_waiting = false;
        m_done = false;

        std::thread thread([&] {
            m_output = start(std::move(resource));

            std::lock_guard<std::mutex> lock(m_mutex);
            m_done = true;
            m_waiting = true;

            m_condition.notify_one();
        });

        m_thread = std::move(thread);
    }

    /**
     * @return true if the worker has finished a task and can provide an output result.
     * Not synonym for ready(): the only difference is just after construction of the consumer: at this time,
     * ready() returns true and done() returns false. In others cases, the two functions returns the same value.
     */
    bool done() const
    {
        std::lock_guard lock(m_mutex);
        return m_done;
    }

    /**
     * @return the output of the latest task. Do not check if the object is the one default-constructed with this
     * object. After at least one task finished, the output is always the result of a preceding task (unless moved from
     * caller).
     */
    Output& output()
    { return m_output; }

    const Output& output() const
    { return m_output; }

protected:
    virtual Output start(Input &&input) = 0;

private:
    /**
     * Result of last computation. Default-constructed if the consumer has not be launched one time.
     */
    Output m_output;

    /**
     * Protect all this class private fields except m_output that should be accessed only after a task finished,
     * also without concurrency.
     */
    mutable std::mutex m_mutex;
    std::condition_variable m_condition;

    /**
     * Represents current operation thread (if any)
     */
    std::thread m_thread;

    bool m_waiting = true;
    bool m_done = false;
};

template class ThreadConsumer<int, int>; // To debug syntax errors

That's it! I was wondering if the code is o.k, ESPECIALLY about thread-safety, and also about copy optimizations, C++ errors and so on.

#pragma once

#include <mutex>
#include <thread>
#include <condition_variable>

/**
 * Thread that infinitely make a task consuming each time a resource
 * When there is no more resource to consume, the thread exit.
 * When the thread is working, it cannot be canceled and wait the end of current operation to
 * ask if there is a pending request and see that there is no more pending request and also can end.
 */
template<typename Input, typename Output>
class ThreadConsumer
{
public:
    /**
     * Ensure cleanup before destruction
     */
    virtual ~ThreadConsumer()
    { stop(); }

    /**
     * Notify the consumer to shutdown and that no new input will be done.
     * If a task is currently running, wait the running task to finish before returns.
     * Used to join if a task is running before exiting, or free some output generated data.
     */
    void stop()
    {
        std::unique_lock lock(m_mutex);

        while(!m_waiting) {
            m_condition.wait(lock);
        }

        if(m_done) { // if zero tasks were accomplished, do not join the empty constructed default thread.
            m_thread.join(); // should returns immediately. Required & cleanup
        }
    }

    /**
     * @return true if the worker is waiting for an input resource to be processed.
     */
    bool ready() const
    {
        std::lock_guard lock(m_mutex);
        return m_waiting;
    }

    /**
     * Give a resource to the Thread. There is no process queue, the thread calling this function will wait
     * until the worker take the input. If the worker is waiting (that is ready() returned true in the current thread),
     * for an incoming resource, returns immediately.
     */
    void give(Input&& resource)
    {
        std::unique_lock lock(m_mutex);

        while(!m_waiting) {
            m_condition.wait(lock);
        }

        if(m_done) {
            m_thread.join(); // should return immediately. Required & cleanup
        }

        m_waiting = false;
        m_done = false;

        std::thread thread([&] {
            m_output = start(std::move(resource));

            std::lock_guard<std::mutex> lock(m_mutex);
            m_done = true;
            m_waiting = true;

            m_condition.notify_one();
        });

        m_thread = std::move(thread);
    }

    /**
     * @return true if the worker has finished a task and can provide an output result.
     * Not synonym for ready(): the only difference is just after construction of the consumer: at this time,
     * ready() returns true and done() returns false. In others cases, the two functions returns the same value.
     */
    bool done() const
    {
        std::lock_guard lock(m_mutex);
        return m_done;
    }

    /**
     * @return the output of the latest task. Do not check if the object is the one default-constructed with this
     * object. After at least one task finished, the output is always the result of a preceding task (unless moved from
     * caller).
     */
    Output& output()
    { return m_output; }

    const Output& output() const
    { return m_output; }

protected:
    virtual Output start(Input &&input) = 0;

private:
    /**
     * Result of last computation. Default-constructed if the consumer has not be launched one time.
     */
    Output m_output;

    /**
     * Protect all this class private fields except m_output that should be accessed only after a task finished,
     * also without concurrency.
     */
    mutable std::mutex m_mutex;
    std::condition_variable m_condition;

    /**
     * Represents current operation thread (if any)
     */
    std::thread m_thread;

    bool m_waiting = true;
    bool m_done = false;
};

template class ThreadConsumer<int, int>; // To debug syntax errors
added 22 characters in body
Source Link
rafoo
  • 335
  • 2
  • 7
#pragma once

#include <mutex>
#include <thread>
#include <condition_variable>

/**
 * Thread that infinitely make a task consuming each time a resource
 * When there is no more resource to consume, the thread exit.
 * When the thread is working, it cannot be canceled and wait the end of current operation to
 * ask if there is a pending request and see that there is no more pending request and also can end.
 */
template<typename Input, typename Output>
class ThreadConsumer
{
public:
    /**
     * Ensure cleanup before destruction
     */
    virtual ~ThreadConsumer()
    { stop(); }

    /**
     * Notify the consumer to shutdown and that no new input will be done.
     * If a task is currently running, wait the running task to finish before returns.
     * Used to join if a task is running before exiting, or free some output generated data.
     */
    void stop()
    {
        std::unique_lock lock(m_mutex);

        while(!m_waiting) {
            m_condition.wait(lock);
        }

        if(m_done) { // if zero tasks were accomplished, do not join the empty constructed default thread.
            m_thread.join(); // should returns immediately. Required & cleanup
        }
    }

    /**
     * @return true if the worker is waiting for an input resource to be processed.
     */
    bool ready() const
    {
        std::lock_guard lock(m_mutex);
        return m_waiting;
    }

    /**
     * Give a resource to the Thread. There is no process queue, the thread calling this function will wait
     * until the worker take the input. If the worker is waiting (that is ready() returned true in the current thread),
     * for an incoming resource, returns immediately.
     */
    void give(Input&& resource)
    {
        std::unique_lock lock(m_mutex);

        while(!m_waiting) {
            m_condition.wait(lock);
        }

        if(m_done) {
            m_thread.join(); // should return immediately. Required & cleanup
        }

        m_waiting = false;
        m_done = false;

        std::thread thread([&] {
            m_output = start(std::move(resource));

            std::lock_guard<std::mutex> lock(m_mutex);
            m_done = true;
            m_waiting = true;

            m_condition.notify_one();
        });

        m_thread = std::move(thread);
    }

    /**
     * @return true if the worker has finished a task and can provide an output result.
     * Not synonym for ready(): the only difference is just after construction of the consumer: at this time,
     * ready() returns true and done() returns false. In others cases, the two functions returns the same value.
     */
    bool done() const
    {
        std::lock_guard lock(m_mutex);
        return m_done;
    }

    /**
     * @return the output of the latest task. Do not check if the object is the one default-constructed with this
     * object. After at least one task finished, the output is always the result of a preceding task (unless moved from
     * caller).
     */
    Output& output()
    { return m_output; }

    const Output& output() const
    { return m_output; }

protected:
    virtual Output start(Input &&input) = 0;

private:
    /**
     * Result of last computation. Default-constructed if the consumer has not be launched one time.
     */
    Output m_output;

    /**
     * Protect all this class private fields except m_output that should be accessed only after a task finished,
     * also without concurrency.
     */
    mutable std::mutex m_mutex;
    std::condition_variable m_condition;

    /**
     * Represents current operation thread (if any)
     */
    std::thread m_thread;

    bool m_waiting = true;
    bool m_done = false;
};

template class ThreadConsumer<int, int>; // To debug syntax errors
```

Thanks in advance.

#pragma once

#include <mutex>
#include <thread>
#include <condition_variable>

/**
 * Thread that infinitely make a task consuming each time a resource
 * When there is no more resource to consume, the thread exit.
 * When the thread is working, it cannot be canceled and wait the end of current operation to
 * ask if there is a pending request and see that there is no more pending request and also can end.
 */
template<typename Input, typename Output>
class ThreadConsumer
{
public:
    /**
     * Ensure cleanup before destruction
     */
    virtual ~ThreadConsumer()
    { stop(); }

    /**
     * Notify the consumer to shutdown and that no new input will be done.
     * If a task is currently running, wait the running task to finish before returns.
     * Used to join if a task is running before exiting, or free some output generated data.
     */
    void stop()
    {
        std::unique_lock lock(m_mutex);

        while(!m_waiting) {
            m_condition.wait(lock);
        }

        if(m_done) { // if zero tasks were accomplished, do not join the empty constructed default thread.
            m_thread.join(); // should returns immediately. Required & cleanup
        }
    }

    /**
     * @return true if the worker is waiting for an input resource to be processed.
     */
    bool ready() const
    {
        std::lock_guard lock(m_mutex);
        return m_waiting;
    }

    /**
     * Give a resource to the Thread. There is no process queue, the thread calling this function will wait
     * until the worker take the input. If the worker is waiting (that is ready() returned true in the current thread),
     * for an incoming resource, returns immediately.
     */
    void give(Input&& resource)
    {
        std::unique_lock lock(m_mutex);

        while(!m_waiting) {
            m_condition.wait(lock);
        }

        if(m_done) {
            m_thread.join(); // should return immediately. Required & cleanup
        }

        m_waiting = false;
        m_done = false;

        std::thread thread([&] {
            m_output = start(std::move(resource));

            std::lock_guard<std::mutex> lock(m_mutex);
            m_done = true;
            m_waiting = true;

            m_condition.notify_one();
        });

        m_thread = std::move(thread);
    }

    /**
     * @return true if the worker has finished a task and can provide an output result.
     * Not synonym for ready(): the only difference is just after construction of the consumer: at this time,
     * ready() returns true and done() returns false. In others cases, the two functions returns the same value.
     */
    bool done() const
    {
        std::lock_guard lock(m_mutex);
        return m_done;
    }

    /**
     * @return the output of the latest task. Do not check if the object is the one default-constructed with this
     * object. After at least one task finished, the output is always the result of a preceding task (unless moved from
     * caller).
     */
    Output& output()
    { return m_output; }

    const Output& output() const
    { return m_output; }

protected:
    virtual Output start(Input &&input) = 0;

private:
    /**
     * Result of last computation. Default-constructed if the consumer has not be launched one time.
     */
    Output m_output;

    /**
     * Protect all this class private fields except m_output that should be accessed only after a task finished,
     * also without concurrency.
     */
    mutable std::mutex m_mutex;
    std::condition_variable m_condition;

    /**
     * Represents current operation thread (if any)
     */
    std::thread m_thread;

    bool m_waiting = true;
    bool m_done = false;
};

template class ThreadConsumer<int, int>; // To debug syntax errors
```
#pragma once

#include <mutex>
#include <thread>
#include <condition_variable>

/**
 * Thread that infinitely make a task consuming each time a resource
 * When there is no more resource to consume, the thread exit.
 * When the thread is working, it cannot be canceled and wait the end of current operation to
 * ask if there is a pending request and see that there is no more pending request and also can end.
 */
template<typename Input, typename Output>
class ThreadConsumer
{
public:
    /**
     * Ensure cleanup before destruction
     */
    virtual ~ThreadConsumer()
    { stop(); }

    /**
     * Notify the consumer to shutdown and that no new input will be done.
     * If a task is currently running, wait the running task to finish before returns.
     * Used to join if a task is running before exiting, or free some output generated data.
     */
    void stop()
    {
        std::unique_lock lock(m_mutex);

        while(!m_waiting) {
            m_condition.wait(lock);
        }

        if(m_done) { // if zero tasks were accomplished, do not join the empty constructed default thread.
            m_thread.join(); // should returns immediately. Required & cleanup
        }
    }

    /**
     * @return true if the worker is waiting for an input resource to be processed.
     */
    bool ready() const
    {
        std::lock_guard lock(m_mutex);
        return m_waiting;
    }

    /**
     * Give a resource to the Thread. There is no process queue, the thread calling this function will wait
     * until the worker take the input. If the worker is waiting (that is ready() returned true in the current thread),
     * for an incoming resource, returns immediately.
     */
    void give(Input&& resource)
    {
        std::unique_lock lock(m_mutex);

        while(!m_waiting) {
            m_condition.wait(lock);
        }

        if(m_done) {
            m_thread.join(); // should return immediately. Required & cleanup
        }

        m_waiting = false;
        m_done = false;

        std::thread thread([&] {
            m_output = start(std::move(resource));

            std::lock_guard<std::mutex> lock(m_mutex);
            m_done = true;
            m_waiting = true;

            m_condition.notify_one();
        });

        m_thread = std::move(thread);
    }

    /**
     * @return true if the worker has finished a task and can provide an output result.
     * Not synonym for ready(): the only difference is just after construction of the consumer: at this time,
     * ready() returns true and done() returns false. In others cases, the two functions returns the same value.
     */
    bool done() const
    {
        std::lock_guard lock(m_mutex);
        return m_done;
    }

    /**
     * @return the output of the latest task. Do not check if the object is the one default-constructed with this
     * object. After at least one task finished, the output is always the result of a preceding task (unless moved from
     * caller).
     */
    Output& output()
    { return m_output; }

    const Output& output() const
    { return m_output; }

protected:
    virtual Output start(Input &&input) = 0;

private:
    /**
     * Result of last computation. Default-constructed if the consumer has not be launched one time.
     */
    Output m_output;

    /**
     * Protect all this class private fields except m_output that should be accessed only after a task finished,
     * also without concurrency.
     */
    mutable std::mutex m_mutex;
    std::condition_variable m_condition;

    /**
     * Represents current operation thread (if any)
     */
    std::thread m_thread;

    bool m_waiting = true;
    bool m_done = false;
};

template class ThreadConsumer<int, int>; // To debug syntax errors

Thanks in advance.

Source Link
rafoo
  • 335
  • 2
  • 7
Loading