ETL script - MySQL to GSheet¶

  • ETL script is used to extract, transform and load data programmatically from the data source to the target location.
  • The script is deployed to a Function Compute service in ALI and triggers are set accordingly.
  • The script consists of: extract, transform, load, and notification task.

image.png

Sample code

In [ ]:
import os
import logging
import json
import mysql.connector
import pandas as pd
import datetime as dt
import gspread
import slack

logger = logging.getLogger()

# To enable the initializer feature (https://help.aliyun.com/document_detail/158208.html)
# please implement the initializer function as below:
# def initializer(context):
#   logger = logging.getLogger()
#   logger.info('initializing')

json_gcp_creds = json.loads(os.environ['GCP_CRED'])
db1_creds = json.loads(os.environ['DB1_CRED'])
db2_creds = json.loads(os.environ['DB2_CRED'])
slack_channel = <slack_channel_id>
gsheet_id = <gsheet_id>


def sql_query ():
    # define sql query here
        
    query_0 = f"""
        SELECT 
            *
        FROM 
            db1.dim_coin_deposit_merchant 
        ;
        """

    query_1 = f"""
        SELECT
            fact_order.order_id, 
            fact_order.order_created_on,
            fact_order.order_amount,
            fact_order.client_id,
            fact_order.merchant_id,
            fact_order.order_type,
            fact_order.order_status, 
            cl.client_name,
            cl.client_email,
            cl.client_phone,
            cl.client_phone_prefix,
            cl.client_is_guest,
            payment.payment_service_charge_amount
        FROM
            db2.fact_order as fact_order 
        LEFT JOIN
            db2.dim_client as cl
            ON
            fact_order.client_id = cl.client_id
        LEFT JOIN
            db2.dim_payment as payment
            ON
            fact_order.payment_id = payment.payment_id
        ;
        """

    return query_0, query_1

def extract (query, db_name):
    # create mysql connection and extract data
    
    if db_name == 'db1':
        cnx = mysql.connector.connect ( host  = db1_creds['host'],
                                        port = int(db1_creds['port']),
                                        user = db1_creds['user'],
                                        password = db1_creds['password'],
                                        database = db1_creds['database']
                                            )
    if db_name == 'db2':
        cnx = mysql.connector.connect ( host  = db2_creds['host'],
                                        port = int(db2_creds['port']),
                                        user = db2_creds['user'],
                                        password = db2_creds['password'],
                                        database = db2_creds['database']
                                            )
    
    logger.info('Connected to ALI DB !!!!')

    df = pd.read_sql(query, cnx)
    cnx.close()

    logger.info(f"Extraction DONE !!!!")
    return df              

def transform (df1, df2):
    # process data
    df1 = df1.merge(df2[['id', 'store_name']].drop_duplicates(subset='id'), 
                   how='left', 
                   left_on = df1['merchant_id'].astype(str),
                   right_on = df2['id'].astype(str))

    df1 = df1.drop(columns = ['key_0', 'id'])
    
    # change data type so it can be parsed by gspread lib
    df1 = df1.fillna('')
    df1 ['order_created_on'] = df1['order_created_on'].astype(str)

    logger.info(f"Transformation DONE !!!!")
    return df1

def load (df, gsheet_id):    
    gc = gspread.service_account_from_dict(json_gcp_creds)
    sh = gc.open_by_key(gsheet_id)
    worksheet = sh.sheet1
    worksheet.update([df.columns.values.tolist()] + df.values.tolist(), value_input_option='USER_ENTERED')
    
    logger.info(f"Load to GSHEET DONE !!!!")

def slack_notification (slack_channel, msg):
    # Authenticate to the Slack API via the generated token
    token = os.environ['SLACK_CRED']
    client = slack.WebClient(token)
    channel_id = slack_channel
    client.chat_postMessage(channel = channel_id, 
                            text = f"{dt.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} [From ALI]:    {msg}"
                            )

    logger.info(f"Slack notification DONE !!!!")

def handler(event, context):
    # evt = json.loads(event)

    list_jobs = [{"job_id": 1, "job_name":"kocek_KO_txn_daily_12AM_ALI_GSHEET"}]
    
    list_job_info = []

    for job in list_jobs:
        logger.info(f'Job_id = {job["job_id"]} started !!!!')
                    
        json_job_info = {
                         "job_id": job["job_id"],
                         "job_status": None,
                         }

        try:
            query_store = sql_query()[0]
            query_ko = sql_query()[1]
                    
            df_store = extract (query_store, 'db1')
            df_ko = extract (query_ko, 'db2')
                    
            df_ko = transform (df_ko, df_store)
                    
            load (df_ko, gsheet_id)

            json_job_info["job_status"] = "OK"
            list_job_info.append(json_job_info)
                    
            logger.info(f'Job {job["job_id"]}, {job["job_name"]} completed !!!!')

        except Exception as e:
            logger.error(f'Job {job["job_id"]}, {job["job_name"]} {e}')  
            #slack_notification (slack_channel, f'ERROR: {e}')
            json_job_info["job_status"] = f'ERROR: {e}'
            list_job_info.append(json_job_info)
            continue

    slack_msg = f'{job["job_name"]}   {str(list_job_info)}'
    slack_notification (slack_channel, slack_msg)