ETL script - MySQL to BigQuery¶

  • ETL script is used to extract, transform and load data programmatically from the data source to the target location.
  • The script is deployed to a Function Compute service in ALI and triggers are set accordingly.
  • The script consists of: extract, transform, load, and notification task.

Full load / Incremental load¶

  • For fact or txn table - initial full load first, then subsequently incremental load (for incremental load, can define txn start and end date then write append to main table)

Sample code

In [ ]:
import os
import logging
import json
import mysql.connector
import pandas as pd
import datetime as dt
from google.cloud import bigquery
from google.oauth2 import service_account
import slack

logger = logging.getLogger()

# To enable the initializer feature (https://help.aliyun.com/document_detail/158208.html)
# please implement the initializer function as below:
# def initializer(context):
#   logger = logging.getLogger()
#   logger.info('initializing')

json_gcp_creds = json.loads(os.environ['GCP_CRED'])
slack_channel = <slack_channel_id>

def sql_query (table_name, date_column_name):
    # define sql query here
    txn_start = dt.date.today() - dt.timedelta(days=1) 
    txn_end = dt.date.today()  

    query = f"""
    SELECT 
        * 
    FROM 
        {table_name} 
    -- WHERE
        -- {date_column_name} BETWEEN "{str(txn_start)}" AND "{str(txn_end)}"
    ;
    """
    return query

def extract (query):
    # create mysql connection and extract data
    cnx = mysql.connector.connect ( host  = os.environ['MYSQL_ENDPOINT'],
                                    port = int(os.environ['MYSQL_PORT']),
                                    user = os.environ['MYSQL_USER'],
                                    password = os.environ['MYSQL_PASSWORD'],
                                    database = os.environ['MYSQL_DBNAME']
                                    )
    
    logger.info('Connected to ALI DB !!!!')

    df = pd.read_sql(query, cnx)
    cnx.close()

    logger.info(f"Extraction DONE !!!!")
    return df              

def transform (df, transformation_required):
    # process data
    if transformation_required == 'yes':
        df ['client_home_city'] = df['client_home_address'].apply(lambda x: json.loads(x)['city'].strip().upper() if pd.notna(x) else 'NA')
        df ['client_home_state'] = df['client_home_address'].apply(lambda x: json.loads(x)['state'].strip().upper() if pd.notna(x) else 'NA')
        df = df.drop('client_home_address', axis = 1)
    else:
        # Convert dataframe to JSON object
        json_data = df.to_json(orient = 'records', date_format='iso', date_unit='s')
        json_object = json.loads(json_data) 

    logger.info("Transformation DONE !!!!")
    return json_object

def load (json_object, project_id, dataset_id, table_id, write_disposition):    
    # connect to BQ
    credentials = service_account.Credentials.from_service_account_info(json_gcp_creds)
    client = bigquery.Client(credentials = credentials, project = project_id)

    logger.info('Connected to Google BQ !!!!')

    # load to BQ
    full_table_name = f"{project_id}.{dataset_id}.{table_id}"
    
    job_config = bigquery.LoadJobConfig()
    job_config.write_disposition = write_disposition
    job_config.autodetect = True
    job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
    #job_config.schema = format_schema(table_schema)
    job = client.load_table_from_json(json_object, full_table_name, job_config = job_config) #https://cloud.google.com/bigquery/docs/samples/bigquery-load-table-gcs-json-autodetect
    
    logger.info(f"Load to BQ DONE !!!! {job.result()}")
    return job.result()

def slack_notification (slack_channel, msg):
    # Authenticate to the Slack API via the generated token
    token = os.environ['SLACK_CRED']
    client = slack.WebClient(token)
    channel_id = slack_channel
    client.chat_postMessage(channel = channel_id, 
                            text = f"{dt.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} [From ALI]:    {msg}"
                            )

    logger.info(f"Slack notification DONE !!!!")

def handler(event, context):
    # evt = json.loads(event)

    list_jobs = [{"job_id": 1, "job_name":"kocek_fact_daily_12AM_ALI_BQ", "table_name": <table_name>, "date_col": "transaction_detail_created_at", "gcp_prj": <gcp_prj_id>, "gcp_dataset": <gcp_dataset_id>, "gcp_table": <gcp_table_name>, "write_disposition": "WRITE_TRUNCATE"}, 
                 {"job_id": 2, "job_name":"kocek_fact_daily_12AM_ALI_BQ", "table_name": <table_name>, "date_col": "transaction_detail_created_at", "gcp_prj": <gcp_prj_id>, "gcp_dataset": <gcp_dataset_id>, "gcp_table": <gcp_table_name>, "write_disposition": "WRITE_TRUNCATE"}]
    
    list_job_info = []

    for job in list_jobs:
        logger.info(f'Job_id = {job["job_id"]} started !!!!')
        
        json_job_info = {
                         "job_id": job["job_id"],
                         "job_status": None,
                         "gcp_table": job["gcp_table"],
                         "bq_job_result": None 
                         }

        try:
            query = sql_query (job["table_name"], job["date_col"])
            df = extract (query)
            json_object = transform (df, transformation_required = 'no')
            bq_job_result = load (json_object, job["gcp_prj"], job["gcp_dataset"], job["gcp_table"], job["write_disposition"])

            json_job_info["job_status"] = "OK"
            json_job_info["bq_job_result"] = bq_job_result
            list_job_info.append(json_job_info)
            logger.info(f'Job {job["job_id"]}, {job["job_name"]} completed !!!!')

        except Exception as e:
            logger.error(f'Job {job["job_id"]}, {job["job_name"]} {e}')  
            #slack_notification (slack_channel, f'ERROR: {e}')
            json_job_info["job_status"] = f'ERROR: {e}'
            list_job_info.append(json_job_info)
            continue

    slack_msg = f'{job["job_name"]}   {str(list_job_info)}'
    slack_notification (slack_channel, slack_msg)

Change Data Capture¶

  • For dim table - initial full load first, then subsequently CDC

Sample code

In [ ]:
import sys
import pandas as pd
import json
import datetime as dt
from google.cloud import bigquery
from google.oauth2 import service_account
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent,
)

json_gcp_creds = json.loads(os.environ['GCP_CRED'])
slack_channel = 'lalala'

json_db_creds = {"host" : os.environ['MYSQL_ENDPOINT'],
                 "port" : int(os.environ['MYSQL_PORT']),
                 "user" : os.environ['MYSQL_USER'],
                 "password" :os.environ['MYSQL_PASSWORD'],
                 "database" : os.environ['MYSQL_DBNAME'}

# create stream reading binlog files
stream = BinLogStreamReader(
                            connection_settings = json_db_creds,
                            server_id = 2216855073, # get server-d from mysql d
                            resume_stream = True,
                            log_file = 'mysql-bin.003618',
                            log_pos = 4,
                            only_events = [DeleteRowsEvent, WriteRowsEvent, Up
                            only_tables = ['dim_client'], # a list with tables
                            #skip_to_timestamp = int(pd.to_datetime(dt.date.tod
                            ignored_tables = None,
                            )

modified_event = []
for binlogevent in stream:
    for row in binlogevent.rows:
        if isinstance(binlogevent, UpdateRowsEvent):
            event = row["after_values"]
            modified_event.append(event)
        elif isinstance(binlogevent, WriteRowsEvent):
            event = row["values"]
            modified_event.append(event)
        elif isinstance(binlogevent, DeleteRowsEvent):
            event = row["values"]
            modified_event.append(event)
        sys.stdout.flush()
stream.close()
                                           
# there will be duplicate records each time a record is updated
# we want to remove duplicates and keep only records with latest 'client_update
if modified_event != []:
    df = pd.DataFrame(modified_event)
else:
    print(f"no event taken place since {dt.datetime.now().strftime('%Y-%m-%d %
df = df.sort_values(by='client_updated_dt').drop_duplicates(subset='client_id'
json_data = df.to_json(orient = 'records', date_format='iso', date_unit='s')
json_object = json.loads(json_data)