2013年12月19日 星期四

Lab4:資料流程設計

在建立倉儲資料庫時,常會需要使用到 Stage 資料表,所以底下的 Lab 一開始會使用三個簡單的方法,來練習(1)如何在 Data Flow 將來源資料 ETL 到 Stage 資料表。 (2)再使用較進階的方法將 Stage 資料匯整成倉儲資料庫所要的資料。 (3)最後再練習如何在 Data Flow 中,判斷一筆資料是要新增或更新到倉儲資料庫。

Staging

在建立倉儲資料庫時,常會需要使用到 Stage 資料表,所以底下這個練習,我們先由三個簡單的 Data Flows 方法,來幫助我們建立 Stage 資料表。 這個小節將同時練如何讀取不同類型的資料來源。

關於 AdventureWork2012

以下練習會使用到 AdventureWork2012 中幾個資料表,簡單敍述一下:

  • 「銷售員」和「客戶」資料都是以「Person」為基底資料表。
  • 每筆「交易記錄」都包含著「銷售員」和「客戶」和「銷售地區」。
  • 「交易記錄」、「銷售員」和「客戶」都有「銷售地區」欄位。

另外再準備一個客戶其他資訊的檔案(.csv),練習檔案資料來源的讀取。(下載 CustomerInformation.txt
完成後,我們將得到下圖結構的 Stage 資料表以及客戶資料的維度資料表:

Stage the Person.Person Table ( Using OLE DB Source )

這個範例,練習使用 OLE DB Connection 將 AdventureWorks2012 中的 Person 資料 ETL 到 MyDW.stg.Person 。

1. 準備工作

通常 Stage Table 是不會對外公開的,所以你可以另外使用一個 schema 來區分 stage 和 dw 資料表的權限。

-- 建立DW資料庫
USE master;
GO
IF DB_ID('MyDW') IS NOT NULL
	DROP DATABASE MyDW;
GO

CREATE DATABASE MyDW
ON PRIMARY
(NAME = N'MyDW', FILENAME = N'D:\Database\testdb\MyDW.mdf',
SIZE = 307200KB , FILEGROWTH = 10240KB )
LOG ON
(NAME = N'MyDW_log', FILENAME = N'D:\Database\testdb\MyDW_log.ldf',
SIZE = 51200KB , FILEGROWTH = 10%);
GO
ALTER DATABASE MyDW SET RECOVERY SIMPLE WITH NO_WAIT;
GO

-- 建立SCHEMA
USE MyDW;
Go
CREATE SCHEMA stg AUTHORIZATION dbo;
Go

-- 建立stage資料表
CREATE TABLE stg.Person
(
	BusinessEntityID INT NULL,
	PersonType NCHAR(2) NULL,
	Title NVARCHAR(8) NULL,
	FirstName NVARCHAR(50) NULL,
	MiddleName NVARCHAR(50) NULL,
	LastName NVARCHAR(50) NULL,
	Suffix NVARCHAR(10) NULL,
	ModifiedDate DATETIME NULL
);

2. 建立 Connection

在連接管理員中,加入二個 OLE DB Connection ,一個連上 AdventureWorks2012 ,一個連上倉儲資料庫 MyDW。

  • DB1.AdventureWorks2012 : connect to AdventureWorks2012
  • DB1.MyDW : connect to MyDW

3. 建構 Control Flow

Drag two Tasks

4. 編輯 Data Flow

Drag two Data Adaptor Components:

Setting:

Stage the Sales.Customer Table ( Using ODBC Source )

這個範例,練習使用 ODBC Connection 將 AdventureWorks2012 中的 Customer 資料 ETL 到 MyDW.stg.Customer 。

方法大至同上,改成使用 ODBC Connection 即可。

CREATE TABLE stg.Customer
(
	CustomerID INT NULL,
	PersonID INT NULL,
	StoreID INT NULL,
	TerritoryID INT NULL,
	AccountNumber NVARCHAR(20) NULL,
	ModifiedDate DATETIME NULL,
);

Stage the CustomerInformation Table ( Using Flat Files )

這個範例,練習使用 Flat Data Connection 將檔案 CustomerInformation.txt 中的資料 ETL 到 MyDW.stg.CustomerInformation 。

1. 準備工作

CREATE TABLE stg.CustomerInformation
(
	PersonID INT NULL,
	EnglishEducation NVARCHAR(30) NULL,
	EnglishOccupation NVARCHAR(50) NULL,
	BirthDate DATE NULL,
	Gender NCHAR(3) NULL,
	MaritalStatus NCHAR(3) NULL,
	EmailAddress NVARCHAR(50) NULL
);

2. 建立 Flat Data Connection

2.1 在連接管理員中,加入一個「一般檔案連接(Flat File Connection)」,連上 CustomerInformation.txt。

2.2 編輯該連線的設定:

2.3 切換到資料行頁簽,檢視資料行資訊:

2.4 設定採樣資料數:

在執行這個步驟前,你可以切換到[進階頁簽]中,檢查目前系統付與各個欄位的欄位型別。 它很可能全部都是字串型別,這是由於 Flat File 這種資料來源本身不帶資料型別,所以這種連線會預先讀取些許的資料來猜測資料的型別。 你可以在進階頁簽中的[建議類型]功能中,增加採樣的資料列數目,讓一般檔案資料來源,可以更精準的預測型別。 當然你也可以自行將 DataType 屬性,修改成你要的型別。

3. 建構 Control Flow

Drag two Tasks

  • Execute SQL Task : truncate table MyDW.str.CustomerInformation (using DB1.MyDW connection)
  • Data Flow Task : move data from File to stg.CustomerInformation

4. 編輯 Data Flow

Drag two Data Flow Components:

Setting Flat File Source:

Setting OLE DB Destination:

5. 執行封裝

如果沒有錯誤,你將可以看到以下執行結果畫面。

ETL to Dimention Table

前一個練習,我們使用三個簡單的轉換方法,建立了Stage資料表。 接下來,我們將利用較進階的轉換方法,將Stage資料匯整出倉儲資料庫要用的客戶資料。 這個轉換,將會同時讀取Stage資料和線上交易資料庫中的資料,以便匯整出完整的客戶資料維度。

Prepare Customers Dimension Table

1. 準備工作

1.1 create DimCustomer table in MyDW database

USE MyDW;
GO

------------------------------------------------------------
-- 建一個 SEQUENCE ,用以產生 DimCustomer 的 surrogate key 
------------------------------------------------------------
IF OBJECT_ID('dbo.SeqCustomerDwKey','SO') IS NOT NULL
	DROP SEQUENCE dbo.SeqCustomerDwKey;
GO
CREATE SEQUENCE dbo.SeqCustomerDwKey AS INT
	START WITH 1
	INCREMENT BY 1;
GO

------------------------------------------------------------
-- 建立 DimCustomers
------------------------------------------------------------
CREATE TABLE dbo.DimCustomers
(
	CustomerDwKey INT NOT NULL,
	CustomerKey INT NOT NULL,
	FullName NVARCHAR(150) NULL,
	EmailAddress NVARCHAR(50) NULL,
	BirthDate DATE NULL,
	MaritalStatus NVARCHAR(3) NULL,
	Gender NVARCHAR(3) NULL,
	Education NVARCHAR(40) NULL,
	Occupation NVARCHAR(100) NULL,
	City NVARCHAR(30) NULL,
	StateProvince NVARCHAR(50) NULL,
	CountryRegion NVARCHAR(50) NULL,
	Age AS
	CASE
		WHEN DATEDIFF(yy, BirthDate, CURRENT_TIMESTAMP) <= 40 THEN 'Younger'
		WHEN DATEDIFF(yy, BirthDate, CURRENT_TIMESTAMP) > 50 THEN 'Older'
		ELSE 'Middle Age'
	END,
	CurrentFlag BIT NOT NULL DEFAULT 1,
	CONSTRAINT PK_DimCustomers PRIMARY KEY (CustomerDwKey)
);

------------------------------------------------------------
-- 設定 DimCustomers.CustomerDwKey 來自於 SeqCustomerDwKey SEQUENCE 
------------------------------------------------------------
ALTER TABLE dbo.DimCustomers
	ADD CONSTRAINT DFT_CustomerDwKey 
	DEFAULT (NEXT VALUE FOR dbo.SeqCustomerDwKey)
FOR CustomerDwKey;

1.2 create a new package

1.3 將前一個 package 中的二個 OLE DB Connection 轉換成「專案連接」,以便在這個 package 中可以共同使用。

2. 建構 Control Flow

這個 package 的 Control Flow 只要放入一個 Data Flow Task 即可。

3. 編輯 Data Flow (Transformation)

依下圖建立資料流程:

上圖說明如下:

3.1 加入三個 data source

  • stgCustomer:using MyDW_OLEDB connection and select stg.Customer table
  • strPerson:using MyDW_OLEDB connection and select stg.Person table
  • Sales Territory:using AdventureWorks2012_OLEDB connection and select Sales.SalesTerritory table

3.2 排序前二個資料來源

使用 Sort Transformation 將來源資料排序:

排序1:指定 BusinessEntityID 當做排序欄位。

排序2:指定 PersonID 當做排序欄位。

3.3 加入合併聯結轉換(Merge Join Transformation):

在這裡使用「合併聯結轉換(Merge Join Transformation)」來合併上面二個排序後的資料來源。 此處的聯結類型,我們設定使用 Inner Join(內部聯結); 同時勾選二個表的關聯鍵,並勾選要取得的欄位。

3.4 合併排序結果(排序3)

3.5 排序第三個資料來源(排序4)

3.6 合併排序結果

在這裡使用「左方外部聯結(Left Join)」的聯結類型來合併上面二個排序後的資料來源。

Loading to Customers Dimension Table

4. 編輯 Data Flow (Loading)

4.1 Lookup CustomerInformation

經過上面的操作,我們會得到一個結果集,接下來針對這個結果集,我們要透過 Lookup Transformation 去查詢 CustomerInformation 中是否有相關資料。

4.1.1 將查閱不到的客戶,導向「無相符結果輸出」
當使用 Lookup Transformation 查閱資料時,若該查詢沒有相符的結果,你可以有4種輸出方式。 在此我們選擇「導向無相符結果輸出」,接著對這個輸出給定預設的 CustomerInformation 資訊值。

4.1.2 在 Lookup Transformation 中設定查閱的資料表。

4.1.3 設定查閱資料表的關聯欄位;並勾選要回傳的欄位。

4.2 設定預設值。

上一步的查閱轉換將原本的資料集分成了二個結果,一個是「查閱比對輸出」,另一個是「查閱無相符結果輸出」。 接下來透過一個 Derived Column Transformation 來設定沒有相符查閱結果的預設值

加入6個衍生欄位以便設定預設值

4.3 聯集全部

上一步驟得到二個輸出結果集,一個是有比對相符,一個是查無相符的結果,現在我們透過 Union All Transformation 把二個合併在一起。

4.4 Lookup DimCustomer

將上一步驟得到的結果,使用 Lookup Transformation 對 DimCustomers 進行查閱。

  • 4.4.1 「無相符項目的資料列」設定成「忽略失敗」 這個lab只要將不存在(新)的資料插入到 DimCustomers ,其實在這邊可以直接處理「無相符項目的資料列」即可。 但是為了多練習底下的條件式分割元件,所以這裡就使用「忽略失敗」,忽略失敗會塞NULL值給「無相符項目的資料列」的欄位。

  • 4.4.2 這邊練習如何使用 SQL 查詢

  • 4.4.3 設定Lookup資料表的關連欄位;並勾選要回傳的欄位。

4.5 加入一個欄位

因為 DimCustomers 需要用到一個 FullName 欄位,所以這裡加入一個 Derived Column Transformation 來取得這個欄位。

4.6 利用條件分割,判斷資料是否已存在

由於我們只需要將不存在的資料加入到 DimCustomers 維度表,所以可以利用 Conditional Split Transformation 來分割結果集。

我們在「條件分割轉換」的編輯視窗中加入一個分割條件:只要符合 IsNull(CustomerDwKey) 條件的資料列,就會被輸出到 New record 這個結果集。

4.7 輸出到目的地

最後將結果集輸出到 OLE DB Destination 元件,並設定輸出的對應欄位。

設定要輸出的欄位,不需要的欄位則設定忽略。

Execute Package

執行封裝後,你可以由工作視窗看到轉換的資料數,如下圖:

如果你再執行一次,則沒有任何資料會被轉換,除非有新增的客戶資料,如下圖:

Implementing Update

前面完成的作業只會執行新增功能,接下來我們將在 Data Flow 中加入更新功能。

加入 OLE DB Command 元件

5.1 加入 OLE DB Command Transformation 元件

「New Record」是我們在前一步驟中自行加進去的一個分割條件,若不符合該條件的資料,則會由流向「預設分割輸出」。 在此,我們使用一個「OLE DB 命令轉換(OLE DB Command Transformation)」元件在處理這些不符 New Record 條件的資料。

「OLE DB 命令轉換」元件會針對資料流程中的每個資料列執行 SQL 陳述式。 使用「OLE DB 命令轉換」元件時,有幾個可用的設定如下:

  • SQL Statement:Provide the SQL statement that the transformation runs for each row.
  • Time Out:Specify the number of seconds before the SQL statement times out.
  • Code Page:Specify the default code page

設定 SQL 陳述式

OLE DB 命令轉換的 SQL 陳述式通常包含參數。參數值由轉換輸入提供。

設定參數:

OLE DB 命令轉換提供了參數名稱,您無法對它們進行修改。 這些參數名稱為 Param_0、Param_1 等。

執行封裝

如果我們先執行以下 SQL ,加入了二筆客戶資料,再執行封裝,就會看到以下結果

insert stg.Person(BusinessEntityID, PersonType, FirstName, LastName, ModifiedDate) values ('20778','AD','Vito','Shao',GETDATE())
insert stg.Person(BusinessEntityID, PersonType, FirstName, LastName, ModifiedDate) values ('20779','AD','Peter','Yun',GETDATE())
insert stg.Customer(CustomerID, PersonID, ModifiedDate) Values ('30119','20778',GETDATE())
insert stg.Customer(CustomerID, PersonID, ModifiedDate) Values ('30120','20779',GETDATE())

二筆新增,其餘的會做更新。

沒有留言:

張貼留言