3/18/13

Pentaho Kettle ETL regression with Spring

The real-life scenario is a Java application with an ETL process to load some data. For example, update the list of names in a phone directory book. With a pretty naive approach, automated regression tests can be introduced quite easily. Here's a demo.

Overview

The application itself makes use of:
  • Spring Framework 3.0,
  • Hibernate 3.6,
  • Pentaho Kettle ETL 4.4.

The model is

  • two entities: Person and PhoneNumber,
  • two DAOs: PersonDao, PhoneNumberDao,
  • ETL loads a CSV file and updates the PERSON table,
  • a wrapper around Kettle Transformation/Job API,

Testing dependencies

  • Spring-test 3.0,
  • DbUnit 2.2,
  • JUnit 4.4,
  • HSQLDB 2.2,
  • Unitils 2.4.

General ideas

  • underlying database is HSQLDB instead of an actual Oracle instance,
    • for Kettle ETL tests, the database is temporarily switched to Oracle compatibility mode - we can use MySQL, PostgreSQL and other compatibility modes,
    • we can also test against a real Oracle DB - in this case, any Oracle-specific functionality can be used. I prefer a compromise with HSQLDB.
  • db schema is created with hbm2ddl,
  • db test data is loaded with DbUnit XML files (tricky!)
  • the actual testing happens inside a simple JUnit4 / Spring Integration test case:
    • first, we execute the ETL transformation,
    • next, we test the results with our Hibernate-based API.

The starting point

We already have a piece of code which allows us to execute the ETL process programmatically:

KettleAdapterImpl.java

public class KettleAdapterImpl implements KettleAdapter {

    private DataSource dataSource;
    private String repositoryPath;

    public KettleAdapterImpl() throws KettleException {
        KettleEnvironment.init(false);
    }

    @Override
    public int executeTransformation(String s, Map<String, String> parameters) {
        try {
            DataSourceProviderFactory.setDataSourceProviderInterface(new DataSourceProviderInterface() {
                @Override
                public DataSource getNamedDataSource(String s) throws DataSourceNamingException {
                    return dataSource;
                }
            });

            KettleFileRepositoryMeta repositoryMeta = new KettleFileRepositoryMeta("id", "name", "repo", ResourceUtils.getFile(repositoryPath).getAbsolutePath());
            KettleFileRepository repository = new KettleFileRepository();
            repository.setRepositoryMeta(repositoryMeta);
            TransMeta transMeta = new TransMeta(ResourceUtils.getFile(repositoryPath + "/" + s).getAbsolutePath(), repository);

            for (Map.Entry<String, String> entry : parameters.entrySet()) {
                transMeta.setParameterValue(entry.getKey(), entry.getValue());
            }

            Trans trans = new Trans(transMeta);
            trans.execute(new String[]{});
            trans.waitUntilFinished();
            int result = trans.getErrors();
            return result;
        } catch (KettleException e) {
            throw new RuntimeException(e);
        } catch (FileNotFoundException e) {
            throw new RuntimeException(e);
        }
    }

    public void setDataSource(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    public void setRepositoryPath(String repositoryPath) {
        this.repositoryPath = repositoryPath;
    }
}

Test configuration, step by step

And now to the configuration details.

Spring integration tests

We'll use some neat features of Unitils, which will speed up test development and maintenance. First, a test for PersonDao:

PersonDaoImpl.java

public class PersonDaoImpl extends HibernateDaoSupport implements PersonDao {
    @Override
    public Integer save(Person person) {
        Serializable save = getHibernateTemplate().save(person);
        getHibernateTemplate().flush();
        return (Integer)save;
    }

    @Override
    public Person get(Integer id) {
        return getHibernateTemplate().get(Person.class, id);
    }

    @Override
    public List<Person> getAll() {
        return getHibernateTemplate().findByNamedQuery("person.loadAll");
    }
}

PersonDaoImplTest.java

@SpringApplicationContext({"classpath:/application-dao.xml", "classpath:/application-test-datasource.xml"})
@DataSet("PersonDaoImplTest.xml")
public class PersonDaoImplTest extends UnitilsJUnit4 {

    @SpringBeanByName
    private PersonDao personDao;

    @SpringBeanByName
    private PhoneNumberDao phoneNumberDao;

    @Test
    public void testPersonDaoImpl() throws Exception {
        Person person = new Person();
        person.setFirstName("Steve");
        PhoneNumber phoneNumber = new PhoneNumber();
        phoneNumber.setNumber("01234");
        person.setPhoneNumbers(Sets.newHashSet(phoneNumber));
        phoneNumberDao.save(phoneNumber);
        personDao.save(person);

        Assert.assertTrue("new id above 1000", person.getId().intValue() >= 1000);
        Assert.assertEquals("all 3", 3, personDao.getAll().size());
    }

}

Testing with HSQLDB

I think it's easier to maintain tests with minimum external dependencies - hence replacing Oracle with in-memory HSQLDB. At some point, you may end up with Oracle, anyway, but until you don't have to do that, HSQLDB will be easier.

Maven

<dependency>
    <groupId>org.hsqldb</groupId>
    <artifactId>hsqldb</artifactId>
    <version>2.2.9</version>
    <scope>test</scope>
</dependency>

Spring configuration

A separate file - test datasource configuration:

application-test-datasource.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">

    <bean id="myDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="org.hsqldb.jdbcDriver"/>
        <property name="url" value="jdbc:hsqldb:mem:aname;hsqldb.tx=mvcc"/>
        <property name="username" value="sa"/>
        <property name="password" value=""/>
    </bean>

</beans> 

Hack #1

The MVCC parameter will be required by Kettle - the tests tend to deadlock without this parameter.

Unitils

unitils.properties

database.driverClassName=org.hsqldb.jdbcDriver
database.url=jdbc:hsqldb:mem:aname;hsqldb.tx=mvcc
database.userName=sa
database.password=
database.dialect=hsqldb
database.schemaNames=PUBLIC
DbUnitModule.DataSet.loadStrategy.default=pl.erizo.random.kettle.regression.util.CleanAllInsertLoadStrategy 

DbUnit

Necessary for pre-loading some data for the tests. Easy and fast, but not problem-free:

Hack #2

DbUnit 2.2 has issues with deleting data from multiple tables - by default, it tries to clean only tables mentioned in the appropriate dataset. So, we're going to always clear all tables:

CleanAllInsertLoadStrategy.java

public class CleanAllInsertLoadStrategy extends BaseDataSetLoadStrategy {

    @Override
    public void doExecute(DbUnitDatabaseConnection dbUnitDatabaseConnection, IDataSet dataSet) throws DatabaseUnitException, SQLException {
        try {
            InputStream xmlStream = ResourceUtils.getURL("classpath:AllTables.xml").openStream();
            FlatXmlDataSet allTablesDataSet = new FlatXmlDataSet(xmlStream);
            DatabaseOperation.DELETE_ALL.execute(dbUnitDatabaseConnection, allTablesDataSet);
            DatabaseOperation.INSERT.execute(dbUnitDatabaseConnection, dataSet);
            dbUnitDatabaseConnection.getConnection().commit();
        } catch (IOException e) {
            throw new DatabaseUnitException("Could not read AllTables.xml", e);
        }
    }
}
And the list of tables:

AllTables.xml

<dataset>
    <PERSON />
    <PHONE_NUMBER />
</dataset> 

One last thing - Hack #3

Mixing up DbUnit datasets, Hibernate, ETL and DB sequences may cause trouble: while loading the datasets, DbUnit does not increment sequences. But this can be fixed with Hibernate SessionFactory configuration:

application-dao.xml

    <bean id="mySessionFactory" class="org.springframework.orm.hibernate3.LocalSessionFactoryBean">
        <property name="dataSource" ref="myDataSource"/>
        <property name="mappingResources">
            <list>
                <value>pl/erizo/random/kettle/regression/model/Person.hbm.xml</value>
                <value>pl/erizo/random/kettle/regression/model/PhoneNumber.hbm.xml</value>
            </list>
        </property>
        <property name="hibernateProperties">
            <value>
                hibernate.dialect=org.hibernate.dialect.HSQLDialect
                hibernate.hbm2ddl.auto=create
                hibernate.show_sql=true
                hibernate.hbm2ddl.import_files=/sequences.sql
            </value>
        </property>
    </bean>
And, situated in src/test/resources:

sequences.sql

-- increment to 1000
alter sequence PER_ID_SEQ restart with 1000;

Let's test

KettleAdapterTest.java

@SpringApplicationContext({"classpath:/application-dao.xml", "classpath:/application-test-datasource.xml"})
@DataSet("KettleAdapterTest.xml")
public class KettleAdapterTest extends UnitilsJUnit4 {

    @SpringBeanByName
    private KettleAdapter kettleAdapter;

    @SpringBeanByName
    private PersonDao personDao;

    @Before
    public void setUp() throws Exception {
        SQLUnitils.executeUpdate("SET DATABASE SQL SYNTAX ORA TRUE", DatabaseUnitils.getDataSource());
    }

    @Test
    public void testExecute() throws Exception {
        List<Person> everybody = personDao.getAll();
        Assert.assertEquals("2 results", 2, everybody.size());
         HashMap<String, String> map = Maps.newHashMap();
        map.put("input.file", ResourceUtils.getFile("classpath:pl/erizo/random/kettle/regression/phonenumbers.txt").getAbsolutePath());
        int errors = kettleAdapter.executeTransformation("LoadNumbers.ktr", map);
        Assert.assertEquals("no errors", 0, errors);
        everybody = personDao.getAll();
        ReflectionAssert.assertPropertyLenientEquals("last names ordered", "lastName",
        Lists.newArrayList("Gates", "Newman", "Smith"), everybody);
        Assert.assertEquals("3 results", 3, everybody.size());
    }

    @After
    public void tearDown() throws Exception {
        SQLUnitils.executeUpdate("SET DATABASE SQL SYNTAX ORA FALSE", DatabaseUnitils.getDataSource());
    }
}

KettleAdapterTest.xml

<dataset>
    <PERSON PER_ID="1" PER_FIRST_NAME="John" PER_LAST_NAME="Smith" />
    <PERSON PER_ID="2" PER_FIRST_NAME="Bill" PER_LAST_NAME="Gates" />
</dataset>

phonenumbers.txt

John,Smith,001
Paul,Newman,002

Some more reading