Azure Data Factory可以在雲端之間或是雲端與本地端之間建立資料處理流程,讓資料從來源傳輸到目的端的過程中加工以符合應用程式的要求,或是作為分析展示之用。
在這篇文章中我會模擬IOT的情境,由虛擬裝置上發出資料到Event Hub,再由Data Factory取出這些資料並轉存到本地端的資料庫。
為了達到雲端與本地端之間hybrid的連線,我們會需要透過Data Management Gateway來建立Secure Channel;此Data Management Gateway可以安裝在本地端的專屬Gateway Server上而無需安裝在資料伺服器上(例如Database Server),但是Gateway Server與資料庫必須可以透過網路連接。
環境
我會建立以下的環境,由Stream Analytics將 Event Hub資料轉存到Storage,再透過Data Factory將檔案寫到本地端的SQL Server上
SQL資料表
我在Azure VM上安裝了SQL Server作為目的端資料庫,資料庫名稱為data,有如下的table
安裝Data Management Gateway
- 首先從Azure Portal上建立一個新的Data Factory,建立過程沒有任何特殊的設定。
- 在Local端的Gateway Server上安裝Data Management Gateway
- 安裝完成後,開啟Gateway Server上的防火牆允許8050 Inbound
- 如果不想更改防火牆設定,可以透過以下指令安裝Data Management Gateway
msiexec /q /i DataManagementGateway.msi NOFIREWALL=1
- 回到Portal,新增Deploy
- 新增Data Gateway
- 複製此Key以供後續使用,然後按下完成開始建立Gateway
- RDP到先前安裝Data Management Gateway的電腦
- 開啟ConfigManager
- 將剛剛複製的Key貼上然後按下Register註冊Gateway
- 在這裡如果出現Register Fail的錯誤訊息;可以打開事件檢視器看詳細的錯誤。通常這邊比較可能的原因會與防火牆有關,在測試時可以先關閉防火牆確認是否問題與防火牆有關
- 如果沒有使用Proxy,可以修改安裝路徑下的diahost.exe.config停用Proxy;修改完後記得重啟Data Management Gateway Service服務。
- 另外,Data Management Gateway Host Service必須要能存取Certificate Store;如果看到類似”Unable to change certificate”的錯誤訊息,則多與Gateway無法存取Certificate有關;此時可以修改預設Gateway Service的Credential來確認。
打開Services.msc中可以看到預設的使用者為NT SERVICE\DIAHostService,這裡可以改為Local System Account。
- 我在安裝Data Management Gateway時,花了許多時間解決這些錯誤;基本上的錯誤類型大致都跟網路連線(如防火牆、Proxy)或是權限相關(如Service Credential)。
另外要特別注意的是,建議的Gateway Server等級至少為4 Core、8 GB RAM,如果少於這個等級,在安裝註冊時會發現CPU常常滿載,我原本使用A1的機器測試,常常遇到Timeout錯誤,後來改為A3後就不再看到這類錯誤;因此,如果以上步驟都無法解決問題時,可以試著調高機器等級看看。
- 如果一切正常,ConfigManager應該要顯示如下
- 回到Portal上就可以看到Gateway Status是Online了
Event Hub與Stream Analytics環境設定
- 首先透過Portal建立新的Event Hub,過程中沒有甚麼要特別注意的,可以參考這一篇文件的說明建立即可。
- 建立完成後,設定read、write policy
- 接著撰寫以下的程式模擬設備發送資料到Event Hub
class Program
{
static string eventHubName = "michiadfdemo";
static string connectionString = "Endpoint=sb://michiazurecontw.servicebus.windows.net/;SharedAccessKeyName=write;SharedAccessKey=eRTO0gkfuOLGZq2AbJAedzE1MOZAcZanVNlr+56FWWc=";
static Random randome = new Random();
static void SendingRandomMessages()
{
var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
while (true)
{
try
{
//var message = System.IO.File.ReadAllText(@"c:\temp\mitac_data.csv");
var body = "{" + string.Format("\"id\":\"{0}\", \"time\":\"{1}\", \"temperature\":{2},\"comment\":\"\"", Guid.NewGuid().ToString("N"), DateTime.UtcNow.ToString("yyyy-MM-ddTHH:mm:ssZ"), randome.Next(1, 100)) + "}";
Console.WriteLine("{0} > Sending message: {1}", DateTime.Now, body);
eventHubClient.Send(new EventData(Encoding.UTF8.GetBytes(body)));
}
catch (Exception exception)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("{0} > Exception: {1}", DateTime.Now, exception.Message);
Console.ResetColor();
}
Thread.Sleep(200);
}
}
static void Main(string[] args)
{
Console.WriteLine("Press Ctrl-C to stop the sender process");
Console.WriteLine("Press Enter to start now");
Console.ReadLine();
SendingRandomMessages();
}
- 執行一下確認資料確實傳送到Event Hub端
- 其中,我們的資料格式,為了方便,僅包含以下的欄位:
- Id:GUID,為唯一識別碼
- Time:代表資料產生時間(UTC)
- Temperature:模擬裝置的溫度
- Comment:預留欄位
- 接著,開始設定Stream Analytics將資料由Event Hub讀出並寫入到Azure Storage Blob
- 新增一個INPUT,指定來源為Event Hub
- 指定資料內容為JSON格式
- 新增一個Output,指定輸出目的為Azure Storage Blob
- 指定輸出的路徑
- 設定檔案格式如下
- 接著,新增一個Query定義如下;這樣當讀進Json物件時,便會轉換為CSV格式。
SELECT
id, time, temperature, comment
INTO
[hybrid-output]
FROM
[hybrid-source]
- 完成後試著執行一下看看結果,在指定的Storage中確實產生了檔案
- 內容也如我們指定的CSV格式。
- 至此,我們以經完成所有的準備工作了,接下來可以開始設定Data Factory。
建立SQL Server的Linked Service
- 首先,我們要建立一個 Linked Service指向本地端的SQL Server;在我的環境中,此SQL Server名稱為dataserver,使用Windows與SQL Server混和驗證。
- 回到Portal,打開Data Factory,建立一個新的Data Store
- 設定Type為SQL Server
- 使用現有的Data Management Gateway
- 在這邊輸入資料庫Server的名稱以及要寫入的資料庫名稱,然後按下”Click Here to set credential”下載設定工具
- 請注意,此工具只支援Internet Explorer或是其他支援ClickOnce的瀏覽器
- 此工具最好在Gateway Server上執行,否則必須確定執行電腦可以連到Gateway Server
- 執行後,輸入連線資料庫的帳號密碼
- 請注意,這裡要確定資料庫伺服器允許Remote Connection,同時防火牆也都有開啟1433 port。
- 成功設定的話,這邊會顯示Credential set Successfully
- 完成後可以看到Data Store是Online狀態
建立Azure Storage的Linked Service
- 在Portal建立一個新的Data Store
- 輸入Storage Account Name與Account Key後按下OK
- 接著,我們要新增兩個資料集(DateSet)分別代表Blob中的檔案以及SQL Server上的資料表
- 首先建立Blob的DataSet
- 設定它的內容如下,詳細的JSON物件格式可以參考這裡
- 因為這裡Blob的檔案是由Stream Analytics所產生,其檔名為隨機產生,因此在下面的定義中,我們不會設定fileName,這樣在folderPath所指定的Folder中所有的檔案都會被處理。
{
"name": "azure-blob-file",
"properties": {
"structure": [
{
"name": "id",
"type": "String"
},
{
"name": "time",
"type": "Datetime"
},
{
"name": "temperature",
"type": "Int32"
},
{
"name": "comment",
"type": "String"
}
],
"published": false,
"type": "AzureBlob",
"linkedServiceName": "azure-blob-01",
"typeProperties": {
"folderPath": "adf/{Year}/{Month}/{Day}/{Hour}/",
"format": {
"type": "TextFormat",
"columnDelimiter": ","
},
"partitionedBy": [
{
"name": "Year",
"value": {
"type": "DateTime",
"date": "SliceStart",
"format": "yyyy"
}
},
{
"name": "Month",
"value": {
"type": "DateTime",
"date": "SliceStart",
"format": "MM"
}
},
{
"name": "Day",
"value": {
"type": "DateTime",
"date": "SliceStart",
"format": "dd"
}
},
{
"name": "Hour",
"value": {
"type": "DateTime",
"date": "SliceStart",
"format": "HH"
}
}
]
},
"availability": {
"frequency": "Minute",
"interval": 15,
"anchorDateTime": "2016-02-16T12:00:00Z"
},
"external": true,
"policy": { }
}
}
- 按下Deploy發布定義
- 重複上述步驟建立SQL Table的DataSet,其定義如下:
{
"name": "onprem-sql",
"properties": {
"structure": [
{
"name": "id",
"type": "String"
},
{
"name": "time",
"type": "Datetime"
},
{
"name": "temperature",
"type": "Int32"
},
{
"name": "comment",
"type": "String"
}
],
"published": false,
"type": "SqlServerTable",
"linkedServiceName": "sql-onprem-01",
"typeProperties": {
"tableName": "data"
},
"availability": {
"frequency": "Minute",
"interval": 15
}
}
}
- 接著建立新的管線,定義如下:
{
"name": "copy-blob-to-sql",
"properties": {
"description": "Copy blob file to SQL Server",
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource",
"skipHeaderLineCount": 1
},
"sink": {
"type": "SqlSink",
"writeBatchSize": 100,
"writeBatchTimeout": "60.00:00:00"
}
},
"inputs": [
{
"name": "azure-blob-file"
}
],
"outputs": [
{
"name": "onprem-sql"
}
],
"policy": {
"timeout": "01:00:00",
"concurrency": 10,
"executionPriorityOrder": "NewestFirst",
"retry": 3
},
"scheduler": {
"frequency": "Minute",
"interval": 15
},
"name": "CopyFromBlobToOnPremSQL"
}
],
"start": "2016-02-16T12:00:00Z",
"end": "2016-02-16T18:00:00Z",
"isPaused": false,
"hubName": "michi-adf-01_hub",
"pipelineMode": "Scheduled"
}
}
- 由於Stream Analytics產生的CSV檔案包含Header,因此要指定skipHeaderLineCount欄位為1
- 佈署Pipeline後,會看到Data Factory開始根據我們設定的15分中間隔產生Slice並執行
· 到資料庫查詢,發現資料確實寫進去了
結論
以往在考慮雲端與地端之間搬移資料時,因為適用情況的不同我們可能會考慮透過客製化程式來處理;透過Data Factory除了可以透過雲端易於延展的特性以及Azure提供的HA機制大幅簡化管理以及佈署架構的複雜度,也可以減少開發人員的負擔。