Google Cloud Platform: Migrating Data to New Schemas on Big Query Using Dataflow

black blue and red graph illustration
Reading Time: 6 minutes

Migrating data on Google Cloud BigQuery may seem like a straightforward task, until you run into having to match old data to tables with different schemas and data types. There are many approaches you can take to moving data, perhaps using SQL commands to transform the data to be compatible with the new schema.

However, SQL has limitations as a programming language, being a query-centric language for databases, it lacks the capabilities of more general purpose languages such as Python and Java. What if we want more programmatic control over data transformation? Let’s take a look at a simple approach using Apache Beam pipelines written in Python to be run on Google Cloud’s Dataflow service.

What is Apache Beam and Dataflow?

Apache Beam is an open source, unified model for defining and executing data processing pipelines using batch processing or streaming. To put it simply, a pipeline is just reading in data from a source, then running it through a series of transformations and processes to an output. In our example, we are simply reading data from a BigQuery table, running each row through the pipeline to match our desired target schema, and writing it to the target BigQuery table.

Beam has SDK’s available in Python, Go, Java, Scala, and SQL and can be run on a variety of platforms such as Apache Spark, Apache Flink, Google Cloud Dataflow, and more. In this example we will be focusing on running our pipeline on Dataflow using a pipeline written in the Apache Beam Python SDK.

Dataflow is a service provided by Google Cloud Platform for executing your data processing pipelines in a distributed workflow, along with a nice graphical interface to see the performance and execution stages of your pipeline in real time. 

Evolving Schemas of Big Query Tables

The approach for the pipeline will be fairly simple. We start with a source table in BigQuery with a certain schema and compare it to the desired schema for our target table. This pipeline will handle adding new columns, removing columns, and modifying data types of existing columns as well as setting default values where appropriate. With these in mind, we then determine the necessary changes to the old schema in order to match the new schema. These changes will be written to a configuration JSON that our pipeline will read and apply transformations to, and from there we write out the transformed data to our new table. If our transformations are correctly defined, we should have the old data in the new table with all our defined modifications.

The configuration JSON for defining transformations will be written in the format of a BigQuery JSON schema (link). With each column being a JSON object, we define only the columns we want to change/add/delete, with two additional fields: “mutation_type” and “default_value.”

 “mutation_type” will be either “add,” “modify,” or “delete.”“default_value” will be the value set when adding a new field or modifying an old one, and it won’t be necessary for fields we want to delete. 

We will store this configuration JSON in a Google Cloud Storage bucket from which our pipeline will read from. Also be sure to create the source and target tables in BigQuery as well as insert some test data in your source table to migrate.

For a more in-depth dive, the source code is located at https://github.com/tanwesley/Dataflow-BigQuery-Migration

Example:

For this demo, we will keep things simple. The following JSON will be our source schema:

Source BigQuery table schema

[
  {
    "name": "first_name",
    "type": "STRING",
    "mode": "REQUIRED"
  },
  {
    "name": "last_name",
    "type": "STRING",
    "mode": "REQUIRED"
  },
  {
    "name": "phone_number",
    "type": "INTEGER",
    "mode": "NULLABLE"
  }
]

The following here is the schema we are aiming for:

Target BigQuery table schema

[
  {
    "name": "last_name",
    "type": "STRING",
    "mode": "REQUIRED"
  },
  {
    "name": "phone_number",
    "type": "STRING",
    "mode": "NULLABLE"
  },
  {
    "name": "address",
    "type": "STRING",
    "mode": "NULLABLE"
  }
]

Comparing them, we can see we need to:

  • Remove the field “first_name”
  • Change the datatype of “phone_number” from INTEGER to STRING
  • Add a new STRING field “address” which will be NULLABLE

Based on that, here is what our configuration will be:

[
  {
    "name": "first_name",
    "type": "STRING",
    "mode": "REQUIRED",
    "mutation_type": "delete"
  },
  {
    "name": "phone_number",
    "type": "STRING",
    "mode": "NULLABLE",
    "mutation_type": "modify",
    "default_value": null
  },
  {
    "name": "address",
    "type": "STRING",
    "mode": "NULLABLE",
    "mutation_type": "add",
    "default_value": null
  }
]

The following code snippet is our pipeline definition contained within the run() method, which lays out each stage of the transformation with comments:

p = beam.Pipeline(options=pipeline_options)
p = beam.Pipeline(options=pipeline_options)
(p | 'Read old table' >> (beam.io.ReadFromBigQuery(table=known_args.input, 
                          gcs_location=temp_location))
   | 'Convert to new schema' >> beam.ParDo(OldToNewSchema(update_config)) 
   | 'Write to BigQuery' >> (beam.io.WriteToBigQuery(table=known_args.output, 
             custom_gcs_temp_location=temp_location))
)

Explanation:

  • ‘Read old table’ >> We call Beam’s ReadFromBigQuery method passing in the BigQuery source table and a temporary GCS location to write our transformed data to. This will pull the data from the table row by row.
  • ‘Convert to new schema’ >> We apply a DoFn we define as a class called OldToNewSchema, passing in the configuration file we defined which will be read in from GCS. This will be where the actual transformation of the data happens. When we read in BigQuery data, the rows come in JSON format and translate to a Python dictionary object which we can easily work with.
  • ‘Write to BigQuery’ >> Here, we simply call Beam’s WriteToBigQuery method, passing in our output table and the temporary GCS location that the first stage of the pipeline wrote to.

Now let’s take a look at the DoFn OldToNewSchema which does all the actual transformation in the pipeline:

class OldToNewSchema(beam.DoFn):
    def __init__(self, config):
        self.config = config
    def process(self, data, config=None):
        import logging
        logger = logging.getLogger(__name__)
        logger.setLevel(logging.INFO)
        if config == None: 
            config = self.config 
        for c in config: 
            name = c.get('name')
            if c.get('mode') == 'REPEATED':             
                logger.info(f"DATA: {data}")
                logger.info(f"NESTED FIELDS: {data.get(name)}")
                nested_fields = data.get(name) 
                for f in nested_fields: 
                    data.update({ name: self.process(data=f,         
                                config=c.get('fields')) })
                    logger.info(f"UPDATED DATA: {data}")
            else:
                mutation_type = c.get('mutation_type') 
                if mutation_type == 'add': 
                    value = c.get('default_value')
                    data.update({ name: value })
                elif mutation_type == 'modify':
                    value = self.data_conversion(data.get(name), 
                                 data.get('type'), c.get('type'))
                    data.update({ name: value })
                elif mutation_type == 'delete':
                    data.pop(name)
        logger.info(f"FINAL UPDATED DATA: {data}n")
        return [data]
    def data_conversion(self, data, old_type, new_type):
        if new_type == 'STRING': 
            converted = str(data)
        elif new_type == 'BOOLEAN':
            if data == 'true' | data == 'True':
                converted = True
            elif data == 'false' | data == 'False':
                converted = False
            else: 
                converted = None
        elif new_type == 'INTEGER':
            try:
                converted = int(data)
            except: 
                converted = None 
        elif new_type == 'FLOAT': 
            try: 
                converted = float(data)
            except:
                converted = None 
        return converted

Explanation:

In Apache Beam, a DoFn works like a function applied to each row of data that passes through the pipeline, except we define it as a Python class. In this case our class is OldToNewSchema, which takes in an update configuration in the form of a dictionary we create from our configuration JSON in GCS. We must implement the process() method of the DoFn class to define the logic of our transformations; this is where all the heavy lifting is done.  

Now that we know the details of the pipeline’s workflow, all we need to do now is submit a job to Dataflow.

First we must define the arguments we need to submit a job. Open the Google Cloud Console Command Line and configure the following:

EXPORT PROJECT="The Google Cloud project ID where you are working from" 
EXPORT INPUT="The BigQuery table from which you wish to migrate from. Format: {project_id}.{dataset_name}.{table_name}" 
EXPORT OUTPUT="The BigQuery table where you wish to migrate data to. Format: {project_id}.{dataset_name}.{table_name}"  
EXPORT MIGRATE_CONFIG="The GCS path to the migration config JSON file which applies data to new schema."  
EXPORT TEMP_LOCATION="The path to a GCS location where data will be temporarily stored to write to BigQuery." 
EXPORT REGION="Any available Google Cloud Platform service region of your choice"

Now simply execute the Python file with all the required flags we defined:

python data_migration_test.py --project=PROJECT 
--input=INPUT 
--migrate_config=MIGRATE_CONFIG 
--output=OUTPUT 
--temp_location=TEMP_LOCATION 
--runner=DataflowRunner 
--region=REGION

Navigate to Dataflow on the Google Cloud Console and if all goes well, we should see the job we submitted in the Jobs tab. Give it some time to run and click on the job to observe the execution of the pipeline in real time.

If all goes well, the Dataflow UI console on Google Cloud will show each stage of the pipeline completing and the job will be marked completed. Check your target BigQuery table to see if your old data has successfully migrated to the new schema.

Source BigQuery table
Old data moved to new BigQuery table

Discover more from Knoldus Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading