3
\$\begingroup\$

Now I have this simple message bus in Java. This one is as simplistic as I could get.

Code

com.github.coderodde.messagebus.AbstractMessageConsumer.java:

package com.github.coderodde.messagebus;

import java.util.Objects;

/**
 * This abstract class specifies a message consumer.
 * 
 * @author Rodion "rodde" Efremov
 * @version 1.6 (Dec 25, 2023)
 * @since 1.6 (Dec 25, 2023)
 */
public abstract class AbstractMessageConsumer {
    
    protected MessageBus messageBus;
    
    public AbstractMessageConsumer(MessageBus messageBus) {
        setMessageBus(messageBus);
    }
    
    public MessageBus getMessageBus() {
        return this.messageBus;
    }
    
    public void setMessageBus(MessageBus messageBus) {
        this.messageBus = 
                Objects.requireNonNull(
                        messageBus, 
                        "The input message bus is null.");
    }
    
    /**
     * Consumes a message and returns a possible output of type {@code O}.
     * 
     * @param message the message text to consume.
     */
    public abstract void consumeMessage(String message);
}

com.github.coderodde.messagebus.MessageBus.java:

package com.github.coderodde.messagebus;

import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Set;

/**
 * This class implements a message bus that keeps track of a set of message bus
 * subscribers. When one subscriber issues a message with a target message 
 * subscriber, the message is propagates to all the currently listening message
 * subscribers.
 * 
 * @author Rodion "rodde" Efremov
 * @version 1.6 (Dec 25, 2023)
 * @since 1.6 (Dec 25, 2023)
 */
public final class MessageBus {

    private final Set<MessageBusSubscriber> subscribers = new LinkedHashSet<>();
    
    public void addMessageBusSubscriber(
            MessageBusSubscriber messageBusSubscriber) {
        
        subscribers.add(
                Objects.requireNonNull(
                        messageBusSubscriber, 
                        "The input message bus subscriber is null."));
    }
    
    public void removeMessageBusSubscriber(
            MessageBusSubscriber messageBusSubscriber) {
        
        subscribers.remove(
                Objects.requireNonNull(
                        messageBusSubscriber, 
                        "The input message subscriber is null."));
    }
    
    public void receiveMessage(MessageBusSubscriber messageBusSubscriber,
                               String messageSubscriberAddress, 
                               String message) {
        
        Objects.requireNonNull(
                messageBusSubscriber,
                "The input message bus subscriber is null.");
        
        Objects.requireNonNull(
                messageSubscriberAddress, 
                "The input message subscriber address is null.");
        
        subscribers.forEach((subscriber) -> {
            
            if (!subscriber.equals(messageBusSubscriber)) {
                subscriber.receiveMessage(
                        messageSubscriberAddress, 
                        message);
            }
        });
    }
}

com.github.coderodde.messagebus.MessageBusDemo.java:

package com.github.coderodde.messagebus;


public class MessageBusDemo {

    public static void main(String[] args) {
        MessageBus messageBus = new MessageBus();
        
        MessageBusSubscriber subscriberAlice =
                new MessageBusSubscriber(
                        "Alice", 
                        messageBus);
        
        MessageBusSubscriber subscriberBob =
                new MessageBusSubscriber(
                        "Bob", 
                        messageBus);
        
        MessageBusSubscriber subscriberClarice =
                new MessageBusSubscriber(
                        "Clarice", 
                        messageBus);
        
        messageBus.addMessageBusSubscriber(subscriberAlice);
        messageBus.addMessageBusSubscriber(subscriberBob);
        messageBus.addMessageBusSubscriber(subscriberClarice);
        
        AbstractMessageConsumer subscriberConsumerAlice =
                new AbstractMessageConsumer(messageBus) {
                    
            @Override
            public void consumeMessage(String message) {
                long value = Long.parseLong(message);
                
                if (value == 10L) {
                    return;
                }
                
                System.out.println("Alice received value " + value + "!");
                
                messageBus.receiveMessage(
                        subscriberAlice, 
                        "Bob", 
                        String.format("%d", value + 1L));
            }
        };
        
        AbstractMessageConsumer subscriberConsumerBob = 
                new AbstractMessageConsumer(messageBus) {
                    
            @Override
            public void consumeMessage(String message) {
                long value = Long.parseLong(message);
                
                if (value == 10L) {
                    return;
                }
                
                System.out.println("Bob received value " + value + "!");
                
                messageBus.receiveMessage(
                        subscriberBob, 
                        "Clarice", 
                        String.format("%d", value + 1L));
            }
        };
        
        AbstractMessageConsumer subscriberConsumerClarice = 
                new AbstractMessageConsumer(messageBus) {
                    
            @Override
            public void consumeMessage(String message) {
                long value = Long.parseLong(message);
                
                if (value == 10L) {
                    return;
                }
                
                System.out.println("Clarice received value " + value + "!");
                
                messageBus.receiveMessage(
                        subscriberClarice, 
                        "Alice", 
                        String.format("%d", value + 1L));
            }
        };
        
        subscriberAlice   .setMessageConsumer(subscriberConsumerAlice);
        subscriberBob     .setMessageConsumer(subscriberConsumerBob);
        subscriberClarice .setMessageConsumer(subscriberConsumerClarice);
        
        subscriberClarice.sendMessage("Alice", "0");
        
        System.out.println("Message passing done!");
    }
}

com.github.coderodde.messagebus.MessageBusSubscriber.java:

package com.github.coderodde.messagebus;

import java.util.Objects;

/**
 * This class implements a message bus subscriber.
 * 
 * @author Rodion "rodde" Efremov
 * @version 1.6 (Dec 25, 2023)
 * @since 1.6 (Dec 25, 2023)
 */
public final class MessageBusSubscriber {
    
    private static long idCounter = 0;
    private static final Object ID_COUNTER_LOCK = new Object();
    
    private final long id;
    private final MessageBus messageBus;
    private final String messageBusSubscriberName;
    private AbstractMessageConsumer messageConsumer;
    
    public MessageBusSubscriber(
            String messageBusSubscriberName,
            MessageBus messageBus) { 
        
        this.id = getAndIncrementId();
        
        this.messageBusSubscriberName = 
                Objects.requireNonNull(
                        messageBusSubscriberName, 
                        "Subscriber name is null.");
        
        this.messageBus = Objects.requireNonNull(messageBus);
    }
    
    public AbstractMessageConsumer getMessageConsumer() {
        return this.messageConsumer;
    }
    
    public void setMessageConsumer(AbstractMessageConsumer consumer) {
        this.messageConsumer =
                Objects.requireNonNull(
                        consumer, 
                        "The input consumer is null.");
    }
    
    public void sendMessage(String messageSubscriberAddress, String message) {
        messageBus.receiveMessage(this, 
                                  messageSubscriberAddress, 
                                  message);
    }
    
    public void receiveMessage(String messageSubscriberAddress, String message) {
        if (!messageBusSubscriberName.equals(messageSubscriberAddress)) {
            return;
        }
        
        messageConsumer.consumeMessage(message);
    }
    
    @Override
    public boolean equals(Object o) {
        if (o == null) {
            return false;
        }
        
        if (!(o instanceof MessageBusSubscriber)) {
            return false;
        }
        
        MessageBusSubscriber other = (MessageBusSubscriber) o;
        return this.id == other.id;
    }
    
    @Override
    public int hashCode() {
        return Long.hashCode(id);
    }
    
    private static long getAndIncrementId() {
        synchronized (ID_COUNTER_LOCK) {
            return idCounter++;
        }
    }
}

Demo output

Alice received value 0!
Bob received value 1!
Clarice received value 2!
Alice received value 3!
Bob received value 4!
Clarice received value 5!
Alice received value 6!
Bob received value 7!
Clarice received value 8!
Alice received value 9!
Message passing done!

Critique request

As always, I would like to receive any comments. Especially, I am concerned whether I should make it concurrent.

\$\endgroup\$

3 Answers 3

4
\$\begingroup\$

Seems minor, but blocks like

/**
 * @author Rodion "rodde" Efremov
 * @version 1.6 (Dec 25, 2023)
 * @since 1.6 (Dec 25, 2023)

are doomed to become wrong. These coordinates are entirely captured in greater detail and accuracy via source control, and do not belong in code.

AbstractMessageConsumer.messageBus seems like something that should be final, and its set() removed.

Especially since you're (maybe) interested in concurrency, rewrite MessageBus.receiveMessage to look like

        subscribers.stream()
            .filter(s -> !messageBusSubscriber.equals(s))
            .forEach(subscriber ->
                subscriber.receiveMessage(
                    messageSubscriberAddress,
                    message
                )
            );

Concurrency can then be trivially introduced by replacing stream() with parallelStream().

Replace

System.out.println("Alice received value " + value + "!");

with

System.out.printf("Alice received value %d!%n", value);

and so on for Bob, etc.

String is not the first type I would have chosen for message passing - but this depends on usage and medium, and you haven't really specified either. All Strings are in UTF-16 (big asterisk), which is a very inefficient encoding if the message-passing is space- or bandwidth-sensitive. Lacking basically all useful context information, I would instead choose that all messages are children of Serializable, however that happens.

Asterisk: effective String packing strategy in memory is somewhat complicated.

\$\endgroup\$
1
\$\begingroup\$

Document the purpose of the subscriber and the consumer. Based on the names only they seem like they do the same thing, which is confusing.

All components are tightly coupled. A subscriber and the consumer have to know what the bus is. This seems unnecessary. As long as they receive the packages, it shouldn't matter whether it's the "UPS, FedEx or USPS" that delivers them. Only when a component is interested in sending something should the bus become relevant to them. Unless there is a reason for the coupling... in which case it should be documented.

\$\endgroup\$
1
\$\begingroup\$

I would definitely add asynchronous decoupling of sending and receiving messages, otherwise the code is "just" implementing the publish-subscribe-pattern, but not worth to be called a "message bus" (last part is just my personal opinion, don't take it too serious please ;-)).

Also I would be interested, why you put sendMessage and receiveMessage both into the subscriber class - by the naming I would expect a "subscriber" just to receive messages. Maybe you can split them into "producers" and "consumers", I think that these naming would be more appropriate for a message bus.

Both producers and consumers could share an interface or logic to register themselves with the bus.

Finally I think that your AbstractMessageConsumer does not provide real benefit, a simple interface with the consumeMessage method would have been sufficient and also what I would have expected from a lib to provide.

\$\endgroup\$

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.