-
Notifications
You must be signed in to change notification settings - Fork 92
sql query
Up until now we have focused on doing processing and setting up alerts without having to write any code. That is powerful and will satisfy a bunch of scenarios. However, when the no code experience isn't sufficient to meet the needs, you can use the power of Spark SQL for more complex processing, and do so without having to learn Scala. In this tutorial you will learn how to use SQL for processing data, chain queries, output data to sinks.
-
Open a Flow or create a new one using first tutorial to create a pipeline in 5 minutes
-
Switch to the Query tab. This is where you can write all your SQL code. Data Accelerator supports full Spark SQL, and enhances it with few more keywords that you will learn about in the next few tutorials. For now, write a statement such as:
--DataXQuery--
T1 = SELECT * FROM DataXProcessedInput;
-
Besides the schema defined in the Input tab, each message coming in from Eventhub and IoThub will also have two additional columns: _SystemProperties and _Properties.
-
Each query is separated by comment "--DataXQuery--" and ends with a semi-colon.
-
You can also chain queries.
--DataXQuery--
DeviceInputSummary = SELECT
deviceDetails.deviceId,
deviceDetails.deviceType,
eventTimeStamp,
deviceDetails.homeId,
deviceDetails.status
FROM T1
GROUP BY deviceId, deviceType, eventTimeStamp, homeId, status;
--DataXQuery--
DeviceInfo = SELECT
deviceId,
deviceType,
homeId,
MAX(eventTimeStamp) AS MaxEventTime,
MIN(status) AS MinReading,
MAX(status) AS MaxReading
FROM DeviceWindowedInputSummary
GROUP BY deviceId, deviceType, homeId;
OUTPUT DeviceInfo TO myCosmosDB;
- You can use the full power of SQL and do JOINS, UNIONS, GROUP BYs, etc. Full documentation of Spark SQL. Example of some of the things possible.
--Example use GROUP BY to divide query results into groups for aggregations on each group
--DataXQuery--
DeviceWindowedInput = SELECT
deviceDetails.deviceId,
deviceDetails.deviceType,
eventTimeStamp,
deviceDetails.homeId,
deviceDetails.status
FROM DataXProcessedInput
TIMEWINDOW('5 minutes')
GROUP BY deviceId, deviceType, eventTimeStamp, homeId, status;
--DataXQuery--
DeviceInfoTimeWindow = SELECT
deviceId,
deviceType,
homeId,
MAX(eventTimeStamp) AS MaxEventTime,
MIN(status) AS MinReading,
MAX(status) AS MaxReading
FROM DeviceWindowedInput
GROUP BY deviceId, deviceType, homeId;
--DataXQuery--
WindowLockStateFiltered = SELECT
eventTimeStamp,
deviceDetails.deviceId,
deviceDetails.deviceType,
deviceDetails.homeId,
deviceDetails.status
FROM DataXProcessedInput
WHERE deviceDetails.homeId = 150
AND deviceDetails.deviceType = 'WindowLock'
GROUP BY eventTimeStamp, deviceId, deviceType, homeId, status;
--Example INNER JOIN two tables
--DataXQuery--
WindowLockWindowedFiltered = SELECT *
FROM DeviceInfoTimeWindow
INNER JOIN WindowLockStateFiltered ON WindowLockStateFiltered.eventTimeStamp = DeviceInfoTimeWindow.MaxEventTime
WHERE DeviceInfoTimeWindow.homeId = 150
AND DeviceInfoTimeWindow.deviceType = 'WindowLock';
--Example UNION two data sets
HomeGrarageDoorLockInput = SELECT
deviceDetails.deviceId,
deviceDetails.deviceType,
eventTimeStamp,
deviceDetails.homeId,
deviceDetails.status
FROM DataXProcessedInput
WHERE deviceDetails.homeId = 150
AND deviceDetails.deviceType = 'GarageDoorLock'
UNION ALL
SELECT deviceDetails.deviceId,
deviceDetails.deviceType,
eventTimeStamp,
deviceDetails.homeId,
deviceDetails.status
FROM DataXProcessedInput
WHERE deviceDetails.homeId = 87
AND deviceDetails.deviceType = 'GarageDoorLock';
--Example CAST value to another type
CAST(Properties.`user.isoptedin` AS Boolean) AS IsOptedIn,
CAST(UNIX_TIMESTAMP(_SystemProperties.`x-opt-enqueued-time`, 'EEE MMM dd HH:mm:ss z yyyy') AS TIMESTAMP) AS EnqueuedTimeUtc,
--Example Formatting. In this example, phone number 4251234567 will be formatted to 425-123-4567.
--Note, you can specify any SQL statement
SUBSTRING(Properties.phoneNumber, 1, 3) + '-'
+ SUBSTRING(Properties.phoneNumber, 4, 3) + '-'
+ SUBSTRING(Properties.phoneNumber, 7, 4) AS PhoneNumber,
--Example Add a property to the bag
addProperty(Properties, 'datax.version', '1.2') AS Properties,
--Example Add a column to the output called AnswerToLife with value 42
42 AS AnswerToLife,
-- Add Tags as key-value pairs in a bag
MAP("WindowsName",
CASE
WHEN Properties.`os.version` LIKE '10%' THEN 'Windows 10'
WHEN Properties.`os.version` LIKE '8%' THEN 'Windows 8'
WHEN Properties.`os.version` LIKE '7%' THEN 'Windows 7'
ELSE 'Windows Unknown'
END,
"StartupPerf",
CASE
WHEN Measures.`startuptime` > 10 THEN 'SLOW'
ELSE 'FAST'
END)
AS Tags,
-- Add Tag as column
CASE
WHEN Properties.`os.version` LIKE '10%' THEN 'Windows 10'
WHEN Properties.`os.version` LIKE '8%' THEN 'Windows 8'
WHEN Properties.`os.version` LIKE '7%' THEN 'Windows 7'
ELSE 'Windows Unknown'
END AS WindowsVersion,
-
To assist you with programming and avoid typos, you will get intellisense for the columns of the table. Simply write the name of the table and dot and hit Ctrl+Space to invoke intellisense. For those who have worked with long SQL queries and tables, will appreciate that this will save hours of frustration with typos!
-
Go ahead! Try out some queries and send the data sets to different sinks. You are on your way to create some powerful stream processing using SQL skills!