Write your own general-purpose, message-oriented middleware MOM is misunderstood, and MOM gets no credit. You may have heard this one before, but in the distributed systems arena it’s actually true! This is because message-oriented middleware (MOM) has traditionally not enjoyed the same level of sophistication and support as other technologies used in distributed communications frameworks.But times are changing. With the introduction of sophisticated, robust vendor offerings, interest in MOM systems is growing rapidly. Good MOM implementations provide a high-level applications interface, quality of service guarantees, and a host of services such as security, message queueing, and directory support that are necessary for “industrial-strength” distributed communications.Distributed communications frameworksThe purpose of a distributed communications framework is to provide a good way for the parts of a distributed system to communicate. Object-oriented frameworks accomplish this task by providing distributed objects with a way to message each other. The distributed object-oriented frameworks that get the most attention are those that model messaging as method calls. CORBA and RMI are two excellent examples of this type of framework (see Resources). These systems are often called remote procedure call (RPC) systems. The magic of these systems is that they make remote procedure (or method) calls appear to be local procedure calls (LPCs).RPCs are architected on the client/server pattern. For example, CORBA objects that expose methods to be called by remote objects are called (and are) servers.Introducing MOMIn contrast to RPCs, MOMs don’t model messages as method calls; instead, they model them as events in an event delivery system. Clients send and receive events, or “messages,” via APIs that the MOM provides. The MOM may present directory services that let clients look up another application which is acting as a server, or it may present all-purpose “channels” that let a group of clients communicate as peers, or it may present both options. All applications communicate directly with each other using the MOM. Messages generated by applications are meaningful only to other clients because the MOM itself is only a message router (and in some cases a message queueing system as well).MOMs come in all shapes and sizes All MOMs share two fundamental characteristics: they enable message-passing and the message-passing is non-blocking. Beyond these basics, vendors can implement any number of different interfaces and services. Many MOMs provide a publish-and-subscribe interface to enable applications to publish and receive messages that they are interested in. This interface can take the form of a channels-based system or a more simple system in which a client registers the types of messages it is interested in receiving.Basic MOMs provide direct messaging only, no additional services. Advanced MOMs provide message queueing and guaranteed delivery, along with security, cross-platform data marshalling, scalability, and other benefits.MOMs at a glanceHere’s a quick reference to help you get a handle on what MOMs are all about. MOM advantagesSimple: Clients publish and subscribe publish-and-subscribe is a useful high-level abstraction for what apps need to do to communicate.Easy: No complicated setup required MOMs are easy to install and use, unlike complex RPC-based systems like CORBA.Generic: The same MOM can be used for multiple apps Because any given MOM system is essentially just a generic message transport, it can be reused in different applications without any additional work.Flexible: Any and every kind of message can be passed Any message can be passed by the MOM. Because the MOM doesn’t understand the messages, it doesn’t matter what they are.MOM disadvantagesGeneric: Applications have to understand messages Making applications use messages instead of method calls may be tricky, especially if the application relies on the fact that method calls block.Unfamiliar: Does not model method calls Developers unfamiliar with messages may have trouble figuring out how to use them effectively.Asynchronous: Messages are non-blocking Messages are naturally non-blocking. This makes it harder to write apps that need blocking calls.Too simple: No data marshalling Even simple RPC systems marshall data correctly. Simple MOMs may just send messages in which the bytes are out of order from the point of view of the receiver.Non-standard: Vendors are all over the board Vendor MOM implementations do everything … and nothing. Caveat Emptor is the phrase to keep in mind when reviewing the various vendor offerings.When are MOMs appropriate? When communicating apps need to use messagesWhen programming staff is not wedded to client/server and RPC systemsWhen CORBA/RMI and related systems are too complexWhen simple RPC systems are too rudimentaryDesign considerations for our MOMNow that the background is out of the way, let’s start putting together our MOM, the Message Bus. We’ll be using the MOM to enable communication between distributed whiteboard clients. (See Resources for links to information on the whiteboard application we’ve been working with in the past few installments.)The driving consideration for the Message Bus is that it provide a convenient high-level communications interface to the application objects that will use it.Because a channel makes sense as the central service that the Message Bus should provide, the interface to the Message Bus is the Channel class. The client uses the Channel class to access every high-level function of the Message Bus, from subscribing and publishing to listing available channels in the system. The Channel class exposes class methods that affect the Message Bus as a whole, or pertain to all channels. Each channel instance represents a single channel in the system and exposes channel-specific methods.Two interfaces, ChannelListener and ChannelsUpdateListener, are provided for the purposes of subscribing to receive messages on a channel and receiving notification of channel addition, respectively.The image below illustrates the Message Bus system architecture. Class Channel is the high-level interface to the client. Under the hood, different transport implementations can be used interchangeably.Under the hoodUnder the hood, the Message Bus application uses class methods and data structures of Channel to keep track of channels. Listeners to a channel implement the ChannelListener interface, and objects that want to receive updates about channel adds implement the ChannelsUpdateListener interface. Registered listener objects are called back by Channel whenever anything interesting happens. All communication with the outside world is done with a transport-specific implementation of the MessageBus interface, such as MessageBusSocketImpl .Each MessageBus implementation passes messages by talking to a corresponding message-passing server, called a broker, over a shared network transport such as sockets or URL/servlets. The broker routes messages among MessageBus instances, each of which corresponds to a Channel class.Because these transport-specific implementations all implement the MessageBus interface, they are interchangeable. For example, a servlet-based MessageBus and broker can be used by Channel in place of the sockets-based MessageBus and broker. Our Message Bus is a simple peer-to-peer system based on channels, making it suitable for use in a peer-to-peer application such as a collaborative system.Using the Message Bus in a client applicationThese steps allow a client to use the Message Bus: Set up an instance of MessageBus. Channel.setMessageBus (new MessageBusSocketImpl (BROKER_NAME, BROKER_PORT)); In this call, a new MessageBus implementation is created, with the broker identified by the arguments to the constructor call.Subscribe to a channel. Channel textChannel = Channel.subscribe ("text_channel", this); This call returns an instance of the channel corresponding to the channel name argument. If the channel does not exist, it is created in the system.Passing this as an argument means that that caller is itself a ChannelListener. The caller can subscribe not just itself but any ChannelListener to the channel, or any number of listeners to a single channel.Publish a message to the channel. textChannel.publish (new String (myID + " says Hello!")); Publishing a message is easy and entails nothing more than calling publish() on the chosen channel instance. Note that the message can be any type of object, as long as other clients on the channel can understand it, and the server has access to the message class file(s) (as detailed in the Using the Message Bus section)Additional optional steps include:Unsubscribe a listener from a channel. textChannel.unsubscribe (ChannelListener); This method unsubscribes the named ChannelListener from the channel, which means that the listener will receive no new messages. Listeners should be unsubscribed in this manner when they are no longer needed.Get a listing of channel names. Enumeration Channel.getChannelNames (); This method returns the names of all channels available on the Message Bus.Subscribe to receive newly added channels. Channel.subscribeChannelsUpdate (ChannelsUpdateListener); A ChannelsUpdateListener can subscribe to get updates when channels are added to the Message Bus.Stop receiving newly added channels. Channel.unsubscribeChannelsUpdate (ChannelsUpdateListener); A ChannelsUpdateListener can be unsubscribed from channel addition updates. Listeners should be unsubscribed in this manner when they are no longer needed.Add more listeners to a channel. textChannel.subscribe (ChannelListener); This method allows the caller to subscribe additional listeners to a channel. String textChannel.getName (); This method returns the name of this channel instance.Interface ChannelListenerThe ChannelListener interface must be implemented by any object that wants to be updated when a message comes in on a particular channel. public interface ChannelListener { public void messageReceived (Channel channel, Object message); } In most cases, a client that asks for a Channel instance will subscribe itself to the channel and implement this interface itself, but it isn’t necessary. In keeping with JDK 1.1 event adapters, a client can subscribe another object to a channel so that it will consume messages generated by the channel.In fact, a single listener object can subscribe to multiple channels, which will call the listener’s messageReceived() every time a message comes in on any of the channels. The messageReceived () method call provides access to the channel where the message appeared, allowing messageReceived () to separate messages by originating channel.Interface ChannelsUpdateListenerChannelsUpdateListener must be implemented by any object that wants to be updated when a channel is added.public interface ChannelsUpdateListener { public void channelAdded (String name); } Class ChannelThe Channel class serves two purposes:It provides a simple abstraction as an interface to the client using the Message BusIt maintains global state about available channels and passes messages from channels to the MessageBus implementation and receives updates from the MessageBus implementationChannel instances are created and stored by Channel‘s static code. References to them are passed out by Channel.subscribe () as requested by the client. Each Channel instance is unique within the JVM process.public class Channel { protected static boolean busSet = false; protected static MessageBus bus; protected static Hashtable channels = new Hashtable (); protected static Vector channelsUpdateListeners = new Vector (); public static synchronized void setMessageBus (MessageBus mb) throws IOException { if (!busSet) { bus = mb; bus.initBroker (); busSet = true; } else System.out.println ("Can't set MessageBus more than once per runtime!"); } public static String getBrokerName () { return bus.getBrokerName (); } public static Enumeration getChannelNames () { return channels.keys (); } These class methods allow the MessageBus instance to be set once for each runtime, and return information about the bus and channel names, respectively. public static synchronized Channel subscribe (String name, ChannelListener cl) throws IOException { Channel ch; if (channels.containsKey (name)) ch = (Channel) channels.get (name); else { bus.addChannel (name); ch = new Channel (name); channels.put (name, ch); } ch.subscribe (cl); return ch; } This class method returns the channel instance corresponding to the channel name. It creates the channel and calls MessageBus to add it to the system if it doesn’t already exist. As soon as the channel is created, its initial listener is registered with it. // called by clients to register ChannelsUpdateListener public static void subscribeChannelsUpdates (ChannelsUpdateListener cul) { channelsUpdateListeners.addElement (cul); } // called by clients to de-register ChannelsUpdateListener public static void unsubscribeChannelsUpdates (ChannelsUpdateListener cul) { channelsUpdateListeners.removeElement (cul); } These class methods allow listeners to register and de-register for channel addition updates. // called by MessageBus, broadcasts to ChannelsUpdateListeners protected static void channelAdded (String name) { if (!channels.containsKey (name)) { Vector l; synchronized (Channel.class) { channels.put (name, new Channel (name)); l = (Vector) channelsUpdateListeners.clone (); } Enumeration chs = l.elements (); while (chs.hasMoreElements ()) { ChannelsUpdateListener cul; cul = (ChannelsUpdateListener) chs.nextElement (); try { cul.channelAdded (name); } catch (Exception ex) { ex.printStackTrace (); } } } } // called my MessageBus protected static void channelMessageReceived (String name, Object message) { Channel target = (Channel) channels.get (name); if (target != null) target.messageReceived (message); } The underlying MessageBus implementation calls these class methods when a channel is added, or when a message is received on a channel. The methods redirect the call to the appropriate listeners.channelAdded () loops through a vector of ChannelListeners and calls the channelAdded () method of each one. If a listener’s method throws an exception, it is ignored and does not stop the remaining listeners from receiving the update.Due to the channelsUpdateListeners clone, a listener could possibly de-register itself and still receive an update. This situation could arise if the listener de-registers itself while the while loop is looping through the cloned listeners vector. // called by Channel instances protected static void publish (Channel channel, Object message) throws IOException { String name = channel.getName (); if (channels.containsKey (name)) bus.publishChannelMessage (name, message); } Channel instances call this method to publish messages. If the channel is in the system, the message is published by the underlying MessageBus implementation. /* * Channel instance members */ protected String name; protected Vector listeners = new Vector (); protected Vector sentMessages = new Vector (); protected Vector messageSenders = new Vector (); protected Channel (String name) { this.name = name; } public void subscribe (ChannelListener cl) { listeners.addElement (cl); } public String getName () { return name; } These class methods enable subscription from additional listeners and the return of the channel name. The constructor is protected because only Channel.subscribe () is allowed to create channels. public void publish (Object message) throws IOException { Channel.publish (this, message); } public void unsubscribe (ChannelListener cl) { listeners.removeElement (cl); } // called by Channel.channelMessageReceived () protected void messageReceived (Object message) { Vector l; synchronized (this) { l = (Vector) listeners.clone (); } Enumeration cls = l.elements (); while (cls.hasMoreElements ()) { ChannelListener cl = (ChannelListener) cls.nextElement (); try{ cl.messageReceived (this, message); } catch (Exception ex) { ex.printStackTrace (); } } } } The publish () method calls the class publish () method to call MessageBus to publish the message. messageReceived() is called by Channel.channelMessageReceived() to broadcast the message to all ChannelListeners on the channel.Interface MessageBusAll classes that want to act as a client message bus for Channel must implement the MessageBus interface. import java.io.*; public interface MessageBus { public void initBroker () throws IOException; public String getBrokerName (); public void addChannel (String name) throws IOException; public void publishChannelMessage (String name, Object message) throws IOException; } The MessageBus interface provides a minimum number of methods that a MessageBus implementation must provide in order to be usable by the Channel class.Message Bus messagesThe Message Bus uses a set of messages to encapsulate client messages between the MessageBus implementation and the MessageBusBroker implementation.interface BusMsg class InitMsg class ChannelAddedMsg class ChannelUpdateMsg class QuitMsg You can easily add new messages as needed by implementing the BusMsg interface and setting up the handlers on both sides to recognize them and behave accordingly.Class MessageBusSocketImplMessageBusSocketImpl is a sockets-based implementation of the MessageBus interface.public class MessageBusSocketImpl implements MessageBus, Runnable { protected String brokerName; protected Socket broker; protected int port; protected Thread processor; protected ObjectInputStream objectIn; protected ObjectOutputStream objectOut; public MessageBusSocketImpl (String name, int port) { this.brokerName = name; this.port = port; } The constructor does nothing more than set up some instance variables. public void initBroker () throws IOException { broker = new Socket (brokerName, port); try { objectOut = new ObjectOutputStream ( new BufferedOutputStream (broker.getOutputStream ())); objectOut.flush (); objectIn = new ObjectInputStream ( new BufferedInputStream (broker.getInputStream ())); InitMsg initMsg = (InitMsg) objectIn.readObject (); Vector channelNames = initMsg.getChannelNames (); Enumeration names = channelNames.elements (); while (names.hasMoreElements ()) { String name = (String) names.nextElement (); Channel.channelAdded (name); } processor = new Thread (this); processor.start (); } catch (IOException ex) { try { broker.close (); } catch (IOException ignored) { } throw ex; } catch (ClassNotFoundException ex) { ex.printStackTrace (); try { broker.close (); } catch (IOException ignored) { } } } The initBroker () method, which does the interesting setup work, is called by Channel once the MessageBus is set. In this implementation, an ObjectInputStream and ObjectOutputStream are instantiated, and an InitMsg is received on the ObjectInputStream over the socket. The InitMsg contains the channels currently on the system.A new Thread, processor, is started to continue listening to the ObjectInputStream for incoming messages. public String getBrokerName () { return brokerName; } public synchronized void addChannel (String name) throws IOException { objectOut.writeObject (new ChannelAddedMsg (name)); objectOut.flush (); } public synchronized void publishChannelMessage (String name, Object message) throws IOException { objectOut.writeObject (new ChannelUpdateMsg (name, message)); objectOut.flush (); } These methods are called by Channel to add a channel and publish a message on a channel, respectively. public synchronized void stop () { if (processor != null) { try { objectOut.writeObject (new QuitMsg ()); objectOut.flush (); } catch (IOException ignored) { } processor = null; } } public void run () { // does not try to reestablish socket if connection is lost try { while (Thread.currentThread () == processor) processMsgs (); } catch (Exception ex) { ex.printStackTrace (); } finally { try { broker.close (); } catch (IOException ignored) {} } processor = null; } The run () method loops until stopped. If an exception is generated, which will happen if the server hangs up on the client, the socket is closed and no attempt is made to set up another connection. void processMsgs () throws IOException, ClassNotFoundException { BusMsg msg = (BusMsg) objectIn.readObject (); if (msg instanceof ChannelAddedMsg) { String name = ((ChannelAddedMsg) msg).getName (); Channel.channelAdded (name); } else if (msg instanceof ChannelUpdateMsg) { String name = ((ChannelUpdateMsg) msg).getName (); Object message = ((ChannelUpdateMsg) msg).getMessage (); Channel.channelMessageReceived (name, message); } else { System.out.println ("Unknown message: " + msg); } } } The processMsgs() method blocks on the first statement until a new BusMsg has come across the wire from the broker. Once that happens, the type of the message is used by the handler to determine the proper action to take.The information contained in the BusMsg is retrieved and the correct class method of Channel is called to pass the information on to the relevant client listeners.Note: For brevity’s sake, the Message Bus broker, MessageBusBrokerSocketImpl, is not explained in detail here, but the source is available as part of the source download in Resources.Using the Message BusFirst download the complete source (in tar or zip format) from Resources.Using the Message Bus with a client is fairly straightforward. The most important step is defining a set of application-specific messages (MessageBusListAddElementMsg and MessageBusListReplaceElement in this case) that the bus will transport among clients.An important implementation note: Once the application-specific messages are defined, you must place copies of their class files where the broker can see them. In other words, you will need to place them in the proper directory structure below the directory containing the broker (MessageBusBrokerSocketImpl), which in this case is MessageBusWhiteBoard/shoffner/step/may.For example, the distributed whiteboard application (our demo app in this article) uses a subclass of ObservableList called MessageBusList, which allows the whiteboard to plug MessageBusList in as its ObservableList. MessageBusList is defined along with MessageBusList-specific versions of the whiteboard classes, MessageBusListWB and MessageBusListWBLauncherApplet, in MessageBusWhiteBoard/shoffner/step/may/wb.Class files for MessageBusList‘s messages, MessageBusListReplaceElementMsg and MessageBusListAddElementMsg, are located in the same directory.MessageBusBrokerSocketImpl is launched from the MessageBusWhiteBoard directory with the following command:java shoffner.step.may.MessageBusBrokerSocketImpl 5001 Note that you can substitute whatever TCP port you wish to use for 5001, but you have to change the BROKER_PORT member of wb/MessageBusListWB.java to whatever port you put on the server’s command line.When whiteboard applications launch, their respective MessageBusLists pass messages to each other to add and move elements in the shared whiteboard session. These messages are deserialized and reserialized at the server, and the server can see the classes it needs to do this because they are in the right place relative to where the server is launched.ConclusionThe Message Bus MOM we implemented in this article is a useful — albeit simple — system, and a good alternative to RPC-based systems like CORBA and RMI. MOM is especially well-suited for situations in which clients collaborate or otherwise act as peers.There are several optimizations you might consider implementing in your copious spare time, including message echo elimination, increased broker intelligence (so that the system doesn’t waste bandwidth broadcasting messages to instances of MessageBus that don’t have listeners on a given channel), and attempting reconnects to the broker if the connection is lost for some reason. Also, changing the implementation of the bus so that it doesn’t have to have access to the classes of the messages it transports would complete its generalization.Other useful MOM features include access control, client-specific routing in addition to channel-based routing, and secure message transport. In addition, you might take a shot at a completely new implementation that uses HTTP.You can certainly implement these features yourself, but you might want to investigate licensing from a vendor. Several MOM vendors are included in the Resources section.Michael Shoffner is president of PDC, Inc., a software development firm specializing in professional Java training and Internet/intranet solutions. He is co-author of Java Network Programming (Manning/Prentice Hall) and is lead author of JavaWorld’s Java Step by Step column. JavaTechnology Industry