Parquet interoperability across language ecosystems

Parquet is available in many environments but you’ll need to keep some quirks in mind to realize the benefits of its ubiquity.

parquet
arrow
spark
pyspark
java
big data
Published

January 29, 2021

(This post is also available as an interactive notebook.)

Apache Parquet is a great default choice for a data serialization format in data processing and machine learning pipelines, but just because it’s available in many environments doesn’t mean it has the same behavior everywhere. In the remaining discussion, we’ll look at how to work around some potential interoperability headaches when using Parquet to transfer data from a data engineering pipeline running in the JVM ecosystem to a machine learning pipeline running in the Python data ecosystem.1

We’ll start by looking at a Parquet file generated by Apache Spark with the output of an ETL job.

from pyspark.sql import SparkSession

session = SparkSession.builder.getOrCreate()

We can look at the schema for this file and inspect a few rows:

spark_df = session.read.parquet("colors.parquet")
spark_df.printSchema()
root
 |-- rowID: string (nullable = true)
 |-- YesNo: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Categorical: string (nullable = true)
spark_df.limit(10).toPandas()
rowID YesNo Color Categorical
0 00000267 No red 62
1 000004c2 No red ba
2 00002dcf No blue 75
3 000035be No green 2f
4 00005f19 No green 0a
5 00007c1e No blue 79
6 0000be2c No green 38
7 0000d29d No green 60
8 0000d313 Yes blue f7
9 0000d66c No blue 94

The “file” we’re reading from (colors.parquet) is a partitioned Parquet file, so it’s really a directory. We can inspect the Parquet metadata for each column using the parquet-tools utility from our shell:

parquet-tools meta colors.parquet 2>& 1 | head -70 | grep SNAPPY
rowID:        BINARY SNAPPY DO:0 FPO:4 SZ:4931389/8438901/1.71 VC:703200 ENC:RLE,BIT_PACKED,PLAIN ST:[min: 00000267, max: ffffc225, num_nulls: 0]
YesNo:        BINARY SNAPPY DO:0 FPO:4931393 SZ:105082/108599/1.03 VC:703200 ENC:RLE,BIT_PACKED,PLAIN_DICTIONARY ST:[min: No, max: Yes, num_nulls: 0]
Color:        BINARY SNAPPY DO:0 FPO:5036475 SZ:177524/177487/1.00 VC:703200 ENC:BIT_PACKED,PLAIN_DICTIONARY ST:[min: blue, max: red, num_nulls: 0]
Categorical:  BINARY SNAPPY DO:0 FPO:5213999 SZ:705931/706389/1.00 VC:703200 ENC:RLE,BIT_PACKED,PLAIN_DICTIONARY ST:[min: 00, max: ff, num_nulls: 0]

This output shows that many of our columns are compressed (SNAPPY) Unicode strings (BINARY) and that many of these columns are dictionary-encoded (ENC:...,PLAIN_DICTIONARY), which means that each distinct string is stored as an index into a dictionary rather than as a literal value. By storing values that may be repeated many times in this way, we save space and compute time.2

So far, so good! But what happens when we read these data into pandas? We can load Parquet files into pandas if we have PyArrow installed; let’s try it out.

import pandas as pd
pandas_df = pd.read_parquet("colors.parquet/")
pandas_df
rowID YesNo Color Categorical
0 00000267 No red 62
1 000004c2 No red ba
2 00002dcf No blue 75
3 000035be No green 2f
4 00005f19 No green 0a
703195 ffff69a9 No green 25
703196 ffff8037 No green 34
703197 ffffa49f No red 3a
703198 ffffa6ae No green 89
703199 ffffc225 Yes blue 40

The data look about like we’d expect them to. However, when we look at how pandas is representing our data, we’re in for a surprise: pandas has taken our efficiently dictionary-encoded strings and represented them with arbitrary Python objects!

pandas_df.dtypes
rowID          object
YesNo          object
Color          object
Categorical    object
dtype: object

We could convert each column to strings and then to categoricals, but this would be tedious and inefficient. (Note that if we’d created a pandas data frame with string- or category-typed columns and saved that to Parquet, the types would survive a round-trip to disk because they’d be stored in pandas-specific Parquet metadata.)

In this case, pandas is using the PyArrow Parquet backend; interestingly, if we use PyArrow directly to read into a pyarrow.Table, the string types are preserved:

import pyarrow.parquet as pq
arrow_table = pq.read_table("colors.parquet/")

…but once we convert that table to pandas, we’ve lost the type information.

arrow_table.to_pandas().dtypes
rowID          object
YesNo          object
Color          object
Categorical    object
dtype: object

However, we can force PyArrow to preserve the dictionary encoding even through the pandas conversion if we specify the read_dictionary option with a list of appropriate columns:

dict_arrow_table = \
    pq.read_table("colors.parquet/", read_dictionary=['YesNo', 'Color', 'Categorical'])

dict_arrow_table
pyarrow.Table
rowID: string
YesNo: dictionary<values=string, indices=int32, ordered=0>
Color: dictionary<values=string, indices=int32, ordered=0> not null
Categorical: dictionary<values=string, indices=int32, ordered=0>
dict_arrow_table.to_pandas().dtypes
rowID            object
YesNo          category
Color          category
Categorical    category
dtype: object

If we don’t know a priori what columns are dictionary-encoded (and thus might hold categoricals), we can find out by programmatically inspecting the Parquet metadata:

dictionary_cols = set([])

# get metadata for each partition
for piece in pq.ParquetDataset("colors.parquet", use_legacy_dataset=False).pieces:
    meta = piece.metadata

    # get column names
    cols = enumerate(meta.schema.names)

    # get column metadata for each row group
    for i in range(meta.num_row_groups):
        rg = meta.row_group(i)
        for col, colname in cols:
            if "PLAIN_DICTIONARY" in rg.column(col).encodings:
                dictionary_cols.add(colname)

dictionary_cols
{'Categorical', 'Color', 'YesNo'}

Preserving column types when transferring data from a JVM-based ETL pipeline to a Python-based machine learning pipeline can save a lot of human effort and compute time – and eliminate an entire class of performance regressions and bugs as well. Fortunately, it just takes a little bit of care to ensure that our entire pipeline preserves the efficiency advantages of Parquet.

Footnotes

  1. There are certainly potential headaches going in the other direction as well (e.g., this and this, but it’s a less-common workflow to generate data in Python for further processing in Spark.↩︎

  2. Parquet defaults to dictionary-encoding small-cardinality string columns, and we can assume that many of these will be treated as categoricals later in a data pipeline.↩︎