Building distributed data pipeline on AWS

Data pipelines are common for any businesses that work with large amount of data. It is a crucial piece of infrastructure that fetches data from its source, transforms it and stores it for internal use. For example, a data pipeline can get all the tweets from Twitter Firehose API, extracts pieces of information relevant to sentiment analysis and store the user sentiment score for each topic for internal analytics.

Modern data pipeline have several things in common. They are all distributed, which simply means the different pieces of the pipeline run on different computing hardware. Due to the distributed nature, they all need a notification mechanism for orchestrate each step of the pipeline. They all need a persistent data storage, most commonly a database, but can also be ephemeral storage.

In this post we will build a distributed data pipeline using core services in Amazon Web Services (AWS).

Traditionally building a distributed data pipe on your own hardware is complicated and requires large engineering and network costs. The advent of cloud computing has dramatically changed the landscape of large scale computing infrastructure. Amazon Web Services(AWS) offers flexible on-demand computing infrastructure that allows us to focus on building what’s important for our business instead of worry about the underlying computer infrastructure.

In this post we will introduce some of the core services of AWS, including Elastic Compute Cloud(EC2), Simple Storage Service(S3), Simple Queue Service(SQS) and Relational Database Service(RDS). We will also build a data importer and transformer on top of these services and tie our data pipeline together.

The architecture of our data pipeline looks like this

data pipeline architecture.png

First of all, our data importer and transformer run on two EC2 instances. EC2 stands for Elastic Compute Cloud. It’s the backbone of AWS. It offers elastic on-demand computing resources that can scale up and down in a matter of minutes. For our demo purpose we spun up two t2.micro instances with 1 cpu and 1 GB of ram.

The data importer runs as a standalone python application on one EC2 instance. It will fetch mock data from an external API that gives fictitious blog posts data in json format, backs up the raw json data into S3. S3 is Amazon’s high performance object storage that can store any amount of data, no matter how large. It has become the de facto solution for large scale data backup and archive. After the data importer has backed up the data into S3, it will send a message to a queue on SQS with information on how to find the data on S3, notifying the transformer to start processing as the next step in the pipeline. SQS is an AWS service that behaves like a message queue. On one end, applications called producers can send messages to the queue. On the other end, another set of application called consumers can receive messages from the queue in the same order the messages were sent by the producers. It’s normally called a FIFO(first-in-first-out) queue. We use SQS to orchestrate different parts of the data pipeline, allowing one task to notify the next of its status.

Our transformer is another python application that runs on the second EC3 instance. Once the transformer gets a message that the importer has finished its work, it will fetch the json data from S3 using an address encoded in the message and start its work. The transformer is simplish, it fetches the json data, parses it and stores it in the database. The database we use is Postgres RDS. RDS stands for Relational Database Service. It simply means AWS manages the setup, provisioning and scaling of the database for you. Once it’s spun up, you can start using it as a normal Postgres database as if it runs on your own hardware. Storing data into a database is the last stage of our data pipeline.

Because we chose to run our data pipeline infrastructure on AWS services, the Python application code we have to write is very simple.

For our data importer, we use boto3 to interact with S3 and SQS and requests library to make http requests to the external API that give us fictitious data. boto3 is the python client library for AWS services. Requests is a common python library for making http requests to a web service.

import json
import logging
from datetime import datetime

import boto3
import requests


EXTERNAL_API_URL = 'https://jsonplaceholder.typicode.com/posts'
BUCKET_NAME = 'cloudboxlabs'
QUEUE_NAME = 'cloudboxlabs_datapipe_tutorial'


class Importer(object):
    def __init__(self):
        self.s3 = boto3.resource('s3')
        sqs = boto3.resource('sqs')
        self.queue = sqs.get_queue_by_name(QueueName=QUEUE_NAME)

    def run(self):
        response = requests.get(EXTERNAL_API_URL)

        file_name = 'posts_{}.json'.format(datetime.strftime(datetime.now(), '%Y%m%d%H%M%S'))
        with open(file_name, 'w') as file_obj:
            file_obj.write(json.dumps(response.json()))

        logging.info('Received API response')

        with open(file_name, 'r') as file_obj:
            self.s3.Bucket(BUCKET_NAME).put_object(Key=file_name, Body=file_obj)

        logging.info('Put json into S3')

        self.queue.send_message(MessageBody='post', MessageAttributes={
            's3_path': {
                'StringValue': 's3://{}/{}'.format(BUCKET_NAME, file_name),
                'DataType': 'String'
            }
        })

For our transformer, we also use boto3 for S3 and SQS interaction. In additional we use pychopg2 library for connecting to Postgres RDS database and insert data into a relational table.

import json
import logging
from datetime import datetime

import boto3
import requests


EXTERNAL_API_URL = 'https://jsonplaceholder.typicode.com/posts'
BUCKET_NAME = 'cloudboxlabs'
QUEUE_NAME = 'cloudboxlabs_datapipe_tutorial'


class Importer(object):
    def __init__(self):
        self.s3 = boto3.resource('s3')
        sqs = boto3.resource('sqs')
        self.queue = sqs.get_queue_by_name(QueueName=QUEUE_NAME)

    def run(self):
        response = requests.get(EXTERNAL_API_URL)

        file_name = 'posts_{}.json'.format(datetime.strftime(datetime.now(), '%Y%m%d%H%M%S'))
        with open(file_name, 'w') as file_obj:
            file_obj.write(json.dumps(response.json()))

        logging.info('Received API response')

        with open(file_name, 'r') as file_obj:
            self.s3.Bucket(BUCKET_NAME).put_object(Key=file_name, Body=file_obj)

        logging.info('Put json into S3')

        self.queue.send_message(MessageBody='post', MessageAttributes={
            's3_path': {
                'StringValue': 's3://{}/{}'.format(BUCKET_NAME, file_name),
                'DataType': 'String'
            }
        })

That is it! A fully functional distributed data import pipeline with less than 100 lines of code. AWS did the heavy lifting of provisional computing resources, establish network policies and scaling the infrastructure. We focused on writing and deploying our application.

Distributed data infrastructure has become the new normal in today’s world. At Cloudbox Labs, we are passionate about offering practical solutions to help businesses — big or small — benefit from the big data technologies.

As always you can find the full code discussed in this post on Cloudbox Labs github.