Written by 13:05 Database administration, Work with data

Implementing Incremental Load using Change Data Capture in SQL Server

This article will be interesting to those who often have to deal with data integration.

Introduction

Assume that there is a database where users always modify data (update or remove). Perhaps, this database is used by a large application that does not allow modifying the table structure. The task is to load data from this database to another database on a different server from time to time. The simplest way to tackle the problem is to load the new data from a source database to a target database with preliminary cleaning up the target database. You can use this method as long as the time to load data is acceptable and does not exceed preset deadlines. What if it takes several days to load data? In addition, unstable communication channels lead to the situation when data load stops and restarts. If you face these obstacles, I suggest considering one of the ‘data reloading’ algorithms. It means that only data modifications occurred since the latest load are loaded.

CDC

In SQL Server 2008, Microsoft introduced a data tracking mechanism called Change Data Capture (CDC). Broadly speaking, the aim of this mechanism is that enabling CDC for any database table will create a system table in the same database with a similar name as the original table has (the schema will be as follows: ‘cdc’ as a prefix plus the old schema name plus ”_” and the end “_CT”. For example, the original table is dbo.Example, then the system table will be called cdc.dbo_Example_CT). It will store all the data that has been modified.

Actually, to dig deeper in CDC, consider the example. But first, make sure that SQL Agent that uses CDC works on the SQL Server test instance.

In addition, we are going to consider a script that creates a database and test table, populates this table with data and enables CDC for this table.

To understand and simplify the task, we will use one SQL Server instance without distributing the source and target databases to different servers.

use master
go
-- create a source database
if not exists (select * from sys.databases where name = 'db_src_cdc')
	create database db_src_cdc
go
use db_src_cdc
go
-- enable CDC if it is disabled
if not exists (select * from sys.databases where name = db_name() and is_cdc_enabled=1)
	exec sys.sp_cdc_enable_db
go
-- create a role for tables with CDC
if not exists(select * from sys.sysusers where name = 'CDC_Reader' and issqlrole=1)
	create role CDC_Reader
go
-- create a table
if object_id('dbo.Example','U') is null
	create table dbo.Example
	(
		ID int identity constraint PK_Example primary key,
		Title varchar(200) not null
	)
go
-- populate the table
insert dbo.Example (Title) values
('One'),('Two'),('Three'),('Four'),('Five');
go
-- enable CDC for the table
if not exists (select * from sys.tables where is_tracked_by_cdc = 1 and name = 'Example')
	exec sys.sp_cdc_enable_table
		@source_schema = 'dbo',
		@source_name = 'Example',
		@role_name = 'CDC_Reader'
go
-- populate the table with some data. We will change or delete something
update dbo.Example
set Title = reverse(Title)
where ID in (2,3,4);

delete from dbo.Example where ID in (1,2);

set identity_insert dbo.Example on;
insert dbo.Example (ID, Title) values
(1,'One'),(6,'Six');
set identity_insert dbo.Example off;
go

Now, let’s look at what we have after executing this script in the dbo.Example and cdc.dbo_Example_CT tables (it should be noted that CDC is asynchronous. Data is populated into the tables where the change tracking is stored after a certain period of time).

select * from dbo.Example;
ID Title
---- ----------------------
   1 One
   3 eerhT 
   4 ruoF 
   5 Five 
   6 Six
select
	row_number() over
		(
			partition by ID
			order by
				__$start_lsn desc,
				__$seqval desc
		) as __$rn,
	*
from cdc.dbo_Example_CT;
__$rn __$start_lsn           __$end_lsn  __$seqval              __$operation __$update_mask    ID Title
------ ---------------------- ----------- ---------------------- ------------ ---------------- --- -----------
     1 0x0000003A000000580005 NULL        0x0000003A000000580003            2 0x03               1 One
     2 0x0000003A000000560006 NULL        0x0000003A000000560002            1 0x03               1 One
     1 0x0000003A000000560006 NULL        0x0000003A000000560005            1 0x03               2 owT
     2 0x0000003A000000540005 NULL        0x0000003A000000540002            3 0x02               2 Two
     3 0x0000003A000000540005 NULL        0x0000003A000000540002            4 0x02               2 owT
     1 0x0000003A000000540005 NULL        0x0000003A000000540003            3 0x02               3 Three
     2 0x0000003A000000540005 NULL        0x0000003A000000540003            4 0x02               3 eerhT
     1 0x0000003A000000540005 NULL        0x0000003A000000540004            3 0x02               4 Four
     2 0x0000003A000000540005 NULL        0x0000003A000000540004            4 0x02               4 ruoF
     1 0x0000003A000000580005 NULL        0x0000003A000000580004            2 0x03

Consider in details the table structure in which change tracking is stored. The __ $start_lsn and __ $seqval fields are LSN (log sequence number in the database) and the transaction number within the transaction respectively. There is an important property in these fields, namely, we can be sure that the record with a higher LSN will be performed later. Due to this property, we can easily get the latest state of each record in the query, filtering our selection by the condition – where __ $ rn = 1.

The __$operation field contains the transaction code:

  • 1 – the record is deleted
  • 2 – the record is inserted
  • 3, 4 – the record is updated. The old data before update is 3, the new data is 4.

In addition to service fields with prefix «__$», the fields of the original table are completely duplicated. This information is enough for us to proceed to the incremental load.

Setting up a database to data loading

Create a table in our test target database, into which data will be loaded, as well as an additional table to store data about the load log.

use master
go
-- create a target database 
if not exists (select * from sys.databases where name = 'db_dst_cdc')
	create database db_dst_cdc
go
use db_dst_cdc
go
-- create a table
if object_id('dbo.Example','U') is null
	create table dbo.Example
	(
		ID int constraint PK_Example primary key,
		Title varchar(200) not null
	)
go
-- create a table to store the load log
if object_id('dbo.log_cdc','U') is null
	create table dbo.log_cdc
	(
		table_name nvarchar(512) not null,
		dt datetime not null default getdate(),
		lsn binary(10) not null default(0x0),
		constraint pk_log_cdc primary key (table_name,dt desc)
	)
go

I would like to draw your attention to the fields of the LOG_CDC table:

  • TABLE_NAME stores information about what table was loaded (it is possible to load several tables in the future, from different databases or even from different servers; the table format is ‘SERVER_NAME.DB_NAME.SCHEMA_NAME.TABLE_NAME’
  • DT is a field of the loading date and time, which is optional for the incremental load. However, it will be useful for auditing loading.
  • LSN – after a table is loaded, we need to store information about the place where to start the next load, if required. Accordingly, after each load, we add the latest (maximum) __ $ start_lsn into this column.

Algorithm for data loading

As described above, using the query, we can get the latest state of the table with the help of window functions. If we know LSN of the latest load, the next time we load we can filter from the source all the data, the changes of which are higher than the stored LSN, if there was at least one complete previous load:

with incr_Example as
(
	select
		row_number() over
			(
				partition by ID
				order by
					__$start_lsn desc,
					__$seqval desc
			) as __$rn,
		*
	from db_src_cdc.cdc.dbo_Example_CT
	where
		__$operation <> 3 and
		__$start_lsn > @lsn
)
select * from incr_Example

Then, we can get all the records for the complete load, if the load LSN is not stored:

with incr_Example as
(
	select
		row_number() over
			(
				partition by ID
				order by
					__$start_lsn desc,
					__$seqval desc
			) as __$rn,
		*
	from db_src_cdc.cdc.dbo_Example_CT
	where
		__$operation <> 3 and
		__$start_lsn > @lsn
)
, full_Example as
(
	select *
	from db_src_cdc.dbo.Example
	where @lsn is null
)
select
	ID, Title, __$operation
from incr_Example
where __$rn = 1
union all
select
	ID, Title, 2 as __$operation
from full_Example

Thus, depending on the @LSN value, this query will display either all the latest changes (bypassing the interim ones) with the status Removed or not, or all the data from the original table, adding status 2 (new record) – this field is used only for unifying two selections. With this query, we can easily implement either full load or reload using the MERGE command (starting with SQL 2008 version).

To avoid bottlenecks that can create alternative processes and to load matched data from different tables (in the future, we will load several tables and, possibly, there may be relational relations between them), I suggest using a DB snapshot on the source database (another SQL 2008 feature).

The full text of the load is as follows:

[expand title=”Code”]

/*
	Algorithm of data loading
*/
-- create a database snapshot
if exists (select * from sys.databases where name = 'db_src_cdc_ss' )
	drop database db_src_cdc_ss;

declare
	@query nvarchar(max);

select
	@query = N'create database db_src_cdc_ss on ( name = N'''+name+
		''', filename = N'''+[filename]+'.ss'' ) as snapshot of db_src_cdc'
from db_src_cdc.sys.sysfiles where groupid = 1;

exec ( @query );

-- read LSN from the previous load
declare @lsn binary(10) =
	(select max(lsn) from db_dst_cdc.dbo.log_cdc
	where table_name = 'localhost.db_src_cdc.dbo.Example');

-- clear a table before the complete load
if @lsn is null truncate table db_dst_cdc.dbo.Example;

-- load process
with incr_Example as
(
	select
		row_number() over
			(
				partition by ID
				order by
					__$start_lsn desc,
					__$seqval desc
			) as __$rn,
		*
	from db_src_cdc_ss.cdc.dbo_Example_CT
	where
		__$operation <> 3 and
		__$start_lsn > @lsn
)
, full_Example as
(
	select *
	from db_src_cdc_ss.dbo.Example
	where @lsn is null
)
, cte_Example as
(
	select
		ID, Title, __$operation
	from incr_Example
	where __$rn = 1
	union all
	select
		ID, Title, 2 as __$operation
	from full_Example
)
merge db_dst_cdc.dbo.Example as trg using cte_Example as src on trg.ID=src.ID
when matched and __$operation = 1 then delete
when matched and __$operation <> 1 then update set trg.Title = src.Title
when not matched by target and __$operation <> 1 then insert (ID, Title) values (src.ID, src.Title);

-- mark the end of the load process and the latest LSN
insert db_dst_cdc.dbo.log_cdc (table_name, lsn)
values ('localhost.db_src_cdc.dbo.Example', isnull((select max(__$start_lsn) from db_src_cdc_ss.cdc.dbo_Example_CT),0))

-- delete the database snapshot
if exists (select * from sys.databases where name = 'db_src_cdc_ss' )
	drop database db_src_cdc_ss

[/expand]

Tags: , Last modified: September 22, 2021
Close