Increase MySQL output to 80K rows/second in Pentaho Data Integration


One of our clients has a MySQL table with around 40M records. To load the table it took around 2,5 hours. When i was watching the statistics of the transformation I noticed that the bottleneck was the write to the database. I was stuck at around 2000 rows/second. You can imagine that it will take a long time to write 40M records at that speed.
I was looking in what way I could improve the speed. There were a couple of options:
  1. Tune MySQL for better performance on Inserts
  2. Use the MySQL Bulk loader step in PDI
  3. Write SQL statements to file with PDI and  read them with mysql-binary
When i discussed this with one of my contacts of Basis06 they faced a similar issue a while ago. He mentioned that speed can be boosted by using some simple JDBC-connection setting. useServerPrepStmts=false
rewriteBatchedStatements=true
useCompression=true

These options should be entered in PDI at the connection. Double click the connection go to Options and set these values.

Used together, useServerPrepStmts=false and rewriteBatchedStatements=true will “fake” batch inserts on the client. Specifically, the insert statements:

INSERT INTO t (c1,c2) VALUES ('One',1);
INSERT INTO t (c1,c2) VALUES ('Two',2);
INSERT INTO t (c1,c2) VALUES ('Three',3);

will be rewritten into:

INSERT INTO t (c1,c2) VALUES ('One',1),('Two',2),('Three',3);

The third option useCompression=true compresses the traffic between the client and the MySQL server.

Finally I increased the number of copies of the output step to 2 so that there are two treads inserting into the database.

This all together increased the speed to around 84.000 rows a second! WOW!

Parallelization jobs in Kettle – Pentaho Data Integration


Reblogged from http://spektom.blogspot.com.es/2014/02/parallelization-monster-framework-for.html

We always end up with ROFL in our team, when trying to find a name for strange looking ETL processes diagrams. This monster has no name yet:

Parallel kettle job

This is a parallelization framework for Pentaho Kettle 4.x. As you probably know in the upcoming version of Kettle (5.0) there’s native ability to launch job entries in parallel, but we haven’t got there yet.

In order to run a job in parallel, you have to call this abstract job, and provide it with 3 parameters:

  • Path to your job (which is supposed to run in parallel).
  • Number of threads (concurrency level).
  • Optional flag that says whether to wait for completion of all jobs or not.
Regarding the number of threads, as you can see the framework supports up to 8 threads, but it can be easily extended.
How this stuff works. “Thread #N” transformations are executed in parallel on all rows copies. Rows are split then, and filtered in these transformations by the given number of threads, so only a relevant portion of rows is passed to the needed job (Job – Thread #N). For example, if the original row set was:
           [“Apple”, “Banana”, “Orange”, “Lemon”, “Cucumber”]
and the concurrency level was 2, then the first job (Job – Thread #1) will get the [“Apple”, “Banana”, “Orange”] and the second job will get the rest: [“Lemon”, “Cucumber”]. All the other jobs will get an empty row set.
Finally, there’s a flag which tells whether we should wait until all jobs are completed.
I hope one will find attached transformations useful. And if not, at least help me find a name for the ETL diagram. Fish, maybe? 🙂

Book Review: Pentaho Data Integration Cookbook – Second Edition


Pentaho Data Integration Cookbook, Second Edition picks up where the first edition left off, by updating the recipes to the latest edition of PDI and diving into new topics such as working with Big Data and cloud sources, and more.

0674OS_ Pentaho Data Integration Cookbook (2nd edition).jpg
https://www.packtpub.com/pentaho-data-integration-cookbook-second-edition/book

Book review by: David Fombella Pombal (twitter: @pentaho_fan)

Book Title: Pentaho Data Integration Cookbook – Second Edition

Authors: Alex Meadows, Adrián Sergio Pulvirenti, María Carina Roldán

Paperback: 462 pages

I would like to suggest this useful book since it shows us how to take advantage of all the aspects of Kettle through a set of practical recipes organized to find quick solutions to our everyday needs. Although this books covers advanced topics, all recipes are explained step by step in order to help all type of readers.

Target Audience
If you are a software developer, data scientist, or anyone else looking for a tool that will help extract, transform, and load data as well as provide the tools to perform analytics and data cleansing, then this book is for you.

Rating: 9 out of 10

Chapter 1, Working with Databases – 15 recipes

This chapter shows us how to work with relational databases with Kettle.The recipes show us how to create and share database connections, perform typical database functions (select, insert, update, and delete), as well as more advanced tricks such as building and executing queries at ETL runtime. Remember that in Kettle you can connect to MySQL,Oracle, SQL Server, PostgreSQL, db2 …. and nearly all the database engines available.

Chapter 1Inserting new records when PK has to be generated based on previous values transformation

Chapter 2, Reading and Writing Files – 15 recipes

This topic not only shows us how to read and write files (csv, txt, excel …), but also how to work with semi-structured files, and read data from Amazon Web Services S3 instances.

Chapter 2Loading data into an AWS S3 Instance transformation

Chapter 3, Working with Big Data and Cloud Sources – 8 recipes

This third chapter covers how to load and read data from some of the many different NoSQL data sources (MongoDB, HBase, Hadoop …) as well as from Salesforce.com. I would like to remark the importance of this issue of the book due to the importance of Big Data techniques nowadays.

Chapter 3 Loading data into HBaseLoading data into HBase transformation

Chapter 4, Manipulating XML Structures – 10 recipes

This topic shows us how to read, write, and validate XML  files. Simple and complex XML structures are shown as well as more specialized formats such as RSS feeds. Even an HTML page is generated using XML and XSL transformations. You should read carefully this chapter if you are used to work loading,reading, updating or validating XML files.

Chapter 4Generating an HTML page using XML and XSL sources transformation

Chapter 5, File Management – 9 recipes

This chapter demonstrates how to copy, move, transfer, and encrypt files and directories. Here you will learn how to get data from remote FTP servers, zip files and encrypt files using OpenPGP standard.

Chapter 5Encrypting and decrypting files transformation

Chapter 6, Looking for Data – 8 recipes

This issue shows you how to search for information through various methods via databases, web services, files, and more. This chapter also shows you how to validate data with Kettle’s built-in validation steps. Besides, in last recipe you will learn how to validate data at runtime.

Chapter 6Validating data at runtime transformation

Chapter 7, Understanding and Optimizing Data Flows – 12 recipes

This chapter details how Kettle moves data through jobs and transformations and how to optimize data flows (Processing jobs in parallel, splitting a stream into 2 or more, comparing streams ….).

Chapter 7Run transformations in parallel job

Chapter 8, Executing and Re-using Jobs and Transformations – 9 recipes

This chapter shows us how to launch jobs and transformations in various ways through static or dynamic arguments and parameterization. Object-oriented transformations through subtransformations are also explained.Chapter 8

Moving the reusable part of a transformation to a sub-transformation (Mapping)

Chapter 9, Integrating Kettle and the Pentaho Suite – 6 recipes

This chapter works with some of the other tools in the Pentaho suite (BI Server, Report Designer) to show how combining tools provides even more capabilities and functionality for reporting, dashboards, and more. In this part of the book you will create Pentaho reports from PDI,  execute PDI transformations from BI Server and populating a dashboard with PDI.

Chapter 9Creating a Pentaho report directly from PDI transformation

Chapter 10, Getting the Most Out of Kettle – 9 recipes

This part works with some of the commonly needed features (e-mail and logging) as well as building sample data sets, and using Kettle to read meta information on jobs and transformations via files or Kettle’s database repository.

Chapter 10Programming custom functionality using Java code transformation

Chapter 11, Utilizing Visualization Tools in Kettle – 4 recipes

This chapter explains how to work with plugins and focuses on DataCleaner, AgileBI, and Instaview, an Enterprise feature that allows for fast analysis of data sources.

Chapter 11PDI Marketplace (Here you can install all plugins available)

Chapter 12, Data Analytics – 3 recipes

This part shows us how to work with the various analytical tools built into Kettle, focusing on statistics gathering steps and building datasets for Weka (Pentaho Data Mining tool), you will also read data from a SAS datafile.

Chapter 13Reading data from a sas file transformation

Appendix A, Data Structures, shows the different data structures used throughout the book.

App ASteelwheels database model structure

Appendix B, References, provides a list of books and other resources that will help you
connect with the rest of the Pentaho community and learn more about Kettle and the other
tools that are part of the Pentaho suite.

Book link:

https://www.packtpub.com/pentaho-data-integration-cookbook-second-edition/book

Community Data Validation ( Thanks to WebDetails & Pedro Alves for this Job)


Why ?

We need a way to do data validation

Use cases

Below are some of the use cases we want to tackle. Emphasized are the ones we think the current spec satisfies

  • Global
    • Is the server running?
    • Is the server running properly?
  • Connectivity
    • Do we have all the access we should? (network / database)
  • Query specific
    • Do we have up to date data?
    • Can we trust the data?
    • How long did the queries take to run?
    • Do we have wrong data? (duplicated users in community)
    • Do we have a big number of ‘unknowns’? (tk=1 in DW)
    • Do we have peaks or valleys in the data? (due to double process or no process)
    • Is the data stalled? (eg: number of twitter followers not updating)
    • Did the data format change
    • We need a way to handle known effects (eg: Christmas dip)
    • We need to correlate independent datasources (eg: comparing AUS with Blocklist evolution)
    • We need to connect the long running queries from CDA to CDV
    • Be able to validate big chunks of reprocessing
    • Do we have clearly wrong lines in resultset? (eg: a line there)
  • Dashboards
    • Are the dashboards rendering properly?
      • Do we have all the components?
      • Any layout change?
      • Any js errors?
    • Are the dashboards performing properly?
    • Can CDF talk with CDV to report client-side errors?
    • Alternatively, can CDA talk with CDV to report query errors?
      • Who caused the error?
  • ETL
    • Did the etl run?
    • Are we processing the expected amount of data?
    • Is the etl taking the expected time to run?
    • Did the etl finish before X am?
    • Test etl against tracer bullets?

Work flow

We expect from this system:

  • A central dashboard that allows us to quickly glimpse the overall status of our system.
    • Did all tests pass?
    • Which one failed?
    • Why?
    • When was the last time the tests ran
  • We need simple ways to define the tests (based on existing CDAs)
  • We need to which queries failed and which queries took long time to run
  • We need push notification system by email
  • We need to make sure it can talk to nagios
  • We need an outside test to check if server is up

Logging types

Every test will result in the following levels:

  • Critical
  • Error
  • Warn
  • Ok

Each specific test will be responsible for converting the output of that test (validation function for cda, tbd for kettle) into that status. The object format is:

{
 level: "Critical",
 type: "Missing data",
 description: "Whatever string the user defined" 
}

On each test definition, we need to be able to optionally set a timing threshold for the queries, and that will automatically generate a log with Type ‘Duration’

Test types

There are 4 possible types of tests:

  • CDA based query validation
  • ETL monitoring
  • Datawarehouse validation (a specific set of the cda based query validation)
  • Dashboard validation (we may opt to leave this one out for now as we’ll try to infer the errors from CDA’s 405)

CDA based query

Workflow

We want to select one or more cda / dataAccessId from our system, define the input parameters and select the type of validation we need.
The shape of the function will be: f( [ query, [params] ], validationFunction )
The generic test will be the implementation of the validation function:

validationFunction = function ( [ {metadata: [] , resultset: [[]]} ] ) :  value

That will be freely mapped to the log outputs

ETL monitoring query

The workflow defined here has to match with the previous section. We’ll build specific CDA queries that will read the kettle log files. From that point on, specific validations will have to be built for this logs
We’ll need, in pentaho, to define which connection refers to the kettle logging tables. Either by defining a special jndi or specifying in the settings.
We’ll need to test for:

  • Time
  • Start /end time
  • Amount of data processed

Datawarehouse schema validation

There are some specific tests we can do on the sanity of a datawarehouse.

  • Coherent amount of data on a daily / hourly basis
  • Test the same as before with specific breakdowns
  • Test for the amount of ‘unknowns’ on dimensions

Invocation and Scheduling

There are 2 ways to call the validations:

  • By url request
  • Scheduled calls

Url will be based on the id / query name (tbd). The schedule calls will be cron based, with the following presets:

  • Every hour
  • Every day
  • Every week
  • Every month
  • Custom cron

User interface

This are the features in the main user interface (this is the ultimate goal, the implementation may be broken into stages):

  • See existing validations
    • Allow to fire a specific validation
    • Get the url of a specific validation / all validations
  • Create / Edit validation
    • Define query name
    • Define queries and parameters
    • Define validation function
    • Chose log alerts (when to throw error / severe / warn / ok)
    • Choose duration thresholds
    • Define error message
    • Define cron
  • Validation status dashboard
  • CDA Query error dashboard (Should this belong to CDA instead?)
    • Query and parameters
    • Error
    • Incidents
  • Duration dashboard to identify slow points in the system
    • Query and parameters
    • Duration
    • Incidents

Technical approach

All the specific information will be stored in solution/cdv/queries/). The files will have the format _queryName.cdv and will internally be a JSON file with the following structure:

{
  type: "query",
  name: "validationName",
  group: "MyGrouping" 
  validation: [ 
    { cdaFile: "/solution/cda/test.cda", dataAccessId: "1" , parameters: [...] },
    { cdaFile: "/solution/cda/test2.cda", dataAccessId: "2" , parameters: [...] }
  ],
  validationType: "custom",
  validationFunction: "function(resultArray,arguments){ return 123 }",
  alerts: {
     /* This functions will be executed from bottom up. As the functions return true, the next one
        will be executed and the last matching level will be thrown.
        The exception to this rule is the optional okAlert(v) function. If this one returns true, no other calls will be made 
     */
     criticalAlert: "function(v){ return v > 10 }",
     errorAlert: undefined,
     warnAlert:  "function(v){ return v > 5 }",
     okAlert: "function(v){return v<3;}",
     alertType: "MissingData",
     alertMessage: "function(level,v){return 'My error message: ' + v)"  /* this can either be a function or a string */
  },
  executionTimeValidation: {
      expected: 5000,
      warnPercentage: 0.30,
      errorPercentage: 0.70,
      errorOnLow: true
  },

  cron: "0 2 * * ? *" 
}

Preset validations

We won’t need to manually define all kinds of validations. CDV will support a preset that can also be extended by adding the definitions to solution/cdv/validationFunctions/ . The template for one such Javascript file looks like this:

wd.cdv.validation.register({
  name: "Existence",
  validationArguments: [
    {name: "testAll", type:"boolean", default: true}
  ],

  validationFunction: function(rs, conf) {
    var exists = !!conf.testAll;

    return rs.map(function(r){return r.length > 0}).reduce(function(prev, curr){
      return conf.testAll ? (curr && prev) : (curr || prev);
    });
  },

  alertArguments: {
    {name: "failOnExistence" type: "boolean", default: true},
    {name: "failureLevel", type: "alarmLevel", default: "ERROR"},
    {name: "failureMessage", type: "String", default: "Failed Existence Test: ${result}"}
  },

  alertMapper: function(result, conf) {
    var success = conf.failOnExistence && result,
        level = success ? "OK", conf.failureLevel,
        message = success ? conf.successMessage : conf.failureMessage; 
    return Alarm(level, message, result);
  }
});

The wd.cdb.validation API is defined in the Validation Module.
There are 5 objects there that we need to analyze:

  • validationFunction(rs, conf) – This is the validation function that will be executed after the query runs
  • validationArguments – Definition of the arguments that will be used within the validation function
  • alertArguments – Definition of the arguments that will be sent to the alertMapper
  • alertMapper(result, conf) – Mapping between the validation result and the alerts

Preset validations or custom validations

When we define a query, we can chose which validation function to use and pass the parameters that specific validation requires.
Alternatively, we can use a custom validation function. That validation function has the following format, where all we need is to return the alarm level (this is a spec, may change after implementation)

function(rs, conf) {

    var exists = rs.map(function(r){return r.length > 0}).reduce(function(prev, curr){
      return conf.testAll ? (curr && prev) : (curr || prev);
    });

    return exists ? Alarm.ERROR : Alarm.OK;
}

CDA integration

We need a tight integration between CDA and CDV to report:

  • Errors in CDA queries
  • Long running CDA queries
  • Queries with obvious errors in the structure (eg: missing lines)

It will obviously need to take into account the fact that CDV may not be installed and can’t have performance impacts in CDA

External interfaces

We can have several external interfaces supported:

  • Email
  • Http
  • Nagios integration
  • Server up check

The last one is a very specific check. All the other integrations will fail if suddenly the server hangs, and we must be notified of that. On http and nagios integration, we’ll be able to get reports on the individual tests and also on the test groups. This will not rerun the tests but get the report on the last status of a test.
On the http case, we can pass a flat to force a test to be rerun.
For nagios, we can have an export of test rules

Settings

We’ll be able to define the group rules, mainly for connectivity reasons. So the settings (that later can be converted to an UI), will look like this:

  • Groups
    • Email
      • Threshold
      • Destinations
    • SMS
      • Threshold
      • Destinations

Improving Performance with Pentaho Data Integration (Kettle) Table Input Step and Oracle Databases


When using Pentaho Data Integration  (a.k.a. Kettle)  Table Input step to connect to Oracle via a JDBC connection there is a setting in your connection information that you can specify that can dramatically improve your performance in retrieving data. This property is the defaultRowprefetch. Oracle JDBC drivers allow you to set the number of rows to prefetch from the server while the result set is being populated during a query. Prefetching row data into the client reduces the number of round trips to the server. The default value for this property is 10.

In the table input step, edit your connection, click on the options tab and then enter in your defaultRowprefetch specification: