Here's how to quickly turn your apps into a scalable, high-performance data-processing solution Reinventing the wheel over and over again can be fun, but you are probably going to end up with squeaky ones that fall off the axle. When developers address the scalability and performance of their applications, they often reinvent a solution where each request is split into batches which are processed concurrently and merged for delivery to the client.Then comes the glorious part (usually accomplished by marketing): naming the new contraption. As a result, plenty of terminology has emerged for our enjoyment: brokers, clusters, agents, runners, distributors, processors, workers, and so on and so forth.After the initial celebration, harsh reality kicks in. Bug fixes related to the deadlocks, blocking synchronizations, transaction issues, and general thread-safety problems begin to consume months and months of effort. The open-source Offsprings project can help developers address this common and potentially difficult aspect of server-side software development: As explained in this article by the authors of the Offprings project, you can use this universal Spring-based solution to add concurrent processing to an application with just an hour of XML context configuration.Parallelizing as a cross-cutting concernThe management of concurrent batch processing falls under the category of aspect-oriented programming (AOP), an approach that allows developers to concentrate on the primary object model for their application. Managing concurrent batch processing, logging, billing, and transaction integrity are all functions considered to be cross-cutting concerns called aspects. They can be addressed outside of the primary object model without modifying or even recompiling the original code (see “Spring bare essentials”). Aspects intercept method invocations on the underlying object model at join points and provide some additional behavior called advice. The management of concurrent batch processing is considered an aspect because it occurs outside of an application’s primary object model and may intersect with that model at various arbitrary points. Offspring’s AOP solution is to wrap a universal advisor bean (the splitter) around the interface for each method (join point) that must be parallelized. This bean’s advice divides the work into batches, which are processed in parallel and reassembled. Neither the sender nor receiver of the original method invocation are aware of the splitter’s work.The implementation of this concept should meet two basic requirements. First, the output from the splitter bean must be identical to the result of the invocation without applying any AOP. Although the solutions for Map, List and Array results are trivial, streams may require additional processing. For example, the XML input stream may require additional configuration and accepts an optional XSLT template to merge meta information (addressed below).The second requirement: The splitter bean should work on a variety of interface signatures without needing meticulous configuration for each particular API. Therefore, the splitter analyzes the API signature through Java Introspection and loads the appropriate implementation dynamically. The basic cases of Map, List, Arraym, and XML InputStream are supported out of the box. Support for additional API signatures can be added without modifying the base Offspring software. Offsprings ExampleLet’s consider simple DelayedClient and DelayedService examples from the package net.sourceforge.offsprings.examples. The DelayedService interface includes several API signatures intended to illustrate some of the basic argument/return type combinations that the splitter supports:public List<Document> getDocListFromIdList(List<String> ids); public Map<String,Document> getDocMapFromIdList(List<String> ids); public Document[] getDocArrayFromIdArray(String[] ids); public InputStream getInputStreamFromIdList(List<String> ids); public InputStream getInputStreamFromIdArray(String[] ids); public void processObjectsById(List<String> ids); The implementation that DelayedServiceImpl provides includes a specific delay of 10 milliseconds per document. The getInputStreamFromId methods return XML that looks like this: <?xml version='1.0' encoding='UTF-8'?> <delayed_service> <docs> <doc source="delayedSerive"><id>0</id> <content>This is a content of the document: 0</content> </doc> <doc source="delayedSerive"><id>1</id> <content>This is a content of the document: 1</content> </doc> </docs> <meta_info> <document_count>2</document_count><processing_time>2</processing_time> </meta_info> </delayed_service> Note that the XML document above has two different content parts: documents content (xpath: delayed_service/docs)and meta info content (xpath: delayed_service/meta_info).The final documents content is simply the documents content from each of the different batches merged within a single <docs> element. However, the meta content (such as document count) should be merged based on the meaning of the tags (e.g. resulting count is a sum of count from all batches).DelayedServiceClient demonstrates usage of the DelayedService API with and without splitter. The conventional single threaded execution without splitter:DelayedService service = new DelayedServiceImpl(); client.setService(service); List docsList = client.processLists(); Map docsMap = client.processMap(); InputStream in = client.processInputStream(); To use splitter, we instead access the DelayedService via the SplitterFactory in the Spring Application Context (configuration within the Application Context is covered below):String[] contextFiles = {"classpath:exampleContext.xml"}; SplitterFactory factory = SplitterFactory.getInstance(contextFiles, "defaultSplitterFactory"); service = (DelayedService)factory.getBean("delayedService"); client.setService(service); The output shows the performance boost on the 10 threads splitter and, no wonder, we have almost 10 times better results: Process List. Time to process 1000 documents: 10015; per document: 10.015 Input Stream. Time to process 1000 documents: 10121; per document: 10.121 Splitter is engaged ... Process List. Time to process 1000 documents: 1039; per document: 1.039 Input Stream. Time to process 1000 documents: 1255; per document: 1.255 <b>DelayedClient</b> example configurationThe example Application Context configuration files is example-context.xml. Configuration is consistent with the usual Spring AOP approach.First, import defaultSplitterContext.xml :<import resource="classpath:defaultSplitterContext.xml"/> Second, declare the <code>delayedServiceImplementation</code> bean, which contains the implementation we are trying to parallelize: <bean id="delayedServiceImpl" class="net.sourceforge.offsprings.examples.DelayedServiceImpl"/> Third, declare the AOP Proxy bean <code>delayedService</code>, which acts as a foundation for the point cut interceptor. In summary, the <code>delayedService</code> proxy intercepts all calls to the <code>DelayedService</code> interface and redirects them through the splitter (bean <code>splitterAdviceDelayedService</code>) while using the actual implementation in <code>delayedServiceImplementation</code>: "target" is the delayedServiceImplementation bean. "interceptorNames" is the splitterAdvisorDelayedService bean. <bean id="delayedService" class="org.springframework.aop.framework.ProxyFactoryBean"> <property name="proxyInterfaces"> <value>net.sourceforge.offsprings.examples.DelayedService</value> </property> <property name="interceptorNames"> <list> <value>splitterAdvisorDelayedService</value> </list> </property> <property name="target"> <ref bean="delayedServiceImpl"/> </property> </bean> Fourth, declare the <code>splitterAdvisorDelayedService</code> bean which effects the point cut to the splitter: Intercept all methods with names matching the "pattern" ".*" Apply the "advice" given by the splitterAdviceDelayedService bean <bean id="splitterAdvisorDelayedService" class="org.springframework.aop.support.RegexpMethodPointcutAdvisor"> <property name="pattern"> <value>.*</value> </property> <property name="advice"> <ref bean="splitterAdviceDelayedService"/> </property> </bean> Fifth, declare the bean <code>splitterAdviceDelayedService</code>, which extends the default advice <code>defaultSplitterAdvice</code>. This bean provides the core functionality of the whole application; it's a splitter. We can reconfigure splitter parameters as necessary:Property parallelism is the number of separate threads which will be in parallelProperty batchsize is the number of items that will be processed in each batchSome optional properties are only relevant for XML <code>InputStream</code> results: Property docPath is an XML xpath expression to the document element (e.g. delayed_service/docs/doc) within the XML. If omitted the elements in each batches root elements are merged.Property metaPath is an xpath expression to the element containing meta information for each batch. If omitted, it is assumed there is no meta information.Property metaXSLTTemplate provides the name of an XSL template that will be used to merge the content in the location designated by metaPath. <bean id="splitterAdviceDelayedService" parent="defaultSplitterAdvice"> <property name="parallelism" value="10"/> <property name="batchsize" value="10"/> <property name = "properties"> <map> <entry key="docPath" value="delayed_service/docs/doc" /> <entry key="metaPath" value="delayed_service/meta_info" /> <entry key="metaXSLTTemplate" value="meta.xsl" /> </map> </property> </bean> Take it apart, put it back together Let’s take a look at the implementation details. The SplitterAdvice class (let’s call it splitter) is the foundation of the whole application. It orchestrates the splitting of the process in the parallel threads and assembling the result of each thread execution. Splitter is an AOP advice; therefore, it implements the Spring MethodInterceptor interface. As a result, all calls are redirected to the method invoke(MethodInvocation invocation). The MethodInvocation method must know which argument represents the collection of document identifiers to split them into the chunks. The Parameter idsArgumentPosition can be set in application context to specify which argument contains the identifiers.By default, the splitter uses the first list or Array argument of the invocation as the identifiers. Parallelism and batch size are two other important configurable parameters which define number of parallel threads and batch size of the id collection in each thread. The IdsIterator interface is responsible for splitting the identifiers collection into chunks. The BarrierResult interface then processes each chunk independently and combines the result of each execution into the final output. Concrete implementations for IdsIterator and BarrierResult are dynamically loaded by the Splitter Factory. This dynamic resolution depends on the identifier collection and result types.Currently splitter supports List and Array classes for the identifiers and Void, Map, List and XML input stream for the execution results. Descendents of these classes are supported automatically. Any other implementations of the identifiers and return type could be easily configured by the user on per-case basis. Extending SplitterSplitter will understand and work with generic parameters and return types such as List, Map, String arrays, and Void. In real life however, it should deal with custom-made types in each specific application. Splitter is designed in such a way that adding support for additional parameter or return types is easy. Adding a new parameter type entails taking the following steps. First, write a class which implements the interface IdsIterator<IDS>. This will usually be a descendent of IdsIteratorBaseImpl<IDS>. This class implements the process of splitting the parameter objects into batches.The method nextBatch() must be synchronized. It should return a Batch object containing a subset of the items (identified by ids) to be processed. The first call to nextBatch() must start at the beginning of the original item set and subsequent calls must return consecutive subsets.The method getApplicableClasses() should return an array containing the classes to which the new idsIterator will apply (descendents to these classes will work automatically and do not need to be listed separately).For descendents of IdsIteratorBaseImple<IDS>, the method sizeForIds (Object ids) should return the number of ids provided in the ids object. The second step is to add a bean declaration for this implementation class to the project’s Spring context file. For example:<bean id="idsIteratorArray" class="net.sourceforge.offsprings.splitter.impl.IdsIteratorArrayImpl" singleton = "false" /> Third, add a value to the idsIterators property of the splitterFactory in the defaultSplitterContext.xml file referencing the newly created bean. For example:<value>idsIteratorArray</value> Follow these steps to add a new return type. First, write a class which implements the interface BarrierResult. (Generally, this will be a descendent of BarrierBaseImpl.) This class implements the process of merging the various batches. The method getApplicableClasses() should return an array containing the classes to which the new idsIterator will apply (descendents to these classes will work automatically and do not need to be listed separately). For descendents of BarrierBaseImpl, the method merge() should merge the results from each batch (stored in the protected attribute _results) into a single object of the appropriate type.Second, add a bean declaration for this implementation class to the project’s Spring context file. For example:<bean id="barrierArray" class="net.sourceforge.offsprings.splitter.impl.BarrierArrayImpl" singleton = "false" /> Third, add a value to the barriers property of the splitterFactory in the defaultSplitterContext.xml file referencing the newly created bean. For example:<value>barrierArray</value> JavaSoftware Development