BigQuery Twitter Schema

I was investigating the feasibility of putting natively-formatted Twitter data into BigQuery, and got pretty far along the way before deciding to go another direction. I found the schema in the twitter-for-bigquery project to be incomplete for my needs, so I made a new schema of my own. I’m making the schema available here in case it’s of use to anyone else.

Here are a couple of things it’s important to understand about the schema:

  • Even though the file has the extension .txt, it is actually a .json file. (I had to change the extension to satisfy WordPress.)
  • I changed the “native” coordinate format ([lat,lng]) to a simple object ({"lat","lng"}) because BigQuery does not support nested arrays of arrays, which the native format of the place bounding_box field uses.
  • All of the fields are labeled as NULLABLE, although I’m sure some could safely be made REQUIRED.

I was able to create tables using this schema in BigQuery, but never got to the point of actually inserting rows, so YMMV.

Also, I created a “helper script” to create a BigQuery schema from JSON data. Essentially, it walks a list of JSON lines files, and creates a BigQuery schema that will either (a) contain all the given records, if they have compatible schemas; or (b) fail. The code for that Python script is here:

import sys, json

filenames = sys.argv[1:]

TYPE = "type"
TYPE_STRING = "STRING"
TYPE_INT = "INTEGER"
TYPE_FLOAT = "FLOAT"
TYPE_BOOL = "BOOLEAN"
TYPE_RECORD = "RECORD"
TYPE_NULL   = "NULL"

MODE = "mode"
MODE_NULLABLE = "NULLABLE"
MODE_REQUIRED = "REQUIRED"
MODE_REPEATED = "REPEATED"
MODE_NULL     = "NULL"

NAME   = "name"
SCHEMA = "schema"

def classify(ns, v):
    result = None

    if v is None:
        result = { MODE: MODE_NULL, TYPE: TYPE_NULL }
    elif isinstance(v, str):
        result = { MODE: MODE_NULLABLE, TYPE: TYPE_STRING }
    elif isinstance(v, dict):
        schema = [ ]
        for key in v.keys():
            field = classify(ns + [ key ], v[key])
            field[NAME] = key
            schema.append(field)
        result = { MODE: MODE_NULLABLE, TYPE: TYPE_RECORD, SCHEMA: schema}
    elif isinstance(v, list):
        if len(v) == 0:
            result = { MODE: MODE_REPEATED, TYPE: TYPE_NULL }
        else:
            elements = classify(ns + [ "0" ], v[0])
            if elements[MODE] == MODE_REPEATED:
                sys.stderr.write(f"WARNING: BigQuery cannot model nested REPEATED fields at {ns}. Flattening...\n")
                
            if elements[TYPE] == TYPE_RECORD:
                result = { MODE: MODE_REPEATED, TYPE: TYPE_RECORD, SCHEMA: elements[SCHEMA] }
            else:
                result = { MODE: MODE_REPEATED, TYPE: elements[TYPE] }
    elif isinstance(v, bool):
        result = { MODE: MODE_NULLABLE, TYPE: TYPE_BOOL }
    elif isinstance(v, int):
        result = { MODE: MODE_NULLABLE, TYPE: TYPE_INT }
    elif isinstance(v, float):
        result = { MODE: MODE_NULLABLE, TYPE: TYPE_FLOAT }
    else:
        raise Exception("Value is not valid JSON: "+str(v))

    return result

def merge(ns, t1, t2):
    result = None

    if t1 == t2:
        result = t1
    elif t1[MODE] == MODE_NULL:
        result = t2
    elif t2[MODE] == MODE_NULL:
        result = t1
    else:
        mode = None
        if t1[MODE] == t2[MODE]:
            mode = t1[MODE]
        elif t1[MODE] in (MODE_REQUIRED, MODE_NULLABLE) and t2[MODE] in (MODE_REQUIRED, MODE_NULLABLE):
            mode = MODE_REQUIRED
        else:
            raise Exception("Incompatible modes at "+str(ns)+": "+t1[MODE]+" and "+t2[MODE])

        type = None
        schema = None
        if t1[TYPE] == t2[TYPE]:
            type = t1[TYPE]
            if t1[TYPE] == TYPE_RECORD:
                fs1 = { f[NAME]: f for f in t1[SCHEMA] }
                fs2 = { f[NAME]: f for f in t2[SCHEMA] }

                schema = []

                for key in set(list(fs1.keys()) + list(fs2.keys())):
                    field = None
                    if key in fs1 and key not in fs2:
                        field = fs1[key]
                    elif key not in fs1 and key in fs2:
                        field = fs2[key]
                    else:
                        f1 = fs1[key]
                        f2 = fs2[key]
                        field = merge(ns + [ key ], f1, f2)

                    field[NAME] = key
                        
                    schema.append(field)
        elif t1[TYPE] == TYPE_NULL:
            type = t2[TYPE]
            if type == TYPE_RECORD:
                schema = t2[SCHEMA]
        elif t2[TYPE] == TYPE_NULL:
            type = t1[TYPE]
            if type == TYPE_RECORD:
                schema = t1[SCHEMA]
        else:
            raise Exception("Incompatible types at "+str(ns)+": "+t1[TYPE]+" and "+t2[TYPE])

        if schema is None:
            result = { MODE: mode, TYPE: type }
        else:
            result = { MODE: mode, TYPE: type, SCHEMA: schema }

    return result

schema = []
def walk(o, s):
    global schema

    fs = { f[NAME]: f for f in schema }

    for key in o.keys():
        field = None
        if key in fs:
            f1 = fs[key]
            f2 = classify([ key ], o[key])
            field = merge([ key ], f1, f2)
        else:
            field = classify([ key ], o[key])

        field[NAME] = key
        
        fs[key] = field

    schema = list(fs.values())

for filename in filenames:
    with open(filename, "r") as f:
        for line in f:
            walk(json.loads(line), schema)

print(json.dumps(schema))

This script did the heavy lifting of generating the schema, since doing it by entirely hand would have been error-prone. Here are a couple of important things to understand about the script:

  • BigQuery supports many more types of data than JSON, so the generated schema’s data types may not match the semantics of the JSON field. For example, the script does not attempt to parse string values to determine if a string is semantically a date or timestamp instead.
  • The script flattens arrays of arrays and prints a warning, since BigQuery does not support storing nested arrays of arrays.

To generate the tweet schema, I pulled several hundred tweets from user timelines — including some hand-crafted tweets from a test account to capture specific data features — and ran them through the script to generate the “skeleton” of the schema. I then went back through and hand-edited the schema to make some changes (e.g, bounding_box above) and refinements (e.g., created_at from STRING to TIMESTAMP).

In case anyone needs a license, I formally make the above schema and script available under the license CC-BY-SA. Also, while I wouldn’t be sharing the content if I didn’t think it would be useful, you use all of the above at your own risk.

I also wouldn’t mind a shout out here in the comments, too, if anyone does use this!