Spark Streaming & Real Time Analytics on AWS
This tutorial describes a real time analytics frame work using spark streaming and window functions on AWS real time streaming application Kinesis.
Amazon Kinesis Data Streams (KDS) is a massively scalable and durable real-time data streaming service. KDS can continuously capture gigabytes of data per second from hundreds of thousands of sources such as website clickstreams, database event streams, financial transactions, social media feeds, IT logs, and location-tracking events. The data collected is available in milliseconds to enable real-time analytics use cases such as real-time dashboards, real-time anomaly detection, dynamic pricing, and more
High Level Design :
Continues events are being sent to Kinesis , Spark Subscribes to Kinesis and reads events in real time performs aggregation on the fly and saves the results . This is quite a common scenario now a days for a data engineer
Steps
- Define Kinesis Data Stream
- Write Streaming data to Kinesis
- Create Spark Streaming Cluster
- Apply analytics on Spark streaming data using window functions
Define Kinesis Data Stream
Just with few clicks we can easily create data stream in AWS. Configured the stream with shard count 1.
Write Streaming Data into Kinesis
A simple python code snippet can act as Kinesis client application to write continues events to data stream . The below python code snippet publish messages to Kinesis for every 3 seconds .
Python has got excellent libraries to create test data and automate testing. Faker and Random are my preferred choice.
Code Snippet
import os
import boto3
from faker import Faker
import json
import random
import datetime
import time
# Get AWS Access Key details
awsaccesskey = os.getenv('AWS_ACCESS_KEY_ID')
awssecretaccesskey = os.getenv('AWS_SECRET_ACCESS_KEY')
#Create Kinesis Client
KinesisClinet = boto3.client('kinesis',aws_access_key_id=awsaccesskey,aws_secret_access_key=awssecretaccesskey,region_name='us-east-1')
#Generate record with Faker and Random libraries
record = {}
faker = Faker()
while True:
record['first_name'] = faker.first_name()
record['last_name'] = faker.last_name()
record['personal_email'] = faker.email()
record['ssn'] = faker.ssn()
record['office'] = faker.city()
record['title'] = faker.job()
#columns that do not use faker
record['gender'] = 'M' if random.randint(0,1) == 0 else 'F'
record['org'] = random.choice(['Engineer','Sales','Associate','Manager','VP'])
record['accrued_holidays'] = random.randint(0,20)
record['salary'] = round(random.randint(50000,100000)/1000)*1000
record['bonus'] = round(random.randint(0,5000)/500)*500
record['event_time'] = datetime.datetime.now().isoformat()
#Write Data to Kinesis Stream
KinesisClinet.put_record(StreamName='Kinesis-Spark-Streaming-Analytics',Data=json.dumps(record),PartitionKey='part_key')
time.sleep(3)
Create Spark Streaming Application
So far we are done with Creating Stream and writing data to Stream. Now it’s Spark’s show.
We can launch either AWS EMR spark cluster or local spark cluster or any third party offerings like Databricks or Cloudera . I will go with Databricks Community edition.
Kinesis delivers event data in json format. So we have to infer the schema of event data in Spark. The below does an explicit schema definition for employee record
Schema definition
Spark Read Kinesis Stream
Extract Data and apply Schema
Define Window Function
Our aim is to calculate moving counts of employees by organization per every minute. So grouping is done on org and window by 1 minute on event time.
Start Spark Stream
Trigger Spark stream by WriteStream. As you can see the results are saved in memory as table with name ‘Counts’. We can query counts just like a table.
Query Results
Hope you enjoy !!
Please let me know your feedback .
Thanks
Reference :
Structured Streaming — Databricks Documentation
Amazon Kinesis Data Streams — Data Streaming Service — Amazon Web Services