Sample code
import os
import logging
import json
import mysql.connector
import pandas as pd
import datetime as dt
import gspread
import slack
logger = logging.getLogger()
# To enable the initializer feature (https://help.aliyun.com/document_detail/158208.html)
# please implement the initializer function as below:
# def initializer(context):
# logger = logging.getLogger()
# logger.info('initializing')
json_gcp_creds = json.loads(os.environ['GCP_CRED'])
db1_creds = json.loads(os.environ['DB1_CRED'])
db2_creds = json.loads(os.environ['DB2_CRED'])
slack_channel = <slack_channel_id>
gsheet_id = <gsheet_id>
def sql_query ():
# define sql query here
query_0 = f"""
SELECT
*
FROM
db1.dim_coin_deposit_merchant
;
"""
query_1 = f"""
SELECT
fact_order.order_id,
fact_order.order_created_on,
fact_order.order_amount,
fact_order.client_id,
fact_order.merchant_id,
fact_order.order_type,
fact_order.order_status,
cl.client_name,
cl.client_email,
cl.client_phone,
cl.client_phone_prefix,
cl.client_is_guest,
payment.payment_service_charge_amount
FROM
db2.fact_order as fact_order
LEFT JOIN
db2.dim_client as cl
ON
fact_order.client_id = cl.client_id
LEFT JOIN
db2.dim_payment as payment
ON
fact_order.payment_id = payment.payment_id
;
"""
return query_0, query_1
def extract (query, db_name):
# create mysql connection and extract data
if db_name == 'db1':
cnx = mysql.connector.connect ( host = db1_creds['host'],
port = int(db1_creds['port']),
user = db1_creds['user'],
password = db1_creds['password'],
database = db1_creds['database']
)
if db_name == 'db2':
cnx = mysql.connector.connect ( host = db2_creds['host'],
port = int(db2_creds['port']),
user = db2_creds['user'],
password = db2_creds['password'],
database = db2_creds['database']
)
logger.info('Connected to ALI DB !!!!')
df = pd.read_sql(query, cnx)
cnx.close()
logger.info(f"Extraction DONE !!!!")
return df
def transform (df1, df2):
# process data
df1 = df1.merge(df2[['id', 'store_name']].drop_duplicates(subset='id'),
how='left',
left_on = df1['merchant_id'].astype(str),
right_on = df2['id'].astype(str))
df1 = df1.drop(columns = ['key_0', 'id'])
# change data type so it can be parsed by gspread lib
df1 = df1.fillna('')
df1 ['order_created_on'] = df1['order_created_on'].astype(str)
logger.info(f"Transformation DONE !!!!")
return df1
def load (df, gsheet_id):
gc = gspread.service_account_from_dict(json_gcp_creds)
sh = gc.open_by_key(gsheet_id)
worksheet = sh.sheet1
worksheet.update([df.columns.values.tolist()] + df.values.tolist(), value_input_option='USER_ENTERED')
logger.info(f"Load to GSHEET DONE !!!!")
def slack_notification (slack_channel, msg):
# Authenticate to the Slack API via the generated token
token = os.environ['SLACK_CRED']
client = slack.WebClient(token)
channel_id = slack_channel
client.chat_postMessage(channel = channel_id,
text = f"{dt.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} [From ALI]: {msg}"
)
logger.info(f"Slack notification DONE !!!!")
def handler(event, context):
# evt = json.loads(event)
list_jobs = [{"job_id": 1, "job_name":"kocek_KO_txn_daily_12AM_ALI_GSHEET"}]
list_job_info = []
for job in list_jobs:
logger.info(f'Job_id = {job["job_id"]} started !!!!')
json_job_info = {
"job_id": job["job_id"],
"job_status": None,
}
try:
query_store = sql_query()[0]
query_ko = sql_query()[1]
df_store = extract (query_store, 'db1')
df_ko = extract (query_ko, 'db2')
df_ko = transform (df_ko, df_store)
load (df_ko, gsheet_id)
json_job_info["job_status"] = "OK"
list_job_info.append(json_job_info)
logger.info(f'Job {job["job_id"]}, {job["job_name"]} completed !!!!')
except Exception as e:
logger.error(f'Job {job["job_id"]}, {job["job_name"]} {e}')
#slack_notification (slack_channel, f'ERROR: {e}')
json_job_info["job_status"] = f'ERROR: {e}'
list_job_info.append(json_job_info)
continue
slack_msg = f'{job["job_name"]} {str(list_job_info)}'
slack_notification (slack_channel, slack_msg)