- Physical vs logical integrity
- ACID (strong) consistency
- BASE (weak) consistency
- Synchronous vs asynchronous replication
- Distributed systems limitations - CAP theorem
- Examples of data consistency violation (like Photo Privacy Violation or Double Money Withdrawal described in "Don't settle for eventual consistency" article)
- Strong consistency comes with performance penalty. Choosing performance and availability over consistency might be justified and lead to improved revenues (as is the case with Amazon) or lead to spectacular failures like in case of Flexcoin
Local vs distributed transactions
The second part of the presentation was slightly different, though. It included a live demonstration of a situation where local transactions are not sufficient to guarantee data consistency across multiple resources and how distributed transactions come to the rescue. The demonstration was done basing on the scenario below:
The application consumes messages from a JMS queue (TEST.QUEUE), stores message content in a database, does some processing inside VeryUsefulBean and finally sends a message to another JMS queue (OUT.QUEUE).
The application was a web application deployed on JBoss EAP 6.2. JMS broker functionality was provided by ActiveMQ and MySQL acted as a database. Web application logic was built with Spring and Apache Camel.
So let's assume that the processing inside VeryUsefulBean fails:
The exception was injected using JBoss Byteman rule:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
RULE Simulate processing failure | |
CLASS pl.marekstrejczek.jmsconsumer.service.VeryUsefulBean | |
METHOD doHeavyProcessing | |
AT ENTRY | |
IF TRUE | |
DO throw new RuntimeException("Byteman trick!") | |
ENDRULE |
As expected with local transactions system state was inconsistent after processing failure. The expected state would be:
Basically one would expect that system state would not change due to processing failure. However the actual behaviour was:
Then the experiment was repeated with JTA transaction manager and XA resources set up. The outcome was correct this time - no data was saved to database. JMS message consumption and all processing including database inserts was handled as part of the same distributed transaction and all changes were rolled back upon failure as expected.
First test cases were defined as JBehave scenarios:
JUnit was used to execute the scenarios:
Spring context for the web application was splitted into two parts:
While the core context could and had to be reused during test execution, the infrastructure context made sense only when the application was deployed on a real JEE server, as it retrieved necessary resources from JNDI - see below.
Therefore the infrastructure part of the context had to be rewritten for automated test execution:
Note beans with id amq-broker and hsqldbServer - they are responsible for starting embedded JMS broker and DB server needed during test execution.
Having the infrastructure in place, it is quite simple to write test steps defined in JBehave scenarios, e.g.:
There are obviously a few other details that need to be worked out, but since this post has already grown too long - have a look at the complete code on GitHub: https://github.com/mstrejczek/dataintegrity.
- Incoming message not lost
- No data saved in database
- Nothing sent to outbound queue (OUT.QUEUE).
Basically one would expect that system state would not change due to processing failure. However the actual behaviour was:
- Incoming message not lost
- Data saved to DB (as many times as message was (re)delivered).
- Nothing sent to outbound queue (OUT.QUEUE).
Then the experiment was repeated with JTA transaction manager and XA resources set up. The outcome was correct this time - no data was saved to database. JMS message consumption and all processing including database inserts was handled as part of the same distributed transaction and all changes were rolled back upon failure as expected.
Automated integration test
The test proved that application worked correctly with JTA transaction manager and XA resources (XA connection factory for JMS, XA data source for JDBC), however the test was manual and time consuming. Ideally this behaviour would be tested automatically, and this was the topic of the final part of the presentation. We did a walk through an integration test that verified transactional behaviour automatically.First test cases were defined as JBehave scenarios:
JUnit was used to execute the scenarios:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@RunWith(SpringAnnotatedEmbedderRunner.class) | |
@Configure( pendingStepStrategy = FailingUponPendingStep.class, failureStrategy = RethrowingFailure.class, storyReporterBuilder = MyStoryReporterBuilder.class) | |
@UsingEmbedder(embedder = Embedder.class, verboseFailures = true, generateViewAfterStories = true, ignoreFailureInStories = false, ignoreFailureInView = true) | |
@UsingSpring(resources = { "spring-test-context.xml" }, ignoreContextFailure = false) | |
public class IntegrityTest extends InjectableEmbedder { | |
@Test | |
public void run() throws URISyntaxException { | |
injectedEmbedder().runStoriesAsPaths(storyPaths()); | |
} | |
protected List<String> storyPaths() throws URISyntaxException { | |
URL storyLocation = this.getClass().getClassLoader().getResource("stories/integrity.story"); | |
File storyFile = new File(storyLocation.toURI()); | |
String storyDirectory = storyFile.getParentFile().getParent(); | |
return new StoryFinder().findPaths(storyDirectory, asList("**/*.story"), null); | |
} | |
} |
Spring context for the web application was splitted into two parts:
- spring-jmsconsumer-core.xml - contained beans with application logic and definition of Camel context.
- spring-jmsconsumer-infrastructure.xml - contained beans used to access external resources, like JMS connection factory or JDBC data source.
- ActiveMQ - replaced by embedded ActiveMQ.
- Arjuna Transaction Manager provided by JBoss EAP - replaced by standalone Atomikos.
- MySQL - replaced by embedded HSQLDB.
While the core context could and had to be reused during test execution, the infrastructure context made sense only when the application was deployed on a real JEE server, as it retrieved necessary resources from JNDI - see below.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<beans xmlns="http://www.springframework.org/schema/beans" | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xmlns:amq="http://activemq.apache.org/schema/core" | |
xmlns:context="http://www.springframework.org/schema/context" | |
xmlns:jee="http://www.springframework.org/schema/jee" | |
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd | |
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd | |
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee.xsd | |
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> | |
<bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager" /> | |
<jee:jndi-lookup id="xaJmsConnectionFactory" jndi-name="java:/ActiveMQ/XAConnectionFactory" /> | |
<jee:jndi-lookup id="xaDataSource" jndi-name="java:jboss/datasources/MysqlXADS" /> | |
</beans> |
Therefore the infrastructure part of the context had to be rewritten for automated test execution:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<beans xmlns="http://www.springframework.org/schema/beans" | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xmlns:amq="http://activemq.apache.org/schema/core" | |
xmlns:context="http://www.springframework.org/schema/context" | |
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd | |
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd | |
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> | |
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" | |
init-method="init" destroy-method="close"> | |
<property name="forceShutdown" value="false" /> | |
</bean> | |
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp"> | |
<property name="transactionTimeout" value="300" /> | |
</bean> | |
<bean id="jtaTransactionManager" | |
class="org.springframework.transaction.jta.JtaTransactionManager"> | |
<property name="transactionManager" ref="atomikosTransactionManager" /> | |
<property name="userTransaction" ref="atomikosUserTransaction" /> | |
</bean> | |
<!-- Set up XA-compliant JMS connection factories --> | |
<amq:broker id="amq-broker"> | |
<amq:transportConnectors> | |
<amq:transportConnector uri="tcp://localhost:62616?deleteAllMessagesOnStartup=true" /> | |
</amq:transportConnectors> | |
</amq:broker> | |
<amq:connectionFactory id="jmsConnectionFactory" brokerURL="tcp://localhost:62616" > | |
<amq:properties> | |
<amq:redeliveryPolicy maximumRedeliveries="1"/> | |
</amq:properties> | |
</amq:connectionFactory> | |
<amq:xaConnectionFactory id="amqXaJmsConnectionFactory" brokerURL="tcp://localhost:62616" > | |
<amq:properties> | |
<amq:redeliveryPolicy maximumRedeliveries="1"/> | |
</amq:properties> | |
</amq:xaConnectionFactory> | |
<bean id="xaJmsConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" | |
init-method="init" destroy-method="close" depends-on="amq-broker"> | |
<property name="uniqueResourceName" value="XA-JMS-ATOMIKOS" /> | |
<property name="localTransactionMode" value="false" /> | |
<property name="poolSize" value="4" /> | |
<property name="xaConnectionFactory" ref="amqXaJmsConnectionFactory" /> | |
</bean> | |
<!-- Set up XA-compliant HSQLDB data source --> | |
<bean id="hsqldbServer" class="pl.marekstrejczek.jmsconsumer.hsqldb.MyHsqlServer" init-method="start" destroy-method="stop"> | |
<constructor-arg index="0" value="test-db"/> | |
<constructor-arg index="1" value="mem:test-db"/> | |
<property name="noSystemExit" value="true"/> | |
</bean> | |
<bean id="hsqldbXADataSource" class="org.hsqldb.jdbc.pool.JDBCXADataSource" | |
lazy-init="true" depends-on="hsqldbServer"> | |
<property name="user" value="sa" /> | |
<property name="password" value="" /> | |
<property name="url" value="jdbc:hsqldb:mem:test-db" /> | |
</bean> | |
<bean id="dataSource" class="org.hsqldb.jdbc.JDBCDataSource" | |
lazy-init="true" depends-on="hsqldbServer"> | |
<property name="user" value="sa" /> | |
<property name="password" value="" /> | |
<property name="url" value="jdbc:hsqldb:mem:test-db" /> | |
</bean> | |
<bean id="xaDataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean" | |
init-method="init" destroy-method="close"> | |
<property name="uniqueResourceName" value="XA-DB-ATOMIKOS" /> | |
<property name="maxPoolSize" value="3" /> | |
<property name="minPoolSize" value="1" /> | |
<property name="maxIdleTime" value="150" /> | |
<property name="reapTimeout" value="5" /> | |
<property name="testQuery" value="select 1 as x from information_schema.tables where 1=0" /> | |
<property name="xaDataSource" ref="hsqldbXADataSource" /> | |
</bean> | |
</beans> |
Note beans with id amq-broker and hsqldbServer - they are responsible for starting embedded JMS broker and DB server needed during test execution.
Having the infrastructure in place, it is quite simple to write test steps defined in JBehave scenarios, e.g.:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@When("I put a message on the queue") | |
public void putMessageOnQueue() throws InterruptedException { | |
log.info(">>> When: I put a message on the queue"); | |
jmsTemplate.send("TEST.QUEUE", new MessageCreator() { | |
@Override | |
public Message createMessage(final Session session) throws JMSException { | |
return session.createTextMessage("!!!test"); | |
} | |
}); | |
TimeUnit.SECONDS.sleep(3); | |
} | |
@When("processing is set to fail") | |
public void injectProcessingFailure() { | |
log.info(">>> When: processing is set to fail"); | |
ThrowExceptionAspectBean exceptionInjector = (ThrowExceptionAspectBean)ac.getBean("throwExceptionAspectBean"); | |
exceptionInjector.setExceptionFlag(true); | |
} | |
@Then("value is stored in database") | |
public void checkMessageStoredInDatabase() { | |
log.info(">>> Then: value is stored in database"); | |
assertEquals(1, SimpleJdbcTestUtils.countRowsInTable(new SimpleJdbcTemplate(jdbcTemplate), "message")); | |
} |
There are obviously a few other details that need to be worked out, but since this post has already grown too long - have a look at the complete code on GitHub: https://github.com/mstrejczek/dataintegrity.