Change Data Capture (CDC): MySQL to BigQuery using Binary Logs Replication

Abdulquadri Ayodeji Oshoare
14 min readAug 7, 2023
Architecture Diagram

Introduction

This article explains how Binary Log Replication can be used to perform CDC when moving data from MySQL to BigQuery.

The following tools will be used in this project:

Source: MySQL Database (Locally or on Cloud)
Destination: BigQuery
Orchestrator: Airflow
Container: Docker and Docker Compose installed on the computer
Dataset: Ny_taxi yellow_trips_data

What is Change Data Capture (CDC)?

CDC is a method of capturing database events. The events can be one or all of the following (create, insert, update, delete). CDC is an efficient method for replicating databases because it ensures that data replication is near real-time.

MySQL like many other relational databases handles CDC differently.
It stores database events in logs (Binlog) that can be pulled to ingest large volumes of data.

Getting Started

# Project Structure
root/
├──airflow_image
│ └── Dockerfile
│ └── requirements.txt
├── dags/
│ └── package/
│ └── config.py
│ └── full_load.py
│ └── incremental_load_cdc.py
├── data/
│ └── yellow_tripdata_2021-01.csv.gz
├── logs/
├── .env
├── plugins
├── load_data_to_db.py
├── load_incremental_to_db.py
├── my.cnf
└── docker-compose.yml

Create the files and folder following the project structure above.
The following files are important for Binary log replication.

MySQL uses a configuration file called my.cnf that stores various settings and parameters for the database server. It is used to control the behavior and performance of the database server.
Contents of my.cnf

[mysqld]
server-id = 1
log_bin = bin.log
expire_logs_days = 10
max_binlog_size = 100M
binlog-format = row #Very important if you want to receive write, update, and delete row events

Content of requirements.txt

google-auth
google-cloud-bigquery
google-cloud-bigquery-storage
pandas
pandas-gbq
mysql-replication
protobuf
pymysql
mysql-connector-python

Contents of .env

AIRFLOW_UID=1000
AIRFLOW_GID=0

Contents of Dockerfile

# Base image
FROM apache/airflow:2.6.3
# User airflow
USER airflow
# Upgrade pip
RUN pip install --upgrade pip
# Copy requirements.txt
COPY requirements.txt .
# Install python packages
RUN pip install -r requirements.txt

Contents of docker-compose file

version: '3'
x-airflow-common:
&airflow-common
build: airflow_image/
environment:
&airflow-common-env
LD_PRELOAD: /usr/lib/x86_64-linux-gnu/libstdc++.so.6
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# For backward compatibility, with Airflow <2.3
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
redis:
image: redis:latest
expose:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
mysqldb:
condition: service_healthy
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
environment:
<<: *airflow-common-env
# Required to handle warm shutdown of the celery workers properly
# See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
DUMB_INIT_SETSID: "0"
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
# yamllint disable rule:line-length
command:
- -c
- |
function ver() {
printf "%04d%04d%04d%04d" $${1//./ }
}
airflow_version=$$(gosu airflow airflow version)
airflow_version_comparable=$$(ver $${airflow_version})
min_airflow_version=2.2.0
min_airflow_version_comparable=$$(ver $${min_airflow_version})
if (( airflow_version_comparable < min_airflow_version_comparable )); then
echo
echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
echo
exit 1
fi
if [[ -z "${AIRFLOW_UID}" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
echo "If you are on Linux, you SHOULD follow the instructions below to set "
echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
echo "For other operating systems you can get rid of the warning with manually created .env file:"
echo " See: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#setting-the-right-airflow-user"
echo
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $$4}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo
echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
echo
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
echo
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
echo
warning_resources="true"
fi
if [[ $${warning_resources} == "true" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of resources available:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin"
echo
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
# yamllint enable rule:line-length
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
user: "0:0"
volumes:
- .:/sources
airflow-cli:
<<: *airflow-common
profiles:
- debug
environment:
<<: *airflow-common-env
CONNECTION_CHECK_MAX_COUNT: "0"
# Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
command:
- bash
- -c
- airflow
flower:
<<: *airflow-common
command: celery flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
mysqldb:
image: mysql:5.7
environment:
MYSQL_ALLOW_EMPTY_PASSWORD: "yes"
MYSQL_DATABASE: trips_db
container_name: mysql_server
volumes:
- "./mysql_data:/var/lib/mysql:rw"
- "./my.cnf:/etc/mysql/conf.d/my.cnf"
- ./logs/mysql:/var/log/mysql
ports:
- 3306:3306
healthcheck:
test: ["CMD", "mysqladmin" ,"ping", "-h", "localhost"]
interval: 10s
timeout: 20s
retries: 10
restart: always

networks:
default:
name: airflow-network
volumes:
postgres-db-volume:

In the root of the container run docker compose up and wait some minutes for the container to be provisioned.

Run “docker container ps” to confirm that all containers are running and healthy.

docker container running

To login to airflow, go to localhost:8080 on your browser.

username: airflow
password: airflow

airflow login

Load Data into MySQL data

To load data into the MySQL database, let’s create a Python script called load_full_data.py and paste the following codes.

import pandas as pd 
import pymysql
from sqlalchemy import create_engine
import MySQLdb
host = "localhost"
user = "root"
password = ""
db = "trips_db"

# Construct the connection URL with an empty password
connection_url = f"mysql+mysqldb://{user}:{password}@{host}/{db}"
# Create an SQLAlchemy engine and connect to the database
engine = create_engine(connection_url)
conn = engine.connect()
df = pd.read_csv("./data/yellow_tripdata_2021-01.csv.gz",nrows=1000,compression='gzip')
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
df.to_sql(name="yellow_tripdata",con=conn,index=False)

The script connects to MySQL using create_engine and loads the first 1000 data into the trips_db database and yellow_tripdata table.

To confirm that the data has been successfully loaded, enter the MySQL terminal on docker by running:

docker exec -it <mysql-container-id> /bin/bash

log into the database by running “mysql -uroot”, after login, type “use trips_db” and run the SQL command below. You should get the result in the image below.

SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'trips_db' AND table_name = 'yellow_tripdata';
testing DB

Turn Binary Log Replication ON

In order for Binary log replication to work, you need to provide read access to the log folder.

Enter the terminal of the MySQL container and run the following bash commands to provide permission on the folder.

chmod 644 /etc/mysql/*

Restart the MySQL container after running the command above. To confirm that Binary log replication has been enabled, enter the MySQL terminal again and run the commands below.

mysql -uroot
set @@global.show_compatibility_56=ON;
select variable_value as "BINARY LOGGING STATUS (log_bin) :: "
from information_schema.global_variables where variable_name='log_bin';

You should get the result below after running the last command.

Binary replication

Connect Airflow to MySQL

For airflow to communicate with MySQL DB, we need to create a connection using the airflow network that was created using the docker-compose file. Run the commands below to get the network gateway.

docker network inspect airflow-network

Copy the network gateway as shown below. This will be used to create the MySQL connection.

docker network

Go to the airflow UI >> Admin >> connections on localhost:8080.
Create a new MySQL connection and add the following details below.

airflow connection page

Make sure to test the connection and ensure that it connects successfully.

Set up BigQuery and Service account

In order to connect to BigQuery, we need to set up a GCP project, create a BigQuery dataset, and set up a service account.

If you don’t have a GCP account, you can create one using the link below and get $300 cloud credits that can be used for 90 days.

Cloud Computing Services | Google Cloud

In the Google Cloud console, search for BigQuery. Create the BigQuery dataset with the name “trips_db”.

To set up a service account, Navigate to IAM-Admin > service account in the Google Cloud console.

Create the service account with the desired name and assign the BigQuery Admin role to the service account.

Download the <service-account-name>.json and place it under the dags folder.

Note: Always add service accounts to .gitignore

Set up Airflow Dags

Now that we’ve successfully created airflow containers, it’s time to create our first dag and move data from the MySQL database to BigQuery.

Under the dags/package folder, create a config.py file.
This will be used to store our variables.

class CONFIG():
PROJECT_ID = "marine-fusion-394105"
TABLE_NAME = "yellow_tripdata"
INCREMENTAL_TABLE_NAME = "yellow_tripdata_incremental"
DATASET_REF = "trips_db"
DB = "trips_db"

Note: in a production environment kindly use airflow variable

Create a full_load.py file under the dags folder and copy the codes below into it.

import time
from datetime import datetime,timedelta
import os
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
from google.cloud import bigquery
from google.oauth2 import service_account
# Import other packages
from packages.config import CONFIG
# Set Google Cloud credentials file path
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/opt/airflow/dags/keys.json'

# Define default_args dictionary for the DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 15),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
# Instantiate the DAG
dag = DAG(
'full_copy',
default_args=default_args,
description='DAG to extract data from MySQL and load it into Google BigQuery',
schedule_interval="0 23 * * 0",
tags=['load_gcp'],
catchup=False,
)
# Define the SQL extract task
def check_table_exist():
try:
hook = MySqlHook(mysql_conn_id="mysql_conn_id", schema=CONFIG.DB)
query = f"""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = '{CONFIG.DB}' AND table_name = '{CONFIG.TABLE_NAME}'
"""
records = hook.get_records(sql=query)
print(records)
tbl_name = records[0]
return tbl_name
except Exception as e:
print("Data extract error: " + str(e))
check_table_exist_task = PythonOperator(
task_id='sql_extract',
python_callable=check_table_exist,
dag=dag,
)
#check bigquery table exist
def create_bigquery_table():
try:
client = bigquery.Client()
dataset_ref = client.dataset(CONFIG.DATASET_REF)
table_ref = dataset_ref.table(CONFIG.TABLE_NAME)
schema = [] # Define your schema here

table = bigquery.Table(table_ref, schema=schema)
table = client.create_table(table, exists_ok=True) # Create if not exists
print(f"BigQuery table {CONFIG.TABLE_NAME} created or already exists.")
except Exception as e:
print("BigQuery table creation error: " + str(e))
create_bigquery_table_task = PythonOperator(
task_id='create_bigquery_table',
python_callable=create_bigquery_table,
dag=dag,
)
# Define the GCP load task
def gcp_load(tbl_name):
try:
client = bigquery.Client()
job_config = bigquery.job.LoadJobConfig()
project_id = CONFIG.PROJECT_ID
dataset_ref = CONFIG.DATASET_REF
TABLE = CONFIG.TABLE_NAME
table_id = f"{project_id}.{dataset_ref}.{TABLE}"
rows_imported = 0
job_config = bigquery.job.LoadJobConfig()
# Set write_disposition parameter as WRITE_APPEND for appending to the table
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
hook = MySqlHook(mysql_conn_id="mysql_conn_id", schema=CONFIG.DB)
for table_name in tbl_name:
# print(table_name)
query = f'SELECT * FROM {table_name}'
df = hook.get_pandas_df(query)
print(f'Importing rows {rows_imported} to {rows_imported + len(df)}... for table {table_name}')

# Then proceed with the data loading
job = client.load_table_from_dataframe(df, table_id, job_config=job_config)
job.result()
table = client.get_table(table_id) # Make an API request.
print(
f"Loaded {table.num_rows} rows and {len(table.schema)} columns to {table_id}"
)
except Exception as e:
print("Data load error: " + str(e))
gcp_load_task = PythonOperator(
task_id='gcp_load',
python_callable=gcp_load,
op_args=[check_table_exist_task.output],
dag=dag,
)
check_table_exist_task >> create_bigquery_table_task >> gcp_load_task

The first dag has 3 tasks. The first task checks the MySQL db if the table we want to pull from exits. The second task checks if The BigQuery table exists and creates it if it doesn’t exist. Then last task loads the data from MySQL to BigQuery.
Let’s see what this looks like in the airflow UI and run it:

full_copy_dag

Cool, looks like the first dag has loaded data into the BigQuery table.

BigQuery Full load

Incremental dag using CDC

Let’s create the second dag which will load incremental data to BigQuery.
Create an incremental_load_cdc.py file and copy the content below.

import time
from datetime import datetime, timedelta, date
import pytz
import os
import json
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import mysql.connector
from mysql.connector import MySQLConnection
from mysql.connector import cursor
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent
)
from airflow.providers.mysql.hooks.mysql import MySqlHook
from google.cloud import bigquery
from google.oauth2 import service_account
# Import other packages
from packages.config import CONFIG
# Set Google Cloud credentials file path
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/opt/airflow/dags/keys.json'
# Define default_args dictionary for the incremental DAG
default_args_incremental = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 7, 15),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
# Instantiate the incremental DAG
dag_incremental = DAG(
'gcp_extract_and_load_incremental',
default_args=default_args_incremental,
description='Incremental DAG to extract data from MySQL and load it into Google BigQuery',
schedule_interval="*/5 * * * *", # Schedule it every 5 minutes
catchup=False,
tags=['load_gcp', 'incremental'],
)
# Define the SQL extract task
def check_table_exist():
try:
hook = MySqlHook(mysql_conn_id="mysql_conn_id", schema=CONFIG.DB)
query = f"""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = '{CONFIG.DB}' AND table_name = '{CONFIG.TABLE_NAME}'
"""
records = hook.get_records(sql=query)
print(records)
tbl_name = records[0][0]
if tbl_name == CONFIG.TABLE_NAME:
return True
else:
return False
except Exception as e:
print("Data extract error: " + str(e))
check_table_exist_task = PythonOperator(
task_id='check_table_exist',
python_callable=check_table_exist,
dag=dag_incremental,
)
# Function to extract incremental data from MySQL using Binary Log Reader
def extract_incremental_data(**kwargs):
task_instance = kwargs['task_instance']
mysql_check_result = task_instance.xcom_pull(task_ids='check_table_exist')

if mysql_check_result:
try:
# Connect to MySQL database using the Airflow MySQL Hook
mysql_hook = MySqlHook(mysql_conn_id="mysql_conn_id")
conn: MySQLConnection = mysql_hook.get_conn()
cursor: cursor = conn.cursor()
###################################
# Retrieve the connection details as a dictionary
connection_details = mysql_hook.get_connection(conn_id="mysql_conn_id")
# Access individual elements (host, port, user, passwd) from the dictionary
host = connection_details.host
port = connection_details.port
user = connection_details.login
passwd = connection_details.password
# Initialize a list to store the modified rows
modified_rows = []

mysql_settings = {'host': host, 'port': port, 'user': user, 'passwd': ''}
# Create the BinLogStreamReader
stream = BinLogStreamReader(
connection_settings=mysql_settings,
server_id=100,
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
#blocking=True,
#resume_stream=True,
#log_file='mysql-bin.000001',
#log_pos=binlog_pos,
)
# Read the Binary Log and identify modified rows
for binlogevent in stream:
for row in binlogevent.rows:
if isinstance(binlogevent, DeleteRowsEvent):
modified_rows.append({key: str(value) if isinstance(value, date) else value for key, value in row["values"].items()})
elif isinstance(binlogevent, UpdateRowsEvent):
modified_rows.append({key: str(value) if isinstance(value, date) else value for key, value in row["after_values"].items()})
elif isinstance(binlogevent, WriteRowsEvent):
modified_rows.append({key: str(value) if isinstance(value, date) else value for key, value in row["values"].items()})
#close stream
stream.close()
cursor.close()
conn.close()
return modified_rows
except Exception as e:
print("Data extract error: " + str(e))
return []
extract_incremental_data_task = PythonOperator(
task_id='extract_incremental_data',
python_callable=extract_incremental_data,
provide_context=True,
dag=dag_incremental,
)
#check bigquery table exist
def create_bigquery_table():
try:
client = bigquery.Client()
dataset_ref = client.dataset(CONFIG.DATASET_REF)
table_ref = dataset_ref.table(CONFIG.INCREMENTAL_TABLE_NAME)

table = bigquery.Table(table_ref)
table = client.create_table(table, exists_ok=True) # Create if not exists
print(f"BigQuery table {CONFIG.INCREMENTAL_TABLE_NAME} created or already exists.")
except Exception as e:
print("BigQuery table creation error: " + str(e))
create_bigquery_table_task = PythonOperator(
task_id='create_bigquery_table',
python_callable=create_bigquery_table,
dag=dag_incremental,
)

# Modify the GCP load task to use WRITE_APPEND to add only the changes
def gcp_load_incremental(**kwargs):

try:
ti = kwargs['ti']
data = ti.xcom_pull(task_ids='extract_incremental_data')
if not data:
print("No data changes found.")
return
client = bigquery.Client()
job_config = bigquery.job.LoadJobConfig()
project_id = CONFIG.PROJECT_ID
dataset_ref = CONFIG.DATASET_REF
TABLE = CONFIG.INCREMENTAL_TABLE_NAME
table_id = f"{project_id}.{dataset_ref}.{TABLE}"
job_config = bigquery.job.LoadJobConfig()
# Set write_disposition parameter as WRITE_APPEND for appending only the changes
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
print(f"Importing {len(data)} rows to {table_id}...")
# Proceed with the data loading using the modified data
job = client.load_table_from_json(data, table_id, job_config=job_config)
job.result()
print(f"Loaded {len(data)} rows to {table_id}")
except Exception as e:
print("Data load error: " + str(e))
gcp_load_incremental_task = PythonOperator(
task_id='gcp_load_incremental',
python_callable=gcp_load_incremental,
provide_context=True,
dag=dag_incremental,
)
# Set task dependencies for the incremental DAG
check_table_exist_task >> extract_incremental_data_task >> create_bigquery_table_task >> gcp_load_incremental_task

The incremental dag has four tasks. The first task checks if the Table to extract data exist. The second task, extract database changes from the Binary log. The third task checks if the BigQuery table exists and creates it. The final task loads the events extracted from the log to BigQuery.

The events we are extracting from the logs are the DeleteRowsEvent,
the UpdateRowsEvent, and the WriteRowsEvent.

Before we run the incremental dag file, let’s simulate events to the MySQL database.

I’ve created a load_incremental_to_db.py scripts that loads data every five minutes to the MySQL database.

import pandas as pd
import pymysql
from sqlalchemy import create_engine
import MySQLdb
import time
host = "localhost"
user = "root"
password = ""
db = "trips_db"
# Construct the connection URL with an empty password
connection_url = f"mysql+mysqldb://{user}:{password}@{host}/{db}"
# Create an SQLAlchemy engine and connect to the database
engine = create_engine(connection_url)
conn = engine.connect()
chunk_size = 1000 # Number of rows to load per iteration
total_iterations = 100 # Number of iterations
for iteration in range(total_iterations):
start_row = iteration * chunk_size
df = pd.read_csv("./data/yellow_tripdata_2021-01.csv.gz",
skiprows=range(1, start_row + 1), # Skip to the next batch of data
nrows=chunk_size,
compression='gzip')
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
# Load the batch into the database
df.to_sql(name="yellow_tripdata", con=conn, index=False, if_exists='append')
print(f"loading {iteration} batch")
# Sleep for 5 minutes
time.sleep(30) # 300 seconds = 5 minutes

Run the load_incremental_to_db.py on your local computer and start the incremental dags.

Let’s see how the incremental dag looks like in the airflow UI and run it:

airflow incremental dags

Cool, it worked as we can see in the image below.

For the purpose of this tutorial, the full and incremental dags loaded data into separate tables. In a real-world scenario, the destination depends on your business.

Conclusion

That’s it guys, we have successfully used CDC to replicate data from MySQL to BigQuery.
You should have your full and incremental dags running by now.
I hope you found this tutorial interesting. Please share and remember to comment with your suggestions or feedback.

Don’t forget to clap and follow me on LinkedIn for posts on Data Engineering.

Cheers!!!

--

--

Abdulquadri Ayodeji Oshoare

Data Scientist | Machine Learning Enthusiast | Digital Cartoonist