2

My application is based on the asio chat example and consists of a client and a server: - Client: Connect to the server, receive requests and respond to it - Server: Has a QT GUI (main thread) and a network service (separate thread) listening for connections, sending requests to particular clients and interprets the response from/in the GUI

I want to achieve this in an asynchronous way to avoid a seperate thread for each client connection.

In my QT window, I have one io_service instance and one instance of my network service:

io_service_ = new asio::io_service();
asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), "1234");
service_ = new Service(*io_service_, endpoint, this);

asio::io_service* ioServicePointer = io_service_;
t = std::thread{ [ioServicePointer](){ ioServicePointer->run(); } };

I want to be able to send data to one client, like this:

service_->send_message(selectedClient.id, msg);

And I am receiving and handling the responses via the observer pattern (the window implements the IStreamListener interface)

Service.cpp:

#include "Service.h"
#include "Stream.h"

void Service::runAcceptor()
{
    acceptor_.async_accept(socket_,
        [this](asio::error_code ec)
    {
        if (!ec)
        {
            std::make_shared<Stream>(std::move(socket_), &streams_)->start();

        }

        runAcceptor();
    });
}

void Service::send_message(std::string streamID, chat_message& msg)
{

    io_service_.post(
        [this, msg, streamID]()
    {
        auto stream = streams_.getStreamByID(streamID);
        stream->deliver(msg);
    }); 

}

Stream.cpp:

#include "Stream.h"
#include <iostream>
#include "../chat_message.h"

Stream::Stream(asio::ip::tcp::socket socket, StreamCollection* streams)
    : socket_(std::move(socket))
{
    streams_ = streams;         // keep a reference to the streamCollection

    // retrieve endpoint ip
    asio::ip::tcp::endpoint remote_ep = socket_.remote_endpoint();
    asio::ip::address remote_ad = remote_ep.address();
    this->ip_ = remote_ad.to_string();      
}

void Stream::start()
{
    streams_->join(shared_from_this());
    readHeader();
}

void Stream::deliver(const chat_message& msg)
{
    bool write_in_progress = !write_msgs_.empty();
    write_msgs_.push_back(msg);
    if (!write_in_progress)
    {
        write();
    }
}

std::string Stream::getName()
{
    return name_;
}

std::string Stream::getIP()
{
    return ip_;
}


void Stream::RegisterListener(IStreamListener *l)
{
    m_listeners.insert(l);
}

void Stream::UnregisterListener(IStreamListener *l)
{
    std::set<IStreamListener *>::const_iterator iter = m_listeners.find(l);
    if (iter != m_listeners.end())
    {
        m_listeners.erase(iter);
    }
    else {
        std::cerr << "Could not unregister the specified listener object as it is not registered." << std::endl;
    }
}

void Stream::readHeader()
{
    auto self(shared_from_this());
    asio::async_read(socket_,
        asio::buffer(read_msg_.data(), chat_message::header_length),
        [this, self](asio::error_code ec, std::size_t /*length*/)
    {
        if (!ec && read_msg_.decode_header())
        {
            readBody();
        }
        else if (ec == asio::error::eof || ec == asio::error::connection_reset)
        {
            std::for_each(m_listeners.begin(), m_listeners.end(), [&](IStreamListener *l) {l->onStreamDisconnecting(this->id()); });
            streams_->die(shared_from_this());
        }
        else
        {
            std::cerr << "Exception: " << ec.message();
        }
    });
}

void Stream::readBody()
{
    auto self(shared_from_this());
    asio::async_read(socket_,
        asio::buffer(read_msg_.body(), read_msg_.body_length()),
        [this, self](asio::error_code ec, std::size_t /*length*/)
    {
        if (!ec)
        {
                    // notify the listener (GUI) that a response has arrived and pass a reference to it

            auto msg = std::make_shared<chat_message>(std::move(read_msg_));

            std::for_each(m_listeners.begin(), m_listeners.end(), [&](IStreamListener *l) {l->onMessageReceived(msg); });

            readHeader();
        }
        else
        {
            streams_->die(shared_from_this());
        }
    });
}

void Stream::write()
{
    auto self(shared_from_this());
    asio::async_write(socket_,
        asio::buffer(write_msgs_.front().data(),
        write_msgs_.front().length()),
        [this, self](asio::error_code ec, std::size_t /*length*/)
    {
        if (!ec)
        {
            write_msgs_.pop_front();
            if (!write_msgs_.empty())
            {
                write();
            }
        }
        else
        {
            streams_->die(shared_from_this());
        }
    });
}

Interfaces

 class IStream
{
public: 
    /// Unique stream identifier
    typedef void* TId;
    virtual TId id() const
    {
        return (TId)(this);
    }

    virtual ~IStream() {}
    virtual void deliver(const chat_message& msg) = 0;

    virtual std::string getName() = 0;
    virtual std::string getIP() = 0;

    /// observer pattern    
    virtual void RegisterListener(IStreamListener *l) = 0;
    virtual void UnregisterListener(IStreamListener *l) = 0;
};

 class IStreamListener
{
public:
    virtual void onStreamDisconnecting(IStream::TId streamId) = 0;
    virtual void onMessageReceived(std::shared_ptr<chat_message> msg) = 0;

};

/*
    streamCollection / service delegates
*/
class IStreamCollectionListener
{
public:
    virtual void onStreamDied(IStream::TId streamId) = 0;
    virtual void onStreamCreated(std::shared_ptr<IStream> stream) = 0;

};

StreamCollection is basically a set of IStreams:

 class StreamCollection
{
public:
    void join(stream_ptr stream)
    {
        streams_.insert(stream);
        std::for_each(m_listeners.begin(), m_listeners.end(), [&](IStreamCollectionListener *l) {l->onStreamCreated(stream); });


    }
    // more events and observer pattern inplementation

First of all: The code works as intended so far.

My question: Is this the way ASIO is supposed to be used for asynchronous programming? I'm especially unsure about the Service::send_message method and the use of io_service.post. What is it's purpose in my case? It did work too when I just called async_write, without wrapping it in the io_service.post call.

Am I running into problems with this approach?

1 Answer 1

2
+100

Asio is designed to be a tookit rather than a framework. As such, there are various ways to successfully use it. Separating the GUI and network threads, and using asynchronous I/O for scalability can be a good idea.

Delegating work to the io_service within a public API, such as Service::send_message(), has the following consequences:

  • decouples the caller's thread from the thread(s) servicing the io_service. For example, if Stream::write() performs a time consuming cryptographic function, the caller thread (GUI) would not be impacted.
  • it provides thread-safety. The io_service is thread-safe; however socket is not thread-safe. Additionally, other objects may not be thread safe, such as write_msgs_. Asio guarantees that handlers will only be invoked from within threads running the io_servce. Consequently, if only one thread is running the io_service, then there is no possibility for concurrency and both socket_ and write_msgs_ will be accessed in a thread-safe manner. Asio refers to this as an implicit strand. If more than one thread is processing the io_service, then one may need to use an explicit strand to provide thread safety. See this answer for more details on strands.

Additional Asio considerations:

  • Observers are invoked within handlers, and handlers are running within the network thread. If any observer takes a long time to complete, such as having to synchronize with various shared objects touched by the GUI thread, then it could create poor responsiveness across other operations. Consider using a queue to broker events between the observer and subject components. For instance, one could use another io_service as a queue, that is being ran by its own thread, and post into it:

    auto msg = std::make_shared<chat_message>(std::move(read_msg_));
    for (auto l: m_listeners)
        dispatch_io_service.post([=](){ l->onMessageReceived(msg); });
    
  • Verify that the container type for write_msgs_ does not invalidate iterators, pointers and references to existing elements on push_back() and other elements for pop_front(). For instance, using std::list or std::dequeue would be safe, but a std::vector may invalidate references to existing elements on push_back.

  • StreamCollection::die() may be called multiple times for a single Stream. This function should either be idempotent or handle the side effects appropriately.
  • On failure for a given Stream, its listeners are informed of a disconnect only in one path: failing to read a header with an error of asio::error::eof or asio::error::connection_reset. Other paths do not invoke IStreamListener.onStreamDisconnecting():
    • the header is read, but decoding failed. In this particular case, the entire read chain will stop without informing other components. The only indication that a problem has occurred is a print statement to std::cerr.
    • when there is a failure reading the body.
Sign up to request clarification or add additional context in comments.

2 Comments

Code-review feedback: consider using ranged-based for loops; be consistent with styling and naming: streams_ vs m_listeners; the Service and Stream family types could benefit from more meaningful names (e.g. is IStream and input stream or a stream interface)?
Thank you very much for your feedback!

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.