Comparison of SSIS-IR startup times

Running classic SSIS packages in cloud has been possible for some time already. It is done utilizing Azure Data Factory (ADF) which contains a specific runtime engine called a SSIS Integration Runtime (SSIS-IR). Basically it’s just a cluster of Windows servers which are managed automatically by Azure. Those virtual machines have SSIS engine installed which is used to run SSIS packages.

To reduce running costs, it’s a best practice to setup SSIS-IR to be on only when needed. In 2018 I was working in project in which we utilized SSIS-IR. I became curious about how long it takes to start up SSIS-IR so I setup a pipeline into ADF which just started and stopped SSIS-IR continuously. Based on these tests the average startup time was approx. 17,6 minutes.

Some time ago I bumped into a video in which a Microsoft representative announced that they had managed to speed up the starting process. Obviously it was time to rerun the tests. And indeed it is now much much faster than previously. Average time was now about 90 seconds. That means it’s now 10 times faster than previously. Here are the results visualized:

Both test runs contained about 100 full start-stop cycles. 2018 tests were run in Azure North Europe region, 2020 in West Europe. Node size D2_v3, number of nodes: 1. No custom configs.

But there’s a catch: Speedup only applies to those SSIS-IRs outside of a VNet! Inside the startup times are the same as previously, about 20 minutes on average. More details here.

Connecting Azure Data Factory to Storage account inside a VNET

Microsoft’s cloud ETL service, Azure Data Factory (ADF), has been in general availability for good time already. It’s a serverless platform for transforming and moving data between different kind of on-premise and cloud services. About year ago I was working as an architect in a project where we utilized ADF to move data into a blob storage. We had our blob attached to a VNET which means it was isolated from public network, effectively behind a firewall. VNET is a way to secure services so that those are not generally available to whole world. Problem then was the fact that connecting ADF to blob in a VNET was not possible without provisioning a virtual machine inside same VNET and installing a self-hosted integration runtime on it. We were building our solution in a serverless way so installing one didn’t sound like a great option.

Last fall things changed when Microsoft finally published a solution for connecting ADF into a storage account attached to a VNET. Now let’s see how it works!

For this demo I provisioned a Data Factory and two storage accounts:

Scenario is to copy a file from accestestsrc to accesstestdest. Source blob is public but destination is secured using VNET. Here’s how destination storage has been attached to a VNET so it is not accessible from everywhere:

Now let’s build a pipeline in Data Factory side to copy data from source to destination. While doing this, things go south when configuring destination sink:

Error 403 is thrown because ADF cannot connect to storage account because blob is connected to a VNET and ADF is not. To make connection happen we have to configure three things:

  1. Add ADF’s managed identity into Storage accounts with proper access rights
  2. Enable ‘Trusted Microsoft services’ setting on storage properties
  3. Change authentication method from account key to managed identity

First one is done in storage account settings. Let’s select Access control (IAM) -> Add -> Add role assignment. For a Role we select ‘Storage Blob Data Contributor’ and then then name of our Data Factory (accessTestDataFactory) is written into ‘Select’ textbox. A query is done and our ADF will be listed below. Then we just select it and hit Save button.

Now our Data Factory is listed in assigned roles section:

Second part is done in Firewalls and virtual networks section. Select ‘Allow trusted Microsoft services to access this storage account’ and hit Save button. Azure Data Factory nowadays belongs to this set of Azure services which it wasn’t before announcement in fall 2019:

Lastly we have to change authentication method in ADF side: When we initially created connection to blob in ADF, it used account key as a method to authenticate. We must change it to use Managed identity to which we gave access rights to storage in step 1. Let’s edit linked service settings:

Now we are ready to finish and test copy pipeline again. It seems to complete without errors:

Let’s verify that the file has been actually copied to destination blob using Storage Explorer:

File somesuperimportantdatafile.txt is found in destination also so everything worked as planned.

I really like the simplicity this new feature enables. No more extra VM’s just to make connection happen and much more cleaner architectures. Also cheaper one because the absence of a VM. Go Serverless!

Connecting Sauna – How to make sauna call home

Last fall I got into playing with Ruuvitag sensors. Those are small, battery powered devices which measure temperature, humidity, pressure and acceleration. I placed one sensor to our sauna with and idea to make sauna call home: When certain temperature is reached, I’ll get a notification to my phone about sauna being ready to be utilized.

Ruuvitags are bluetooth beacons which broadcast measurements once in a second. Raspberry Pi has a bluetooth receiver built-in so it was a natural choice for listening the data stream. There is also a great Python library for Ruuvitags available which makes reading the measurements easy.

For notifications I’m utilizing Telegram messenger which I installed into my mobile. It’s an instant messaging app similar like WhatsApp or Signal. There’s a nice feature called bots which are basically third-party apps. The cool thing is that bots can be controlled using an API and that’s how I’m sending notifications to my phone. And there’s a Python library for Telegram also. What needs to be done in Telegram side is following:

  1. Create a bot
  2. Acquire token
  3. Acquire chatid

There are multiple how-to documents available how to do this and here’s one.

I made a Python script which listens the Ruuvitag data stream, parses temperature out of it and if certain temperature is reached, it will send a notification. Script utilizes a file to store timestamp of when the latest notification was sent. This timestamp combined with resend timelimit parameter is for not flooding the phone with notifications:

from ruuvitag_sensor.ruuvi import RuuviTagSensor
from datetime import datetime
import telegram
import csv
import sys

# Two command line arguments:
# 1. Temperature limit
# 2. Resend timelimit in minutes

macs = [''] # MAC address of a Ruuvitag
timeout_in_sec = 10
temp_limit = float(sys.argv[1]) # Temperature limit (Celsius)  which indicates sauna being ready to enter
temperature = 0
resend_timelimit = float(sys.argv[2]) # Resend timelimit in minutes
now = datetime.utcnow()
dt_string = now.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

# Let's fetch the latest measurement (timestamp and temperature):
reader = csv.reader(
   open("sauna.csv"), delimiter=";")
for row in reader:
   latest_notification = datetime.strptime(row[0], "%Y-%m-%d %H:%M:%S.%f")
   prev_temp = row[1]

# Fetching data from Ruuvitag:
data = RuuviTagSensor.get_data_for_sensors(macs, timeout_in_sec)

for j in data:
     temperature = data[j]['temperature']
     print('Measured temperature: '+str(temperature))
     print('Temperature limit: ' + str(temp_limit))
     datediff_in_minutes = (now - latest_notification).total_seconds() / 60 # time difference in minutes between now and when latest notification was sent
     print('diff in minutes: '+str(datediff_in_minutes))
     print('resend timelimit: ' + str(resend_timelimit))
     # IF temperature is above limit AND time difference between now and when latest notification was sent is more than resend timelimit THEN let's send a notification:
     if temperature >= temp_limit and datediff_in_minutes > resend_timelimit:
          # Let's create a Telegram bot
          bot = telegram.Bot(token='')
          chatid=
          chat_text = 'Sauna valmis! Lämpötila: ' + str(temperature)
          bot.send_message(chat_id=chatid, text=chat_text)
          f = open("sauna.csv", 'w')
          with f: # Let's write latest measurement into a file:
             writer = csv.writer(f, delimiter=";")
             writer.writerow([now, temperature])

MAC address of Ruuvitag must be placed into line 11. Lines 39-40 are for Telegram token and chatid.

Script is scheduled using cron. Below’s a screenshot of an actual notification.

Sauna calls home

Going Serverless

Previous post was about utilizing cloud services to build processes. However I didn’t went all out on this: It was Raspberry Pi located at my home which was utilized to run Python code. And it is the weakest link from (at least) two point of view:

  • Raspberry Pi’s SD memory card will eventually corrupt.
  • Somebody has to maintain Raspberry Pi by patching the operating system and runtimes, monitor disk for not getting full etc. All the regular maintenance tasks administrators have to deal with. That somebody is me: Lazy data engineer.

I already had my first SD card corruption happened last fall: It took only two years from purchase to occur. Second bullet point is much more general problem and it covers all on-premise systems and IaaS option in the cloud as well: Maintenance overhead. One option to avoid SD corruption could be utilizing a VM running on Azure. But then again there’s a problem with bullet point two: Somebody has to maintain that VM. I wish there was a service into which I could just submit the code and not worry about this kind of things…

And there is: Azure has services for building business processes in cloud without having to worry about the server(s) running it. This concept is called serverless. It’s a little bit misleading term since actually there are servers under the hood but the main point is that developers don’t have to worry about those. Traditionally difficult problems, like scaling, is taken care of by the platform so developers can focus on the main thing: Building business processes. Serverless also means apps are billed only when those are actually run so no cost is generated when there isn’t usage. This is called a consumption based plan. So it’s like bring-your-own-code-and-don’t-worry-about-the-platform®.

Serverless offering in Azure builds on top of Logic Apps and Azure Functions. First one is a service for building workflows using a visual designer to automate business processes without coding. It has lots of built-in connectors available for both cloud and on-premise systems and services. It’s based on triggers which means a workflow is kick-started when something happens, e.g. a timer is hit, an certain event (like a file landing into a blob container) happens or a HTTP request is received.

Azure Functions is a service for running code in cloud environment. Language choices at the time of writing are C#, F#, Python, JavaScript, Java, PowerShell and TypeScript. Check the official documentation.

Usually Logic Apps and Functions goes hand in hand so that Logic App works like an orchestrator and calls different functions to control the flow of business processes.

The architecture I was aiming at was like this: Getting rid of Raspberry Pi and setting up a Azure Function to run existing Python code:

Serverless architecture

Python is one of supported languages so that was a natural choice since original code was written on it. It turned out so that all changes I had to do was how to schedule it and how to interact with storage account.

Scheduling can be done using triggers. One type of those is a timer and it’s the one used here. There are also other types of triggers available like an event based (e.g. file lands into blob).

Connecting the function to storage account is achieved using bindings which are endpoints to different Azure services like blob storage, Event Hubs, Cosmos DB etc. In this case we are uploading a csv file to blob so it’s an output binding. To get data into a function, one can use input bindings. Check the documentation for details.

Triggers and bindings are configured in a file named function.json which in this case was like this:

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "mytimer",
      "type": "timerTrigger",
      "direction": "in",
      "schedule": "0 0 8 * * *"
    },
    {
      "type": "blob",
      "direction": "out",
      "name": "outputBlob",
      "path": "sahko/inputdata.csv",
      "connection": "AzureWebJobsStorage"
    }
  ]
}

Mytimer is a schedule for running the function and described as a NCRONTAB expression. It’s like a classic CRON except having an extra sixth field at the beginning which describes seconds (original CRON has time granularity at minute level). Schedule is 0 0 8 * * * which means the function is run in the morning at 08:00 every day. OutputBlob is a binding to storage account container/filename (sahko/inputdata.csv).

Here’s the actual Python code:

import os
import requests
from lxml import html
from datetime import datetime, timezone
from azure.storage.blob import BlobServiceClient
import azure.functions as func

def main(mytimer: func.TimerRequest, outputBlob: func.Out[str]) -> None:
 
    LOGIN = os.environ["LOGIN"]
    PASSWORD = os.environ["PASSWORD"]
    LOGIN_URL = os.environ["LOGIN_URL"]
    URL = os.environ["URL"]

    session_requests = requests.session()

    # Get login csrf token
    result = session_requests.get(LOGIN_URL)
    tree = html.fromstring(result.text)
    authenticity_token = list(set(tree.xpath("//input[@name='__RequestVerificationToken']/@value")))[0]

    # Create payload
    payload = {
        "username": LOGIN,
        "password": PASSWORD,
        "__RequestVerificationToken": authenticity_token
    }

    # Perform login
    result = session_requests.post(LOGIN_URL, data = payload, headers = dict(referer = LOGIN_URL))


    # Scrape url
    result = session_requests.get(URL, headers = dict(referer = URL))

    formatted_output = result.text.replace('\\r\\n', '\n')

    for line in formatted_output.splitlines():
        if line.lstrip()[0:11] == 'var model =':

            jsonni = line.lstrip()[12:-1]
            start = jsonni.find('[[')
            end = jsonni.find(']]')
            jsonni = jsonni[start+1:end+1]
            jsonni = jsonni.replace('],[','\n')
            jsonni = jsonni.replace(']','')
            jsonni = jsonni.replace('[','')

            output = 'timestamp;consumption\n' # header for csv-file
            for line in jsonni.splitlines():
                start = line.find(',')
                epoc = line[0:start]
                measure = line[start+1:]
                timestamp = int(epoc)/1000
                timedate = datetime.fromtimestamp(timestamp, timezone.utc)
                timestamp_str = str(timedate)[:-6]
                output += timestamp_str + ';' + measure + '\n'

            outputBlob.set(output)


On line 8 one can see how timer trigger and output binding are utilized. Line 59 shows how to write csv file into blob. I also took the hard coded connection strings, urls, user id and password away from the code file into application settings. Those values are fetched on lines 10-13.

All other sections are identical compared to original code so in that sense it can be said it was pretty straightforward to migrate existing application from on-premise to serverless.

Development was done using Visual Studio Code. One thing to note is that setting up Azure Function & Python environments in VS Code was a painful experience and could be a blog post on its own.

Solution has been running as serverless now for two weeks without problems. Here’s a screen shot of Azure Portal of actual execution logs. It takes about 5 seconds for code to run:

Execution statistics

Originally I planned it so that Logic App would do orchestration of all components. I mean Logic App would also call Function based on a timer which is set up in Logic App side. However this didn’t work as I initially planned since at the moment Logic App only supports calling Functions which use .NET or JavaScript as a runtime stack. Most likely this will change in future and also Python (and other languages not supported at the moment) functions can be utilized from Logic Apps. And when this happens, there will be a blog post about it!

Getting data into cloud

I wanted to monitor our electricity usage on more granural timeframe than the monthly bills we are getting. My local provider Lahti Energia has usage statistics available on their website that are on a hourly granularity. It even has some data visualizations built-in but those are kind of basic so I figured I could build something by myself.

It is possible to download data in a csv format but that requires logging into service and manually doing some selections. As a lazy data engineer I want to automate things. Unfortunately there’s no any kind of an API, like REST, available so I had to come up something else: Web scraping. It is a method where web browsing is done programmatically to get the data needed. It’s far from ideal because even a tiny change in web page can break the procedure. But since there wasn’t API available, scraping was the way forward.

I had Raspberry Pi up and running so I figured a Python script could do the heavy liftin’n’shiftin. Python also has lots of good libraries for doing screen scraping, connecting Azure etc.

I wanted to utilize cloud services as much as possible because I like the concept of someone else taking care of the platform. So I decided to place the data into a relational cloud database: Azure SQL DB. I already had one running so it was only a matter of creating a couple of database objects. I also like to minimize the amount of code and that’s why utilizing Azure Logic App to orchestrate whole thing seemed to be a right choice.

The architecture I came up is as follows:

  1. Python script running on my Raspberry Pi scrapes the electricity usage data from my local provider’s web site.
  2. Script parses data into tabular format (good old csv) and places the file into a specific Azure blob storage container. Script is scheduled using cron.
  3. Azure Logic app is setup so that it polls blob storage container for new files. When a new file is found, it kicks up a stored procedure found from Azure SQL DB which imports the file into a staging table and from there all new rows are inserted into actual table.
Architecture diagram

For doing screen scraping against website that requires login, I followed this handy tutorial: https://kazuar.github.io/scraping-tutorial/. Also a big shout out to stackoverflow.com for providing lots of good code examples e.g. how to connect Azure storage, convert UNIX epoch time to timestamp etc.

Microsoft has lots of Python modules for managing Azure services and I’m utilizing those to connect Storage account. I already had a storage account so it was just a matter of creating a new container on it and getting a connection string. A parameter connection_string is a pointer to a storage account and it can be found from Azure Portal -> Storage account -> Access keys -> Connection string.

Actual usage data can be found from web page source from a line which begins with a string ‘var model =’. Value is a JSON object but not actually a valid one. This is parsed to tabular format with two column: Timestamp and consumption.

Here’s the code I ended up using:

import requests
import csv
from lxml import html
from datetime import datetime, timezone
from azure.storage.blob import BlobServiceClient

connection_string = "DefaultEndpointsProtocol=https;AccountName=...

# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(connection_string)

# Create a blob client using the local file name as the name for the blob
blob_client = blob_service_client.get_blob_client(container='sahko', blob='inputdata.csv')

USERNAME = ""
PASSWORD = ""

LOGIN_URL = "https://online.lahtienergia.fi/eServices/Online/Login"
URL = "https://online.lahtienergia.fi/Reporting/CustomerConsumption?loadLastMonthData=true"

def main():
    session_requests = requests.session()

    # Get login csrf token
    result = session_requests.get(LOGIN_URL)
    tree = html.fromstring(result.text)
    authenticity_token = list(set(tree.xpath("//input[@name='__RequestVerificationToken']/@value")))[0]

    # Create payload
    payload = {
        "username": USERNAME,
        "password": PASSWORD,
        "__RequestVerificationToken": authenticity_token
    }

    # Perform login
    result = session_requests.post(LOGIN_URL, data = payload, headers = dict(referer = LOGIN_URL))
    #print(result.content)

    # Scrape url
    result = session_requests.get(URL, headers = dict(referer = URL))

    formatted_output = result.text.replace('\\r\\n', '\n')

    for line in formatted_output.splitlines():
        if line.lstrip()[0:11] == 'var model =':
            jsonni = line.lstrip()[12:-1]
            start = jsonni.find('[[')
            end = jsonni.find(']]')
            jsonni = jsonni[start+1:end+1]
            jsonni = jsonni.replace('],[','\n')
            jsonni = jsonni.replace(']','')
            jsonni = jsonni.replace('[','')

            with open('usagedata.csv', 'w', newline='') as file:
                writer = csv.writer(file, delimiter=';') # Let's also write data into file
                output = 'timestamp;consumption\n' # header for csv-file
                for line in jsonni.splitlines():
                    start = line.find(',')
                    epoc = line[0:start]
                    measure = line[start+1:]
                    timestamp = int(epoc)/1000
                    timedate = datetime.fromtimestamp(timestamp, timezone.utc)
                    timestamp_str = str(timedate)[:-6]
                    writer.writerow([timestamp_str, measure])
                    output += timestamp_str + ';' + measure + '\n'
            blob_client.upload_blob(output,overwrite=True)

if __name__ == '__main__':
    main()

The csv file is given a name inputdata.csv and is placed into a container named sahko. It has two columns separated by a semicolon: timestamp and comsumption which tells the electricity consumption during that hour:

timestamp;consumption
2019-12-04 00:00:00;0.3
2019-12-04 01:00:00;0.06
2019-12-04 02:00:00;0.29
2019-12-04 03:00:00;0.06
2019-12-04 04:00:00;0.24
2019-12-04 05:00:00;0.12
2019-12-04 06:00:00;0.23
2019-12-04 07:00:00;0.23
2019-12-04 08:00:00;0.21
2019-12-04 09:00:00;0.3
2019-12-04 10:00:00;0.07
2019-12-04 11:00:00;0.31
2019-12-04 12:00:00;0.06

Now we have raw data stored in blob! To get the csv file from blob to SQL, these two services must be connected. First a shared access signature (SAS) to blob needs to be created. After SAS being created, following commands are being run in Azure SQL DB:

CREATE MASTER KEY ENCRYPTION BY PASSWORD = '...';

CREATE DATABASE SCOPED CREDENTIAL MyCredentials WITH IDENTITY = 'SHARED ACCESS SIGNATURE',SECRET = 'st=...';

CREATE EXTERNAL DATA SOURCE MyAzureStorage WITH (
TYPE = BLOB_STORAGE,
LOCATION = 'https://<storage_account_name>.blob.core.windows.net',
CREDENTIAL = MyCredentials
);

First command creates a master encryption key. Choose a strong password. After this a stored credentials can be made which utilizes the shared access signature created earlier. Lastly an actual external data source can be created utilizing these stored credentials to connect storage. To verify the connection, following SQL query can be run:

Now the Azure SQL DB is successfully connected into storage! Now let’s create a stored procedure dbo.sp_insert_data for loading csv file into a database table:

CREATE PROCEDURE [dbo].[sp_insert_data]

AS
BEGIN

	TRUNCATE TABLE [dbo].[stg_sahko];

	BULK INSERT [dbo].[stg_sahko]
	FROM 'sahko/inputdata.csv'
	WITH (DATA_SOURCE = 'MyAzureStorage',
		  FIELDTERMINATOR = ';',
		  ROWTERMINATOR = '0x0a', --LF
		  FIRSTROW = 2);

	INSERT usage(pvm, usage)
	SELECT CAST(s.[pvm] AS DATETIME) AS pvm
          ,CAST(s.[usage] AS NUMERIC(10,2)) AS usage
	FROM stg_sahko s LEFT JOIN usage u ON s.pvm=u.pvm 
	WHERE u.pvm IS NULL;
    
END
GO

The logic goes so that first a temporary staging table (dbo.stg_sahko) is emptied by running a truncate command. Then raw data from csv file is loaded into this table. We have to tell a little bit of metadata (field terminator character, row terminator and first row) so that the database engine knows how to load file correctly. From staging table only new rows are loaded into actual destination table (dbo.usage) and also the fields are casted into real datatypes (datetime & numeric).

Now we have methods for both generating a csv file and loading that into database. Now we just have to combine there two so that when csv file is placed into blob, it will be loaded into database. For this I’m utilizing Azure Logic app which is a service for automating and orchestrating data flows, processes etc.

Azure Logic app acts like an orhestrator: It polls the specific container (sahko) in blob storage and if there’s new file added (or modified), it fires stored procedure located in SQL to load data into table. Here’s the workflow:

Python script runs once a day at 10 am and pushes the csv file into blob. Even though the website data has a granularity of one hour, it is only updated once a day so there’s no point running script more than once a day.

Here’s a screen shot of Logic App logs which shows that the stored procedure has been run couple minutes after csv file has landed into blob container:

Here’s a screenshot of actual target table in SQL:

And that’s it folks! I’ve shown how to automate data gathering utilizing modern cloud services. I didn’t touch the area of data analysis at all but let’s see if we are able to do that on following posts.

Create your website at WordPress.com
Get started