Now I have this simple message bus in Java. This one is as simplistic as I could get.
Code
com.github.coderodde.messagebus.AbstractMessageConsumer.java:
package com.github.coderodde.messagebus;
import java.util.Objects;
/**
* This abstract class specifies a message consumer.
*
* @author Rodion "rodde" Efremov
* @version 1.6 (Dec 25, 2023)
* @since 1.6 (Dec 25, 2023)
*/
public abstract class AbstractMessageConsumer {
protected MessageBus messageBus;
public AbstractMessageConsumer(MessageBus messageBus) {
setMessageBus(messageBus);
}
public MessageBus getMessageBus() {
return this.messageBus;
}
public void setMessageBus(MessageBus messageBus) {
this.messageBus =
Objects.requireNonNull(
messageBus,
"The input message bus is null.");
}
/**
* Consumes a message and returns a possible output of type {@code O}.
*
* @param message the message text to consume.
*/
public abstract void consumeMessage(String message);
}
com.github.coderodde.messagebus.MessageBus.java:
package com.github.coderodde.messagebus;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Set;
/**
* This class implements a message bus that keeps track of a set of message bus
* subscribers. When one subscriber issues a message with a target message
* subscriber, the message is propagates to all the currently listening message
* subscribers.
*
* @author Rodion "rodde" Efremov
* @version 1.6 (Dec 25, 2023)
* @since 1.6 (Dec 25, 2023)
*/
public final class MessageBus {
private final Set<MessageBusSubscriber> subscribers = new LinkedHashSet<>();
public void addMessageBusSubscriber(
MessageBusSubscriber messageBusSubscriber) {
subscribers.add(
Objects.requireNonNull(
messageBusSubscriber,
"The input message bus subscriber is null."));
}
public void removeMessageBusSubscriber(
MessageBusSubscriber messageBusSubscriber) {
subscribers.remove(
Objects.requireNonNull(
messageBusSubscriber,
"The input message subscriber is null."));
}
public void receiveMessage(MessageBusSubscriber messageBusSubscriber,
String messageSubscriberAddress,
String message) {
Objects.requireNonNull(
messageBusSubscriber,
"The input message bus subscriber is null.");
Objects.requireNonNull(
messageSubscriberAddress,
"The input message subscriber address is null.");
subscribers.forEach((subscriber) -> {
if (!subscriber.equals(messageBusSubscriber)) {
subscriber.receiveMessage(
messageSubscriberAddress,
message);
}
});
}
}
com.github.coderodde.messagebus.MessageBusDemo.java:
package com.github.coderodde.messagebus;
public class MessageBusDemo {
public static void main(String[] args) {
MessageBus messageBus = new MessageBus();
MessageBusSubscriber subscriberAlice =
new MessageBusSubscriber(
"Alice",
messageBus);
MessageBusSubscriber subscriberBob =
new MessageBusSubscriber(
"Bob",
messageBus);
MessageBusSubscriber subscriberClarice =
new MessageBusSubscriber(
"Clarice",
messageBus);
messageBus.addMessageBusSubscriber(subscriberAlice);
messageBus.addMessageBusSubscriber(subscriberBob);
messageBus.addMessageBusSubscriber(subscriberClarice);
AbstractMessageConsumer subscriberConsumerAlice =
new AbstractMessageConsumer(messageBus) {
@Override
public void consumeMessage(String message) {
long value = Long.parseLong(message);
if (value == 10L) {
return;
}
System.out.println("Alice received value " + value + "!");
messageBus.receiveMessage(
subscriberAlice,
"Bob",
String.format("%d", value + 1L));
}
};
AbstractMessageConsumer subscriberConsumerBob =
new AbstractMessageConsumer(messageBus) {
@Override
public void consumeMessage(String message) {
long value = Long.parseLong(message);
if (value == 10L) {
return;
}
System.out.println("Bob received value " + value + "!");
messageBus.receiveMessage(
subscriberBob,
"Clarice",
String.format("%d", value + 1L));
}
};
AbstractMessageConsumer subscriberConsumerClarice =
new AbstractMessageConsumer(messageBus) {
@Override
public void consumeMessage(String message) {
long value = Long.parseLong(message);
if (value == 10L) {
return;
}
System.out.println("Clarice received value " + value + "!");
messageBus.receiveMessage(
subscriberClarice,
"Alice",
String.format("%d", value + 1L));
}
};
subscriberAlice .setMessageConsumer(subscriberConsumerAlice);
subscriberBob .setMessageConsumer(subscriberConsumerBob);
subscriberClarice .setMessageConsumer(subscriberConsumerClarice);
subscriberClarice.sendMessage("Alice", "0");
System.out.println("Message passing done!");
}
}
com.github.coderodde.messagebus.MessageBusSubscriber.java:
package com.github.coderodde.messagebus;
import java.util.Objects;
/**
* This class implements a message bus subscriber.
*
* @author Rodion "rodde" Efremov
* @version 1.6 (Dec 25, 2023)
* @since 1.6 (Dec 25, 2023)
*/
public final class MessageBusSubscriber {
private static long idCounter = 0;
private static final Object ID_COUNTER_LOCK = new Object();
private final long id;
private final MessageBus messageBus;
private final String messageBusSubscriberName;
private AbstractMessageConsumer messageConsumer;
public MessageBusSubscriber(
String messageBusSubscriberName,
MessageBus messageBus) {
this.id = getAndIncrementId();
this.messageBusSubscriberName =
Objects.requireNonNull(
messageBusSubscriberName,
"Subscriber name is null.");
this.messageBus = Objects.requireNonNull(messageBus);
}
public AbstractMessageConsumer getMessageConsumer() {
return this.messageConsumer;
}
public void setMessageConsumer(AbstractMessageConsumer consumer) {
this.messageConsumer =
Objects.requireNonNull(
consumer,
"The input consumer is null.");
}
public void sendMessage(String messageSubscriberAddress, String message) {
messageBus.receiveMessage(this,
messageSubscriberAddress,
message);
}
public void receiveMessage(String messageSubscriberAddress, String message) {
if (!messageBusSubscriberName.equals(messageSubscriberAddress)) {
return;
}
messageConsumer.consumeMessage(message);
}
@Override
public boolean equals(Object o) {
if (o == null) {
return false;
}
if (!(o instanceof MessageBusSubscriber)) {
return false;
}
MessageBusSubscriber other = (MessageBusSubscriber) o;
return this.id == other.id;
}
@Override
public int hashCode() {
return Long.hashCode(id);
}
private static long getAndIncrementId() {
synchronized (ID_COUNTER_LOCK) {
return idCounter++;
}
}
}
Demo output
Alice received value 0!
Bob received value 1!
Clarice received value 2!
Alice received value 3!
Bob received value 4!
Clarice received value 5!
Alice received value 6!
Bob received value 7!
Clarice received value 8!
Alice received value 9!
Message passing done!
Critique request
As always, I would like to receive any comments. Especially, I am concerned whether I should make it concurrent.