Photo by Jezael Melgoza on Unsplash

In Part I of this series, we saw how to build a simple Apache Beam pipeline. In Part II we laid the groundwork for the pipeline we’re about to build here.

So, without further ado, here is our batch pipeline.

AlbumPipeline

Pipeline Breakdown

AppOptions

Before creating the pipeline instance, first, we register our custom pipeline options class:

Then we parse the command line arguments:

Pipeline

Now that the pipeline options are ready, we can create the pipeline instance, and then proceed with building the pipeline stages.

Reading from PostgreSQL

Before we start reading data from PostgreSQL, we first need to prepare an instance of PGSimpleDataSource which Apache Beam needs to be able to connect to the PostgreSQL server.

Now we can read or select all records under the Album table.

To make our life a bit easier, instead of dealing with the old and cumbersome java.sql.ResultSet API, we will map each row from the SQL ResultSet to a HashMap<String, Object>

HashMap to TableRow

Before we write to BigQuery, first we need to prepare the TableRow instances to be passed to the BigQuery writer.

We do this inside the HashMapToTableRowFn that creates a TableRow instance from a HashMap

Writing to BigQuery

We’re now ready to write to BigQuery.

To run this pipeline locally you’ll a Google cloud service account with access to both Cloud Storage and BigQuery.

When you create the service account, make sure to download the service account credentials file, then export the absolute path from a terminal like so:

export GOOGLE_APPLICATION_CREDENTIALS="/path/to/credentials.json

Assuming you’ve PostgreSQL running locally, and that you’ve imported the chinnok database from Part II of this series, you can run the pipeline using the command shown below:

You’ll need to replace the postgres username with the appropriate username on your end. You will also need to replace the GOOGLE_CLOUD_PROJECT_ID with the correct cloud project id you’re testing against.

One final thing, you will need to create a BigQuery dataset dwh under which the albums table will be created.

If all goes well, you should see the albums table under BigQuery.

--

--