2016年2月16日

用Azure Data Factory在雲端與本地端資料庫搬移資料

Azure Data Factory可以在雲端之間或是雲端與本地端之間建立資料處理流程,讓資料從來源傳輸到目的端的過程中加工以符合應用程式的要求,或是作為分析展示之用。

在這篇文章中我會模擬IOT的情境,由虛擬裝置上發出資料到Event Hub,再由Data Factory取出這些資料並轉存到本地端的資料庫。

為了達到雲端與本地端之間hybrid的連線,我們會需要透過Data Management Gateway來建立Secure Channel;此Data Management Gateway可以安裝在本地端的專屬Gateway Server上而無需安裝在資料伺服器上(例如Database Server),但是Gateway Server與資料庫必須可以透過網路連接。

 

環境

我會建立以下的環境,由Stream Analytics將 Event Hub資料轉存到Storage,再透過Data Factory將檔案寫到本地端的SQL Server上

image

SQL資料表

我在Azure VM上安裝了SQL Server作為目的端資料庫,資料庫名稱為data,有如下的table

clip_image003

安裝Data Management Gateway

  • 首先從Azure Portal上建立一個新的Data Factory,建立過程沒有任何特殊的設定。
  • 在Local端的Gateway Server上安裝Data Management Gateway
  • 安裝完成後,開啟Gateway Server上的防火牆允許8050 Inbound

clip_image005

    • 如果不想更改防火牆設定,可以透過以下指令安裝Data Management Gateway

msiexec /q /i DataManagementGateway.msi NOFIREWALL=1

  • 回到Portal,新增Deploy

clip_image007

  • 新增Data Gateway

clip_image009

  • 複製此Key以供後續使用,然後按下完成開始建立Gateway

clip_image011

  • RDP到先前安裝Data Management Gateway的電腦
  • 開啟ConfigManager

clip_image013

  • 將剛剛複製的Key貼上然後按下Register註冊Gateway

clip_image015

    • 在這裡如果出現Register Fail的錯誤訊息;可以打開事件檢視器看詳細的錯誤。通常這邊比較可能的原因會與防火牆有關,在測試時可以先關閉防火牆確認是否問題與防火牆有關
    • 如果沒有使用Proxy,可以修改安裝路徑下的diahost.exe.config停用Proxy;修改完後記得重啟Data Management Gateway Service服務。

clip_image017

  • 另外,Data Management Gateway Host Service必須要能存取Certificate Store;如果看到類似”Unable to change certificate”的錯誤訊息,則多與Gateway無法存取Certificate有關;此時可以修改預設Gateway Service的Credential來確認。

打開Services.msc中可以看到預設的使用者為NT SERVICE\DIAHostService,這裡可以改為Local System Account。

clip_image019

  • 我在安裝Data Management Gateway時,花了許多時間解決這些錯誤;基本上的錯誤類型大致都跟網路連線(如防火牆、Proxy)或是權限相關(如Service Credential)。

另外要特別注意的是,建議的Gateway Server等級至少為4 Core、8 GB RAM,如果少於這個等級,在安裝註冊時會發現CPU常常滿載,我原本使用A1的機器測試,常常遇到Timeout錯誤,後來改為A3後就不再看到這類錯誤;因此,如果以上步驟都無法解決問題時,可以試著調高機器等級看看。

  • 如果一切正常,ConfigManager應該要顯示如下

clip_image021

  • 回到Portal上就可以看到Gateway Status是Online了

clip_image023

Event Hub與Stream Analytics環境設定

  • 首先透過Portal建立新的Event Hub,過程中沒有甚麼要特別注意的,可以參考這一篇文件的說明建立即可。
  • 建立完成後,設定read、write policy
  • 接著撰寫以下的程式模擬設備發送資料到Event Hub

class Program
{
    static string eventHubName = "michiadfdemo";
    static string connectionString = "Endpoint=sb://michiazurecontw.servicebus.windows.net/;SharedAccessKeyName=write;SharedAccessKey=eRTO0gkfuOLGZq2AbJAedzE1MOZAcZanVNlr+56FWWc=";
    static Random randome = new Random();
    static void SendingRandomMessages()
    {
       
        var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
        while (true)
        {
            try
            {
                //var message = System.IO.File.ReadAllText(@"c:\temp\mitac_data.csv");
                var body = "{" + string.Format("\"id\":\"{0}\", \"time\":\"{1}\", \"temperature\":{2},\"comment\":\"\"", Guid.NewGuid().ToString("N"), DateTime.UtcNow.ToString("yyyy-MM-ddTHH:mm:ssZ"), randome.Next(1, 100)) + "}";

                Console.WriteLine("{0} > Sending message: {1}", DateTime.Now, body);
                eventHubClient.Send(new EventData(Encoding.UTF8.GetBytes(body)));


            }
            catch (Exception exception)
            {
                Console.ForegroundColor = ConsoleColor.Red;
                Console.WriteLine("{0} > Exception: {1}", DateTime.Now, exception.Message);
                Console.ResetColor();
            }

            Thread.Sleep(200);
        }
    }
   
    static void Main(string[] args)
    {
        Console.WriteLine("Press Ctrl-C to stop the sender process");
        Console.WriteLine("Press Enter to start now");
        Console.ReadLine();
        SendingRandomMessages();
    }

  • 執行一下確認資料確實傳送到Event Hub端

clip_image025

  • 其中,我們的資料格式,為了方便,僅包含以下的欄位:
    • Id:GUID,為唯一識別碼
    • Time:代表資料產生時間(UTC)
    • Temperature:模擬裝置的溫度
    • Comment:預留欄位
  • 接著,開始設定Stream Analytics將資料由Event Hub讀出並寫入到Azure Storage Blob
  • 新增一個INPUT,指定來源為Event Hub

clip_image026

  • 指定資料內容為JSON格式

clip_image027

  • 新增一個Output,指定輸出目的為Azure Storage Blob

clip_image028

  • 指定輸出的路徑

clip_image029

  • 設定檔案格式如下

clip_image030

  • 接著,新增一個Query定義如下;這樣當讀進Json物件時,便會轉換為CSV格式。

SELECT

id, time, temperature, comment

INTO

[hybrid-output]

FROM

[hybrid-source]

  • 完成後試著執行一下看看結果,在指定的Storage中確實產生了檔案

clip_image031

  • 內容也如我們指定的CSV格式。

clip_image032

  • 至此,我們以經完成所有的準備工作了,接下來可以開始設定Data Factory。

建立SQL Server的Linked Service

  • 首先,我們要建立一個 Linked Service指向本地端的SQL Server;在我的環境中,此SQL Server名稱為dataserver,使用Windows與SQL Server混和驗證。
  • 回到Portal,打開Data Factory,建立一個新的Data Store

clip_image034

  • 設定Type為SQL Server

clip_image036

  • 使用現有的Data Management Gateway

clip_image038

  • 在這邊輸入資料庫Server的名稱以及要寫入的資料庫名稱,然後按下”Click Here to set credential”下載設定工具
    • 請注意,此工具只支援Internet Explorer或是其他支援ClickOnce的瀏覽器
    • 此工具最好在Gateway Server上執行,否則必須確定執行電腦可以連到Gateway Server

clip_image040

  • 執行後,輸入連線資料庫的帳號密碼
    • 請注意,這裡要確定資料庫伺服器允許Remote Connection,同時防火牆也都有開啟1433 port。

clip_image041

  • 成功設定的話,這邊會顯示Credential set Successfully

clip_image042

  • 完成後可以看到Data Store是Online狀態

clip_image043

建立Azure Storage的Linked Service

  • 在Portal建立一個新的Data Store

clip_image044

  • 輸入Storage Account Name與Account Key後按下OK

clip_image045

  • 接著,我們要新增兩個資料集(DateSet)分別代表Blob中的檔案以及SQL Server上的資料表
  • 首先建立Blob的DataSet

clip_image046

  • 設定它的內容如下,詳細的JSON物件格式可以參考這裡
  • 因為這裡Blob的檔案是由Stream Analytics所產生,其檔名為隨機產生,因此在下面的定義中,我們不會設定fileName,這樣在folderPath所指定的Folder中所有的檔案都會被處理。

{
  "name": "azure-blob-file",
  "properties": {
    "structure": [
      {
        "name": "id",
        "type": "String"
      },
      {
        "name": "time",
        "type": "Datetime"
      },
      {
        "name": "temperature",
        "type": "Int32"
      },
      {
        "name": "comment",
        "type": "String"
      }
    ],
    "published": false,
    "type": "AzureBlob",
    "linkedServiceName": "azure-blob-01",
    "typeProperties": {
      "folderPath": "adf/{Year}/{Month}/{Day}/{Hour}/",
      "format": {
        "type": "TextFormat",
        "columnDelimiter": ","
      },
      "partitionedBy": [
        {
          "name": "Year",
          "value": {
            "type": "DateTime",
            "date": "SliceStart",
            "format": "yyyy"
          }
        },
        {
          "name": "Month",
          "value": {
            "type": "DateTime",
            "date": "SliceStart",
            "format": "MM"
          }
        },
        {
          "name": "Day",
          "value": {
            "type": "DateTime",
            "date": "SliceStart",
            "format": "dd"
          }
        },
        {
          "name": "Hour",
          "value": {
            "type": "DateTime",
            "date": "SliceStart",
            "format": "HH"
          }
        }
      ]
    },
    "availability": {
      "frequency": "Minute",
      "interval": 15,
      "anchorDateTime": "2016-02-16T12:00:00Z"
    },
    "external": true,
    "policy": { }
  }
}

  • 按下Deploy發布定義
  • 重複上述步驟建立SQL Table的DataSet,其定義如下:

{
  "name": "onprem-sql",
  "properties": {
    "structure": [
      {
        "name": "id",
        "type": "String"
      },
      {
        "name": "time",
        "type": "Datetime"
      },
      {
        "name": "temperature",
        "type": "Int32"
      },
      {
        "name": "comment",
        "type": "String"
      }
    ],
    "published": false,
    "type": "SqlServerTable",
    "linkedServiceName": "sql-onprem-01",
    "typeProperties": {
      "tableName": "data"
    },
    "availability": {
      "frequency": "Minute",
      "interval": 15
    }
  }
}

  • 接著建立新的管線,定義如下:

{
  "name": "copy-blob-to-sql",
  "properties": {
    "description": "Copy blob file to SQL Server",
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BlobSource",
            "skipHeaderLineCount": 1
          },
          "sink": {
            "type": "SqlSink",
            "writeBatchSize": 100,
            "writeBatchTimeout": "60.00:00:00"
          }
        },
        "inputs": [
          {
            "name": "azure-blob-file"
          }
        ],
        "outputs": [
          {
            "name": "onprem-sql"
          }
        ],
        "policy": {
          "timeout": "01:00:00",
          "concurrency": 10,
          "executionPriorityOrder": "NewestFirst",
          "retry": 3
        },
        "scheduler": {
          "frequency": "Minute",
          "interval": 15
        },
        "name": "CopyFromBlobToOnPremSQL"
      }
    ],
    "start": "2016-02-16T12:00:00Z",
    "end": "2016-02-16T18:00:00Z",
    "isPaused": false,
    "hubName": "michi-adf-01_hub",
    "pipelineMode": "Scheduled"
  }
}

    • 由於Stream Analytics產生的CSV檔案包含Header,因此要指定skipHeaderLineCount欄位為1
  • 佈署Pipeline後,會看到Data Factory開始根據我們設定的15分中間隔產生Slice並執行

clip_image048

· 到資料庫查詢,發現資料確實寫進去了

clip_image049

 

結論

以往在考慮雲端與地端之間搬移資料時,因為適用情況的不同我們可能會考慮透過客製化程式來處理;透過Data Factory除了可以透過雲端易於延展的特性以及Azure提供的HA機制大幅簡化管理以及佈署架構的複雜度,也可以減少開發人員的負擔。

 

參考資料

2016年2月15日

以ARM Template佈署Azure Point-to-Site VPN

Azure Resource Manager已經開始支援Point-to-Site VPN這裡有如何透過Powershell建置ARM模式下P2S VPN的方式。這裡會透過ARM Template來佈署P2S VPN
  • 首先是準備ARM Template,這個template只是非常單純的建立一個VNet、一個Public IP以及VPN Gateway;並將Public IP設定給VPN Gateway
  • 先定義Public IPVNet

   "apiVersion":"[variables('apiVersion')]",
   "type":"Microsoft.Network/virtualNetworks",
   "name":"[parameters('virtualNetworkName')]",
   "location":"[variables('location')]",
   "properties":{ 
      "addressSpace":{ 
         "addressPrefixes":[ 
            "[parameters('vnetAddressPrefix')]"
         ]
      },
      "subnets":[ 
         { 
            "name":"GatewaySubnet",
            "properties":{ 
               "addressPrefix":"[parameters('gatewaySubnetPrefix')]"
            }
         }
      ]
   }
},

   "apiVersion":"[variables('apiVersion')]",
   "type":"Microsoft.Network/publicIPAddresses",
   "name":"[parameters('gatewayPublicIPName')]",
   "location":"[variables('location')]",
   "properties":{ 
      "publicIPAllocationMethod":"Dynamic"
   }
},
  • 接著定義VNet Gateway

         "apiVersion":"[variables('apiVersion')]",
         "type":"Microsoft.Network/virtualNetworkGateways",
         "name":"[parameters('gatewayName')]",
         "location":"[variables('location')]",
         "dependsOn":[ 
            "[concat('Microsoft.Network/publicIPAddresses/', parameters('gatewayPublicIPName'))]",
            "[concat('Microsoft.Network/virtualNetworks/', parameters('virtualNetworkName'))]"
         ],
         "properties":{ 
            "ipConfigurations":[ 
               { 
                  "properties":{ 
                     "privateIPAllocationMethod":"Dynamic",
                     "subnet":{ 
                        "id":"[variables('gatewaySubnetRef')]"
                     },
                     "publicIPAddress":{ 
                        "id":"[resourceId('Microsoft.Network/publicIPAddresses',parameters('gatewayPublicIPName'))]"
                     }
                  },
                  "name":"vnetGatewayConfig"
               }
            ],
        "sku": {
          "name": “myVNetGateway”,
         "tier": "[parameters('gatewaySku')]"
        },           
            "gatewayType":"Vpn",
            "vpnType":"RouteBased",
            "enableBgp":"false",
            "vpnClientConfiguration":{ 
               "vpnClientAddressPool":{ 
                  "addressPrefixes":[ 
                     "[parameters('vpnClientAddressPoolPrefix')]"
                  ]
               },
               "vpnClientRootCertificates":[ 
                  { 
                     "name": "[parameters('clientRootCertName')]",
                     "properties":{
                        "PublicCertData": "[parameters('clientRootCertData')]"
                     }
                  }
               ],
               "vpnClientRevokedCertificates":[ 
                  { 
                     "name": "[parameters('revokedCertName')]",
                     "properties":{ 
                        "Thumbprint": "[parameters('revokedCertThumbprint')]"
                     }
                  }
               ]
            }
         }
      }
    • 其中tier可以是BasicStandard或是HighPerformance;各自對應到不同的VPN Gateway SKU
    • PublicCertData則是用來連線驗證的Certificate內容,下面會以self-signed certificate為例說明如何設定此內容
    • vpnClientRevokedCertificatesOptional;在這裡可以設定要拒絕連線的Client Certificate;由於Azure VPN是透過憑證來做身分驗證,因此被設定在Revoked的憑證便無法使用Point-to-Site連線。
  • 產生Self-Signed Certificate
    • 開啟Visual Studio Command Prompt或是安裝Windows SDK後,在以下位置找到makecert.exe
    • 執行以下指令產生Root certificate
      • makecert -sky exchange -r -n "CN=RootCertificateName" -pe -a sha1 -len 2048 -ss My "RootCertificateName.cer"
    • 執行以下指令產生Client certificate
      • makecert.exe -n "CN=ClientCertificateName" -pe -sky exchange -m 96 -ss My -in "RootCertificateName" -is my -a sha1
    • 開啟certmgr.msc
    • 找到Root憑證,Export
    • 不要匯出Private Key
    • 匯出BASE64編碼的檔案
    • Notepad打開該檔案,反白的部分就是ARM TemplatePublicCertData欄位的內容(需去掉換行)
    • 注意:vpnClientRevokedCertificates不可以與vpnClientRootCertificates相同,否則佈署時會出現錯誤。
      • 接著,執行Powershell佈署此ARM Template
      • 佈署完成後,我們還需要下載Client Package才能連線;打開Powershell透過下列指令取的下載位址
        • Get-AzureRmVpnClientPackage -ResourceGroupName $rgn -VirtualNetworkGatewayName $gwName -ProcessorArchitecture Amd64
        • 其中$rgn$gwName代表Resource Group NameVNet Gateway的名稱
      透過回傳的網址下載並安裝Client Package
  • 接著便可以以一般連線VPN的方式連線了


Blog Archive

About Me