by Michael Shoffner

Write your own MOM!

how-to
May 1, 199818 mins

Write your own general-purpose, message-oriented middleware

MOM is misunderstood, and MOM gets no credit. You may have heard this one before, but in the distributed systems arena it’s actually true! This is because message-oriented middleware (MOM) has traditionally not enjoyed the same level of sophistication and support as other technologies used in distributed communications frameworks.

But times are changing. With the introduction of sophisticated, robust vendor offerings, interest in MOM systems is growing rapidly. Good MOM implementations provide a high-level applications interface, quality of service guarantees, and a host of services such as security, message queueing, and directory support that are necessary for “industrial-strength” distributed communications.

Distributed communications frameworks

The purpose of a distributed communications framework is to provide a good way for the parts of a distributed system to communicate. Object-oriented frameworks accomplish this task by providing distributed objects with a way to message each other.

The distributed object-oriented frameworks that get the most attention are those that model messaging as method calls. CORBA and RMI are two excellent examples of this type of framework (see Resources). These systems are often called remote procedure call (RPC) systems. The magic of these systems is that they make remote procedure (or method) calls appear to be local procedure calls (LPCs).

RPCs are architected on the client/server pattern. For example, CORBA objects that expose methods to be called by remote objects are called (and are) servers.

Introducing MOM

In contrast to RPCs, MOMs don’t model messages as method calls; instead, they model them as events in an event delivery system. Clients send and receive events, or “messages,” via APIs that the MOM provides. The MOM may present directory services that let clients look up another application which is acting as a server, or it may present all-purpose “channels” that let a group of clients communicate as peers, or it may present both options.

All applications communicate directly with each other using the MOM. Messages generated by applications are meaningful only to other clients because the MOM itself is only a message router (and in some cases a message queueing system as well).

MOMs come in all shapes and sizes

All MOMs share two fundamental characteristics: they enable message-passing and the message-passing is non-blocking. Beyond these basics, vendors can implement any number of different interfaces and services.

Many MOMs provide a publish-and-subscribe interface to enable applications to publish and receive messages that they are interested in. This interface can take the form of a channels-based system or a more simple system in which a client registers the types of messages it is interested in receiving.

Basic MOMs provide direct messaging only, no additional services. Advanced MOMs provide message queueing and guaranteed delivery, along with security, cross-platform data marshalling, scalability, and other benefits.

MOMs at a glance

Here’s a quick reference to help you get a handle on what MOMs are all about.

MOM advantages

  • Simple: Clients publish and subscribe

    publish-and-subscribe is a useful high-level abstraction for what apps need to do to communicate.

  • Easy: No complicated setup required

    MOMs are easy to install and use, unlike complex RPC-based systems like CORBA.

  • Generic: The same MOM can be used for multiple apps

    Because any given MOM system is essentially just a generic message transport, it can be reused in different applications without any additional work.

  • Flexible: Any and every kind of message can be passed

    Any message can be passed by the MOM. Because the MOM doesn’t understand the messages, it doesn’t matter what they are.

MOM disadvantages

  • Generic: Applications have to understand messages

    Making applications use messages instead of method calls may be tricky, especially if the application relies on the fact that method calls block.

  • Unfamiliar: Does not model method calls

    Developers unfamiliar with messages may have trouble figuring out how to use them effectively.

  • Asynchronous: Messages are non-blocking

    Messages are naturally non-blocking. This makes it harder to write apps that need blocking calls.

  • Too simple: No data marshalling

    Even simple RPC systems marshall data correctly. Simple MOMs may just send messages in which the bytes are out of order from the point of view of the receiver.

  • Non-standard: Vendors are all over the board

    Vendor MOM implementations do everything … and nothing.

    Caveat Emptor

    is the phrase to keep in mind when reviewing the various vendor offerings.

When are MOMs appropriate?

  • When communicating apps need to use messages
  • When programming staff is not wedded to client/server and RPC systems
  • When CORBA/RMI and related systems are too complex
  • When simple RPC systems are too rudimentary

Design considerations for our MOM

Now that the background is out of the way, let’s start putting together our MOM, the Message Bus. We’ll be using the MOM to enable communication between distributed whiteboard clients. (See Resources for links to information on the whiteboard application we’ve been working with in the past few installments.)

The driving consideration for the Message Bus is that it provide a convenient high-level communications interface to the application objects that will use it.

Because a channel makes sense as the central service that the Message Bus should provide, the interface to the Message Bus is the Channel class. The client uses the Channel class to access every high-level function of the Message Bus, from subscribing and publishing to listing available channels in the system.

The Channel class exposes class methods that affect the Message Bus as a whole, or pertain to all channels. Each channel instance represents a single channel in the system and exposes channel-specific methods.

Two interfaces, ChannelListener and ChannelsUpdateListener, are provided for the purposes of subscribing to receive messages on a channel and receiving notification of channel addition, respectively.

The image below illustrates the Message Bus system architecture.

Class Channel is the high-level interface to the client. Under the hood, different transport implementations can be used interchangeably.

Under the hood

Under the hood, the Message Bus application uses class methods and data structures of

Channel

to keep track of channels. Listeners to a channel implement the

ChannelListener

interface, and objects that want to receive updates about channel adds implement the

ChannelsUpdateListener

interface. Registered listener objects are called back by

Channel

whenever anything interesting happens. All communication with the outside world is done with a transport-specific implementation of the

MessageBus

interface, such as

MessageBusSocketImpl

.

Each MessageBus implementation passes messages by talking to a corresponding message-passing server, called a broker, over a shared network transport such as sockets or URL/servlets. The broker routes messages among MessageBus instances, each of which corresponds to a Channel class.

Because these transport-specific implementations all implement the MessageBus interface, they are interchangeable. For example, a servlet-based MessageBus and broker can be used by Channel in place of the sockets-based MessageBus and broker.

Our Message Bus is a simple peer-to-peer system based on channels, making it suitable for use in a peer-to-peer application such as a collaborative system.

Using the Message Bus in a client application

These steps allow a client to use the Message Bus:

  1. Set up an instance of MessageBus.

      Channel.setMessageBus (new MessageBusSocketImpl (BROKER_NAME, BROKER_PORT));
    

    In this call, a new MessageBus implementation is created, with the broker identified by the arguments to the constructor call.

  2. Subscribe to a channel.

      Channel textChannel = Channel.subscribe ("text_channel", this);
    

    This call returns an instance of the channel corresponding to the channel name argument. If the channel does not exist, it is created in the system.

    Passing this as an argument means that that caller is itself a ChannelListener. The caller can subscribe not just itself but any ChannelListener to the channel, or any number of listeners to a single channel.

  3. Publish a message to the channel.

      textChannel.publish (new String (myID + " says Hello!"));
    

    Publishing a message is easy and entails nothing more than calling publish() on the chosen channel instance. Note that the message can be any type of object, as long as other clients on the channel can understand it, and the server has access to the message class file(s) (as detailed in the Using the Message Bus section)

Additional optional steps include:

  • Unsubscribe a listener from a channel.

      textChannel.unsubscribe (ChannelListener);
    

    This method unsubscribes the named ChannelListener from the channel, which means that the listener will receive no new messages. Listeners should be unsubscribed in this manner when they are no longer needed.

  • Get a listing of channel names.

      Enumeration Channel.getChannelNames ();
    

    This method returns the names of all channels available on the Message Bus.

  • Subscribe to receive newly added channels.

      Channel.subscribeChannelsUpdate (ChannelsUpdateListener);
    

    A ChannelsUpdateListener can subscribe to get updates when channels are added to the Message Bus.

  • Stop receiving newly added channels.

      Channel.unsubscribeChannelsUpdate (ChannelsUpdateListener);
    

    A ChannelsUpdateListener can be unsubscribed from channel addition updates. Listeners should be unsubscribed in this manner when they are no longer needed.

  • Add more listeners to a channel.

      textChannel.subscribe (ChannelListener);
    

    This method allows the caller to subscribe additional listeners to a channel.

      String textChannel.getName ();
    

    This method returns the name of this channel instance.

Interface ChannelListener

The ChannelListener interface must be implemented by any object that wants to be updated when a message comes in on a particular channel.

public interface ChannelListener {
  public void messageReceived (Channel channel, Object message);
}

In most cases, a client that asks for a Channel instance will subscribe itself to the channel and implement this interface itself, but it isn’t necessary. In keeping with JDK 1.1 event adapters, a client can subscribe another object to a channel so that it will consume messages generated by the channel.

In fact, a single listener object can subscribe to multiple channels, which will call the listener’s messageReceived() every time a message comes in on any of the channels. The messageReceived () method call provides access to the channel where the message appeared, allowing messageReceived () to separate messages by originating channel.

Interface ChannelsUpdateListener

ChannelsUpdateListener must be implemented by any object that wants to be updated when a channel is added.

public interface ChannelsUpdateListener {
  public void channelAdded (String name);
}

Class Channel

The Channel class serves two purposes:

  • It provides a simple abstraction as an interface to the client using the Message Bus
  • It maintains global state about available channels and passes messages from channels to the MessageBus implementation and receives updates from the MessageBus implementation

Channel instances are created and stored by Channel‘s static code. References to them are passed out by Channel.subscribe () as requested by the client. Each Channel instance is unique within the JVM process.

public class Channel {

protected static boolean busSet = false; protected static MessageBus bus; protected static Hashtable channels = new Hashtable (); protected static Vector channelsUpdateListeners = new Vector ();

public static synchronized void setMessageBus (MessageBus mb) throws IOException { if (!busSet) { bus = mb; bus.initBroker (); busSet = true; } else System.out.println ("Can't set MessageBus more than once per runtime!"); }

public static String getBrokerName () { return bus.getBrokerName (); }

public static Enumeration getChannelNames () { return channels.keys (); }

These class methods allow the MessageBus instance to be set once for each runtime, and return information about the bus and channel names, respectively.

  public static synchronized Channel subscribe (String name, ChannelListener cl) throws IOException {
    Channel ch;
    if (channels.containsKey (name))
      ch = (Channel) channels.get (name);
    else {
      bus.addChannel (name);
      ch = new Channel (name);
      channels.put (name, ch);
    }
    ch.subscribe (cl);
    return ch;
  }

This class method returns the channel instance corresponding to the channel name. It creates the channel and calls MessageBus to add it to the system if it doesn’t already exist. As soon as the channel is created, its initial listener is registered with it.

// called by clients to register ChannelsUpdateListener public static void subscribeChannelsUpdates (ChannelsUpdateListener cul) { channelsUpdateListeners.addElement (cul); }

// called by clients to de-register ChannelsUpdateListener public static void unsubscribeChannelsUpdates (ChannelsUpdateListener cul) { channelsUpdateListeners.removeElement (cul); }

These class methods allow listeners to register and de-register for channel addition updates.

// called by MessageBus, broadcasts to ChannelsUpdateListeners protected static void channelAdded (String name) { if (!channels.containsKey (name)) { Vector l; synchronized (Channel.class) { channels.put (name, new Channel (name)); l = (Vector) channelsUpdateListeners.clone (); } Enumeration chs = l.elements (); while (chs.hasMoreElements ()) { ChannelsUpdateListener cul; cul = (ChannelsUpdateListener) chs.nextElement (); try { cul.channelAdded (name); } catch (Exception ex) { ex.printStackTrace (); } } } }

// called my MessageBus protected static void channelMessageReceived (String name, Object message) { Channel target = (Channel) channels.get (name); if (target != null) target.messageReceived (message); }

The underlying MessageBus implementation calls these class methods when a channel is added, or when a message is received on a channel. The methods redirect the call to the appropriate listeners.

channelAdded () loops through a vector of ChannelListeners and calls the channelAdded () method of each one. If a listener’s method throws an exception, it is ignored and does not stop the remaining listeners from receiving the update.

Due to the channelsUpdateListeners clone, a listener could possibly de-register itself and still receive an update. This situation could arise if the listener de-registers itself while the while loop is looping through the cloned listeners vector.

  // called by Channel instances
  protected static void publish (Channel channel, Object message) throws IOException {
    String name = channel.getName ();
    if (channels.containsKey (name))
      bus.publishChannelMessage (name, message);
  }

Channel instances call this method to publish messages. If the channel is in the system, the message is published by the underlying MessageBus implementation.

/* * Channel instance members */

protected String name; protected Vector listeners = new Vector (); protected Vector sentMessages = new Vector (); protected Vector messageSenders = new Vector ();

protected Channel (String name) { this.name = name; }

public void subscribe (ChannelListener cl) { listeners.addElement (cl); }

public String getName () { return name; }

These class methods enable subscription from additional listeners and the return of the channel name. The constructor is protected because only Channel.subscribe () is allowed to create channels.

public void publish (Object message) throws IOException { Channel.publish (this, message); }

public void unsubscribe (ChannelListener cl) { listeners.removeElement (cl); }

// called by Channel.channelMessageReceived () protected void messageReceived (Object message) { Vector l; synchronized (this) { l = (Vector) listeners.clone (); } Enumeration cls = l.elements (); while (cls.hasMoreElements ()) { ChannelListener cl = (ChannelListener) cls.nextElement (); try{ cl.messageReceived (this, message); } catch (Exception ex) { ex.printStackTrace (); } } } }

The publish () method calls the class publish () method to call MessageBus to publish the message. messageReceived() is called by Channel.channelMessageReceived() to broadcast the message to all ChannelListeners on the channel.

Interface MessageBus

All classes that want to act as a client message bus for Channel must implement the MessageBus interface.

import java.io.*;

public interface MessageBus { public void initBroker () throws IOException; public String getBrokerName (); public void addChannel (String name) throws IOException; public void publishChannelMessage (String name, Object message) throws IOException; }

The MessageBus interface provides a minimum number of methods that a MessageBus implementation must provide in order to be usable by the Channel class.

Message Bus messages

The Message Bus uses a set of messages to encapsulate client messages between the MessageBus implementation and the MessageBusBroker implementation.

interface BusMsg
class InitMsg
class ChannelAddedMsg
class ChannelUpdateMsg
class QuitMsg

You can easily add new messages as needed by implementing the BusMsg interface and setting up the handlers on both sides to recognize them and behave accordingly.

Class MessageBusSocketImpl

MessageBusSocketImpl is a sockets-based implementation of the MessageBus interface.

public class MessageBusSocketImpl implements MessageBus, Runnable {

protected String brokerName; protected Socket broker;

protected int port; protected Thread processor; protected ObjectInputStream objectIn; protected ObjectOutputStream objectOut;

public MessageBusSocketImpl (String name, int port) { this.brokerName = name; this.port = port; }

The constructor does nothing more than set up some instance variables.

  public void initBroker () throws IOException {
    broker = new Socket (brokerName, port);
    try {
      objectOut = new ObjectOutputStream (
        new BufferedOutputStream (broker.getOutputStream ()));
      objectOut.flush ();
      objectIn = new ObjectInputStream (
        new BufferedInputStream (broker.getInputStream ()));
      InitMsg initMsg = (InitMsg) objectIn.readObject ();
      Vector channelNames = initMsg.getChannelNames ();
      Enumeration names = channelNames.elements ();
      while (names.hasMoreElements ()) {
        String name = (String) names.nextElement ();
        Channel.channelAdded (name);
      }
      processor = new Thread (this);
      processor.start ();
    } catch (IOException ex) {
      try {
        broker.close ();
      } catch (IOException ignored) {
      }
      throw ex;
    } catch (ClassNotFoundException ex) {
      ex.printStackTrace ();
      try {
        broker.close ();
      } catch (IOException ignored) {
      }
    }
  }

The initBroker () method, which does the interesting setup work, is called by Channel once the MessageBus is set. In this implementation, an ObjectInputStream and ObjectOutputStream are instantiated, and an InitMsg is received on the ObjectInputStream over the socket. The InitMsg contains the channels currently on the system.

A new Thread, processor, is started to continue listening to the ObjectInputStream for incoming messages.

public String getBrokerName () { return brokerName; }

public synchronized void addChannel (String name) throws IOException { objectOut.writeObject (new ChannelAddedMsg (name)); objectOut.flush (); }

public synchronized void publishChannelMessage (String name, Object message) throws IOException { objectOut.writeObject (new ChannelUpdateMsg (name, message)); objectOut.flush (); }

These methods are called by Channel to add a channel and publish a message on a channel, respectively.

public synchronized void stop () { if (processor != null) { try { objectOut.writeObject (new QuitMsg ()); objectOut.flush (); } catch (IOException ignored) { } processor = null; } }

public void run () { // does not try to reestablish socket if connection is lost try { while (Thread.currentThread () == processor) processMsgs (); } catch (Exception ex) { ex.printStackTrace (); } finally { try { broker.close (); } catch (IOException ignored) {} } processor = null; }

The run () method loops until stopped. If an exception is generated, which will happen if the server hangs up on the client, the socket is closed and no attempt is made to set up another connection.

  void processMsgs () throws IOException, ClassNotFoundException {
    BusMsg msg = (BusMsg) objectIn.readObject ();
    if (msg instanceof ChannelAddedMsg) {
      String name = ((ChannelAddedMsg) msg).getName ();
      Channel.channelAdded (name);
    } else if (msg instanceof ChannelUpdateMsg) {
      String name = ((ChannelUpdateMsg) msg).getName ();
      Object message = ((ChannelUpdateMsg) msg).getMessage ();
      Channel.channelMessageReceived (name, message);
    } else {
      System.out.println ("Unknown message: " + msg);
    }
  }
}

The processMsgs() method blocks on the first statement until a new BusMsg has come across the wire from the broker. Once that happens, the type of the message is used by the handler to determine the proper action to take.

The information contained in the BusMsg is retrieved and the correct class method of Channel is called to pass the information on to the relevant client listeners.

Note: For brevity’s sake, the Message Bus broker, MessageBusBrokerSocketImpl, is not explained in detail here, but the source is available as part of the source download in Resources.

Using the Message Bus

First download the complete source (in tar or zip format) from Resources.

Using the Message Bus with a client is fairly straightforward. The most important step is defining a set of application-specific messages (MessageBusListAddElementMsg and MessageBusListReplaceElement in this case) that the bus will transport among clients.

An important implementation note: Once the application-specific messages are defined, you must place copies of their class files where the broker can see them. In other words, you will need to place them in the proper directory structure below the directory containing the broker (MessageBusBrokerSocketImpl), which in this case is MessageBusWhiteBoard/shoffner/step/may.

For example, the distributed whiteboard application (our demo app in this article) uses a subclass of ObservableList called MessageBusList, which allows the whiteboard to plug MessageBusList in as its ObservableList. MessageBusList is defined along with MessageBusList-specific versions of the whiteboard classes, MessageBusListWB and MessageBusListWBLauncherApplet, in MessageBusWhiteBoard/shoffner/step/may/wb.

Class files for MessageBusList‘s messages, MessageBusListReplaceElementMsg and MessageBusListAddElementMsg, are located in the same directory.

MessageBusBrokerSocketImpl is launched from the MessageBusWhiteBoard directory with the following command:

java shoffner.step.may.MessageBusBrokerSocketImpl 5001

Note that you can substitute whatever TCP port you wish to use for 5001, but you have to change the BROKER_PORT member of wb/MessageBusListWB.java to whatever port you put on the server’s command line.

When whiteboard applications launch, their respective MessageBusLists pass messages to each other to add and move elements in the shared whiteboard session. These messages are deserialized and reserialized at the server, and the server can see the classes it needs to do this because they are in the right place relative to where the server is launched.

Conclusion

The Message Bus MOM we implemented in this article is a useful — albeit simple — system, and a good alternative to RPC-based systems like CORBA and RMI. MOM is especially well-suited for situations in which clients collaborate or otherwise act as peers.

There are several optimizations you might consider implementing in your copious spare time, including message echo elimination, increased broker intelligence (so that the system doesn’t waste bandwidth broadcasting messages to instances of MessageBus that don’t have listeners on a given channel), and attempting reconnects to the broker if the connection is lost for some reason. Also, changing the implementation of the bus so that it doesn’t have to have access to the classes of the messages it transports would complete its generalization.

Other useful MOM features include access control, client-specific routing in addition to channel-based routing, and secure message transport. In addition, you might take a shot at a completely new implementation that uses HTTP.

You can certainly implement these features yourself, but you might want to investigate licensing from a vendor. Several MOM vendors are included in the Resources section.

Michael Shoffner is president of PDC, Inc., a software development firm specializing in professional Java training and Internet/intranet solutions. He is co-author of Java Network Programming (Manning/Prentice Hall) and is lead author of JavaWorld’s Java Step by Step column.