Engineering

Enabling Change Data Capture on Apache Beam with DebeziumIO

Enabling Change Data Capture on Apache Beam with DebeziumIO

Decades ago, visionaries said: “The future is in the data.” Today we know that nothing was more accurate, as cloud computing’s cost-effectiveness and power have made realizing the value of data a reality.

In more recent times, the industry has seen a boom of tools, products, and frameworks to make data movement easier. Some of these tools are oriented toward change data capture (CDC). There are a vast number of solutions available, from open source tools to commercial end-to-end platforms.

One of these tools is Debezium, an open-source piece of code used to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred.

On the other hand, Apache Beam is an open-source, unified model for defining both batch and streaming data-parallel processing pipelines. One of the best things about Beam is that you can use the language (supported) and runner of your choice, like Apache Flink, Apache Spark, or Cloud Dataflow.

By converging both technologies, data engineers can expect tremendous benefits when building pipelines for replication, data enrichment, or more sophisticated scenarios where CDC is the best practice. Google invited Wizeline to collaborate on the Apache Beam project to expand its connectivity capabilities, adding Debezium connector support to Beam’s SDK. This article will demonstrate our solution and how easy and convenient it is to use Debezium in your Apache Beam pipelines.

Debezium + Beam: a perfect couple 

So, how do these two technologies come together? Well, I said that Debezium helps you capture changes on your source databases, but what I didn’t mention is that the data is not only captured but Debezium also by design delivers those CDC events as SourceRecords to the “preferable” Kafka topic. This is how Debezium connectors work in conjunction with Kafka.

Now the question is: Where does Beam fit in the architecture? You would usually place Beam at the right end of this architecture, reading from Kafka. If Beam can already read CDC events from Kafka, why do we need something new?

Before responding to that question, you need to understand the basics of Beam’s architecture. In short, Beam lets you easily create data pipelines by connecting what they call “transformations.” Each transformation delivers a “collection” of something — let’s say “records” — to the next transformation, then the last transformation does the same, and so on. The following diagram shows this process:

 

I invite you to take another look at the diagram above. What do you see? There are two particular transformations, the Read transform, and the Write transform. The Read transform is the one responsible for getting the data into the pipeline. Then, what if you could use the Debezium connector directly from your Beam pipeline with no need to set up and deploy any Debezium or Kafka clusters? Well, that’s precisely what DebeziumIO does.

Look, DebeziumIO is nothing but a Read transform that “wraps” a given Debezium connector class. It also defines a KafkaSourceConsumerFn, which extends from a regular DoFn<String,String>. This consumer class is responsible for instantiating the Debezium connector and starting the SourceTask that does the magic. If you want to see the exact details of Beam and Debezium, take a look at their GitHub repositories here and here.

How does DebeziumIO work?

Before starting the pipeline creation, you need a source Database to read from. Fortunately, Debezium has an example MySql database ready to use, which you can find here. All the details to configure your database correctly can be found on the Debezium website. For now, let’s run a docker container on your localhost with the following command.

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.4

Be sure port 3306 is not being used by another process.

Now it’s time to create a java-maven project. Set up some dependencies in your pipeline project. In your pom.xml file (in this case, we are using Maven), make sure you include the Kafka and Debezium dependencies. Let’s say we want to replicate our OnPrem MySql database to the cloud, and for this article, BigQuery; since the pom.xml can be quite long, here is a code snippet of some dependencies you should not forget.

       <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-core</artifactId>
            <version>${debezium-core.version}</version>
     </dependency>
     <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-connector-mysql</artifactId>
            <version>${debezium-connector-mysql.version}</version>
     </dependency>

Notice the debezium-connector-mysql dependency because this might be different depending on what database you want to connect to. For a list of supported databases, visit the Debezium website

Now let’s create our pipeline, starting with the shell, a public class with a static main function, just like the following example:

public class SampleCdcMySqlToBigQuery {
private static final Logger LOG = LoggerFactory.getLogger(SampleCdcMySqlToBigQuery.class);
public static void main(String[] args) {
CdcPipelineOptions options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(CdcPipelineOptions.class);
run(options);
}
private static PipelineResult run(CdcPipelineOptions options) {
// Create the pipeline with options
Pipeline pipeline = Pipeline.create(options);
// Pipeline transforms goes here
return pipeline.run();
}

If you want to pass some options to your pipeline taken from the arguments passed to it at execution time, you then create a class that extends from PipelineOptions; in the code above it is called CdcPipelineOptions, and within it you define the options you want to read from the program arguments. Take a look at the CdcPipelineOptions class:

public interface CdcPipelineOptions extends PipelineOptions {
    @Description(
        "The JDBC connection Hostname string.")
    ValueProvider<String> getHostname();
    void setHostname(ValueProvider<String> hostname);
    @Description(
        "The JDBC connection Port string.")
    ValueProvider<String> getPort();
    void setPort(ValueProvider<String> port);
    @Description("JDBC connection user name. ")
    ValueProvider<String> getUsername();
    void setUsername(ValueProvider<String> username);
    @Description("JDBC connection password. ")
    ValueProvider<String> getPassword();
    void setPassword(ValueProvider<String> password);
    @Description("Output topic to write to")
    ValueProvider<String> getOutputTable();
    void setOutputTable(ValueProvider<String> value);
    @Validation.Required
    @Description("Temporary directory for BigQuery loading process")
    ValueProvider<String> getBigQueryLoadingTemporaryDirectory();
    void setBigQueryLoadingTemporaryDirectory(ValueProvider<String> directory);
}

Notice that this options class extends from PipelineOptions, which accepts a bunch of other standard options. For now, we have defined the following options:

  • Hostname: The host to connect to, in this case, your MySql instance running in your localhost (127.0.0.1).
  • Port: The database port, in this case, 3306.
  • Username: mysqluser (you must use this same user)
  • Password: mysqlpw
  • OutputTable: Use a valid unused BigQuery table name using the following format: <<project_id>>:<<dataset>>.<<table_name>&gt
  • BigQueryLoadingTemporaryDirectory: Existing and valid bucket, such as: gs://<<bucket_name>>/temp/

Before continuing with the pipeline creation, let’s explore the pipeline design. The following diagram shows the steps that will conform our pipeline:

The pipeline has three steps:

  1. DebeziumIO (Read transform) reads/captures the CDC events. At the time of this article’s publication, DebeziumIO converts the Kafka SourceRecord CDC event into a JSON element.
  2. The JSON to TableRow transformer will take that CDC event as JSON and transform it into a BigQuery’s TableRow collection.
  3. Finally, the Write transform called “BigQuery” will push out the resultant rows to Google BigQuery into the specified outputTable.

Once the pipeline has been designed, it’s time to code it. Let’s add content to this shell class. Inside the run(CdcPipelineOptions options) function, and just after the pipeline instantiation, paste the following code:

// Step 1: Configure Debezium connector
        pipeline
            .apply(
                "Read from DebeziumIO",
                DebeziumIO.<String>read()
                    .withConnectorConfiguration(
                        DebeziumIO.ConnectorConfiguration.create()
                            .withUsername(options.getUsername())
                            .withPassword(options.getPassword())
                            .withHostName(options.getHostname())
                            .withPort(options.getPort())
                            .withConnectorClass(MySqlConnector.class)
                            .withConnectionProperty("database.server.name", "dbserver1")
                            .withConnectionProperty("database.include.list", "inventory")
                            .withConnectionProperty("include.schema.changes", "false")
                            .withConnectionProperty("table.include.list", "inventory.customers")
                            .withConnectionProperty("connect.keep.alive", "false")
                            .withConnectionProperty("connect.keep.alive.interval.ms", "200")
                    )
                    .withFormatFunction(new SourceRecordJson.SourceRecordJsonMapper())
                    .withCoder(StringUtf8Coder.of())
            )
 //Step 2: Get record from CDC Json event and create a TableRow per event
            .apply("Json to TableRow", ParDo.of(new DoFn<String, TableRow>() {
                @DoFn.ProcessElement
                public void processElement(ProcessContext c) throws Exception {
                    String json = c.element();
                    LOG.debug("PIPELINE STEP2: {}", json);
                    TableRow tr = new TableRow();
                    tr.set("json", json);
                    c.output(tr);
                }
            }))
// Step 3: Append TableRow to a given BigQuery table: outputTable argument.
            .apply(
                "Write to BigQuery",
                BigQueryIO.writeTableRows()
                    .withoutValidation()
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                    .withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
                    .withSchema(new TableSchema().setFields(Arrays.asList(
                        new TableFieldSchema().setName("json").setType("STRING")
                    )))
                    .to(options.getOutputTable()));

The code above shows how easy it is to configure the DebeziumIO transformer. Join me in the walkthrough of the code. The first step has been named “Read from Debezium,” instantiating the read transform which will return a collection of strings. Now let’s explore the connector configuration.

DebeziumIO.<String>read()
.withConnectorConfiguration(
DebeziumIO.ConnectorConfiguration.create()
.withUsername(options.getUsername())
.withPassword(options.getPassword())
.withHostName(options.getHostname())
.withPort(options.getPort())
.withConnectorClass(MySqlConnector.class)
.withConnectionProperty("database.server.name", "dbserver1")
.withConnectionProperty("database.include.list", "inventory")
...
)
.withFormatFunction(new SourceRecordJson.SourceRecordJsonMapper())
.withCoder(StringUtf8Coder.of())

DebeziumIO has a set of standard options to configure the underlying Debezium connector: HostName, Username, Password, and Port. By calling withConnectorClass, you specify the Debezium connector you want to use. Since the pipeline is connecting to a MySql database, the connector is MySqlConnector. Notice you are passing the class, and DebeziumIO’s Kafka consumer will instantiate the connector.

Due to each connector having specific configuration options, DebeziumIO accepts a map of “Connection Properties,” which are going to be passed to the specified connector. To make things easier, DebeziumIO offers the withConnectionProperty(String, String) builder function. Lastly, the code above specifies a mapper function that will help to transform the SourceRecord instances to Json formatted strings.

The next step is the transformation from Json to TableRow. Nothing fancy here, since the purpose of this article is to show you how to use DebeziumIO, and not how to create a pipeline, I’m keeping this simple.

public void processElement(ProcessContext c) throws Exception {
String json = c.element();
LOG.debug("PIPELINE STEP2: {}", json);
TableRow tr = new TableRow();
tr.set("json", json);
c.output(tr);
}

The function reads each element from the returned collection of Json strings on step 1 and puts a collection of TableRow back to the pipeline.

Lastly, the third step takes the collection of TableRow and writes it down in BigQuery. Here are the highlights of this code:

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
.withSchema(new TableSchema().setFields(Arrays.asList(
new TableFieldSchema().setName("json").setType("STRING")
)))
.to(options.getOutputTable()));

Once more, you can see how the CdcPipelineOptions class acts as a vehicle to deliver some BigQuery options to BigQueryIO, such as the temporary location to upload data to BigQuery and the BigQuery table’s name. The CREATE_IF_NEEDED will create the table for you if it doesn’t exist.

Wonderful! Now that you have all pieces together, it is time to compile and deploy our pipeline.

Using DirectRunner

As you know, Beam lets you choose among a variety of supported runners. The easier and fastest way to see our pipeline running is using the DirectRunner. The snippet below shows you how to do it.

/> mvn compile -X exec:java \
-Dexec.mainClass=<<your_packae>>.SampleCdcMySqlToBigQuery \
-Dexec.args="--runner=DirectRunner \
--username=mysqluser \
--password=mysqlpw \
--hostname=127.0.0.1 \
--port=3306 \
--outputTable=<<google-cloud-project-id>>:<<dataset>>.<<table>> \
--bigQueryLoadingTemporaryDirectory=gs://<<bucket-name>>/temp/ "

Once your code is compiled, Beam will execute your pipeline using the DirectRunner. Remember LOG.debug("PIPELINE STEP2: {}", json); from step 2? Well, you should be seeing a bunch of them on your terminal, just like the following example:

PIPELINE STEP2:
{"metadata":{"connector":"mysql","version":"1.3.1.Final","name":"dbserver1","database":"inventory","schema":"mysql-bin.000004","table":"online_orders"},"before":null,"after":{"fields":{"order_date":18941,"quantity":24,"purchaser":1002,"order_number":20101,"product_id":108,"billing_address":"4859 William Plains\nHenryview, MO 18809","order_time_cst":4568000000}}}

Here you can see the structure of the captured CDC event. I have highlighted the main structures that conform and event metadata, before and after. You might be wondering, “Why is the before null?” This is a very common case when the triggered event on the source database is the insert of a new record or if the event is a product of an initial snapshot.

Congratulations! With this very simple pipeline, you were able to move data from a source database to the cloud.

When to deploy this solution

Software is capturing, transforming, and delivering data all the time, but when the amount of data is massive, every decision you make is that much bigger. To that end, choosing the right CDC tool can make a huge difference.

Consider the amount of time and resources used to set up and deploy a CDC solution before DebeziumIO. Think about how the Debezium connector and Kafka clusters, plus the Apache Beam pipeline, really add up.

Of course, there will be situations where this Debezium-Kafka-Beam set-up is the way to go, whether you are already using Kafka as a message bus or because you need to broadcast to other services. But there are other situations where you don’t need all this complexity, where DebeziumIO fits. Reducing your solution’s complexity by cutting on unnecessary elements reduces your operational costs and lets your engineering team focus on innovating your business’s value chain.

Conclusion

Wizeline is a global software development company that collaborates on open source technologies used by industry leaders like Google to solve leading companies’ greatest challenges. If you’re interested in learning more about how we can help, contact consulting@wizeline.com. If you’d like to join our worldwide team of experts, visit our careers website.

If you want to know more about the Apache Beam project, visit their official website. For more information about DebeziumIO releases and updates, please visit the repository. And to try connecting the two technologies yourself, I invite you to look at the code shown here in our repository

Finally, I would like to thank the Wizeline team involved in this incredible new tool for Apache Beam. Special recognition goes to:

  • William Sykes, Solutions Architect
  • Juan Sandoval, Team Lead
  • Rodrigo Chaparro, Sr. Data Engineer
  • Juan Carlos Vera, Project Manager
By Hassan Reyes, Wizeline Solutions Architect
By Hassan Reyes, Wizeline Solutions Architect

Aisha Owolabi

Posted by Aisha Owolabi on March 16, 2021