wtorek, 10 czerwca 2014

Writing and testing software for data integrity

Last week I hosted a presentation under auspices of JUG Łódź (which I happen to be a founding member of): "Writing and testing software for data integrity". Data integrity is a broad topic, so I touched only a few chosen aspects. Things that made it into the presentation include:

  • Physical vs logical integrity
  • ACID (strong) consistency
  • BASE (weak) consistency
  • Synchronous vs asynchronous replication
  • Distributed systems limitations - CAP theorem
  • Examples of data consistency violation (like Photo Privacy Violation or Double Money Withdrawal described in "Don't settle for eventual consistency" article)
  • Strong consistency comes with performance penalty. Choosing performance and availability over consistency might be justified and lead to improved revenues (as is the case with Amazon) or lead to spectacular failures like in case of Flexcoin

Local vs distributed transactions

The second part of the presentation was slightly different, though. It included a live demonstration of a situation where local transactions are not sufficient to guarantee data consistency across multiple resources and how distributed transactions come to the rescue. The demonstration was done basing on the scenario below:

The application consumes messages from a JMS queue (TEST.QUEUE), stores message content in a database, does some processing inside VeryUsefulBean and finally sends a message to another JMS queue (OUT.QUEUE).
The application was a web application deployed on JBoss EAP 6.2. JMS broker functionality was provided by ActiveMQ and MySQL acted as a database. Web application logic was built with Spring and Apache Camel.
So let's assume that the processing inside VeryUsefulBean fails:
The exception was injected using JBoss Byteman rule:
RULE Simulate processing failure
CLASS pl.marekstrejczek.jmsconsumer.service.VeryUsefulBean
METHOD doHeavyProcessing
AT ENTRY
IF TRUE
DO throw new RuntimeException("Byteman trick!")
ENDRULE
view raw BytemanTrick hosted with ❤ by GitHub

As expected with local transactions system state was inconsistent after processing failure. The expected state would be:

  • Incoming message not lost
  • No data saved in database
  • Nothing sent to outbound queue (OUT.QUEUE).

Basically one would expect that system state would not change due to processing failure. However the actual behaviour was:
  • Incoming message not lost
  • Data saved to DB (as many times as message was (re)delivered).
  • Nothing sent to outbound queue (OUT.QUEUE).
The reason for that behaviour was that local transaction that saved data into database was committed independently from JMS message consumption transaction, leading to inconsistent state.

Then the experiment was repeated with JTA transaction manager and XA resources set up. The outcome was correct this time - no data was saved to database. JMS message consumption and all processing including database inserts was handled as part of the same distributed transaction and all changes were rolled back upon failure as expected.

Automated integration test

The test proved that application worked correctly with JTA transaction manager and XA resources (XA connection factory for JMS, XA data source for JDBC), however the test was manual and time consuming. Ideally this behaviour would be tested automatically, and this was the topic of the final part of the presentation. We did a walk through an integration test that verified transactional behaviour automatically.

First test cases were defined as JBehave scenarios:
JUnit was used to execute the scenarios:
@RunWith(SpringAnnotatedEmbedderRunner.class)
@Configure( pendingStepStrategy = FailingUponPendingStep.class, failureStrategy = RethrowingFailure.class, storyReporterBuilder = MyStoryReporterBuilder.class)
@UsingEmbedder(embedder = Embedder.class, verboseFailures = true, generateViewAfterStories = true, ignoreFailureInStories = false, ignoreFailureInView = true)
@UsingSpring(resources = { "spring-test-context.xml" }, ignoreContextFailure = false)
public class IntegrityTest extends InjectableEmbedder {
@Test
public void run() throws URISyntaxException {
injectedEmbedder().runStoriesAsPaths(storyPaths());
}
protected List<String> storyPaths() throws URISyntaxException {
URL storyLocation = this.getClass().getClassLoader().getResource("stories/integrity.story");
File storyFile = new File(storyLocation.toURI());
String storyDirectory = storyFile.getParentFile().getParent();
return new StoryFinder().findPaths(storyDirectory, asList("**/*.story"), null);
}
}
view raw IntegrityTest hosted with ❤ by GitHub

Spring context for the web application was splitted into two parts:
  • spring-jmsconsumer-core.xml - contained beans with application logic and definition of Camel context.
  • spring-jmsconsumer-infrastructure.xml - contained beans used to access external resources, like JMS connection factory or JDBC data source.
In order to execute the application logic in fully controlled environment the test had to be completely autonomous. It means that all external interfaces and infrastructure had to be recreated by the test harness:
  • ActiveMQ - replaced by embedded ActiveMQ.
  • Arjuna Transaction Manager provided by JBoss EAP - replaced by standalone Atomikos.
  • MySQL - replaced by embedded HSQLDB.
Both embedded ActiveMQ and HSQLDB support XA protocol, so they could be used to verify transactional behaviour.
While the core context could and had to be reused during test execution, the infrastructure context made sense only when the application was deployed on a real JEE server, as it retrieved necessary resources from JNDI - see below.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jee="http://www.springframework.org/schema/jee"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager" />
<jee:jndi-lookup id="xaJmsConnectionFactory" jndi-name="java:/ActiveMQ/XAConnectionFactory" />
<jee:jndi-lookup id="xaDataSource" jndi-name="java:jboss/datasources/MysqlXADS" />
</beans>

Therefore the infrastructure part of the context had to be rewritten for automated test execution:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
init-method="init" destroy-method="close">
<property name="forceShutdown" value="false" />
</bean>
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
<property name="transactionTimeout" value="300" />
</bean>
<bean id="jtaTransactionManager"
class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager" ref="atomikosTransactionManager" />
<property name="userTransaction" ref="atomikosUserTransaction" />
</bean>
<!-- Set up XA-compliant JMS connection factories -->
<amq:broker id="amq-broker">
<amq:transportConnectors>
<amq:transportConnector uri="tcp://localhost:62616?deleteAllMessagesOnStartup=true" />
</amq:transportConnectors>
</amq:broker>
<amq:connectionFactory id="jmsConnectionFactory" brokerURL="tcp://localhost:62616" >
<amq:properties>
<amq:redeliveryPolicy maximumRedeliveries="1"/>
</amq:properties>
</amq:connectionFactory>
<amq:xaConnectionFactory id="amqXaJmsConnectionFactory" brokerURL="tcp://localhost:62616" >
<amq:properties>
<amq:redeliveryPolicy maximumRedeliveries="1"/>
</amq:properties>
</amq:xaConnectionFactory>
<bean id="xaJmsConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
init-method="init" destroy-method="close" depends-on="amq-broker">
<property name="uniqueResourceName" value="XA-JMS-ATOMIKOS" />
<property name="localTransactionMode" value="false" />
<property name="poolSize" value="4" />
<property name="xaConnectionFactory" ref="amqXaJmsConnectionFactory" />
</bean>
<!-- Set up XA-compliant HSQLDB data source -->
<bean id="hsqldbServer" class="pl.marekstrejczek.jmsconsumer.hsqldb.MyHsqlServer" init-method="start" destroy-method="stop">
<constructor-arg index="0" value="test-db"/>
<constructor-arg index="1" value="mem:test-db"/>
<property name="noSystemExit" value="true"/>
</bean>
<bean id="hsqldbXADataSource" class="org.hsqldb.jdbc.pool.JDBCXADataSource"
lazy-init="true" depends-on="hsqldbServer">
<property name="user" value="sa" />
<property name="password" value="" />
<property name="url" value="jdbc:hsqldb:mem:test-db" />
</bean>
<bean id="dataSource" class="org.hsqldb.jdbc.JDBCDataSource"
lazy-init="true" depends-on="hsqldbServer">
<property name="user" value="sa" />
<property name="password" value="" />
<property name="url" value="jdbc:hsqldb:mem:test-db" />
</bean>
<bean id="xaDataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean"
init-method="init" destroy-method="close">
<property name="uniqueResourceName" value="XA-DB-ATOMIKOS" />
<property name="maxPoolSize" value="3" />
<property name="minPoolSize" value="1" />
<property name="maxIdleTime" value="150" />
<property name="reapTimeout" value="5" />
<property name="testQuery" value="select 1 as x from information_schema.tables where 1=0" />
<property name="xaDataSource" ref="hsqldbXADataSource" />
</bean>
</beans>


Note beans with id amq-broker and hsqldbServer - they are responsible for starting embedded JMS broker and DB server needed during test execution.

Having the infrastructure in place, it is quite simple to write test steps defined in JBehave scenarios, e.g.:
@When("I put a message on the queue")
public void putMessageOnQueue() throws InterruptedException {
log.info(">>> When: I put a message on the queue");
jmsTemplate.send("TEST.QUEUE", new MessageCreator() {
@Override
public Message createMessage(final Session session) throws JMSException {
return session.createTextMessage("!!!test");
}
});
TimeUnit.SECONDS.sleep(3);
}
@When("processing is set to fail")
public void injectProcessingFailure() {
log.info(">>> When: processing is set to fail");
ThrowExceptionAspectBean exceptionInjector = (ThrowExceptionAspectBean)ac.getBean("throwExceptionAspectBean");
exceptionInjector.setExceptionFlag(true);
}
@Then("value is stored in database")
public void checkMessageStoredInDatabase() {
log.info(">>> Then: value is stored in database");
assertEquals(1, SimpleJdbcTestUtils.countRowsInTable(new SimpleJdbcTemplate(jdbcTemplate), "message"));
}

There are obviously a few other details that need to be worked out, but since this post has already grown too long - have a look at the complete code on GitHub: https://github.com/mstrejczek/dataintegrity.

Brak komentarzy:

Prześlij komentarz