Building a data lake on AWS using Redshift Spectrum

In one of our earlier posts, we had talked about setting up a data lake using AWS LakeFormation. Once the data lake is setup, we can use Amazon Athena to query data. Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage. With Athena, there is no need for complex ETL jobs to prepare data for analysis. Today, we will explore querying the data from a data lake in S3 using Redshift Spectrum. This use case makes sense for those organizations that already have a significant exposure to using Redshift as their primary data warehouse.

Amazon Redshift Spectrum

Amazon Redshift Spectrum is used to efficiently query and retrieve structured and semistructured data from files in Amazon S3 without having to load the data into Amazon Redshift tables. Amazon Redshift Spectrum resides on dedicated Amazon Redshift servers that are independent of your cluster. Redshift Spectrum pushes many compute-intensive tasks, such as predicate filtering and aggregation, down to the Redshift Spectrum layer.

How is Amazon Athena different from Amazon Redshift Spectrum?

  1. Redshift Spectrum needs an Amazon Redshift cluster and an SQL client that’s connected to the cluster so that we can execute SQL commands. But Athena is serverless.
  2. In Redshift Spectrum the external tables are read-only, it does not support insert query. Athena supports the insert query which inserts records into S3.

Amazon Redshift cluster

To use Redshift Spectrum, you need an Amazon Redshift cluster and a SQL client that’s connected to your cluster so that you can execute SQL commands. The cluster and the data files in Amazon S3 must be in the same AWS Region.

Redshift cluster needs the authorization to access the external data catalog in AWS Glue or Amazon Athena and the data files in Amazon S3. Let’s kick off the steps required to get the Redshift cluster going.

Create an IAM Role for Amazon Redshift

  1. Open the IAM console, choose Roles.
  2. Then choose, Create role.
  3. Choose AWS service, and then select Redshift.
  4. Under Select your use case, select Redshift – Customizable and then choose Next: Permissions.
  • Then Attach permissions policy page appears. Attach the following policies AmazonS3FullAccess, AWSGlueConsoleFullAccess and AmazonAthenaFullAccess
  • For Role name, enter a name for your role, in this case, redshift-spectrum-role.
  • Choose Create role.

Create a Sample Amazon Redshift Cluster

  • Open the Amazon Redshift console.
  • Choose the AWS Region. The cluster and the data files in Amazon S3 must be in the same AWS Region.
  • Select CLUSTERS and choose Create cluster.
    Cluster Configuration:
    • Based on the size of data and type of data(compressed/uncompressed), select the nodes.
    • Amazon Redshift provides an option to calculate the best configuration of a cluster, based on the requirements. Then choose to Calculate the best configuration for your needs.
    • In this case, use dc2.large with 2 nodes.
  • Specify Cluster details.
    • Cluster identifier: Name-of-the-cluster.
    • Database port: Port number 5439 which is the default.
    • Master user name: Master user of the DB instance.
    • Master user password: Specify the password.
  • In the Cluster permissions section, select Available IAM roles and choose the IAM role that was created earlier, redshift-spectrum-role. Then choose the Add IAM role.
  • Select  Create cluster, wait till the status is Available.

Connect to Database

  1. Open the Amazon Redshift console and choose EDITOR.
  2. Database name is dev.

Create an External Schema and an External Table

External tables must be created in an external schema.

  • To create an external schema, run the following command. Please replace the iam_role with the role that was created earlier.
create external schema spectrum
from data catalog
database 'spectrumdb'
iam_role 'arn:aws:iam::xxxxxxxxxxxx:role/redshift-spectrum-role'
create external database if not exists;
  • Copy data using the following command. The data used above is provided by AWS. Configure aws cli on your machine and run this command.
aws s3 cp s3://awssampledbuswest2/tickit/spectrum/sales/ s3://bucket-name/data/source/ --recursive
  • To create an external table, please run the following command. The table is created in the spectrum.
create external table spectrum.table_name(
salesid integer,
listid integer,
sellerid integer,
buyerid integer,
eventid integer,
dateid smallint,
qtysold smallint,
saletime timestamp)
row format delimited
fields terminated by '\t'
stored as textfile
location 's3://bucket-name/copied-prefix/';

Now the table is available in Redshift Spectrum. We can analyze the data using SQL queries like so:

SELECT
 *
FROM
spectrum.rs_table
LIMIT 10;

Create a Table in Athena using Glue Crawler

In case you are just starting out on the AWS Glue crawler, I have explained how to create one from scratch in one of my earlier articles. In this case, I created the rs_table in spectrumdb database.

Comparison between Amazon Redshift Spectrum and Amazon Athena

I ran some basic queries in Athena and Redshift Spectrum as well. The query elapsed time comparison is as follows. It take about 3 seconds on Athena compared to about 16 seconds on Redshift Spectrum.

The idea behind this post was to get you up and running with a basic data lake on S3 that is queryable on Redshift Spectrum. I hope it was useful.

This story is authored by PV Subbareddy. He is a Big Data Engineer specializing on AWS Big Data Services and Apache Spark Ecosystem.

Federated Querying across Relational, Non-relational, Object, and Custom Data Sources using Amazon Athena

Querying Data from DynamoDB in Amazon Athena

Amazon Athena now enables users to run SQL queries across data stored in relational, non-relational, object, and custom data sources. With federated querying, customers can submit a single SQL query that scans data from multiple sources running on-premises or hosted in the cloud.

Athena executes federated queries using Athena Data Source Connectors that run on AWS Lambda. Athena federated query is available in Preview in the us-east-1 (N. Virginia) region.

Preparing to create federated queries is a two-part process:

  1. Deploying a Lambda function data source connector.
  2. Connecting the Lambda function to a data source. 

I assume that you have at least one DynamoDB table in us-east-1 region.

Deploy a Data Source Connector

  • Open the Amazon Athena console and choose the Connect data source. This feature is available in the region us-east-1 only.
  • On the Connect data source console, choose Query a data source feature. And choose Amazon DynamoDB as a data source.
  • Choose Next
  • For the Lambda function, choose to Configure new function. It opens in the Lambda console in a new tab with information about the connector.
  • Under ApplicationSettings, provide the required information.
    1. AthenaCatalogName – A name for the Lambda function.
    2. SpillBucket – An Amazon S3 bucket in your account to store data that exceeds Lambda function response size limits.
    3. SpillPrefix – Data that exceeds Lambda function response size limits stores under the Spillbucket/Spillprefix.
  • Choose I acknowledge that this app creates custom IAM roles and choose Deploy.

Connect to a data source using a connector that deployed in the earlier step

  • Open the Amazon Athena console and choose the Connect data source. This feature is available in the region us-east-1 only.
  • On the Connect data source console, choose Query a data source feature. And choose Amazon DynamoDB as a data source and choose Next.
  • Configure the Lambda function, choose the name of the lambda that you created in the earlier step.
  • Configure Catalog name, enter a unique name to use for the data source in your SQL queries, such as dynamo_athena.
  • Choose Connect. Now the data source is available under the Data Sources section in Amazon Athena.

Querying Data using Federated Queries

To use this feature in preview, you must create an Athena workgroup named AmazonAthenaPreviewFunctionality and join that workgroup.

Create an Athena workgroup

  • Open the Amazon Athena console and choose Workgroup, and choose Create workgroup.
  • After creating a Workgroup, under Workgroup section select the created workgroup and choose Switch workgroup.
  • Select the Data source that was created in the earlier step in Athena. After choosing the data source, the DynamoDb tables are available in Athena in the default database.

Querying Data in Athena using SQL Queries

The following query is used to retrieve data from DynamoDB in Athena.

SELECT * FROM "data_source_connector"."database_name"."table_name";

Creating Athena table using CTAS with results of querying DynamoDB

The CTAS query looks like the following. Using the CTAS query, the format of data can be changed into the required format be it parquet, JSON, and CSV, etc.

CREATE TABLE database.table_name
WITH (
      external_location = 's3://bucket-name/data/',
      format = 'parquet')
AS 
SELECT * FROM "data_source_connector"."database_name"."table_name";

I hope this was helpful and look forward to your comments.

This story is authored by PV Subbareddy. Subbareddy is a Big Data Engineer specializing on AWS Big Data Services and Apache Spark Ecosystem.

How to Customize QuickSight Dashboards for User Specific Data

We have been getting a lot of queries on how to customize a single QuickSight dashboard for user specific data. We can accomplish this by filtering the dashboard data with login username using AWS QuickSight’s Row-Level Security. To further explain this use-case, let’s consider the sales department in a company. Every day your team of sales agents contacts a list of potential customers. Now you need a single dashboard that is accessed by all the agents but only displays the list of prospects he or she is assigned to.

Note: This is completely different from filter/controls on QuickSight dashboards. If you have filters/controls/parameters set up with dynamic values being picked up from the dataset, then even that data is filtered with Row-Level security, as the underlying dataset itself is filtered with the login username.

Let’s get on with the show! I have created a hypothetical data set. This dataset has a column named assigned-agent which shall be used for filtering.

Using this dataset, I have created a dashboard that looks like below.

This dashboard is shared with two other IAM users (sales agents).

As we haven’t set up any rules both of them can access whole data.

As you can see ziva, could also access whole data and we don’t want that!

Our requirement:

User NameAgent NamePermissions
nickNick HoweCan access only his prospects
zivaZiva MedalleCan access only her prospects
managerNASuper user, can access all prospects

Creating Data Set Rules for Row-Level Security:

Create a file or a query that contains the data set rules (permissions).

It doesn’t matter what order the fields are in. However, all the fields are case-sensitive. They must exactly match the field names and values.

The structure should look similar to one of the following. You must have at least one field that identifies either users or groups. You can include both, but only one is required, and only one is used at a time. If you are specifying groups, use only Amazon QuickSight groups or Microsoft AD groups.

The following example shows a table with user names.

UserNameagent_assigned
nickNick Howe
zivaZiva Medalle
managerNick Howe,Ziva Medalle

For SQL:

/* for users*/
select User as UserName, Agent as agent_assigned
from permissions_table;

Or if you prefer to use a .csv file:

UserName,agent_assigned
"nick","Nick Howe"
"ziva","Ziva Medalle"
"manager","Nick Howe,Ziva Medalle"

Here agent_assigned is a column in the dataset, and UserName is the same as QuickSight login name.

What we are essentially doing is mapping UserName with the agent_assigned column. Let’s suppose ziva has logged in, only those records with condition agent_assigned = Ziva Medalle are picked up. Same is the case with nick.

But in the case of the manager, we want him to be a superuser, so we added all the agent names (all values of agent_assigned column).

Note: If you are using an Athena or an RDS or a Redshift or an S3 CSV file-based dataset, just make sure the output format/structure of those sources matches the above-mentioned formats.

Create Permissions Data Set:

Create a QuickSight dataset with the above data set rules. Go to Manage data, choose New data set, choose source and create accordingly. As mine is a CSV, I have just uploaded it. To make sure that you can easily find it, give it a meaningful name, for example in my case Permissions-prospects-list.

After finishing, Refresh the page as it might not appear in the data sources list while applying it to the dataset.

Creating Row-Level Security: 

Choose Permissions, From the list choose the permissions dataset that you have created earlier.

Choose the Apply data set.

Once you have applied, you should be seeing the dataset has a new lock symbol on it saying restricted.

That’s it. Now the data is filtered/secured based on username.

Manager’s Account:

Ziva’s Account:

Nick’s Account:

You could also add Users to Groups and have permissions set at the group level. More information here.

I hope it was helpful, any queries drop them in the comments section.

Thanks for the read!

This story is authored by Koushik. Koushik is a software engineer and a keen data science and machine learning enthusiast.

Machine Learning based Fuzzy Matching using AWS Glue ML Transforms

Machine Learning Transforms in AWS Glue

Machine Learning Transforms in AWS Glue

AWS Glue provides machine learning capabilities to create custom transforms to do Machine Learning based fuzzy matching to deduplicate and cleanse your data. For this we are going to use a transform named FindMatches. The FindMatches transform enables you to identify duplicate or matching records in your dataset, even when the records do not have a common unique identifier and no fields match exactly. This will not require writing any code or knowing how machine learning works. For more details about ML Transforms, please go through the docs.

Creating a Machine Learning Transform with AWS Glue

This article walks you through the actions to create and manage a machine learning (ML) transform using AWS Glue. I assume that you are familiar with using the AWS Glue console to add crawlers and jobs and edit scripts. You should also be familiar with finding and downloading files on the Amazon Simple Storage Service (Amazon S3) console.

In case you are just starting out on AWS Glue, I have explained how to create an AWS Glue Crawler and Glue Job from scratch in one of my earlier articles.
The source data used in this blog is a hypothetical file named customers_data.csv. A second file, label_file.csv, is an example of a labeling file that contains both matching and nonmatching records used to teach the transform.

Step 1: Crawl the Data using AWS Glue Crawler

At the outset, crawl the source data from the CSV file in S3 to create a metadata table in the AWS Glue Data Catalog. I created a crawler pointing to the source location (s3://bucketname/data/ml-transform/customers/).

In case you are just starting out on the AWS Glue crawler, I have explained how to create one from scratch in one of my earlier articles. If you run this crawler, it creates a customers table in the specified database (ml-transform).

Step 2: Add a Machine Learning Transform

Next, add a machine learning transform that is based on the schema of your data source table created by the above crawler.

  • On the AWS Glue console, in the navigation pane, choose ML Transforms, Add transform.
    1. For transform name, enter ml-transform. This is the name of the transform that is used to find matches in the source data.
    2. Choose an IAM role that has permission to access Amazon S3 and AWS Glue API operations.

Choose Worker type and Maximum capacity as per the requirements.
3. For Data source, choose the table that was created in the earlier step. In this, the table named customers in database ml-transform.
4. For Primary key, choose the primary key column for the table, email.

  • Choose Finish.

Step 3: How to Teach Your Machine Learning Transform

Next, teach the machine learning transform using the sample labeling file.
You can’t use a machine language transform in an extract, transform, and load (ETL) job until its status is Ready for use. To get your transform ready, you must teach it how to identify matching and non-matching records by providing examples of matching and non-matching records. To teach your transform, you can Generate a label file, add labels, and then Upload label file.

For this article, the label file I have used is label_file.csv

  • On the AWS Glue console, in the navigation pane, choose ML Transforms.
  • Choose the earlier created transform, and then choose Action, Teach.
  • If you don’t have the label file, choose I do not have labels, you can Generate a label file, add labels, and then Upload label file.

If you have the label file, choose I have labels, then choose Upload labelling file from S3.
Choose an Amazon S3 path to the sample labeling file in the current AWS Region. (s3://bucketname/data/ml-transform/labels/label_file.csv) with the option to overwrite existing labels. The labeling file must be located in S3 in the same Region as the AWS Glue console.

When you upload a labeling file, a task is started in AWS Glue to add or overwrite the labels used to teach the transform how to process the data source.

  • Choose Finish, and return to the ML transforms list.

Step 4: Estimate the Quality of ML Transform

What is Labeling?

The act of labeling is creating a labeling file (such as in a spreadsheet) and adding identifiers, or labels, into the label column that identifies matching and non-matching records. It is important to have a clear and consistent definition of a match in your source data. AWS Glue learns from which records you designate as matches (or not) and uses your decisions to learn how to find duplicate records.

Next, you can estimate the quality of your machine learning transform. The quality depends on how much labeling you have done.

  • On the AWS Glue console, in the navigation pane, choose ML Transforms.
  • Choose the earlier created transform, and choose the Estimate quality tab. This tab displays the current quality estimates, if available, for the transform.
  • Choose Estimate quality to start a task to estimate the quality of the transform. The accuracy of the quality estimate is based on the labeling of the source data.
  • Navigate to the History tab. In this pane, task runs are listed for the transform, including the Estimating quality task. For more details about the run, choose Logs. Check that the run status is Succeeded when it finishes.

Step 5: Create and Run a Job with ML Transform

In this step, we use your machine learning transform to add and run a job in AWS Glue. When the transform is Ready for use, we can use it in an ETL job.

On the AWS Glue console, in the navigation pane, choose Jobs.

Choose Add job.

In case you are just starting out on AWS Glue ETL Job, I have explained how to create one from scratch in one of my earlier articles.

  • For Name, choose the example job in this tutorial, ml-transform.
  • Choose an IAM role that has permission to access Amazon S3 and AWS Glue API operations.
  • For ETL language, choose Spark 2.2, Python 2. Machine learning transforms are currently not supported for Spark 2.4.
  • For Data source, choose the table created in Step 1. The data source you choose must match the machine learning transform data source schema.
  • For Transform type, choose to Find matching records to create a job using a machine learning transform.
  • For Transform, choose transform created in step 2, the machine learning transform used by the job.
  • For Create tables in your data target, choose to create tables with the following properties.
    • Data store type — Amazon S3
    • Format — CSV
    • Compression type — None
    • Target path — The Amazon S3 path where the output of the job is written (in the current console AWS Region)

Choose Save job and edit script to display the script editor page. The script looks like the following. After you edit the script, choose Save.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglueml.transforms import FindMatches

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "ml_transforms", table_name = "customers", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "ml_transforms", table_name = "customers", transformation_ctx = "datasource0")
## @type: ResolveChoice
## @args: [choice = "MATCH_CATALOG", database = "ml_transforms", table_name = "customers", transformation_ctx = "resolvechoice1"]
## @return: resolvechoice1
## @inputs: [frame = datasource0]
resolvechoice1 = ResolveChoice.apply(frame = datasource0, choice = "MATCH_CATALOG", database = "ml_transforms", table_name = "customers", transformation_ctx = "resolvechoice1")
## @type: FindMatches
## @args: [transformId = "eacb9a1ffbc686f61387f63", emitFusion = false, survivorComparisonField = "<primary_id>", transformation_ctx = "findmatches2"]
## @return: findmatches2
## @inputs: [frame = resolvechoice1]
findmatches2 = FindMatches.apply(frame = resolvechoice1, transformId = "eacb9a1ffbc686f61387f63", transformation_ctx = "findmatches2")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://bucket-name/data/ml-transforms/output/"}, format = "csv", transformation_ctx = "datasink3"]
## @return: datasink3
## @inputs: [frame = findmatches2]
datasink3 = glueContext.write_dynamic_frame.from_options(frame = findmatches2, connection_type = "s3", connection_options = {"path": "s3:/<bucket-name>/data/ml-transforms/output/"}, format = "csv", transformation_ctx = "datasink3")
job.commit()

Choose Run job to start the job run. Check the status of the job in the jobs list. When the job finishes, in the ML transform, History tab, there is a new Run ID row added of type ETL job. 

Navigate to the Jobs, History tab. In this pane, job runs are listed. For more details about the run, choose Logs. Check that the run status is Succeeded when it finishes.

Step 6: Verify Output Data from Amazon S3 in Amazon Athena

In this step, check the output of the job run in the Amazon S3 bucket that you chose when you added the job. You can create a table in the Glue Data catalog pointing to the output location, just like the way we crawled the source data in Step 1. You can then query the data in Athena.

However, the Find matches transform adds another column named match_id to identify matching records in the output. Rows with the same match_id are considered matching records.

If you don’t find any matches, you can continue to teach the transform by adding more labels.

Thanks for the read and look forward to your comments

This story is authored by PV Subbareddy. Subbareddy is a Big Data Engineer specializing on AWS Big Data Services and Apache Spark Ecosystem.

Processing High Volume Big Data Concurrently with No Duplicates using AWS SQS

In this blog post, we’ll be looking at how one could leverage AWS Simple Queue Service (Standard queue) to achieve high concurrency while processing with no duplicates. Also we compare it with other AWS services like DynamoDB, SQS FIFO queue and Kinesis in terms of cost and performance.

A simple use case for the below architecture could be building an end-end messaging service, or sending out transactional emails. In both the above use cases, a highly concurrent processing with no duplicates is needed.

Using AWS SQS with Lambda to process Big data concurrently with no duplicates

We have a Lambda function that writes messages to the Standard queue in batches. This writer function is invoked when a file is posted to S3. While there are messages in the queue, Lambda polls the queue, reads messages in batches and invokes the Processor function synchronously with an event that contains queue messages. The processing function is invoked once for each batch. When the function successfully processes a batch, Lambda deletes its messages from the queue. If at all the function fails processing(raise error) the batch is put back in the queue. Now, the Standard queue is configured with redrive policy to move messages to a Dead Letter Queue (DLQ) when receive request reaches the Maximum receive count(MRC). We set the MRC to 1 to ensure deduplication.

Setting up Standard Queue with Dead Letter Queue

We need two queues one for processing, second for moving failed messages into it. First create the failed_messages queue. As it is needed while creating the message processing queue. Create a new queue, give it a name (failed_messages), select type as Standard and choose Configure Queue

According to the needs, set the queue attributes like visibility timeout, message retention period etc.

For processing messages, Create a new queue, give it a name, select type as standard and choose Configure Queue.

Set the Default Visibility Timeout to 5min and Dead Letter Queue Settings to setup the redrive policy to move failed messages into failed_messages queue created earlier.

From the SQS homepage, select processing queue, and select Redrive Policy, If setup correctly you should see the ARN of failed_messages queue there.

Creating the Writer and Processor lambda functions:

Writer.py

# Write batch messages to queue
import csv
import boto3

s3 = boto3.resource('s3')

# Update this dummy URL
processing_queue_url = "https://sqs.us-west-2.amazonaws.com/85XXXXXXX205/ToBeProcessed"

def lambda_handler(event, context):
    try:
        if 'Records' in event:
            bucket_name = event['Records'][0]['s3']['bucket']['name']        
            key = event['Records'][0]['s3']['object']['key']
            bucket = s3.Bucket(bucket_name)
            obj = bucket.Object(key=key)

            # get the object
            response = obj.get()['Body'].read().decode('utf-8').split('\n')
            resp = list(response)
            if resp[-1] == '':
                #removing header metadata and extra newline
                total_records = len(resp) - 2 
            else:
                #removing header metadata
                total_records = len(resp) - 1 
            print("total record count is :", total_records)

            batch_size = 0
            record_count = 0
            messages = []

            # Write to SQS
            for row in csv.DictReader(response):
                record_count += 1
                record = {}
                for k,v in row.items():
                    record[k] = v

                # Replace below with appropriate column with all values as unique
                unique_id = record['ANY_COLUMN_WITH_ALL_VALUES_UNIQUE']
                
                batch_size += 1
                messages.append(
                {
                    'Id': unique_id,
                    'MessageBody': json.dumps(record)
                })
                   
                if (batch_size == 10):
                    batch_size = 0
                    try:
                        response = sqs.send_message_batch(
                            QueueUrl = processing_queue_url,
                            Entries = messages
                        )
                        print("response:", response)
                        if 'Failed' in response:
                            print('failed_count:', len(response['Failed']))
                    except Exception as e:
                        print("error:",e)
                    messages = []
                
                # Handling last batch
                if(record_count == total_records):
                    print("batch size is :", batch_size)
                    batch_size = 0
                    try:
                        response = sqs.send_message_batch(
                            QueueUrl = processing_queue_url,
                            Entries = messages
                        )
                        print("response:", response)
                        if 'Failed' in response:
                            print('failed count is :', len(response['Failed']))
                    except Exception as e:
                        print("error:",e)
                    messages = []    
        
        print('record count is :', record_count)

    except Exception as e:
        return e

Processor.py

# Process queue messages

def handler(event, context):
    if 'Records' in event:
        try:
            messages = event['Records']
            for message in messages:
                print("message to be processed :", message)
                
                result = message['body']
                result = json.loads(result)

                print("result:",result)
            return {
                'statusCode': 200,
                'body': 'All messages processed successfully'
            }

        except Exception as e:
            print(e)
            return str(e)

Setting up S3 as trigger to Writer lambda

Setting up SQS trigger to processor Lambda

If set up properly, you should be able to view it in Lambda Triggers section from the SQS homepage like this.

The setup is done. To test this upload a .csv file to the S3 location.

SQS Standard Queue in comparison with FIFO queue

FIFO queue in SQS supports deduplication in two ways:

  1. Content based deduplication while writing to SQS.
  2. Processing one record/batch at a time. 

Unlike Standard Queue, FIFO doesn’t support concurrency and lambda invocation. On top of all this there is a limit to how many messages you could write to FIFO queue in a second. FIFO queues are much suited when the order of processing is important.

Cost analysis:
First 1 million Amazon SQS requests are free each month.

TypeCost per 1 million requests
Standard Queue$0.40
FIFO Queue$0.50

More on SQS pricing here.

SQS Standard Queue in comparison with DynamoDB

DynamoDB streams are slow when compared SQS, and costs on various aspects like:

  1. Data Storage
  2. Writes
  3. Reads
  4. Provisioned throughput
  5. Reserved capacity
  6. Indexed data storage
  7. Streams and many more.

In a nutshell, DynamoDB’s monthly cost is dictated by data storage, writes and reads. The best use cases for DynamoDB are those that require a flexible data model, reliable performance, and the automatic scaling of throughput capacity.

SQS Standard Queue in comparison with Kinesis

Kinesis primary use case is collecting, storing and processing real-time continuous data streams. Kinesis is designed for large scale data ingestion and processing, with the ability to maximise write throughput for large volumes of data.

While a message queue makes it easy to decouple and scale micro-services, distributed systems, and serverless applications. Using a queue, you can send, store, and receive messages between software components at any volume, without losing messages or requiring other services to be always available. In a nutshell, Serverless applications are built using micro services, message queue serves as a reliable plumbing.

Drawbacks of Kinesis:

  1. Shard management
  2. Limited Read Throughput

For a much detailed comparison of SQS and Kinesis visit here.

Thanks for the read, I hope it was helpful.

This story is authored by Koushik. Koushik is a software engineer and a keen data science and machine learning enthusiast.

Real Time Streaming Data Analytics using Amazon Kinesis Family

Amazon Kinesis Data Analytics

Amazon Kinesis Data Analytics (KDA) is the easiest way to analyze streaming data, gain actionable insights, and respond to your business and customer needs in real time. KDS reduces the complexity of building, managing and integrating streaming applications with other AWS services. SQL users can easily query streaming data or build entire streaming applications using templates and an interactive SQL editor. Java developers can quickly build sophisticated streaming applications using open source Java libraries and AWS integrations to transform and analyze data in real-time.

For deep dive into Amazon Kinesis Data Analytics, please go through the official docs.

Amazon Kinesis Data Streams

Amazon Kinesis Data Streams (KDS) is a massively scalable and durable real-time data streaming service. KDS can continuously capture gigabytes of data per second from hundreds of thousands of sources such as website clickstreams, database event streams, financial transactions, social media feeds, IT logs, and location-tracking events. The data collected is available in milliseconds to enable real-time analytics use cases such as real-time dashboards, real-time anomaly detection, dynamic pricing, and more.

For more details about Amazon Kinesis Data Streams, please go through the official docs.

Amazon Kinesis Data Firehose

Amazon Kinesis Data Firehose is the easiest way to reliably load streaming data into data lakes, data stores and analytics tools. It can capture, transform, and load streaming data into Amazon S3, Amazon Redshift, Amazon Elasticsearch Service, and Splunk, enabling near real-time analytics with existing business intelligence tools and dashboards you’re already using today. It is a fully managed service that automatically scales to match the throughput of your data and requires no ongoing administration. It can also batch, compress, transform, and encrypt the data before loading it, minimizing the amount of storage used at the destination and increasing security.

For more details about Amazon Kinesis Data Firehose, please go through the official docs.

To Create an Amazon Kinesis Data Stream using Console

  • Open the Kinesis console at https://console.aws.amazon.com/kinesis.
  • In the navigation bar, expand the Region selector and choose a Region.
  • Choose Create data stream.
  • On the Create Kinesis stream page, enter a name for your stream and the number of shards you need, and then click Create Kinesis stream.
    On the Kinesis streams page, your stream’s Status is shown as Creating while the stream is being created. When the stream is ready to use, the Status changes to Active.

Amazon Kinesis Data Generator

The Amazon Kinesis Data Generator (KDG) makes it easy to send data to Kinesis Streams or Kinesis Firehose.

While following this link, choose to Create a Cognito User with Cloud Formation.

  • Choose Create a Cognito User with Cloud Formation.
  • After choosing the above option, console redirects to the Cloud Formation Stack creation page. The console looks like the following.
  • Click on Next, provide the CloudFormation Stack Name and provide username, password details for creating Cognito User for Kinesis Data Generator. 
  • Click on Next and choose Create Stack.
  • After Status of Stack changes to Create complete, click on Outputs tab and open the link under the outputs section.
  • After opening the above link, provide the username and password created in earlier steps.
  • Select Region and Stream/delivery name as created.
    The Record template is 
{
    "sensor_id": {{random.number(50)}},
    "current_temperature": {{random.number(
        {
            "min":0,
            "max":150
        }
    )}},
    "location": "{{random.arrayElement(
        ["AUS","USA","UK"]
    )}}"
}

To Create the Kinesis Data Analytics Application

  • Open the Kinesis Data Analytics console at https://console.aws.amazon.com/kinesisanalytics.
  • Choose Create application.
  • On the Create application page, type an application name, type a description, choose SQL for the application’s Runtime setting, and then choose Create application.

Doing this creates a Kinesis data analytics application with a status of READY. The console shows the application hub where you can configure input and output.

In the next step, you configure input for the application. In the input configuration, you add a streaming data source to the application and discover a schema for an in-application input stream by sampling data on the streaming source.

Configure Streaming Source as Input to Kinesis Data Analytics Application

  • On the Kinesis Analytics applications page in the console, choose Connect streaming data.
  • Source section, where you specify a streaming source for your application. You can select an existing stream source or create one. By default the console names the in-application input stream that is created as INPUT_SQL_STREAM_001. For this exercise, keep this name as it appears.
    Stream reference name – This option shows the name of the in-application input stream that is created, SOURCE_SQL_STREAM_001. You can change the name of the stream.
  • Choose Discover Schema, which automatically discovers the schema of input stream.
  • Choose Save and continue.
    Now, we have an application with input configuration added to it. In the next step, we will add SQL code to perform some analytics on the data in-application input stream.

 Real-Time Analytics on Input Stream Data

  • On the Kinesis Analytics applications page in the console, choose Go to SQL editor.
  • In the Would you like to start running “ApplicationName”? dialog box, choose Yes, start application.
  • The console opens the SQL editor page. Review the page, including the buttons (Add SQL from templates, Save and run SQL) and various tabs.
  • Run Analytics on the input stream data using the following sample query. This Query detects an anomaly in the input stream and sends the anomaly data to anomaly_data_stream and normal data to output_data_stream. Load the following query in SQL editor and choose Save and run SQL.
CREATE OR REPLACE STREAM "anomaly_data_stream" (
	"sensor_id" INTEGER,
	"current_temperature" INTEGER, 
	"location" VARCHAR(16));

CREATE OR REPLACE  PUMP "STREAM_PUMP_ANOMALY" AS INSERT INTO "anomaly_data_stream"
SELECT STREAM "sensor_id",
				"current_temperature",
				"location"
FROM "SOURCE_SQL_STREAM_001" WHERE "current_temperature" > 100;

CREATE OR REPLACE STREAM "output_data_stream" (
	"sensor_id" INTEGER,
	"current_temperature" INTEGER, 
	"location" VARCHAR(16));

CREATE OR REPLACE  PUMP "STREAM_PUMP_OUTPUT" AS INSERT INTO "output_data_stream"
SELECT STREAM "sensor_id",
				"current_temperature",
				"location"
FROM "SOURCE_SQL_STREAM_001" WHERE "current_temperature" < 100;

It creates the in-application stream output_data_stream and anomaly_data_stream.
It creates the pump STREAM_PUMP_OUTPUT and STREAM_PUMP_ANOMALY, and uses it to select rows from SOURCE_SQL_STREAM_001 and insert them in the output_data_stream and anomaly_data_stream. You can see the results in the Real-time analytics tab.

  • The SQL editor has the following tabs:

    The Source data tab shows an in-application input stream data that is mapped to the streaming source. Choose the in-application stream, and you can see data coming in. ROWTIME – Each row in an in-application stream has a special column called ROWTIME. This column is the timestamp when Amazon Kinesis Data Analytics inserted the row in the first in-application stream (the in-application input stream that is mapped to the streaming source).

    The Real-time Analytics tab shows all the other in-application streams created by your application code. It also includes the error stream. Choose DESTINATION_SQL_STREAM to view the rows your application code inserted. 

    The Destination tab shows the external destination where Kinesis Data Analytics writes the query results. We haven’t configured any external destination for our application output yet. 

To create a delivery stream from Kinesis Data Firehose to Amazon S3

  • Open the Kinesis Data Firehose console at https://console.aws.amazon.com/firehose/.
  • Choose Create Delivery Stream. In this case, the name of the stream is anomaly-delivery-stream.
  • On the Destination page, choose the following options.
    • Destination – Choose Amazon S3.
    • Delivery stream name – Type a name for the delivery stream
    • S3 bucket – Choose an existing bucket, or choose New S3 Bucket. If you create a new bucket, type a name for the bucket and choose the region your console is currently using.
    • S3 prefix – Stream stores data in the provided prefix. For anomaly data, the prefix becomes 
      data/anomaly/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/
    • S3 error prefix – errors in delivering stream to s3, stores in error prefix.
  • Choose Next.
  • On the Configuration page, leave the fields at the default settings. The only required step is to select an IAM role that enables Kinesis Data Firehose to access your resources, as follows:
    1. For IAM Role, choose Select an IAM role.
    2. In the drop-down menu, under Create/Update existing IAM role, choose Firehose delivery IAM role, leave the fields at their default settings, and then choose Allow.
  • Choose Next.
  • Review your settings, and then choose Create Delivery Stream.

The anomaly-delivery-stream created successfully. In the same way, create another Firehose stream named output-delivery-stream.

Configuring Application Output to Amazon Kinesis Data Firehose

We can optionally add an output configuration to the application, to persist everything written from an in-application stream to an external destination such as an Amazon Kinesis data stream, a Kinesis Data Firehose delivery stream, or an AWS Lambda function.

In this application, we are connecting the in-application stream to a Kinesis Data Firehose delivery stream.

In the Destination Tab, choose in-application stream as anomaly_data_stream and Firehose stream as anomaly-delivery-stream and select the format as JSON. In this way configure for output_data_stream as well.
You can see the following after configuring:

Data writes into S3 using Kinesis Firehose Delivery Stream. Now we can query the data in Athena by running a Crawler once on that path.

Thanks for the read. Hope it was helpful.

This story is authored by PV Subbareddy. Subbareddy is a Big Data Engineer specializing on Cloud Big Data Services and Apache Spark Ecosystem.

AWS Machine Learning Data Engineering Pipeline for Batch Data

This post walks you through all the steps required to build a data engineering pipeline for batch data using AWS Step Functions. The sequence of steps works like so : the ingested data arrives as a CSV file in a S3 based data lake in the landing zone, which automatically triggers a Lambda function to invoke the Step Function. I have assumed that data is being ingested daily in a .csv file with a filename_date.csv naming convention like so customers_20190821.csv. The step function, as the first step, starts a landing to raw zone file transfer operation via a Lambda Function. Then we have an AWS Glue crawler crawl the raw data into an Athena table, which is used as a source for AWS Glue based PySpark transformation script. The transformed data is written in the refined zone in the parquet format. Again an AWS Glue crawler runs to “reflect” this refined data into another Athena table. Finally, the data science team can consume this refined data available in the Athena table, using an AWS Sagemaker based Jupyter notebook instance. It is to be noted that the data science does not need to do any data pull manually, as the data engineering pipeline automatically pulls in the delta data, as per the data refresh schedule that writes new data in the landing zone.

Let’s go through the steps

How to make daily data available to Amazon SageMaker?

What is Amazon SageMaker?

Amazon SageMaker is an end-to-end machine learning (ML) platform that can be leveraged to build, train, and deploy machine learning models in AWS. Using the Amazon SageMaker Notebook module, improves the efficiency of interacting with the data without the latency of bringing it locally.
For deep dive into Amazon SageMaker, please go through the official docs.

In this blog post, I will be using a dummy customers data. The customers data consists of retailer information and units purchased.

Updating Table Definitions with AWS Glue

The data catalog feature of AWS Glue and the inbuilt integration to Amazon S3 simplifies the process of identifying data and deriving the schema definition out of the source data. Glue crawlers within Data catalog, are used to build out the metadata tables of data stored in Amazon S3.

I created a crawler named raw for the data in raw zone (s3://bucketname/data/raw/customers/). In case you are just starting out on AWS Glue crawler, I have explained how to create one from scratch in one of my earlier article. If you run this crawler, it creates customers table in specified database (raw).

Create an invocation Lambda Function

In case you are just starting out on Lambda functions, I have explained how to create one from scratch with an IAM role to access the StepFunctions, Amazon S3, Lambda and CloudWatch in my earlier article.

Add trigger to the created Lambda function named invoke-step-functions. Configure Bucket, Prefix and  Suffix accordingly.

Once file is arrived at landing zone, it triggers the invoke Lambda function which extracts year, month, day from file name that comes from event. It passes year, month, day with two characters from uuid as input to the AWS StepFunctions.Please replace the following code in invoke-step-function Lambda.

import json
import uuid
import boto3
from datetime import datetime

sfn_client = boto3.client('stepfunctions')

stm_arn = 'arn:aws:states:us-west-2:XXXXXXXXXXXX:stateMachine:Datapipeline-for-SageMaker'

def lambda_handler(event, context):
    
    # Extract bucket name and file path from event
    bucket_name = event['Records'][0]['s3']['bucket']['name']
    path = event['Records'][0]['s3']['object']['key']
    
    file_name_date = path.split('/')[2]
    processing_date_str = file_name_date.split('_')[1].replace('.csv', '')
    processing_date = datetime.strptime(processing_date_str, '%Y%m%d')
    
    # Extract year, month, day from date
    year = processing_date.strftime('%Y')
    month = processing_date.strftime('%m')
    day = processing_date.strftime('%d')
    
    uuid_temp = uuid.uuid4().hex[:2]
    execution_name = '{processing_date_str}-{uuid_temp}'.format(processing_date_str=processing_date_str, uuid_temp=uuid_temp)
    
    # Starts the execution of AWS StepFunctions
    response = sfn_client.start_execution(
          stateMachineArn = stm_arn,
          name= str(execution_name),
          input= json.dumps({"year": year, "month": month, "day": day})
      )
    
    return {"year": year, "month": month, "day": day}

Create a Generic FileTransfer Lambda

Create a Lambda function named generic-file-transfer as we created earlier in this article. In the file transfer Lambda function, it transfers files from landing zone to raw zone and landing zone to archive zone based on event coming from the StepFunction.

  1. If step is landing-to-raw-file-transfer, the Lambda function copies files from landing to raw zone.
  2. If step is landing-to-archive-file-transfer, the Lambda function copies files from landing to archive zone and deletes files from landing zone.

Please replace the following code in generic-file-transfer Lambda.

import json
import boto3

s3 = boto3.resource('s3')

def lambda_handler(event, context):
    
    # Extract Parameters from Event (invoked by StepFunctions)
    step = event['step']
    year = event['year']
    month = event['month']
    day = event['day']
    
    bucket_name = event['bucket_name']
    source_prefix = event['source_prefix']
    destination_prefix = event['destination_prefix']
    
    bucket = s3.Bucket(bucket_name)
    
    for objects in bucket.objects.filter(Prefix = source_prefix):
        file_path = objects.key
        
        if ('.csv' in file_path) and (step == 'landing-to-raw-file-transfer'):
            
            # Extract filename from file_path
            file_name_date = file_path.split('/')[2]
            file_name = file_name_date.split('_')[0]
            
            # Add filename to the destination prefix
            destination_prefix = '{destination_prefix}{file_name}/year={year}/month={month}/day={day}/'.format(destination_prefix=destination_prefix, file_name=file_name, year=year, month=month, day=day)
            print(destination_prefix)
            
            source_object = {'Bucket': bucket_name, "Key": file_path}
            
            # Replace source prefix with destination prefix
            new_path = file_path.replace(source_prefix, destination_prefix)
            
            # Copies file
            new_object = bucket.Object(new_path)
            new_object.copy(source_object)
         
        if ('.csv' in file_path) and (step == 'landing-to-archive-file-transfer'):
            
            # Add filename to the destination prefix
            destination_prefix = '{destination_prefix}{year}-{month}-{day}/'.format(destination_prefix=destination_prefix, year=year, month=month, day=day)
            print(destination_prefix)
            
            source_object = {'Bucket': bucket_name, "Key": file_path}
            
            # Replace source prefix with destination prefix
            new_path = file_path.replace(source_prefix, destination_prefix)
            
            # Copies file
            new_object = bucket.Object(new_path)
            new_object.copy(source_object)
            
            # Deletes copied file
            bucket.objects.filter(Prefix = file_path).delete()
            
    return {"year": year, "month": month, "day": day}

Generic FileTransfer Lambda function setup is now complete. We need to check all files are copied successfully from one zone to another zone. If you have large files that needs to be copied, you could check out our Lightening fast distributed file transfer architecture.

Create Generic FileTransfer Status Check Lambda Function

Create a Lambda function named generic-file-transfer-status. If the step is landing to raw file transfer, the Lambda function checks if all files are copied from landing to raw zone by comparing the number of objects in landing and raw zones. If count doesn’t match it will raise an exception, and that exception is handled in AWS StepFunctions and retries after some backoff rate. If the count matches, all files are copied successfully. If the step is landing to archive file transfer, the Lambda function checks that any files are left in landing zone. Please replace the following code in generic-file-transfer-status Lambda function.

import json
import boto3

s3 = boto3.resource('s3')

def lambda_handler(event, context):
    
    # Extract Parameters from Event (invoked by StepFunctions)
    step = event['step']
    year = event['year']
    month = event['month']
    day = event['day']
    
    bucket_name = event['bucket_name']
    source_prefix = event['source_prefix']
    destination_prefix = event['destination_prefix']
    
    bucket = s3.Bucket(bucket_name)
    
    class LandingToRawFileTransferIncompleteException(Exception):
        pass

    class LandingToArchiveFileTransferIncompleteException(Exception):
        pass
    
    if (step == 'landing-to-raw-file-transfer'):
        if file_transfer_status(bucket, source_prefix, destination_prefix):
            print('File Transfer from Landing to Raw Completed Successfully')
        else:
            raise LandingToRawFileTransferIncompleteException('File Transfer from Landing to Raw not completed')
    
    if (step == 'landing-to-archive-file-transfer'):
        if is_empty(bucket, source_prefix):
            print('File Transfer from Landing to Archive Completed Successfully')
        else:
            raise LandingToArchiveFileTransferIncompleteException('File Transfer from Landing to Archive not completed.')
    
    return {"year": year, "month": month, "day": day}

def file_transfer_status(bucket, source_prefix, destination_prefix):
    
    try:
        
        # Checks number of objects at the source prefix (count of objects at source i.e., landing zone)
        source_object_count = 0
        for obj in bucket.objects.filter(Prefix = source_prefix):
            path = obj.key
            if (".csv" in path):
                source_object_count = source_object_count + 1
        print(source_object_count)
        
        # Checks number of objects at the destination prefix (count of objects at destination i.e., raw zone)
        destination_object_count = 0
        for obj in bucket.objects.filter(Prefix = destination_prefix):
            path = obj.key
            
            if (".csv" in path):
                destination_object_count = destination_object_count + 1
        
        print(destination_object_count)
        return (source_object_count == destination_object_count)

    except Exception as e:
        print(e)
        raise e

def is_empty(bucket, prefix):
    
    try:
        # Checks if any files left in the prefix (i.e., files in landing zone)
        object_count = 0
        for obj in bucket.objects.filter(Prefix = prefix):
            path = obj.key

            if ('.csv' in path):
                object_count = object_count + 1
                    
        print(object_count)
        return (object_count == 0)
        
    except Exception as e:
        print(e)
        raise e

Create a Generic Crawler invocation Lamda

Create a Lambda function named generic-crawler-invoke. The Lambda function invokes a crawler. The crawler name is passed as argument from AWS StepFunctions through event object. Please replace the following code in generic-crawler-invoke Lambda function.

import json
import boto3

glue_client = boto3.client('glue')

def lambda_handler(event, context):
    
    # Extract Parameters from Event (invoked by StepFunctions)
    year = event['year']
    month = event['month']
    day = event['day']
    
    crawler_name = event['crawler_name']
    
    try:
        response = glue_client.start_crawler(Name = crawler_name)
    except Exception as e:
        print('Crawler in progress', e)
        raise e
    
    return {"year": year, "month": month, "day": day}

Create a Generic Crawler Status Lambda

Create a Lambda function named generic-crawler-status. The Lambda function checks whether the crawler ran successfully or not. If crawler is in running state, the Lambda function raises an exception and the exception will be handled in the Step Function and retries after a certain backoff rate. Please replace the following code in generic-crawler-status Lambda.

import json
import boto3

glue_client = boto3.client('glue')

def lambda_handler(event, context):
    
    class CrawlerInProgressException(Exception):
        pass
    
    # Extract Parametres from Event (invoked by StepFunctions)
    year = event['year']
    month = event['month']
    day = event['day']
    
    crawler_name = event['crawler_name']
    
    response = glue_client.get_crawler_metrics(CrawlerNameList =[crawler_name])
    print(response['CrawlerMetricsList'][0]['CrawlerName']) 
    print(response['CrawlerMetricsList'][0]['TimeLeftSeconds']) 
    print(response['CrawlerMetricsList'][0]['StillEstimating']) 
    
    if (response['CrawlerMetricsList'][0]['StillEstimating']):
        raise CrawlerInProgressException('Crawler In Progress!')
    elif (response['CrawlerMetricsList'][0]['TimeLeftSeconds'] > 0):
        raise CrawlerInProgressException('Crawler In Progress!')
    
    return {"year": year, "month": month, "day": day}

Create an AWS Glue Job

AWS Glue is a fully managed ETL (extract, transform, and load) service that makes it simple and cost-effective to categorize your data, clean it, enrich it, and move it reliably between various data stores. For deep dive into AWS Glue, please go through the official docs.

Create an AWS Glue Job named raw-refined. In case you are just starting out on AWS Glue Jobs, I have explained how to create one from scratch in my earlier article. This Glue job converts file format from csv to parquet and stores in refined zone. The push down predicate is used as filter condition for reading data of only the processing date using the partitions.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
# args = getResolvedOptions(sys.argv, ['JOB_NAME'])

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'year', 'month', 'day'])

year = args['year']
month = args['month']
day = args['day']

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "raw", table_name = "customers", push_down_predicate ="((year == " + year + ") and (month == " + month + ") and (day == " + day + "))", transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("email_id", "string", "email_id", "string"), ("retailer_name", "string", "retailer_name", "string"), ("units_purchased", "long", "units_purchased", "long"), ("purchase_date", "string", "purchase_date", "string"), ("sale_id", "string", "sale_id", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string")], transformation_ctx = "applymapping1")

resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")

dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://bucketname/data/refined/customers/", "partitionKeys": ["year","month","day"]}, format = "parquet", transformation_ctx = "datasink4")

job.commit()

Create a Refined Crawler as we created Raw Crawler earlier in this article. Please point the crawler path to refined zone(s3://bucketname/data/refined/customers/) and database as refined. No need to create a Lambda function for refined crawler invocation and status, as we will pass crawler names from the StepFunction.

Resources required to create an the StepFunction have been created.

Creating the AWS StepFunction

StepFunction is where we create and orchestrate steps to process data according to our workflow. Create an AWS StepFunctions named Datapipeline-for-SageMaker.  In case you are just starting out on AWS StepFunctions, I have explained how to create one from scratch here.

Data is being ingested into landing zone. It triggers a Lambda function which in turn invokes the execution of the StepFunction. The steps in the StepFunction are as follows:

  1. Transfers files from landing zone to raw zone.
  2. Checks all files are copied to raw zone successfully or not.
  3. Invokes raw Crawler which crawls data in raw zone and updates/creates definition of table in the specified database.
  4. Checks if the Crawler is completed successfully or not.
  5. Invokes Glue Job and waits for it to complete.
  6. Invokes refined Crawler which crawls data from refined zone in and updates/creates definition of table in the specified database.
  7. Checks if the Crawler is completed successfully or not.
  8. Transfers files from landing zone to archive zone and deletes files from landing zone.
  9. Checks all files are copied and deleted from landing zone successfully.

Please update the StepFunctions definition with the following code.

{
  "Comment": "Datapipeline For MachineLearning in AWS Sagemaker",
  "StartAt": "LandingToRawFileTransfer",
  "States": {
    "LandingToRawFileTransfer": {
      "Comment": "Transfers files from landing zone to Raw zone.",
      "Type": "Task",
      "Parameters": {
        "step": "landing-to-raw-file-transfer",
        "bucket_name": "bucketname",
        "source_prefix": "data/landing/",
        "destination_prefix": "data/raw/",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-file-transfer",
      "TimeoutSeconds": 4500,
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "LandingToRawFileTransferFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "LandingToRawFileTransferFailed"
        }
      ],
      "Next": "LandingToRawFileTransferPassed"
    },
    "LandingToRawFileTransferFailed": {
      "Type": "Fail",
      "Cause": "Landing To Raw File Transfer failed"
    },
    "LandingToRawFileTransferPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "LandingToRawFileTransferStatus"
    },
    "LandingToRawFileTransferStatus": {
      "Comment": "Checks whether all files are copied from landing to raw zone successfully.",
      "Type": "Task",
      "Parameters": {
        "step": "landing-to-raw-file-transfer",
        "bucket_name": "bucketname",
        "source_prefix": "data/landing/",
        "destination_prefix": "data/raw/",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-file-transfer-status",
      "Retry": [
        {
          "ErrorEquals": [
            "LandingToRawFileTransferInCompleteException"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        },
        {
          "ErrorEquals": [
            "States.All"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "LandingToRawFileTransferStatusFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "LandingToRawFileTransferStatusFailed"
        }
      ],
      "Next": "LandingToRawFileTransferStatusPassed"
    },
    "LandingToRawFileTransferStatusFailed": {
      "Type": "Fail",
      "Cause": "Landing To Raw File Transfer failed"
    },
    "LandingToRawFileTransferStatusPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "StartRawCrawler"
    },
    "StartRawCrawler": {
      "Comment": "Crawls data from raw zone and adds table definition to the specified Database. IF table definition exists updates the definition.",
      "Type": "Task",
      "Parameters": {
        "crawler_name": "raw",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-crawler-invoke",
      "TimeoutSeconds": 4500,
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "StartRawCrawlerFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "StartRawCrawlerFailed"
        }
      ],
      "Next": "StartRawCrawlerPassed"
    },
    "StartRawCrawlerFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "StartRawCrawlerPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "RawCrawlerStatus"
    },
    "RawCrawlerStatus": {
      "Comment": "Checks whether crawler is successfully completed.",
      "Type": "Task",
      "Parameters": {
        "crawler_name": "raw",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-crawler-status",
      "Retry": [
        {
          "ErrorEquals": [
            "CrawlerInProgressException"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        },
        {
          "ErrorEquals": [
            "States.All"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "RawCrawlerStatusFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "RawCrawlerStatusFailed"
        }
      ],
      "Next": "RawCrawlerStatusPassed"
    },
    "RawCrawlerStatusFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "RawCrawlerStatusPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "GlueJob"
    },
    "GlueJob": {
      "Comment": "Invokes Glue job and waits for Glue job to complete.",
      "Type": "Task",
      "Resource": "arn:aws:states:::glue:startJobRun.sync",
      "Parameters": {
        "JobName": "retail-raw-refined",
        "Arguments": {
          "--refined_prefix": "data/refined",
          "--year.$": "$.year",
          "--month.$": "$.month",
          "--day.$": "$.day"
        }
      },
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "GlueJobFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "GlueJobFailed"
        }
      ],
      "Next": "GlueJobPassed"
    },
    "GlueJobFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "GlueJobPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.Arguments.--year",
        "month.$": "$.Arguments.--month",
        "day.$": "$.Arguments.--day"
      },
      "Next": "StartRefinedCrawler"
    },
    "StartRefinedCrawler": {
      "Comment": "Crawls data from refined zone and adds table definition to the specified Database.",
      "Type": "Task",
      "Parameters": {
        "crawler_name": "refined",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-crawler-invoke",
      "TimeoutSeconds": 4500,
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "StartRefinedCrawlerFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "StartRefinedCrawlerFailed"
        }
      ],
      "Next": "StartRefinedCrawlerPassed"
    },
    "StartRefinedCrawlerFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "StartRefinedCrawlerPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "RefinedCrawlerStatus"
    },
    "RefinedCrawlerStatus": {
      "Comment": "Checks whether crawler is successfully completed.",
      "Type": "Task",
      "Parameters": {
        "crawler_name": "refined",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-crawler-status",
      "Retry": [
        {
          "ErrorEquals": [
            "CrawlerInProgressException"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        },
        {
          "ErrorEquals": [
            "States.All"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "RefinedCrawlerStatusFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "RefinedCrawlerStatusFailed"
        }
      ],
      "Next": "RefinedCrawlerStatusPassed"
    },
    "RefinedCrawlerStatusFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "RefinedCrawlerStatusPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "LandingToArchiveFileTransfer"
    },
    "LandingToArchiveFileTransfer": {
      "Comment": "Transfers files from landing zone to archived zone",
      "Type": "Task",
      "Parameters": {
        "step": "landing-to-archive-file-transfer",
        "bucket_name": "bucketname",
        "source_prefix": "data/landing/",
        "destination_prefix": "data/raw/",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-file-transfer",
      "TimeoutSeconds": 4500,
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "LandingToArchiveFileTransferFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "LandingToArchiveFileTransferFailed"
        }
      ],
      "Next": "LandingToArchiveFileTransferPassed"
    },
    "LandingToArchiveFileTransferFailed": {
      "Type": "Fail",
      "Cause": "Crawler invocation failed"
    },
    "LandingToArchiveFileTransferPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Next": "LandingToArchiveFileTransferStatus"
    },
    "LandingToArchiveFileTransferStatus": {
      "Comment": "Checks whether all files are copied from landing to archived successfully.",
      "Type": "Task",
      "Parameters": {
        "step": "landing-to-archive-file-transfer",
        "bucket_name": "bucketname",
        "source_prefix": "data/landing/",
        "destination_prefix": "data/raw/",
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:generic-file-transfer-status",
      "Retry": [
        {
          "ErrorEquals": [
            "LandingToArchiveFileTransferInCompleteException"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        },
        {
          "ErrorEquals": [
            "States.All"
          ],
          "IntervalSeconds": 30,
          "BackoffRate": 2,
          "MaxAttempts": 5
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "LandingToArchiveFileTransferStatusFailed"
        },
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "LandingToArchiveFileTransferStatusFailed"
        }
      ],
      "Next": "LandingToArchiveFileTransferStatusPassed"
    },
    "LandingToArchiveFileTransferStatusFailed": {
      "Type": "Fail",
      "Cause": "LandingToArchiveFileTransfer invocation failed"
    },
    "LandingToArchiveFileTransferStatusPassed": {
      "Type": "Pass",
      "ResultPath": "$",
      "Parameters": {
        "year.$": "$.year",
        "month.$": "$.month",
        "day.$": "$.day"
      },
      "End": true
    }
  }
}

After updating the AWS StepFunctions definition, the visual workflow looks like the following.

Now upload file in data/landing/ zone in the bucket  where the trigger has been configured with the Lambda. The execution of StepFunction has started and the visual workflow looks like the following.

In RawCrawlerStatus step, if the Lambda is failing we retry till sometime and then mark the StepFunction as failed. If the StepFunction ran successfully. The visual workflow of the StepFunction looks like following.

Machine Learning workflow using Amazon SageMaker

The final step in this data pipeline is to make the processed data available in a Jupyter notebook instance of the Amazon SageMaker. Jupyter notebooks are popularly used among data scientists to do exploratory data analysis, build and train machine learning models.

Create Notebook Instance in Amazon SageMaker

Step1: In the Amazon SageMaker console choose Create notebook instance.

Step2: In the Notebook Instance settings populate the Notebook instance name, choose an instance type depends on data size, and a role for the notebook instances in Amazon SageMaker to interact with Amazon S3. The SageMaker execution role needs to have the required permission to Athena, the S3 buckets where the data resides, and KMS if encrypted.

Step3: Wait for the Notebook instances to be created and the Status to change to InService.

Step4: Choose the Open Jupyter, which will open the notebook interface in a new browser tab.

Click new to create a new notebook in Jupyter. Amazon SageMaker provides several kernels for Jupyter including support for Python 2 and 3, MXNet, TensorFlow, and PySpark. Choose Python as the kernel for this exercise as it comes with the Pandas library built in.

Step5: Within the notebook, execute the following commands to install the Athena JDBC driver. PyAthena is a Python DB API 2.0 (PEP 249) compliant client for the Amazon Athena JDBC driver.

import sys
!{sys.executable} -m pip install PyAthena

Step6: After the Athena driver is installed, you can use the JDBC connection to connect to Athena and populate the Pandas data frames. For data scientists, working with data is typically divided into multiple stages: munging and cleaning data, analyzing/ modeling it, then organizing the results of the analysis into a form suitable for plotting or tabular display. Pandas is the ideal tool for all of these tasks.

from pyathena import connect
import pandas as pd
conn = connect(s3_staging_dir='<ATHENA QUERY RESULTS LOCATION>',
               region_name='REGION, for example, us-east-1')

df = pd.read_sql("SELECT * FROM <DATABASE>.<TABLENAME> limit 10;", conn)
df

As shown above, the dataframe always stays consistent with the latest incoming data because of the data engineering pipeline setup earlier in the ML workflow. This dataframe can be used for downstream ad-hoc model building purposes or for exploratory data analysis.

That’s it folks. Thanks for the read.

This story is authored by PV Subbareddy. Subbareddy is a Big Data Engineer specializing on Cloud Big Data Services and Apache Spark Ecosystem.

Advanced Analytics – Presto Functions and Operators Quick Review

This post is a lot different from our earlier entries. Think of it as a reference flag post for people interested in a quick lookup for advanced analytics functions and operators used in modern data lake operations based on Presto. So you could, of course, use it in Presto installations, but also in some other commercial products such as AWS Athena that is used widely these days to facilitate analytic operations on Enterprise Data Lakes built on top of Amazon S3.

Without further ado, let’s dive straight into the nuts and bolts of these queries for advanced analytics:

JSON Functions

is_json_scalar(json) → boolean

  • Determines if json is a scalar (i.e. a JSON number, a JSON string, true, false or null).
  • Example:
QueryOutput
SELECT is_json_scalar(‘1’)true
SELECT is_json_scalar(‘[1, 2, 3]’)false

json_array_contains(json, value) → boolean

  • Determines if value exists in json (a string containing a JSON array)
  • Example:
QueryOutput
SELECT json_array_contains(‘[1, 2, 3]’, 2)true

json_array_get(json_array, index) → json

  • Example:
QueryOutput
SELECT json_array_get(‘[“a”, [3, 9], “c”]’, 0)a
SELECT json_array_get(‘[“a”, [3, 9], “c”]’, 10)null
SELECT json_array_get(‘[“c”, [3, 9], “a”]’, -2)JSON ‘[3,9]’

json_array_length(json) → bigint

  • Returns the array length of json (a string containing a JSON array)
  • Example:
QueryOutput
SELECT json_array_length(‘[1, 2, 3]’)3

json_extract(json, json_path) → json

  • Evaluates the JSONPath-like expression json_path on json (a string containing JSON) and returns the result as a JSON string
  • Example:
QueryOutput
SELECT json_extract(json_parse(‘{“email”: {“abcd”: “xyz@yahoo.com”}, “phone_numbers”: [5678908, 587578575, 668798]}’), ‘$.email.abc’)“xyz@yahoo.com”

json_extract_scalar(json, json_path) → varchar

  • Just like json_extract(), but returns the result value as a string (as opposed to being encoded as JSON). The value referenced by json_path must be a scalar (boolean, number or string)
  • Example:
QueryOutput
SELECT json_extract_scalar(json_parse(‘{“email”: {“abcd”: “xyz@yahoo.com”}, “phone_numbers”: [5678908, 587578575, 9999999]}’), ‘$.phone_numbers[0]’)9999999
SELECT json_extract_scalar(json_parse(‘{“email”: {“abcd”: “xyz@yahoo.com”}, “phone_numbers”: [{“home”: 5678908}, {“mob”: 587578575}, {“cell”: 9999999}]}’), ‘$.phone_numbers[1].mob’)587578575
SELECT json_extract_scalar(json_parse(‘{“email”: “xyz@yahoo.com”, “phone_numbers”: [{“home”: 5678908}, {“mob”: 587578575}, {“cell”: 9999999}]}’), ‘$[email]’)xyz@yahoo.com

json_parse(string) → json

  • Returns the JSON value deserialized from the input JSON text. This is an inverse function to json_format()
  • Example:
QueryOutput
SELECT json_parse(‘{“email”: {“abcd”: “xyz@yahoo.com”}, “phone_numbers”: [{“home”: 5678908}, {“mob”: 587578575}, {“cell”: 9999999}]}’){“email”: {“abcd”: “xyz@yahoo.com”}, “phone_numbers”: [{“home”: 5678908}, {“mob”: 587578575}, {“cell”: 9999999}]}

json_size(json, json_path) → bigint

  • Just like json_extract(), but returns the size of the value. For objects or arrays, the size is the number of members, and the size of a scalar value is zero.
  • Example:
QueryOutput
SELECT json_size(json_parse(‘{“email”: “xyz@yahoo.com”, “phone_numbers”: [{“home”: 5678908}, {“mob”: 587578575}, {“cell”: 9999999}]}’), ‘$.phone_numbers’)3
SELECT json_size(‘{“x”: {“a”: 1, “b”: 2}}’, ‘$.x.a’)0

Date and Time Functions and Operators

current_date → date

  • Returns the current date as of the start of the query.

current_time → time with time zone

  • Returns the current time as of the start of the query. (with UTC)

current_timestamp → timestamp with time zone

  • Returns the current timestamp as of the start of the query.

current_timezone() → varchar

  • Returns the current time zone in the format defined by IANA (e.g., America/Los_Angeles) or as fixed offset from UTC (Example: +08:35)

date(x) → date

  • This is an alias for CAST(x AS date).
  • Example:
QueryOutput
SELECT DATE(‘2019-08-07’)SELECT CAST(‘2019-08-07’ AS DATE);

now() → timestamp with time zone

  • This is an alias for current_timestamp.
  • SELECT now();

date_trunc(unit, x) → [same as input]

  • Returns x truncated to unit.
  • Example:
QueryOutput
SELECT date_trunc(‘second’, current_timestamp)2019-08-16 06:51:29.000 UTC (returns value upto unit)
SELECT date_trunc(‘minute’, current_timestamp)2019-08-16 06:54:00.000 UTC

Interval Functions

UnitDescription
millisecondMilliseconds
secondSeconds
minuteMinutes
hourHours
dayDays
weekWeeks
monthMonths
quarterQuarters of a year
yearYear

date_add(unit, value, timestamp) → [same as input]

  • Adds an interval value of type unit to timestamp. Subtraction can be performed by using a negative value.
  • Example:
QueryOutput
SELECT date_add(‘month’, 1, current_timestamp)2019-09-16 06:59:55.425 UTC
SELECT date_add(‘day’, 1, current_timestamp)2019-08-17 07:01:26.834 UTC

date_diff(unit, timestamp1, timestamp2) → bigint

  • Returns timestamp2 – timestamp1 expressed in terms of unit.
  • Example:
QueryOutput
SELECT date_diff(‘day’, current_timestamp, date_add(‘day’, 10, current_timestamp))10

parse_duration(string) → interval

  • Parses string of format value unit into an interval, where value is fractional number of unit values
  • Example:
QueryOutput
SELECT parse_duration(‘42.8ms’)0 00:00:00.043
SELECT parse_duration(‘3.81 d’)3 19:26:24.000
SELECT parse_duration(‘5m’)0 00:05:00.000

date_format(timestamp, format) → varchar

  • Formats timestamp as a string using format (converts timestamp to string)
  • Example:
QueryOutput
SELECT date_format(current_timestamp, ‘%Y-%m-%d’)2019-08-16

date_parse(string, format) → timestamp

  • Parses string into a timestamp using format. (converts string to timestamp)
  • Example:
QueryOutput
SELECT date_parse(‘2019-08-16’, ‘%Y-%m-%d’)2019-08-16 00:00:00.000

day(x) → bigint

  • Returns the day of the month from x.
  • Example:
QueryOutput
SELECT day(date_parse(‘2019-08-16’, ‘%Y-%m-%d’))16

day_of_month(x) → bigint

  • This is an alias for day().

day_of_week(x) → bigint

  • Returns the ISO day of the week from x. The value ranges from 1 (Monday) to 7 (Sunday).
  • Example:
QueryOutput
SELECT day_of_week(date_parse(‘2019-08-16’, ‘%Y-%m-%d’))5

year(x) → bigint

  • Returns the year from x.

Aggregate Functions

array_agg(x) → array<[same as input]>

  • Returns an array created from the input x elements
  • The array_agg() function is an aggregate function that accepts a set of values and returns an array in which each value in the input set is assigned to an element of the array.
  • Syntax:
    array_agg(expression [ORDER BY [sort_expression {ASC | DESC}], […])
  • Example:
Query
SELECT title, array_agg (first_name || ‘ ‘ || last_name) actors FROM film
SELECT title, array_agg (first_name || ‘ ‘ || last_name ORDER BY first_name) actors FROM film

avg(x) → double

  • Returns the average (arithmetic mean) of all input values

bool_and(boolean) → boolean

  • Returns TRUE if every input value is TRUE, otherwise FALSE.

bool_or(boolean) → boolean

  • Returns TRUE if any input value is TRUE, otherwise FALSE.

count(*) → bigint

  • Returns the number of input rows.

count(x) → bigint

  • Returns the number of non-null input values.

count_if(x) → bigint

  • Returns the number of TRUE input values. This function is equivalent to count(CASE WHEN x THEN 1 END).

arbitrary(x) → [same as input]

  • Returns an arbitrary non-null value of x, if one exists.
  • Arbitrary chooses one value out of a set of values. arbitrary is useful for silencing warnings about values neither grouped by or aggregated over.

max_by(x, y) → [same as x]

  • Returns the value of x associated with the maximum value of y over all input values.
  • The max_by takes two arguments and returns the value of the first argument for which the value of the second argument is maximized.
  • If multiple rows maximize the result of the second value, and arbitrary first value is chosen from among them. max_by can be used with both numeric and non-numeric data.
  • Example:
QueryOutput
SELECT max_by(close_date, close_value) as date_of_max_sale FROM sales_pipelinequery returns the date where close_value is maximum

max_by(x, y, n) → array<[same as x]>

  • Returns n values of x associated with the n largest of all input values of y in descending order of y.

min_by(x, y) → [same as x]

  • Returns the value of x associated with the minimum value of y over all input values.

min_by(x, y, n) → array<[same as x]>

  • Returns n values of x associated with the n smallest of all input values of y in ascending order of y.

max(x, n) → array<[same as x]>

  • Returns n largest values of all input values of x.

min(x) → [same as input]

  • Returns the minimum value of all input values.

min(x, n) → array<[same as x]>

  • Returns n smallest values of all input values of x.

reduce_agg(inputValue T, initialState S, inputFunction(S, T, S), combineFunction(S, S, S)) → S

  • Reduces all input values into a single value. inputFunction will be invoked for each input value. In addition to taking the input value, inputFunction takes the current state, initially initialState, and returns the new state. combineFunction will be invoked to combine two states into a new state. The final state is returned
  • Example:
QueryOutput
SELECT id, reduce_agg(value, 0, (a, b) -> a + b, (a, b) -> a + b) FROM (VALUES (1, 2) (1, 3), (1, 4), (2, 20), (2, 30), (2, 40) ) AS t(id, value) GROUP BY id(1, 9), (2, 90)

Array Functions and Operators

Subscript Operator: [ ]

  • The [ ] operator is used to access an element of an array and is indexed starting from one:
  • If a column has values like [1, 3], [45, 46]
  • Example:
QueryOutput
SELECT column[1]1, 45

Concatenation Operator: ||

  • The || operator is used to concatenate an array with an array or an element of the same type.
  • Example:
QueryOutput
SELECT ARRAY [1] || ARRAY [2][1, 2]
SELECT ARRAY [1] || 2[1, 2]
SELECT 2 || ARRAY [1][2, 1]

array_distinct(x) → array

  • Remove duplicate values from the array x.
  • Example:
QueryOutput
SELECT array_distinct(ARRAY[1, 1, 1, 2])[1, 2]

array_intersect(x, y) → array

  • Returns an array of the elements in the intersection of x and y, without duplicates.
  • Common elements in two arrays without duplicates.
  • Example:
QueryOutput
SELECT array_intersect(ARRAY[1, 1, 1, 2], ARRAY[10, 15, 1, 100])[1]

array_union(x, y) → array

  • Returns an array of the elements in the union of x and y, without duplicates.
  • Example:
QueryOutput
SELECT array_union(ARRAY[1, 1, 1, 2], ARRAY[10, 15, 1, 100])[1, 2, 10, 15, 100]

array_except(x, y) → array

  • Returns an array of elements in x but not in y, without duplicates.
  • Example:
QueryOutput
SELECT array_except(ARRAY[1, 1, 1, 2], ARRAY[10, 15, 1, 100])[2]

array_join(x, delimiter, null_replacement) → varchar

  • Concatenates the elements of the given array using the delimiter and an optional string to replace nulls.
  • Example:
QueryOutput
SELECT array_join(ARRAY[1, 71, 81, 92, null], ‘/’, ‘abcd’)1/71/81/92/abcd

array_max(x) → x

  • Returns the maximum value of input array.
  • Example:
QueryOutput
SELECT array_max(ARRAY[1, 71, 81, 92, 100])100
SELECT array_max(ARRAY[1, 71, 81, 92, null])null

array_min(x) → x

  • Returns the minimum value of input array.
  • Example:
QueryOutput
SELECT array_min(ARRAY[1, 71, 81, 92, 100])1

array_position(x, element) → bigint

  • Returns the position of the first occurrence of the element in array x (or 0 if not found).
  • Example:
QueryOutput
SELECT array_position(ARRAY[1, 71, 81, 92, 100], 100)5

array_remove(x, element) → array

  • Remove all elements that equal element from array x.
  • Example:
QueryOutput
SELECT array_remove(ARRAY[1, 71, 81, 92, 100, 1], 1)[71, 81, 92, 100]

array_sort(x) → array

  • Sorts and returns the array x. The elements of x must be orderable. Null elements will be placed at the end of the returned array.
  • Example:
QueryOutput
SELECT array_sort(ARRAY[1, null, 8, 9, 71, 81, 92, 100, 12, 51, 10, 7, 1, null])[1, 1, 7, 8, 9, 10, 12, 51, 71, 81, 92, 100, null, null]

array_sort(array(T), function(T, T, int)) → array(T)

  • Sorts and returns the array based on the given comparator function. The comparator will take two nullable arguments representing two nullable elements of the array. It returns -1, 0, or 1 as the first nullable element is less than, equal to, or greater than the second nullable element. 
  • If the comparator function returns other values (including NULL), the query will fail and raise an error.
  • Example:
QueryOutput
SELECT array_sort(ARRAY [3, 2, 5, 1, 2], (x, y) -> IF(x < y, 1, IF(x = y, 0, -1)))[5, 3, 2, 2, 1] 

cardinality(x) → bigint

  • Returns the cardinality (size) of the array x.
  • Example:
QueryOutput
SELECT cardinality(ARRAY[1, 81, 92, 100, 12, 51, 10])7

arrays_overlap(x, y) → boolean

  • Tests if arrays x and y have any any non-null elements in common. Returns null if there are no non-null elements in common but either array contains null.
  • Example:
QueryOutput
SELECT arrays_overlap(ARRAY[1, 81, 92, 100, 12, 51, 10], ARRAY[101])false
SELECT arrays_overlap(ARRAY[1, 81, 92, 100, 12, 51, 10], ARRAY[101, 51])true

concat(array1, array2, …, arrayN) → array

  • Concatenates the arrays array1, array2, …, arrayN. This function provides the same functionality as the SQL-standard concatenation operator (||).

contains(x, element) → boolean

  • Returns true if the array x contains the element.
  • Example:
QueryOutput
SELECT contains(ARRAY[1, 81, 92, 100, 12, 51, 10], 1)true
SELECT contains(ARRAY[‘abcd’, ‘test’, ‘xyz’], ‘xyz’)true

element_at(array(E), index) → E

  • SQL array indices start at 1
  • Returns element of array at given index. If index > 0, this function provides the same functionality as the SQL-standard subscript operator ([]). If index < 0, element_at accesses elements from the last to the first.
  • Example:
QueryOutput
SELECT contains(ARRAY[1, 81, 92, 100, 12, 51, 10], 2)81

filter(array(T), function(T, boolean)) → array(T)

  • Constructs an array from those elements of array for which function returns true.
  • Example:
QueryOutput
SELECT filter(ARRAY [5, -6, NULL, 7],  x -> x > 0)[5, 7]

flatten(x) → array

  • Flattens an array(array(T)) to an array(T) by concatenating the contained arrays.

reduce(array(T), initialState S, inputFunction(S, T, S), outputFunction(S, R)) → R

  • Returns a single value reduced from array. inputFunction will be invoked for each element in array in order. 
  • In addition to taking the element, inputFunction takes the current state, initially initialState, and returns the new state. outputFunction will be invoked to turn the final state into the result value.
  • It may be the identity function (i -> i).
  • Example:
QueryOutput
SELECT reduce(ARRAY [5, 20, NULL, 50], 0, (s, x) -> s + COALESCE(x, 0), s -> s)75
SELECT reduce(ARRAY [5, 20, NULL, 50], 1, (s, x) -> s * COALESCE(x, 1), s -> s)5000

repeat(element, count) → array

  • Repeat element for count times

reverse(x) → array

  • Returns an array which has the reversed order of array x.
  • Example:
QueryOutput
SELECT reverse(ARRAY[2, 5])[5, 2]

sequence(start, stop) → array(bigint)

  • Generate a sequence of integers from start to stop, increments by 1 if start is less than or equal to stop, otherwise decrements by 1.
  • Example:
QueryOutput
SELECT sequence(2, 7)[2, 3, 4, 5, 6, 7]
SELECT sequence(7, 1)[7, 6, 5, 4, 3, 2, 1]

sequence(start, stop, step) → array(bigint)

  • Generate a sequence of integers from start to stop, incrementing by step.

sequence(start, stop) → array(date)

  • Generate a sequence of dates from start date to stop date, incrementing by 1 day if start date is less than or equal to stop date, otherwise -1 day.

sequence(start, stop, step) → array(date)

  • Generate a sequence of dates from start to stop, incrementing by step. The type of step can be either INTERVAL DAY TO SECOND or INTERVAL YEAR TO MONTH.

shuffle(x) → array

  • Generate a random permutation of the given array x.
  • Example:
QueryOutput
SELECT shuffle(ARRAY[1, 7, 2])[7, 1, 2]

slice(x, start, length) → array

  • Subsets array x starting from index start (or starting from the end if start is negative) with a length of length.
  • Example:
QueryOutput
SELECT slice(ARRAY[1, 7, 2, 87, 12, 09], 2, 4)[7, 2, 87, 12]

transform(array(T), function(T, U)) → array(U)

  • Returns an array that is the result of applying function to each element of array
  • Example:
QueryOutput
SELECT transform(ARRAY [5, NULL, 6], x -> COALESCE(x, 0) + 1)[6, 1, 7]
SELECT transform(ARRAY [‘x’, ‘abc’, ‘z’], x -> x || ‘0’)[‘x0’, ‘abc0’, ‘z0’]

zip(array1, array2[, …]) → array(row)

  • Merges the given arrays, element-wise, into a single array of rows. The M-th element of the N-th argument will be the N-th field of the M-th output element. If the arguments have an uneven length, missing values are filled with NULL.
  • Example:
QueryOutput
SELECT zip(ARRAY[1, 2], ARRAY[‘1b’, null, ‘3b’])[ROW(1, ‘1b’), ROW(2, null), ROW(null, ‘3b’)]

zip_with(array(T), array(U), function(T, U, R)) → array(R)

  • Merges the two given arrays, element-wise, into a single array using function. If one array is shorter, nulls are appended at the end to match the length of the longer array, before applying function:
  • Example:
QueryOutput
SELECT zip_with(ARRAY[1, 3, 5], ARRAY[‘a’, ‘b’, ‘c’], (x, y) -> (y, x))[ROW(‘a’, 1), ROW(‘b’, 3), ROW(‘c’, 5)]

Window functions

Window functions perform calculations across rows of the query result. They run after the HAVING clause but before the ORDER BY clause. Invoking a window function requires special syntax using the OVER clause to specify the window. A window has three components:

  1. The partition specification, which separates the input rows into different partitions. This is analogous to how the GROUP BY clause separates rows into different groups for aggregate functions.
  2. The ordering specification, which determines the order in which input rows will be processed by the window function.
  3. The window frame, which specifies a sliding window of rows to be processed by the function for a given row. If the frame is not specified, it defaults to RANGE UNBOUNDED PRECEDING, which is the same as RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW. This frame contains all rows from the start of the partition up to the last peer of the current row.

rank() → bigint

  • Returns the rank of a value in a group of values. The rank is one plus the number of rows preceding the row that are not peer with the row. Thus, tie values in the ordering will produce gaps in the sequence. The ranking is performed for each window partition.
  • As shown below, the rank function produces a numerical rank within the current row’s partition for each distinct ORDER BY value, in the order defined by the ORDER BY clause. rank needs no explicit parameter, because its behavior is entirely determined by the OVER clause.
  • Query:
WITH dataset AS 
    (SELECT 'name' AS rows, ARRAY['a', 'a', 'a', 'b', 'b', 'c', 'c', 'd', 'e'] AS sample )
SELECT rows,
         value,
         rank()
    OVER (ORDER BY value) AS rank
FROM dataset
CROSS JOIN UNNEST(sample) AS t(value)
  • Output:
 rowsvaluerank
1namea1
2namea1
3namea1
4nameb4
5nameb4
6namec6
7namec6
8named8
9namee9

row_number() → bigint

  • returns a unique, sequential number for each row, starting with one, according to the ordering of rows within the window partition.
  • Query:
WITH dataset AS 
    (SELECT 'name' AS rows, ARRAY['a', 'a', 'a', 'b', 'b', 'c', 'c', 'd', 'e'] AS sample )
SELECT rows,
         value,
         row_number()
    OVER (ORDER BY value) AS row_num
FROM dataset
CROSS JOIN UNNEST(sample) AS t(value)
  • Output:
 rowsvaluerow_num
1namea1
2namea2
3namea3
4nameb4
5nameb5
6namec6
7namec7
8named8
9namee9

dense_rank() → bigint

  • returns the rank of a value in a group of values. This is similar to rank(), except that tie values do not produce gaps in the sequence.
  • Query: 
WITH dataset AS 
    (SELECT 'name' AS rows, ARRAY['a', 'a', 'a', 'b', 'b', 'c', 'c', 'd', 'e'] AS sample )
SELECT rows,
         value,
         dense_rank()
    OVER (ORDER BY value) AS dense_ran
FROM dataset
CROSS JOIN UNNEST(sample) AS t(value)
  • Output: 
 rowsvaluedense_ran
1namea1
2namea1
3namea1
4nameb2
5nameb2
6namec3
7namec3
8named4
9namee5

Map Functions and Operators

map(array(K), array(V)) → map(K, V)

  • Returns a map created using the given key/value arrays.
  • Example:
QueryOutput
SELECT map(ARRAY[1,3], ARRAY[2,4]) AS a{1=2, 3=4}
SELECT map(ARRAY[‘k1′,’k2’], ARRAY[‘value1’, ‘value2’]){k1=value1, k2=value2}

Subscript Operator: [ ]

  • The [ ] operator is used to retrieve the value corresponding to a given key from a map.
  • Example:
QueryOutput
SELECT map(ARRAY[1,3], ARRAY[2,4]) AS a{1=2, 3=4}
SELECT a[1] FROM (SELECT map(ARRAY[1,3], ARRAY[2,4]) AS a)2

element_at(map(K, V), key) → V

  • Returns value for given key, or NULL if the key is not contained in the map.
  • Example:
Query
SELECT element_at(a, ‘k1’) FROM (SELECT map(ARRAY[‘k1′,’k2’], ARRAY[‘value1’, ‘value2’]) AS a)

cardinality(x) → bigint

  • Returns the cardinality (size) of the map x.

map() → map<unknown, unknown>

  • Returns an empty map.
  • Example:
QueryOutput
SELECT map(){}

map_from_entries(array(row(K, V))) → map(K, V)

  • Returns a map created from the given array of entries.
  • Example:
QueryOutput
SELECT map_from_entries(ARRAY[(1, ‘x’), (2, ‘y’)]){1 -> ‘x’, 2 -> ‘y’}

map_agg(key, value) → map(K, V)

  • Returns a map created from the input key/value pairs.

multimap_from_entries(array(row(K, V))) → map(K, array(V))

  • Returns a multimap created from the given array of entries. Each key can be associated with multiple values.
  • Example:
QueryOutput
SELECT multimap_from_entries(ARRAY[(1, ‘x’), (2, ‘y’), (1, ‘z’)]){1 -> [‘x’, ‘z’], 2 -> [‘y’]}

map_concat(map1(K, V), map2(K, V), …, mapN(K, V)) → map(K, V)

  • Returns the union of all the given maps. If a key is found in multiple given maps, that key’s value in the resulting map comes from the last one of those maps.

map_filter(map(K, V), function(K, V, boolean)) → map(K, V)

  • Constructs a map from those entries of map for which function returns true.
  • Example:
QueryOutput
SELECT map_filter(MAP(ARRAY[], ARRAY[]), (k, v) -> true){}
SELECT map_filter(MAP(ARRAY[10, 20, 30], ARRAY[‘a’, NULL, ‘c’]), (k, v) -> v IS NOT NULL){10 -> a, 30 -> c}

map_keys(x(K, V)) → array(K)

  • Returns all the keys in the map x.
  • Example:
QueryOutput
SELECT map_keys(map(ARRAY[‘k1′,’k2’], ARRAY[‘value1’, ‘value2’]))[k1, k2]

map_values(x(K, V)) → array(V)

  • Returns all the values in the map x.
  • Example:
QueryOutput
SELECT map_values(map(ARRAY[‘k1′,’k2’], ARRAY[‘value1’, ‘value2’]))[value1, value2]

transform_keys(map(K1, V), function(K1, V, K2)) → map(K2, V)

  • Returns a map that applies function to each entry of map and transforms the keys.
  • Example:
QueryOutput
SELECT transform_keys(MAP(ARRAY[], ARRAY[]), (k, v) -> k + 1){}
SELECT transform_keys(MAP(ARRAY [1, 2, 3], ARRAY [‘a’, ‘b’, ‘c’]), (k, v) -> k + 1){2 -> a, 3 -> b, 4 -> c}
SELECT transform_keys(MAP(ARRAY [‘a’, ‘b’], ARRAY [1, 2]), (k, v) -> k || CAST(v as VARCHAR)){a1 -> 1, b2 -> 2}

transform_values(map(K, V1), function(K, V1, V2)) → map(K, V2)

  • Returns a map that applies function to each entry of map and transforms the values.
  • Example:
QueryOutput
SELECT transform_values(MAP(ARRAY[], ARRAY[]), (k, v) -> v + 1){}
SELECT transform_values(MAP(ARRAY [1, 2, 3], ARRAY [10, 20, 30]), (k, v) -> v + k){1 -> 11, 2 -> 22, 3 -> 33}

Thanks for the read. Please feel free to reach out with your comments.

This story is authored by PV Subbareddy. Subbareddy is a Big Data Engineer specializing on Cloud Big Data Services and Apache Spark Ecosystem.

Setting Up a Data Lake on AWS Cloud Using LakeFormation

Setting up a Data Lake involves multiple steps such as collecting, cleansing, moving, and cataloging data, and then securely making that data available for downstream analytics and Machine Learning. AWS LakeFormation simplifies these processes and also automates certain processes like data ingestion. In this post, we shall be learning how to build a very simple data lake using LakeFormation with hypothetical retail sales data.

AWS Lake Formation provides its own permissions model that augments the AWS IAM permissions model. This centrally defined permissions model enables fine-grained access to data stored in data lake through a simple grant/revoke mechanism. These permissions are enforced at the table and column level on the data catalogue and are mapped to the underlying objects in S3. LakeFormation permissions are applicable across the full portfolio of AWS analytics and Machine Learning services, including Amazon Athena and Amazon Redshift.

So, let’s get on with the setup.

Adding an administrator

First and foremost step in using LakeFormation is to create an administrator. An administrator has full access to LakeFormation system and initial access to data configuration and access permissions. 

After adding an administrator, navigate to the Dashboard using the sidebar. This illustrates the typical process of Data lake setup.

Register location

From Register and Ingest sub menu in the sidebar, If you wish to setup data ingestion, that is, import unprocessed/landing data, AWS LakeFormation comes with in-house Blueprints that one could use to build Workflows. These workflows could be scheduled as per the needs of the end-user. Sources of data for these workflows can be a JDBC source, log files and many more. Learn more about importing data using workflows here.

If your ingestion process doesn’t involve any of the above mentioned ways and writes directly to S3, it’s alright. Either way we end up registering that S3 location as one of the Data Lake locations.

Once created you shall see its listing in the Data Lake locations.

You could not only access this location from here but also set permission to objects stored in that path. If preferred, one could register lake locations precisely for each processing zone and set permissions accordingly. I registered it to the whole bucket.

I created 2 retail datasets (.csv), one with 20 records and the other with 5 records. I have uploaded one of the datasets (20 records) to S3 with raw/retail_sales prefix.

Creating a Database

Lake Formation internally uses the Glue Data Catalog, so it shows all the databases available. From the Data Catalog sub menu in the sidebar, navigate to Databases to create and manage all the databases. I created a database called merchandise with default permissions.

Once created, you shall see its listing, and also manage, grant/revoke permissions and view tables in that DB.

Creating Crawlers and ETL jobs

From the Register and Ingest sub menu in the sidebar, navigate to Crawlers, Jobs to create and manage all Glue related services. Lake Formation redirects to AWS Glue and internally uses it. I created a crawler to get the metadata for objects residing in raw zone.

After running this crawler manually, now raw data can be queried from Athena.

I created an ETL job to run a transformation on this raw table data. 

All it does is change the class type of purchase date, which is from string class to date class. Creates partitions while writing to refined zone in parquet format. These partitions are created from the processing date but not the purchase date.

retail-raw-refined ETL job python script:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import *
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql import *

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "merchandise", table_name = "raw_retail_sales", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "merchandise", table_name = "raw_retail_sales", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("email_id", "string", "email_id", "string"), ("retailer_name", "string", "retailer_name", "string"), ("units_purchased", "long", "units_purchased", "long"), ("purchase_date", "string", "purchase_date", "date"), ("sale_id", "string", "sale_id", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]

#convert glue object to sparkDF
sparkDF = datasource0.toDF()
sparkDF = sparkDF.withColumn('purchase_date', unix_timestamp(sparkDF.purchase_date, 'dd/MM/yyyy').cast(TimestampType()))

applymapping1 = DynamicFrame.fromDF(sparkDF, glueContext,"datafields")
# applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("email_id", "string", "email_id", "string"), ("retailer_name", "string", "retailer_name", "string"), ("units_purchased", "long", "units_purchased", "long"), ("purchase_date", "string", "purchase_date", "date"), ("sale_id", "string", "sale_id", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-787/refined/retail_sales"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
now = datetime.datetime.now()
path = "s3://test-787/refined/retail_sales/"+'year='+str(now.year)+'/month='+str(now.month)+'/day='+str(now.day)+'/'
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": path}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

The lakeformation:GetDataAccess permission is needed for this job to work. I created a new policy named LakeFormationGetDataAccess and attached it to AWSGlueServiceRoleDefault role.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": "lakeformation:GetDataAccess",
            "Resource": "*"
        }
    ]
}

After running the job manually, it will load new transformed data with partitions in the refined zone as specified in the job.

I created another crawler to get the metadata for these objects residing in refined zone.

After running this crawler manually, now refined data can be queried from Athena.

You could now see the newly added partition columns (year, month, day).

Let us add some new raw data and see how our ETL job process that delta difference.

We only want to process new data and old data is either moved to archive location or deleted from raw zone, whatever is preferred.

Run the ETL job again. See new files being added into refined zone.

Load new partitions using msck repair table query.

Note: Try creating another IAM user and as an administrator in the LakeFormation, give this user limited access to the tables, try querying using Athena. See if the permissions are working.

Pros and cons of LakeFormation

The UI is made simple, all under one roof. Most of the times, one needs to keep multiple tabs open and opening S3 locations is troublesome. This is made easy by register data lake locations feature, one not only can access these locations directly but also revoke/grant permissions of the objects residing there. 

Managing permissions on an Object level in S3 is a hectic process. But with LakeFormation permissions can be managed at the data catalog level. This enables one to grant/revoke permissions to users or roles on a table/column level. These permissions are internally mapped to underlying objects sitting in S3.

Though managing permissions, data ingestion workflow are made easy, but still most of the Glue processes like ETL, Crawler, ML specific transformations have to be setup manually.

This story is authored by Koushik. He is a software engineer and a keen data science and machine learning enthusiast.

Serverless Architecture for Lightening Fast Distributed File Transfer on AWS Data Lake

Today, we are very excited to share our insights on setting up a serverless architecture for setting up a lightening fast way* to copy large number of objects across multiple folders or partitions in an AWS data lake on S3. Typically in a data lake, data is kept across various zones depending on data lifecycle. For example, as the data arrives from source, it can be kept in the raw zone and then post processing moved to a processed zone, so that the lake is ready for the next influx of data. The rate of object transfer is a crucial factor, as it affects the overall efficiency of the data processing lifecycle in the data lake.

*In our tests, we copied more than 300K objects ranging from 1KB to 10GB in size from the raw zone into the processed zone. Compared to the best known tool for hyper fast file transfer on AWS called s3s3mirror, we were able to finish this transfer of about 24GB of data in about 50% less time. More details have been provided at the end of the post.

We created a lambda invoke architecture that copies files/objects concurrently. The below picture accurately depicts it.


OMS (Orchestrator-Master-Slave) Lambda Architecture

For example, If we have an S3 bucket with the following folder structure with the actual objects further contained within this hierarchy of folders, sub-folders and partitions.

S3 file structure

Let us look at how we can use OMS Architecture (Orchestrator-Master-Slave) to achieve hyper-fast distributed/concurrent file transfer. The above architecture can be divided into two halves, Orchestrator-Master, Master-Slave.

Orchestrator-Master

The Orchestrator simply invokes a Master Lambda for each folder. Each Master then iterates the objects in that folder (including all sub-folders and partitions) and invokes a Slave Lambda for each object to copy it to the destination.

Orchestrator-Master Lambda invoke

Let us look at the Orchestrator Lambda code.
Source-to-Destination-File-Transfer-Orchestrator:

import os
import boto3
import json
from datetime import datetime

client_lambda = boto3.client('lambda')
master_lambda = "Source-to-Destination-File-Transfer-Master"

folder_names = ["folder1", "folder2", "folder3", "folder4", "folder5", "folder6", "folder7", "folder8", "folder9"]

def lambda_handler(event, context):
    
    t = datetime.now()
    print("start-time",t)
    
    try:            
        for folder_name in folder_names:
            
            payload_data = {
              'folder_name': folder_name
            }                
        
            payload = json.dumps(payload_data)
            client_lambda.invoke(
                FunctionName = master_lambda,
                InvocationType = 'Event',
                LogType = 'None',
                Payload = payload
            )
            print(payload)
            
    except Exception as e:
        print(e)
        raise e

Master-Slave

Master-Slave Lambda invoke

Let us look at the Master Lambda code.
Source-to-Destination-File-Transfer-Master:

import os
import boto3
import json
from botocore.exceptions import ClientError

s3 = boto3.resource('s3')
client_lambda = boto3.client('lambda')

source_bucket_name = 'source bucket name'
source_bucket = s3.Bucket(source_bucket_name)

slave_lambda = "Source-to-Destination-File-Transfer-Slave"

def lambda_handler(event, context):

    try:
        source_prefix = "" #add if any
        source_prefix = source_prefix + "/" + event['table_name'] + "/"

        for obj in source_bucket.objects.filter(Prefix = source_prefix):
            path = obj.key
            payload_data = {
               'file_path': path
            }
            payload = json.dumps(payload_data)
            client_lambda.invoke(
                FunctionName = slave_lambda,
                InvocationType = 'Event',
                LogType = 'None',
                Payload = payload
            )

    except Exception as e:
        print(e)
        raise e

Slave

Let us look at the Slave Lambda code.
Source-to-Destination-File-Transfer-Slave:

import os
import boto3
import json
import re
from botocore.exceptions import ClientError

s3 = boto3.resource('s3')

source_prefix = "" #add if any
source_bucket_name = "source bucket name"
source_bucket = s3.Bucket(source_bucket_name )

destination_bucket_name = "destination bucket name"
destination_bucket = s3.Bucket(destination_bucket_name )

def lambda_handler(event, context):
    try:
        destination_prefix = "" #add if any
        
        source_obj = { 'Bucket': source_bucket_name, 'Key': event['file_path']}
        file_path = event['file_path']
        
        #copying file
        new_key = file_path.replace(source_prefix, destination_prefix)
        new_obj = source_bucket.Object(new_key)
        new_obj.copy(source_obj)
        
    except Exception as e:
        raise e

You must ensure that these Lambda functions have been configured to meet the maximum execution time and memory limit constraints as per your case. We tested by setting the upper limit of execution time as 5 minutes and 1GB of available memory.

Calculating the Rate of File Transfer

To calculate the rate of file transfer we are printing start time at the beginning of Orchestrator Lambda execution. Once the file transfer is complete, we use another lambda to extract the last modified date attribute of the last copied object.

Extract-Last-Modified:

import json
import boto3
from datetime import datetime
from dateutil import tz

s3 = boto3.resource('s3')

destination_bucket_name = "destination bucket name"
destination_bucket = s3.Bucket(destination_bucket_name)
destination_prefix = "" #add if any

def lambda_handler(event, context):
    
    #initializing with some old date
    last_modified_date = datetime(1940, 7, 4).replace(tzinfo = tz.tzlocal()) 

    for obj in my_bucket.objects.filter(Prefix = destination_prefix):
        
        obj_date = obj.last_modified.replace(tzinfo = tz.tzlocal())
        
        if last_modified_date < obj_date:
            last_modified_date = obj_date
    
    print("end-time: ", last_modified_date)

Now we have both start-time from Orchestrator Lambda and end-time from Extract-last-modified Lambda, their difference is the time taken for file transfer.

Before writing this post, we copied 24.1GB of objects using the above architecture, results are shown in the following screenshots:

duration	=	end-time - start-time
		=	10:04:49 - 10:03:28
		=	00:01:21 (hh-mm-ss)

To check the efficiency of our OMS Architecture, we compared the results of OMS with s3s3mirror, a utility for mirroring content from one S3 bucket to another or to/from the local filesystem. Below screenshot has the file transfer stats of s3s3 for the same set of files:

As we see the difference was 1 minutes and 8 seconds for total data transfer of about 24GB, it can be much higher for large data sets if we add more optimizations. I have only shared a generalized view of the OMS Architecture, it can be further fine-tuned to specific needs and get a highly optimized performance. For instance, if you have partitions in each folder and the OMS Architecture could yield much better results if you invoke Master Lambda for each partition inside the folder instead of invoking the master just at the folder level.

Thanks for the read. Looking forward to your thoughts.

This story is co-authored by Koushik and Subbareddy. Koushik is a software engineer and a keen data science and machine learning enthusiast. Subbareddy is a Big Data Engineer specializing on Cloud Big Data Services and Apache Spark Ecosystem.