steven_nunez
Contributing Editor

Review: Storm’s real-time processing comes at a price

reviews
Apr 15, 201511 mins

The open source stream processing solution is proven reliable at scale, but difficult to learn and use

business storm 157689723
Credit: Thinkstock

Storm, a top-level Apache project, is a Java framework designed to help programmers write real-time applications that run on Hadoop clusters. Designed at Twitter, Storm excels at processing high-volume message streams to collect metrics, detect patterns, or take actions when certain conditions in the stream are detected. Typically Storm scenarios are at the intersection of real time and high volume, such as analyzing financial transactions for fraud or monitoring cell-tower traffic to maintain service level agreements.

Traditionally these sorts of systems have been constructed using a network of computers connected by a message bus (such as JMS). What makes Storm different is that it combines the message passing and processing infrastructure into a single conceptual unit known as a “topology” and runs them on a Hadoop cluster. This means that Storm clusters can take advantage of the linear scalability and fault tolerance of Hadoop, without the need to reconfigure the messaging bus when increasing capacity.

When working with teams new to Storm, I have found it helpful to approach system design from three dimensions: operations, topology, and data. These roughly map onto their corresponding dimensions in traditional enterprise applications, but translated into the Hadoop world. A Storm topology is a processing workflow analogous to a set of steps in a processing pipeline that would be managed by Oozie in a multipurpose Hadoop cluster.

The topology is the fundamental unit of deployment in Storm. It consists of two types of objects: spouts (message sources) and bolts (message processors). Spouts are available for many common data sources such as JMS, Kafka, and HBase.

Deploying a Storm cluster

Storm clusters typically run multiple Storm applications (topologies) simultaneously. In this sense it is analogous to a Java application server. A developer bundles up a JAR file containing the Storm topology and all of its dependencies, then deploys it to the cluster where it runs until terminated. Storm application developers do not need to be aware of the specific configuration of the cluster their application runs on, so they can focus on the specifics of their application.

Installing a Storm cluster these days is straightforward using Ambari. I was able to install Storm and its dependencies in less than 10 minutes using Hortonworks Data Platform. AWS users who want to spin up Storm clusters on Amazon should check out the storm-deploy project. HDInsights users can provision a Storm cluster on Azure with only a few clicks from their Azure portal or from PowerShell. Manual installation of Storm is straightforward but tedious.

Typically you will be asked where you would like the major components of Storm installed during this process. The three master services — Nimbus, the UI server, and a distributed remote procedure call — are usually located together on a single node. Then there are the worker nodes, confusingly called “supervisors.” The more of these you can deploy, the more processing power your Storm cluster will have. Unlike Hadoop services, Storm services are fault tolerant, so the decision of which services to place on which nodes is less critical.

Storm Cluster architecture

Under the hood, Storm has a lot of moving parts, but because they use Zookeeper to maintain state, they are fault tolerant and can be restarted, even on different nodes, without affecting the running application. Although Zookeeper is a critical component of a Storm topology, all you need to know at this point is that it’s a centralized service for maintaining configuration information and coordinating distributed applications. Storm stores all of its important information in Zookeeper, so if a component should fail, it can be restarted and will pick up where it left off by reading its state from Zookeeper.

The master service, known as Nimbus, is responsible for resource allocation in the Storm cluster. This daemon, implemented as a Thrift service, accepts incoming topologies, distributes code around the cluster, assigns tasks to supervisors, and handles failed nodes. Although failure of the Nimbus daemon will not cause running jobs to fail, its importance in managing failure of worker nodes means that it should be run under some kind of monitoring program to restart it if it fails.

Finally, the worker nodes, or supervisors, are responsible for running spouts and bolts in a pool of JVMs and threads (“workers” and “tasks” in Storm-speak). A bolt or spout executes as one or more threads running in parallel across the cluster. This gives Storm its scalability. Further, Storm worker nodes can be shared with other Hadoop services like Spark or MapReduce.

Managing a Storm cluster

Storm comes with a simple GUI that displays metrics collected by Nimbus. The metrics are useful for knowing when it is time to add nodes to a cluster and for tuning a topology. Although useful, the Storm GUI is not a complete solution, and most production clusters use additional monitoring tools such as JMX, Graphite, or Metrics (Yammer). These tools requires advanced configuration and tuning knowledge, and when the need for troubleshooting arises, be prepared to spend a good deal of time grepping through the logs.

Storm UI

Storm comes with a simple GUI that provides essential metrics, but a complete monitoring solution will require additional tools.  

Developing a Storm application

Storm is written in a combination of Java and Clojure, though spouts and bolts can be written in any language that supports Thrift, such as LISP, Python, or JavaScript. Developers work in local mode when writing and debugging their topologies. In local mode, threads are used to simulate worker nodes, and they allow the developer to set breakpoints, halt execution, inspect variables, and profile their topology before deploying it to a distributed cluster where all of this is much more difficult.

A topology is implemented as Thrift data structures. In a typical workflow, developers write spouts and bolts that operate over streams of tuples (ordered lists of values of any type). These components are assembled into a topology specific to the problem they are trying to solve. When testing is complete, the bundled JAR is submitted to Nimbus to be executed on the cluster.

Designing a topology is similar in many ways to designing a solution architecture. Each bolt in the processing pipeline modifies the tuples in the stream. The Storm architect’s job is to connect multiple bolts together in a topology to implement a complex set of real-time transformations or analysis. Given a sufficiently rich library of bolts, the process of developing a new application becomes one of simply combining the parts into the right topology and modifying any required parameters.

Data transformation

The fundamental data abstraction in Storm is a stream, or an unbounded sequence of tuples. Each stream in a topology is uniquely identified, as are the tuples within it. Creating a topology is, conceptually, specifying the “flow” of the streams through the bolts. Streams are produced by spouts that act as adapters between external data sources and a Storm cluster.

A bolt’s job is to process tuples as they are delivered by the stream. Bolts do all the heavy lifting in a Storm application. They are used to filter, aggregate, or join tuples together. Bolts can also send tuples to external message queues, databases, or HDFS. Bolts can process any number of input streams and produce any number of output streams. Like spouts, bolts are available for common data sources, including MongoDB, Cassandra, HBase, HDFS, and many relational database management systems.

Reliable message processing

Spouts can be configured in two modes: reliable and unreliable. Unreliable operation is appropriate for applications that tolerate a small amount of data loss. For example, losing two or three tweets from the firehose won’t make much of a difference in most applications. Nevertheless, the majority of Storm applications are reliable. In a reliable configuration, Storm guarantees that every tuple in a stream will be fully processed. To be considered fully processed, each tuple — along with every tuple it generates as it traverses bolts in the topology, a collection known as the “tuple tree” — must be processed before a timeout interval, configured for 30 seconds by default. If it is not or if a fail method is called on the tuple, the spout will replay it.

Reliable operation of a topology must be built by the developer. Every tuple must be acknowledged by calling methods on a special “acker” task, which Storm uses to track tuple processing. Failure to ack tuples will cause out-of-memory errors in the acker task, and this problem rarely shows up in local mode. Once the spout that emitted the tuple receives an ack, it can clean up and send an acknowledgement to the external data source. Storm includes some base classes and interfaces that help with all of the acking.

In addition, the tuple tree must be constructed by a mechanism called “anchoring.” This makes Storm aware of each tuple tree branch and its associated spout-to-bolt or bolt-to-bolt connections. Reliable tuples are emitted from spouts with a 64-bit message ID that must be re-emitted by all downstream bolts after processing.

Trident topologies

Trident is a high-level abstraction for Storm that addresses two of the most common requirements in real-time processing: maintaining stream state and ensuring exactly once processing semantics. Trident supersedes the “transactional topologies” described in the documentation, which hasn’t quite caught up with the released source code. In many ways you can think of Trident as an analogy to Pig in map-reduce. Most of the lower-level details described above are abstracted away, allowing the developer to focus on the problem at hand rather than the nuts and bolts of a topology (pun noted, but not intended).

Trident topologies are compiled into Storm topologies for execution on a cluster. This compilation is automatic and generally produces more efficient topologies by minimizing network data transfers whenever possible.

In a Trident topology, the developer works with streams and operators on those streams. One of the main differences in stream implementation is that Trident streams consist of batched sequences of tuples, rather than individual tuples. This is a more efficient way of processing, especially when updates to external systems are involved because the entire batch may be committed as an atomic transaction. This slightly lowers latency but increases overall throughput.

Spouts in Trident are conceptually similar to Storm spouts, but implemented differently. In a compiled Trident topology, a Trident spout will consist of a Storm spout and one or more bolts. These extra components give Trident spouts their additional functionality to handle batch and stateful operations. The spouts are instances of SpoutCoordinator and the bolts MasterBatchCoordinator. A MasterBatchCoordinator uses Zookeeper to track where in the stream the batch came from so that the correct range of source data may be replayed in the event of a failure. The coordinators have their own interfaces for programmers to use when creating topologies.

In contrast to Storm, developers do not define bolts in Trident. They define a sequence of operations on a data flow. I find this a much more intuitive way to work with a real-time data stream. Trident takes care of compiling the stream operations into a series of bolts that implement the logic. Trident operates five general types of operators: filters, functions, aggregations, joins, and merges. Developers can define their own customer operators if required, such as for streaming analytics.

Finally, Trident allows developers to manage state without worrying about the details of persistence. Each type of state is associated with a type of Trident spout, and the developer selects the type based on the level of fault tolerance required. The transactional state enables exactly once message semantics, the most commonly encountered requirement. The opaque state is also transactional, but subtly different in that a replayed batch might not contain the same tuples. The nontransactional spout is at-most-once or at-least-once. Like Storm, Trident has a number of ready-made bolts that implement these processing semantics.

For developers only

Storm may be the only real-time processing framework that has been proven to process millions of messages per second. It is not without a price, though. Compared to Storm’s nearest competitor, Spark streaming, the Java API is awkward and difficult to understand (though the Clojure one is better). The lack of GUI for management makes operating a cluster harder than it should be, and many problems can only be diagnosed by looking at the logs. The most significant feature that Storm lacks is a GUI for building topologies. This means that only developers with an understanding of Java or Clojure are able to build applications from reusable components. A graphical topology builder would open up Storm application development to a wider audience, foster the spread of reusable bolts, and generally speed adoption.

Options are few in the world of real-time message processing frameworks at massive scale, and Storm is the only one battle tested by the largest organizations in the world. Despite its current shortcomings, if you really have a “big streaming” problem, Storm appears to be the best choice.

steven_nunez
Contributing Editor

Steve Nuñez is technologist-turned-executive currently working as a management consultant helping senior executives apply artificial intelligence in a practical, cost effective manner. He takes an incremental approach to AI adoption, emphasizing the organizational change required for analytics and A.I. to become part of the company DNA.

Before moving to consulting Steve led the professional services and technical pre-sales organizations in Asia Pacific for MapR, a “big data unicorn” acquired by HP Enterprise. While leading the field organization, Steve served clients including Toyota, Bank of China, Philips, Samsung, and the government of India in their bio ID program.

Steve has been a contributing editor and reviewer for InfoWorld since 1999.

More from this author