This article will be interesting to those who often have to deal with data integration.
Assume that there is a database where users always modify data (update or remove). Perhaps, this database is used by a large application that does not allow modifying the table structure. The task is to load data from this database to another database on a different server from time to time. The simplest way to tackle the problem is to load the new data from a source database to a target database with preliminary cleaning up the target database. You can use this method as long as the time to load data is acceptable and does not exceed preset deadlines. What if it takes several days to load data? In addition, unstable communication channels lead to the situation when data load stops and restarts. If you face these obstacles, I suggest considering one of the ‘data reloading’ algorithms. It means that only data modifications occurred since the latest load are loaded.
In SQL Server 2008, Microsoft introduced a data tracking mechanism called Change Data Capture (CDC). Broadly speaking, the aim of this mechanism is that enabling CDC for any database table will create a system table in the same database with a similar name as the original table has (the schema will be as follows: ‘cdc’ as a prefix plus the old schema name plus ”_” and the end “_CT”. For example, the original table is dbo.Example, then the system table will be called cdc.dbo_Example_CT). It will store all the data that has been modified.
Actually, to dig deeper in CDC, consider the example. But first, make sure that SQL Agent that uses CDC works on the SQL Server test instance.
In addition, we are going to consider a script that creates a database and test table, populates this table with data and enables CDC for this table.
To understand and simplify the task, we will use one SQL Server instance without distributing the source and target databases to different servers.
use master go -- create a source database if not exists (select * from sys.databases where name = 'db_src_cdc') create database db_src_cdc go use db_src_cdc go -- enable CDC if it is disabled if not exists (select * from sys.databases where name = db_name() and is_cdc_enabled=1) exec sys.sp_cdc_enable_db go -- create a role for tables with CDC if not exists(select * from sys.sysusers where name = 'CDC_Reader' and issqlrole=1) create role CDC_Reader go -- create a table if object_id('dbo.Example','U') is null create table dbo.Example ( ID int identity constraint PK_Example primary key, Title varchar(200) not null ) go -- populate the table insert dbo.Example (Title) values ('One'),('Two'),('Three'),('Four'),('Five'); go -- enable CDC for the table if not exists (select * from sys.tables where is_tracked_by_cdc = 1 and name = 'Example') exec sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'Example', @role_name = 'CDC_Reader' go -- populate the table with some data. We will change or delete something update dbo.Example set Title = reverse(Title) where ID in (2,3,4); delete from dbo.Example where ID in (1,2); set identity_insert dbo.Example on; insert dbo.Example (ID, Title) values (1,'One'),(6,'Six'); set identity_insert dbo.Example off; go
Now, let’s look at what we have after executing this script in the dbo.Example and cdc.dbo_Example_CT tables (it should be noted that CDC is asynchronous. Data is populated into the tables where the change tracking is stored after a certain period of time).
select * from dbo.Example;
ID Title ---- ---------------------- 1 One 3 eerhT 4 ruoF 5 Five 6 Six
select row_number() over ( partition by ID order by __$start_lsn desc, __$seqval desc ) as __$rn, * from cdc.dbo_Example_CT;
__$rn __$start_lsn __$end_lsn __$seqval __$operation __$update_mask ID Title ------ ---------------------- ----------- ---------------------- ------------ ---------------- --- ----------- 1 0x0000003A000000580005 NULL 0x0000003A000000580003 2 0x03 1 One 2 0x0000003A000000560006 NULL 0x0000003A000000560002 1 0x03 1 One 1 0x0000003A000000560006 NULL 0x0000003A000000560005 1 0x03 2 owT 2 0x0000003A000000540005 NULL 0x0000003A000000540002 3 0x02 2 Two 3 0x0000003A000000540005 NULL 0x0000003A000000540002 4 0x02 2 owT 1 0x0000003A000000540005 NULL 0x0000003A000000540003 3 0x02 3 Three 2 0x0000003A000000540005 NULL 0x0000003A000000540003 4 0x02 3 eerhT 1 0x0000003A000000540005 NULL 0x0000003A000000540004 3 0x02 4 Four 2 0x0000003A000000540005 NULL 0x0000003A000000540004 4 0x02 4 ruoF 1 0x0000003A000000580005 NULL 0x0000003A000000580004 2 0x03
Consider in details the table structure in which change tracking is stored. The __ $start_lsn and __ $seqval fields are LSN (log sequence number in the database) and the transaction number within the transaction respectively. There is an important property in these fields, namely, we can be sure that the record with a higher LSN will be performed later. Due to this property, we can easily get the latest state of each record in the query, filtering our selection by the condition – where __ $ rn = 1.
The __$operation field contains the transaction code:
- 1 – the record is deleted
- 2 – the record is inserted
- 3, 4 – the record is updated. The old data before update is 3, the new data is 4.
In addition to service fields with prefix «__$», the fields of the original table are completely duplicated. This information is enough for us to proceed to the incremental load.
Setting up a database to data loading
Create a table in our test target database, into which data will be loaded, as well as an additional table to store data about the load log.
use master go -- create a target database if not exists (select * from sys.databases where name = 'db_dst_cdc') create database db_dst_cdc go use db_dst_cdc go -- create a table if object_id('dbo.Example','U') is null create table dbo.Example ( ID int constraint PK_Example primary key, Title varchar(200) not null ) go -- create a table to store the load log if object_id('dbo.log_cdc','U') is null create table dbo.log_cdc ( table_name nvarchar(512) not null, dt datetime not null default getdate(), lsn binary(10) not null default(0x0), constraint pk_log_cdc primary key (table_name,dt desc) ) go
I would like to draw your attention to the fields of the LOG_CDC table:
- TABLE_NAME stores information about what table was loaded (it is possible to load several tables in the future, from different databases or even from different servers; the table format is ‘SERVER_NAME.DB_NAME.SCHEMA_NAME.TABLE_NAME’
- DT is a field of the loading date and time, which is optional for the incremental load. However, it will be useful for auditing loading.
- LSN – after a table is loaded, we need to store information about the place where to start the next load, if required. Accordingly, after each load, we add the latest (maximum) __ $ start_lsn into this column.
Algorithm for data loading
As described above, using the query, we can get the latest state of the table with the help of window functions. If we know LSN of the latest load, the next time we load we can filter from the source all the data, the changes of which are higher than the stored LSN, if there was at least one complete previous load:
with incr_Example as ( select row_number() over ( partition by ID order by __$start_lsn desc, __$seqval desc ) as __$rn, * from db_src_cdc.cdc.dbo_Example_CT where __$operation <> 3 and __$start_lsn > @lsn ) select * from incr_Example
Then, we can get all the records for the complete load, if the load LSN is not stored:
with incr_Example as ( select row_number() over ( partition by ID order by __$start_lsn desc, __$seqval desc ) as __$rn, * from db_src_cdc.cdc.dbo_Example_CT where __$operation <> 3 and __$start_lsn > @lsn ) , full_Example as ( select * from db_src_cdc.dbo.Example where @lsn is null ) select ID, Title, __$operation from incr_Example where __$rn = 1 union all select ID, Title, 2 as __$operation from full_Example
Thus, depending on the @LSN value, this query will display either all the latest changes (bypassing the interim ones) with the status Removed or not, or all the data from the original table, adding status 2 (new record) – this field is used only for unifying two selections. With this query, we can easily implement either full load or reload using the MERGE command (starting with SQL 2008 version).
To avoid bottlenecks that can create alternative processes and to load matched data from different tables (in the future, we will load several tables and, possibly, there may be relational relations between them), I suggest using a DB snapshot on the source database (another SQL 2008 feature).
The full text of the load is as follows:Code
/* Algorithm of data loading */ -- create a database snapshot if exists (select * from sys.databases where name = 'db_src_cdc_ss' ) drop database db_src_cdc_ss; declare @query nvarchar(max); select @query = N'create database db_src_cdc_ss on ( name = N'''+name+ ''', filename = N'''+[filename]+'.ss'' ) as snapshot of db_src_cdc' from db_src_cdc.sys.sysfiles where groupid = 1; exec ( @query ); -- read LSN from the previous load declare @lsn binary(10) = (select max(lsn) from db_dst_cdc.dbo.log_cdc where table_name = 'localhost.db_src_cdc.dbo.Example'); -- clear a table before the complete load if @lsn is null truncate table db_dst_cdc.dbo.Example; -- load process with incr_Example as ( select row_number() over ( partition by ID order by __$start_lsn desc, __$seqval desc ) as __$rn, * from db_src_cdc_ss.cdc.dbo_Example_CT where __$operation <> 3 and __$start_lsn > @lsn ) , full_Example as ( select * from db_src_cdc_ss.dbo.Example where @lsn is null ) , cte_Example as ( select ID, Title, __$operation from incr_Example where __$rn = 1 union all select ID, Title, 2 as __$operation from full_Example ) merge db_dst_cdc.dbo.Example as trg using cte_Example as src on trg.ID=src.ID when matched and __$operation = 1 then delete when matched and __$operation <> 1 then update set trg.Title = src.Title when not matched by target and __$operation <> 1 then insert (ID, Title) values (src.ID, src.Title); -- mark the end of the load process and the latest LSN insert db_dst_cdc.dbo.log_cdc (table_name, lsn) values ('localhost.db_src_cdc.dbo.Example', isnull((select max(__$start_lsn) from db_src_cdc_ss.cdc.dbo_Example_CT),0)) -- delete the database snapshot if exists (select * from sys.databases where name = 'db_src_cdc_ss' ) drop database db_src_cdc_ss
Latest posts by Alexey Kurenkov (see all)
- Implementing Incremental Load using Change Data Capture in SQL Server - January 26, 2018
- Missing Indexes in MS SQL or Optimization in no Time - December 28, 2017