wtorek, 10 czerwca 2014

Writing and testing software for data integrity

Last week I hosted a presentation under auspices of JUG Łódź (which I happen to be a founding member of): "Writing and testing software for data integrity". Data integrity is a broad topic, so I touched only a few chosen aspects. Things that made it into the presentation include:

  • Physical vs logical integrity
  • ACID (strong) consistency
  • BASE (weak) consistency
  • Synchronous vs asynchronous replication
  • Distributed systems limitations - CAP theorem
  • Examples of data consistency violation (like Photo Privacy Violation or Double Money Withdrawal described in "Don't settle for eventual consistency" article)
  • Strong consistency comes with performance penalty. Choosing performance and availability over consistency might be justified and lead to improved revenues (as is the case with Amazon) or lead to spectacular failures like in case of Flexcoin

Local vs distributed transactions

The second part of the presentation was slightly different, though. It included a live demonstration of a situation where local transactions are not sufficient to guarantee data consistency across multiple resources and how distributed transactions come to the rescue. The demonstration was done basing on the scenario below:

The application consumes messages from a JMS queue (TEST.QUEUE), stores message content in a database, does some processing inside VeryUsefulBean and finally sends a message to another JMS queue (OUT.QUEUE).
The application was a web application deployed on JBoss EAP 6.2. JMS broker functionality was provided by ActiveMQ and MySQL acted as a database. Web application logic was built with Spring and Apache Camel.
So let's assume that the processing inside VeryUsefulBean fails:
The exception was injected using JBoss Byteman rule:

As expected with local transactions system state was inconsistent after processing failure. The expected state would be:

  • Incoming message not lost
  • No data saved in database
  • Nothing sent to outbound queue (OUT.QUEUE).

Basically one would expect that system state would not change due to processing failure. However the actual behaviour was:
  • Incoming message not lost
  • Data saved to DB (as many times as message was (re)delivered).
  • Nothing sent to outbound queue (OUT.QUEUE).
The reason for that behaviour was that local transaction that saved data into database was committed independently from JMS message consumption transaction, leading to inconsistent state.

Then the experiment was repeated with JTA transaction manager and XA resources set up. The outcome was correct this time - no data was saved to database. JMS message consumption and all processing including database inserts was handled as part of the same distributed transaction and all changes were rolled back upon failure as expected.

Automated integration test

The test proved that application worked correctly with JTA transaction manager and XA resources (XA connection factory for JMS, XA data source for JDBC), however the test was manual and time consuming. Ideally this behaviour would be tested automatically, and this was the topic of the final part of the presentation. We did a walk through an integration test that verified transactional behaviour automatically.

First test cases were defined as JBehave scenarios:
JUnit was used to execute the scenarios:

Spring context for the web application was splitted into two parts:
  • spring-jmsconsumer-core.xml - contained beans with application logic and definition of Camel context.
  • spring-jmsconsumer-infrastructure.xml - contained beans used to access external resources, like JMS connection factory or JDBC data source.
In order to execute the application logic in fully controlled environment the test had to be completely autonomous. It means that all external interfaces and infrastructure had to be recreated by the test harness:
  • ActiveMQ - replaced by embedded ActiveMQ.
  • Arjuna Transaction Manager provided by JBoss EAP - replaced by standalone Atomikos.
  • MySQL - replaced by embedded HSQLDB.
Both embedded ActiveMQ and HSQLDB support XA protocol, so they could be used to verify transactional behaviour.
While the core context could and had to be reused during test execution, the infrastructure context made sense only when the application was deployed on a real JEE server, as it retrieved necessary resources from JNDI - see below.

Therefore the infrastructure part of the context had to be rewritten for automated test execution:


Note beans with id amq-broker and hsqldbServer - they are responsible for starting embedded JMS broker and DB server needed during test execution.

Having the infrastructure in place, it is quite simple to write test steps defined in JBehave scenarios, e.g.:

There are obviously a few other details that need to be worked out, but since this post has already grown too long - have a look at the complete code on GitHub: https://github.com/mstrejczek/dataintegrity.

poniedziałek, 9 czerwca 2014

Review of Scalar - Scala Conference

This is a long long overdue review of the Scalar conference that took place on 5th of April 2014 in Warsaw. It's been over 2 months since the event so I've forgotten a lot - this post is based mostly on the notes I took during the event. Unfortunately I lost most of the notes when transferring them between devices...

About the event

Scalar was a one day conference dedicated to Scala and organized by SoftwareMill - company whose people are well known among Java/Scala community. It was a free event and the venue was National Library - a lovely location next to Pole Mokotowskie park. There was no breakfast served at all - not great, but acceptable given it was a free event. A warning on the website to have a proper breakfast in the morning would be good, though. It was difficult to stay focused during the first presentation when in starvation mode. The situation was saved to some extent by cookies that were available in large quantities, and I appreciated 100% juice being available instead of sugar-enriched nectars or cola. There was one more issue with logistics - the lobby with cookies and drinks was too small and therefore incredibly crowded during the first breaks. Later the situation improved as more and more people started to go outside to enjoy lovely weather.

Content

There was only one track, so I didn't have to make difficult choices. I missed a few sessions, though - due to personal reasons I could not attend all sessions or stay focused at all times. The first session was about "Database Access with Slick". Slick is a functional-relational mapping offering direct mapping from SQL to Scala code with little impedance mismatch. In SQL world extensions like PL/SQL are proprietary and use ugly imperative form. With Slick one can enrich queries using Java code (or was it Scala code?) There is free support for open source DBs. Commercial extensions are needed to use Slick with proprietary DBs like Oracle. I remember a complex type system and operators presented during the session - I didn't find that aspect of Slick too appealing. Also support for upsert operation is not available yet. Summing up - the presentation was really solid, but I was not impressed by the Slick library itself. One more note - one can try Slick out on Typesafe Activator.

The second session covered "ReactJS and Scala.js". ReactJS is a JavaScript library for building UIs that was developed by Facebook and is used by Instagram. Scala.js allowed to develop code in Scala and compile it into JavaScript. During the presentation there was an extensive walk through code of a chat application (Play on backend, ReactJS + Scala.js on frontend). What I noted was that Scala collections work nice in a web browser. The problem is that JavaScript implementation of Scala standard library is heavy (20 MB), but can be stripped down to include only necessary bits e.g. 200kB in size. Another issue was that conversions from/to JavaScript types were a pain - especially intense if rapid prototyping is what you're after. Another conclusion: ReactJS works great with immutable data. Regarding Scala-to-JavaScript compiler: normal compilation is fast, but the produced files are huge. Closure compilation is slow but produces small output files. So from now on I'm writing from memory only as my surviving notes covered only first two sessions:

The next session was "Simple, Fast & Agile REST with Spray" by Adam Warski from SoftwareMill. The presentation was one of the better ones: well-paced, informative and with really smooth live coding. Live coding, if done well, makes presentations more interesting in my opinion. Not only the presentation itself was good, but the subject too - you can really expose RESTful interface (including starting up HTTP server) with a few lines of code using Spray. Definitely recommended if you are looking for Scala-based REST framework.

After that came "Doing Crazy Algebra With Scala Types". There were some interesting allegories shown between Scala types and mathematic formulae. That was probably the first time I saw Taylor series since I left university...Mildly amusing - I found it a curiosity but could not identify practical uses.

The last session before lunch was "Scaling with Akka", which involved a demo of an Akka cluster running on a few Raspberry Pi nodes. I must admit I don't remember much from this session apart from the fact that Akka cluster worked indeed and Raspberry Pis were painfully slow.

The first session after lunch was devoted to Scala weaknesses and pecularities: "The Dark Side of Scala" by Tomasz Nurkiewicz. It was a good and fast paced presentation, and there were some interesting kinks shown that did not repeat those found in other well known presentations (e.g. the famous "We're Doing It All Wrong" by Paul Phillips).

The next session was a solid introduction to event sourcing: "Event Sourcing with Akka-Persistence" by Konrad Malawski.

I couldn't focus fully during the next three presentations, so I won't write about them. The last one that I remember was "Lambda implementation in Scala 2.11 and Java 8". It included comparing bytecode generated for Java 8 lambda expression with bytecode generated by Scala, plus an excellent explanation how invokedynamic works in Java 8. Java 8 uses invokedynamic to call the lambda expression code, while Scala generates additional anonymous class and invokes a virtual override method. The bytecode for Java 8 looks much more concise, although at the performance is not necessarily better as invokedynamic in Java 8 leads to generation of an anonymous class at runtime. So effectively an anonymous class is used anyway - with Scala it is generated at compile time, with Java 8 - at runtime. So currently the main benefit is smaller size of Java 8 jar files compared to Scala-generated ones. However if in Java 9 or Java 10 the anonymous method generation gets optimized away entirely then invokedynamic will clearly get significant runtime performance boost - without need to touch the source code! Scala is going to migrate to invokedynamic in the future.

There was one more session at the end but I missed it. Summing up - the conference was all right. The sessions were short, which made space for many different topics. The level of presentations was satisfactory on average - not all presentations were perfect and interesting, but I think for the first edition of a free conference - well done.

niedziela, 28 lipca 2013

Making recommendations with Apache Mahout

Recently I've started reading "Mahout in Action" - a book about machine learning library Apache Mahout. A few years ago I was involved in chess programming and came across machine learning algorithms for tuning chess opening books. Therefore I thought it might be interesting to see what modern machine learning libraries have to offer.
First part of the book is dedicated to making recommendations. I'm sure everyone has seen suggestions offered by online stores - "users who chose that product also liked...". Implementation of this feature can be made much easier with tools like Mahout. Let's see.
For our trivial proof of concept let's assume we have a travel site which collects ratings that users give to the countries they visited. Basing on these ratings the site suggests other countries that the user might enjoy visiting. For the sake of simplicity we make a few assumptions:
  • We have only 6 users so far - 3 females and 3 males.
  • We have only 10 countries rated so far - only America and Europe.
  • It happened that women seem to have strong preference for European countries (higher ratings) while guys apparently enjoy American countries.
The assumptions above serve only one purpose - to make the problem and answers easy to comprehend for us at first glance. They are absolutely not required for Mahout to work properly.

User IDUser nameCountry IDCountry nameRating (1.0-5.0)
1Albert101Albania2.0
1Albert102Costa Rica4.0
1Albert105France2.5
1Albert106Mexico3.5
2Caroline102Costa Rica1.5
2Caroline103Denmark4.5
2Caroline105France4.0
2Caroline107Poland5.0
2Caroline108USA2.5
3Jacob101Albania2.5
3Jacob103Denmark2.0
3Jacob104Guatemala5.0
3Jacob106Mexico4.5
3Jacob108USA4.5
4Joanne101Albania5.0
4Joanne102Costa Rica2.0
4Joanne103Denmark4.5
4Joanne107Poland4.5
5Paul101Albania2.5
5Paul102Costa Rica4.0
5Paul105France3.0
5Paul106Mexico4.0
5Paul107Poland2.5
5Paul108USA5.0
6Monica101Albania3.5
6Monica104Guatemala1.0
6Monica105France4.0
6Monica107Poland4.0
6Monica108USA3.0

Given the data above let's try to generate recommendations for Caroline and Jacob. First we need the input data above represented in a format that Mahout understands. Our format of choice is <User ID>,<Item ID>,<Preference>:
1,101,2.0
1,102,4.0
1,105,2.5
1,106,3.5
2,102,1.5
2,103,4.5
2,105,4.0
2,107,5.0
2,108,2.5
3,101,2.5
3,103,2.0
3,104,5.0
3,106,4.5
3,108,4.5
4,101,5.0
4,102,2.0
4,103,4.5
4,107,4.5
5,101,2.5
5,102,4.0
5,105,3.0
5,106,4.0
5,107,2.5
5,108,5.0
6,101,3.5
6,104,1.0
6,105,4.0
6,107,4.0
6,108,3.0
A complete Java program using Mahout that would provide a recommendation for a given user looks as follows:
package pl.marekstrejczek.mahout;

import java.io.File;
import java.io.IOException;
import java.util.List;

import org.apache.mahout.cf.taste.common.TasteException;
import org.apache.mahout.cf.taste.impl.model.file.FileDataModel;
import org.apache.mahout.cf.taste.impl.neighborhood.NearestNUserNeighborhood;
import org.apache.mahout.cf.taste.impl.recommender.GenericUserBasedRecommender;
import org.apache.mahout.cf.taste.impl.similarity.EuclideanDistanceSimilarity;
import org.apache.mahout.cf.taste.model.DataModel;
import org.apache.mahout.cf.taste.neighborhood.UserNeighborhood;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;
import org.apache.mahout.cf.taste.recommender.Recommender;
import org.apache.mahout.cf.taste.similarity.UserSimilarity;

public class TravelSiteRecommender {
 public static void main(final String[] args) throws IOException, TasteException {
  DataModel model = new FileDataModel(new File("travel.dat"));
  
  UserSimilarity similarity = new EuclideanDistanceSimilarity(model);
  
  UserNeighborhood neighborhood = new NearestNUserNeighborhood(2, similarity, model);
  
  Recommender recommender = new GenericUserBasedRecommender(model, neighborhood, similarity);

  List recommendationForCaroline = recommender.recommend(2, 1);
  List recommendationForJacob = recommender.recommend(3, 1);
  
  System.out.println("Recommendation for Caroline: "+recommendationForCaroline);
  System.out.println("Recommendation for Jacob: "+recommendationForJacob);
 }
}
The output of this program is:
Recommendation for Caroline: [RecommendedItem[item:101, value:4.3082685]]
Recommendation for Jacob: [RecommendedItem[item:102, value:4.0]]
The simple program above recommends visiting Albania to Caroline and going to Costa Rica for Jacob. Intuitively it makes sense:

  • other users who like the same countries as Caroline (Joanne, Monica) like Albania.
  • Therefore there's a good chance that Caroline would also enjoy Albania.
Similar reasoning applies to the recommendation for Jacob.
The recommender expects that Caroline would rate Albania at 4.3 and Jacob would rate Costa Rica at 4.0.

The program is able to process input data and give a reasonable recommendation - quite an impressive outcome from just a few lines of code. Well done Mahout!
Let's see what the code actually does:

DataModel model = new FileDataModel(new File("travel.dat"));
Loads input data stored as User ID,Item ID,Preference triples into memory. It's possible to customize the way input data is loaded if our set of preferences is not represented in this format. Input data can also be taken from database.

UserSimilarity similarity = new EuclideanDistanceSimilarity(model);
The recommender we use is a user-based one. It doesn't pay any attention to attributes of items - it tries to find out what a user might like basing on his preferences so far and what other users with similar preferences also liked. UserSimilarity is an abstraction for the concept of "similarity" - there are many possible metrics and Mahout comes with ready implementations. EuclideanDistanceSimilarity defines similarity between two users basing on euclidean distance between their location in n-dimensional space (where n is number of items). Coordinates for each user location are her preference values. Other UserSimilarity implementations include PearsonCorrelationSimilarity, SpearmanCorrelationSimilarity, TanimotoCoefficientSimilarity. All UserSimilarityImplementations represent similarity as a number from range <-1-1> (from completely different to having identical preferences). Choosing the best implementation for the given problem is not easy and always involves some trial-and-error.

UserNeighborhood neighborhood = new NearestNUserNeighborhood(2, similarity, model);
UserNeighborhood is another abstraction present in Mahout. It determines which users should be considered when providing a recommendation. NearestNUserNeighborhood is an implementation which takes N most similar users (where similarity is determined by UserSimilarity described above). We take 2 most similar users in this example - this is one of the parameters that can be tuned for best accurracy. Other UserNeighborhood implementation is ThresholdUserNeighborhood - number of similar users is not fixed in this case. Parameter for this ThresholdUserNeighborhood tells how similar a user needs to be to be still included in the process of looking for a recommendation. Any number of users that are similar enough is used for arriving at the recommendation.

 
Recommender recommender = new GenericUserBasedRecommender(model, neighborhood, similarity);
Finally, a Recommender is the actual engine which provides the results. This Recommender implementation is user-based. Another approach is to use item-based recommenders, which consider how similar items are to each other rather than how users are similar to other users. There are at least a few other approaches, but this post is too short to even mention them.
List<Recommendeditem> recommendationForCaroline = recommender.recommend(2, 1);
Here we ask the Recommender for top 1 recommendation for user with ID 2 (Caroline). Let's see what changes when we use a different UserSimilarity and UserNeighborhood implementation:
package pl.marekstrejczek.mahout;

import java.io.File;
import java.io.IOException;
import java.util.List;

import org.apache.mahout.cf.taste.common.TasteException;
import org.apache.mahout.cf.taste.impl.model.file.FileDataModel;
import org.apache.mahout.cf.taste.impl.neighborhood.ThresholdUserNeighborhood;
import org.apache.mahout.cf.taste.impl.recommender.GenericUserBasedRecommender;
import org.apache.mahout.cf.taste.impl.similarity.PearsonCorrelationSimilarity;
import org.apache.mahout.cf.taste.model.DataModel;
import org.apache.mahout.cf.taste.neighborhood.UserNeighborhood;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;
import org.apache.mahout.cf.taste.recommender.Recommender;
import org.apache.mahout.cf.taste.similarity.UserSimilarity;

public class TravelSiteRecommender {
 public static void main(final String[] args) throws IOException, TasteException {
  DataModel model = new FileDataModel(new File("travel.dat"));
  
  UserSimilarity similarity = new PearsonCorrelationSimilarity(model); // changed
  
  UserNeighborhood neighborhood = new ThresholdUserNeighborhood(0.7, similarity, model); // changed
  Recommender recommender = new GenericUserBasedRecommender(model, neighborhood, similarity);

  List<Recommendeditem> recommendationForCaroline = recommender.recommend(2, 1);
  List<Recommendeditem> recommendationForJacob = recommender.recommend(3, 1);
        
  System.out.println("Recommendation for Caroline: "+recommendationForCaroline);
  System.out.println("Recommendation for Jacob: "+recommendationForJacob);
 }
}
Output now is:
Recommendation for Caroline: [RecommendedItem[item:101, value:4.2789083]]
Recommendation for Jacob: [RecommendedItem[item:107, value:3.542936]]
So the recommendation for Caroline didn't change (even the expected preference value is almost the same), but for Jacob this time the suggestion is to visit Poland (although the expected rating is quite low - only 3.54 out of 5). This recommendation looks significantly worse than the previous one, which shows how important it is to choose appropriate implementations and tune parameters for a given problem.

This was just a brief introduction to recommendation capabilities of Mahout. For real production solutions one would certainly need to consider at least:

  • which implementations and what parameter values to use for most accurrate results. Fortunately Mahout comes with facilities that help evaluating performance of recommendations.
  • how to enrich standard algorithms with domain-specific knowledge for better accurracy.
  • performance and memory consumption of the solution.
For non-trivial data sets performance may become an issue - especially if recommendations are needed in real-time, within split second.
Even batch processing may turn out to be too slow - a solution can be to distribute computations among many nodes using Apache Hadoop. Mahout provides Job implementations that allow distributing computations using standard algorithms, like the ones described above. However if data sets are too large to fit into single machine memory then a different approach is needed - redesigning algorithms to fit into MapReduce concept to take full advantage of Hadoop facilities.

If you want to run the program then:

  • save the Maven pom.xml file included below.
  • save TravelSiteRecommender class in a folder under src/main/java.
  • save preference data (comma separated triplets) as travel.dat file.
  • use Maven to run the code: mvn clean package exec:java

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>pl.marekstrejczek</groupId>
 <artifactId>mahout-exercises</artifactId>
 <packaging>jar</packaging>
 <version>1.0-SNAPSHOT</version>
 <name>Mahout Exercises</name>
 <build>
    <plugins>
      <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>exec-maven-plugin</artifactId>
        <version>1.2.1</version>
        <executions>
          <execution>
            <goals>
              <goal>java</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <mainClass>pl.marekstrejczek.mahout.TravelSiteRecommender</mainClass>
        </configuration>
      </plugin>
    </plugins>  
 </build>
 <dependencies>
    <dependency>
      <groupId>org.apache.mahout</groupId>
      <artifactId>mahout-core</artifactId>
      <version>0.7</version>
    </dependency>
   <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.5</version>    
   </dependency>
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-classic</artifactId>
      <version>1.0.11</version>       
    </dependency>
 </dependencies>
</project>

środa, 3 lipca 2013

Trust, but verify - losing messages with ActiveMQ

I created this blog more than half an year ago, but didn't post anything until today. Finally two things coincided - I have some time and I have something I'd like to share. I hope it's not the last of my posts...

Anyway, today I'd like to show that even so well known and relatively mature product as ActiveMQ 5.8.0 can behave in a totally unexpected manner. It's not meant to be ActiveMQ bashing - it is a great product with rich feature set, and you get it for free (or you can pay RedHat for comercially supported JBoss A-MQ). My goal is to show that if certain aspects of the system are of top importance then thorough testing of the whole system is key.

So how can I lose a message with ActiveMQ? Of course we use persistent messages, transactions and so on. One would expect that a queued message stays in the system until it is consumed successfully, expires or is deleted manually, right? No, not really. Try this scenario:

  • Deploy a consumer application that always throws RuntimeException in its onMessage method.

package pl.marekstrejczek.mdb;

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.Message;
import javax.jms.MessageListener;

@MessageDriven(
activationConfig = { @ActivationConfigProperty(
propertyName = "destinationType", propertyValue = "javax.jms.Queue"), @ActivationConfigProperty(
propertyName = "destination", propertyValue = "java:/queue/TestQueue")
}, 
mappedName = "TestQueue")
public class RejectAllMDB implements MessageListener {

    public void onMessage(final Message message) {
    throw new RuntimeException("!!! This is intentional !!!");
    }
}
  • Put a message onto the queue using ActiveMQ web console. Of course we want persistent delivery enabled - we don't want to lose the message if anything goes wrong.


  • After our consumer application fails to process the message it lands on dead letter queue (default name: ActiveMQ.DLQ). The messaging broker is smart enough to recognize that the consumer failed to process the message and decides to put the message in a dedicated "quarantine area" - this is what dead letter queue is all about.


  • At this stage in a real environment someone from operations would normally investigate why message processing failed. If the problem was transient and the system is ready to accept the message then the operator will probably want to replay the message to get it finally processed. But what if we move the message from DLQ to the original queue and it fails to process again? Let's try - let's browse ActiveMQ.DLQ in web console and try to move our test message from ActiveMQ.DLQ back to the TestQueue

  • What could we expect? The message is moved to TestQueue, sent to our test application (which hasn't changed and still throws RuntimeException from its onMessage method) and after broker recognizes that the message still cannot be processed successfully it should move the message to dead letter queue again. Message still exists in our system, we're fine. BUT...what really happens is that the message is sent to the application and after processing fails the message DISSAPPEARS. It's no longer on TestQueue or ActiveMQ.DLQ - it's gone, lost. If that message contained confirmation of a financial trade worth $10 mln then you better have good recovery procedures in place.



    For some systems losing a message might not be a big deal. If your system falls into this category - don't worry. But if you need very robust messaging and losing even a single message can have noticeable impact on the business then be careful. I discovered this problem during execution of robustness tests for a technology refresh project I'm currently involved in. We found at least two more middleware issues while executing our tests, so bugs in these products do exist. Third party apps need to be QAed as part of the solution similarly to any applications developed in-house. Unless of course you can afford to go live with a solution that can be buggy in the most sensitive areas.

    By the way - the problem described above will be solved in the upcoming 6.1 version of JBoss A-MQ - see https://fusesource.com/issues/browse/ENTMQ-341. I've also raised ActiveMQ ticket for this issue: https://issues.apache.org/jira/browse/AMQ-4616 (Update 2013-07-13: there has already been an open Active MQ ticket for this problem since 2011: https://issues.apache.org/jira/browse/AMQ-3405. Solved in the upcoming 5.9.0 release).