Sample code
import os
import logging
import json
import mysql.connector
import pandas as pd
import datetime as dt
from google.cloud import bigquery
from google.oauth2 import service_account
import slack
logger = logging.getLogger()
# To enable the initializer feature (https://help.aliyun.com/document_detail/158208.html)
# please implement the initializer function as below:
# def initializer(context):
# logger = logging.getLogger()
# logger.info('initializing')
json_gcp_creds = json.loads(os.environ['GCP_CRED'])
slack_channel = <slack_channel_id>
def sql_query (table_name, date_column_name):
# define sql query here
txn_start = dt.date.today() - dt.timedelta(days=1)
txn_end = dt.date.today()
query = f"""
SELECT
*
FROM
{table_name}
-- WHERE
-- {date_column_name} BETWEEN "{str(txn_start)}" AND "{str(txn_end)}"
;
"""
return query
def extract (query):
# create mysql connection and extract data
cnx = mysql.connector.connect ( host = os.environ['MYSQL_ENDPOINT'],
port = int(os.environ['MYSQL_PORT']),
user = os.environ['MYSQL_USER'],
password = os.environ['MYSQL_PASSWORD'],
database = os.environ['MYSQL_DBNAME']
)
logger.info('Connected to ALI DB !!!!')
df = pd.read_sql(query, cnx)
cnx.close()
logger.info(f"Extraction DONE !!!!")
return df
def transform (df, transformation_required):
# process data
if transformation_required == 'yes':
df ['client_home_city'] = df['client_home_address'].apply(lambda x: json.loads(x)['city'].strip().upper() if pd.notna(x) else 'NA')
df ['client_home_state'] = df['client_home_address'].apply(lambda x: json.loads(x)['state'].strip().upper() if pd.notna(x) else 'NA')
df = df.drop('client_home_address', axis = 1)
else:
# Convert dataframe to JSON object
json_data = df.to_json(orient = 'records', date_format='iso', date_unit='s')
json_object = json.loads(json_data)
logger.info("Transformation DONE !!!!")
return json_object
def load (json_object, project_id, dataset_id, table_id, write_disposition):
# connect to BQ
credentials = service_account.Credentials.from_service_account_info(json_gcp_creds)
client = bigquery.Client(credentials = credentials, project = project_id)
logger.info('Connected to Google BQ !!!!')
# load to BQ
full_table_name = f"{project_id}.{dataset_id}.{table_id}"
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = write_disposition
job_config.autodetect = True
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
#job_config.schema = format_schema(table_schema)
job = client.load_table_from_json(json_object, full_table_name, job_config = job_config) #https://cloud.google.com/bigquery/docs/samples/bigquery-load-table-gcs-json-autodetect
logger.info(f"Load to BQ DONE !!!! {job.result()}")
return job.result()
def slack_notification (slack_channel, msg):
# Authenticate to the Slack API via the generated token
token = os.environ['SLACK_CRED']
client = slack.WebClient(token)
channel_id = slack_channel
client.chat_postMessage(channel = channel_id,
text = f"{dt.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} [From ALI]: {msg}"
)
logger.info(f"Slack notification DONE !!!!")
def handler(event, context):
# evt = json.loads(event)
list_jobs = [{"job_id": 1, "job_name":"kocek_fact_daily_12AM_ALI_BQ", "table_name": <table_name>, "date_col": "transaction_detail_created_at", "gcp_prj": <gcp_prj_id>, "gcp_dataset": <gcp_dataset_id>, "gcp_table": <gcp_table_name>, "write_disposition": "WRITE_TRUNCATE"},
{"job_id": 2, "job_name":"kocek_fact_daily_12AM_ALI_BQ", "table_name": <table_name>, "date_col": "transaction_detail_created_at", "gcp_prj": <gcp_prj_id>, "gcp_dataset": <gcp_dataset_id>, "gcp_table": <gcp_table_name>, "write_disposition": "WRITE_TRUNCATE"}]
list_job_info = []
for job in list_jobs:
logger.info(f'Job_id = {job["job_id"]} started !!!!')
json_job_info = {
"job_id": job["job_id"],
"job_status": None,
"gcp_table": job["gcp_table"],
"bq_job_result": None
}
try:
query = sql_query (job["table_name"], job["date_col"])
df = extract (query)
json_object = transform (df, transformation_required = 'no')
bq_job_result = load (json_object, job["gcp_prj"], job["gcp_dataset"], job["gcp_table"], job["write_disposition"])
json_job_info["job_status"] = "OK"
json_job_info["bq_job_result"] = bq_job_result
list_job_info.append(json_job_info)
logger.info(f'Job {job["job_id"]}, {job["job_name"]} completed !!!!')
except Exception as e:
logger.error(f'Job {job["job_id"]}, {job["job_name"]} {e}')
#slack_notification (slack_channel, f'ERROR: {e}')
json_job_info["job_status"] = f'ERROR: {e}'
list_job_info.append(json_job_info)
continue
slack_msg = f'{job["job_name"]} {str(list_job_info)}'
slack_notification (slack_channel, slack_msg)
import sys
import pandas as pd
import json
import datetime as dt
from google.cloud import bigquery
from google.oauth2 import service_account
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent,
)
json_gcp_creds = json.loads(os.environ['GCP_CRED'])
slack_channel = 'lalala'
json_db_creds = {"host" : os.environ['MYSQL_ENDPOINT'],
"port" : int(os.environ['MYSQL_PORT']),
"user" : os.environ['MYSQL_USER'],
"password" :os.environ['MYSQL_PASSWORD'],
"database" : os.environ['MYSQL_DBNAME'}
# create stream reading binlog files
stream = BinLogStreamReader(
connection_settings = json_db_creds,
server_id = 2216855073, # get server-d from mysql d
resume_stream = True,
log_file = 'mysql-bin.003618',
log_pos = 4,
only_events = [DeleteRowsEvent, WriteRowsEvent, Up
only_tables = ['dim_client'], # a list with tables
#skip_to_timestamp = int(pd.to_datetime(dt.date.tod
ignored_tables = None,
)
modified_event = []
for binlogevent in stream:
for row in binlogevent.rows:
if isinstance(binlogevent, UpdateRowsEvent):
event = row["after_values"]
modified_event.append(event)
elif isinstance(binlogevent, WriteRowsEvent):
event = row["values"]
modified_event.append(event)
elif isinstance(binlogevent, DeleteRowsEvent):
event = row["values"]
modified_event.append(event)
sys.stdout.flush()
stream.close()
# there will be duplicate records each time a record is updated
# we want to remove duplicates and keep only records with latest 'client_update
if modified_event != []:
df = pd.DataFrame(modified_event)
else:
print(f"no event taken place since {dt.datetime.now().strftime('%Y-%m-%d %
df = df.sort_values(by='client_updated_dt').drop_duplicates(subset='client_id'
json_data = df.to_json(orient = 'records', date_format='iso', date_unit='s')
json_object = json.loads(json_data)