Etleap Blog

Integrate Etleap with Amazon Redshift Streaming Ingestion (preview) to make data available in seconds

Written by Etleap Marketing | Dec 1, 2021 8:30:16 PM

This blog post was written in partnership with the Amazon Redshift team, and also posted on the AWS Big Data Blog.

Amazon Redshift is a fully managed cloud data warehouse that makes it simple and cost-effective to analyze all your data using SQL and your extract, transform, and load (ETL), business intelligence (BI), and reporting tools. Tens of thousands of customers use Amazon Redshift to process exabytes of data per day and power analytics workloads.

Etleap is an AWS Advanced Technology Partner with the AWS Data & Analytics Competency and Amazon Redshift Service Ready designation. Etleap ETL removes the headaches experienced building data pipelines. A cloud-native platform that seamlessly integrates with AWS infrastructure, Etleap ETL consolidates data without the need for coding. Automated issue detection pinpoints problems so data teams can stay focused on business initiatives, not data pipelines.

In this post, we show how Etleap customers are integrating with the new streaming ingestion feature in Amazon Redshift (currently in limited preview) to load data directly from Amazon Kinesis Data Streams. This reduces load times from minutes to seconds and helps you gain faster data insights.

Amazon Redshift streaming ingestion with Kinesis Data Streams

Traditionally, you had to use Amazon Kinesis Data Firehose to land your stream into Amazon Simple Storage Service (Amazon S3) files and then employ a COPY command to move the data into Amazon Redshift. This method incurs latencies in the order of minutes.

Now, the native streaming ingestion feature in Amazon Redshift lets you ingest data directly from Kinesis Data Streams. The new feature enables you to ingest hundreds of megabytes of data per second and query it at exceptionally low latency—in many cases only 10 seconds after entering the data stream.

Configure Amazon Redshift streaming ingestion with SQL queries

Amazon Redshift streaming ingestion uses SQL to connect with one or more Kinesis data streams simultaneously. In this section, we walk through the steps to configure streaming ingestion.

Create an external schema

We begin by creating an external schema referencing Kinesis using syntax adapted from Redshift’s support for Federated Queries:

CREATE EXTERNAL SCHEMA MySchema
FROM Kinesis
IAM_ROLE { default | 'iam-role-arn' };

This external schema command creates an object inside Amazon Redshift that acts as a proxy to Kinesis Data Streams. Specifically, to the collection of data streams that are accessible via the AWS Identity and Access Management (IAM) role. You can use either the default Amazon Redshift cluster IAM role or a specified IAM role that has been attached to the cluster previously.

Create a materialized view

You can use Amazon Redshift materialized views to materialize a point-in-time view of a Kinesis data stream, as accumulated up to the time it is queried. The following command creates a materialized view over a stream from the previously defined schema:

CREATE MATERIALIZED VIEW MyView AS
SELECT *
FROM MySchema.MyStream;

Note the use of the dot syntax to pick out the particular stream desired. The attributes of the stream include a timestamp field, partition key, sequence number, and a VARBYTE data payload.

Although the previous materialized view definition simply performs a SELECT *, more sophisticated processing is possible, for instance, applying filtering conditions or shredding JSON data into columns. To demonstrate, consider the following Kinesis data stream with JSON payloads:

{
“player” : “alice 127”,
“region” : “us-west-1”,
action” : “entered shop”,
}

To demonstrate this, write a materialized view that shreds the JSON into columns, focusing only on the entered shop action:

CREATE MATERIALIZED VIEW ShopEntrances AS
SELECT ApproximateArrivalTimestamp, SequenceNumber,
json_extract_path_text(from_varbyte(Data, 'utf-8'), 'player') as Player,
json_extract_path_text(from_varbyte(Data, 'utf-8'), 'region') as Region
FROM MySchema.Actions
WHERE json_extract_path_text(from_varbyte(Data, 'utf-8'), 'action') = 'entered shop';

On the Amazon Redshift leader node, the view definition is parsed and analyzed. On success, it is added to the system catalogs. No further communication with Kinesis Data Streams occurs until the initial refresh.

Refresh the materialized view

The following command pulls data from Kinesis Data Streams into Amazon Redshift:

REFRESH MATERIALIZED VIEW MyView;

You can initiate it manually (via the SQL preceding command) or automatically via a scheduled query. In either case, it uses the IAM role associated with the stream. Each refresh is incremental and massively parallel, storing its progress in each Kinesis shard in the system catalogs so as to be ready for the next round of refresh.

With this process, you can now query near-real-time data from your Kinesis data stream through Amazon Redshift.

Configure Amazon Redshift streaming ingestion with SQL queries

Etleap pulls data from databases, applications, file stores, and event streams, and transforms it before loading it into an AWS data repository. Data ingestion pipelines typically process batches every 5–60 minutes, so when you query your data in Amazon Redshift, it’s at least 5 minutes out of date. For many use cases, such as ad hoc queries and BI reporting, this latency time is acceptable.

But what about when your team demands more up-to-date data? An example is operational dashboards, where you need to track KPIs in near-real time. Amazon Redshift load times are bottlenecked by COPY commands that move data from Amazon S3 into Amazon Redshift, as mentioned earlier.

This is where streaming ingestion comes in: by staging the data in Kinesis Data Streams rather than Amazon S3, Etleap can reduce data latency in Amazon Redshift to less than 10 seconds. To preview this feature, we ingest data from SQL databases such as MySQL and Postgres that support change data capture (CDC). The data flow is shown in the following diagram.

Etleap manages the end-to-end data flow through AWS Database Migration Service (AWS DMS) and Kinesis Data Streams, and creates and schedules Amazon Redshift queries, providing up-to-date data.

AWS DMS consumes the replication logs from the source, and produces insert, update, and delete events. These events are written to a Kinesis data stream that has multiple shards in order to handle the event load. Etleap transforms these events according to user-specified rules, and writes them to another data stream. Finally, a sequence of Amazon Redshift commands load data from the stream into a destination table. This procedure takes less than 10 seconds in real-world scenarios.

Configure Amazon Redshift streaming ingestion with Etleap

Previously, we explored how data in Kinesis Data Streams can be accessed in Amazon Redshift using SQL queries. In this section, we see how Etleap uses the streaming ingestion feature to mirror a table from MySQL into Amazon Redshift, and the end-to-end latency we can achieve.

Etleap customers that are part of the Streaming Ingestion Preview Program can ingest data into Amazon Redshift directly from an Etleap-managed Kinesis data stream. All pipelines from a CDC-enabled source automatically use this feature.

The destination table in Amazon Redshift is Type 1, a mirror of the table in the source database.

For example, say you want to mirror a MySQL table in Amazon Redshift. The table represents the online shopping carts that users have open. In this case, low latency is critical so that the platform marketing strategists can instantly identify abandoned carts and high demand items.

The cart table has the following structure:

CREATE TABLE cart (
id int PRIMARY KEY AUTO_INCREMENT,
user_id INT,
current_price DECIMAL(6,2),
no_items INT,
checked_out TINY_INT(1),
update_date TIMESTAMP
);

Changes from the source table are captured using AWS DMS and then sent to Etleap via a Kinesis data stream. Etleap transforms these records and writes them to another data stream using the following structure:

{
"id": 8322,
"user_id": 443,
"current_price": 22.98,
"no_items": 3,
"checked_out": 0,
"update_date": "2021-11-05 23:11",
"op": "U"
}

The structure encodes the row that was modified or inserted, as well as the operation type (represented by the op column), which can have three values: I (insert), U (update) or D (delete).

This information is then materialized in Amazon Redshift from the data stream:

CREATE EXTERNAL SCHEMA etleap_stream
FROM KINESIS
IAM_ROLE '<redacted>';

CREATE MATERIALIZED VIEW cart_staging
DISTSTYLE KEY
DISTKEY(id)
SORTKEY(etleap_sequence_no)
AS SELECT
CAST(PartitionKey as bigint) AS etleap_sequence_no,
CAST(JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'id') as bigint) AS id,
JSON_PARSE(FROM_VARBYTE(Data, 'utf-8')) AS Data
FROM etleap_stream."cart";

In the materialized view, we expose the following columns:

  • PartitionKey represents an Etleap sequence number, to ensure that updates are processed in the correct order.
  • We shred the primary keys of the table (id in the preceding example) from the payload, using them as a distribution key to improve the update performance.
  • The Data column is parsed out into a SUPER type from the JSON object in the stream. This is shredded into the corresponding columns in the cart table when the data is inserted.

With this staging materialized view, Etleap then updates the destination table (cart) that has the following schema:

CREATE TABLE cart (
id BIGINT PRIMARY KEY,
user_id
BIGINT,
current_price
DECIMAL(6,2),
no_items
INT,
checked_out
BOOLEAN,
update_date
VARCHAR(64)
)
DISTSTYLE
key
distkey
(id);

To update the table, Etleap runs the following queries, selecting only the changed rows from the staging materialized view, and applies them to the cart table:

BEGIN;

REFRESH MATERIALIZED VIEW cart_staging;

UPDATE _etleap_si SET end_sequence_no = (
SELECT COALESCE(MIN(etleap_sequence_no), (SELECT MAX(etleap_sequence_no) FROM cart_staging)) FROM
(
SELECT
etleap_sequence_no,
LEAD(etleap_sequence_no, 1) OVER (ORDER BY etleap_sequence_no) - etleap_sequence_no AS diff
FROM cart_staging
WHERE etleap_sequence_no > (SELECT start_sequence_no FROM _etleap_si WHERE table_name = 'cart')
)
WHERE diff > 1
) WHERE table_name = 'cart';

DELETE FROM cart
WHERE id IN (
SELECT id
FROM cart_staging
WHERE etleap_sequence_no > (SELECT start_sequence_no FROM _etleap_si WHERE table_name = 'cart')
AND etleap_sequence_no <= (SELECT end_sequence_no FROM _etleap_si WHERE table_name = 'cart')
);

INSERT INTO cart
SELECT
DISTINCT(id),
CAST(Data."timestamp" as timestamp),
CAST(Data.payload as varchar(256)),
CAST(Data.etleap_sequence_no as bigint) from
(SELECT id,
JSON_PARSE(LAST_VALUE(JSON_SERIALIZE(Data)) OVER (PARTITION BY id ORDER BY etleap_sequence_no ASC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)) AS Data
FROM cart_staging
WHERE etleap_sequence_no > (SELECT start_sequence_no FROM _etleap_si WHERE table_name = 'cart')
AND etleap_sequence_no <= (SELECT end_sequence_no FROM _etleap_si WHERE table_name = 'cart'
AND Data.op != 'D')
);

UPDATE _etleap_si SET start_sequence_no = end_sequence_no WHERE table_name = 'cart';

COMMIT;

We run the following sequence of queries:

  1. Refresh the cart_staging materialized view to get new records from the cart stream.
  2. Delete all records from the cart table that were updated or deleted since the last time we ran the update sequence.
  3. Insert all the updated and newly inserted records from the cart_staging materialized view into the cart table.
  4. Update the _etleap_si bookkeeping table with the current position. Etleap uses this table to optimize the query in the staging materialized view.

This update sequence runs continuously to minimize end-to-end latency. To measure performance, we simulated the change stream from a database table that has up to 100,000 inserts, updates, and deletes. We tested target table sizes of up to 1.28 billion rows. Testing was done on a 2-node ra3.xlplus Amazon Redshift cluster and a Kinesis data stream with 32 shards.

The following figure shows how long the update sequence takes on average over 5 runs in different scenarios. Even in the busiest scenario (100,000 changes to a 1.28 billion row table), the sequence takes just over 10 seconds to run. In our experiment, the refresh time was independent of the delta size, and took 3.7 seconds with a standard deviation of 0.4 seconds.

This shows that the update process can keep up with source database tables that have 1 billion rows and 10,000 inserts, updates, and deletes per second.

Summary

In this post, you learned about the native streaming ingestion feature in Amazon Redshift and how it achieves latency in seconds, while ingesting data from Kinesis Data Streams into Amazon Redshift. You also learned about the architecture of Amazon Redshift with the streaming ingestion feature enabled, how to configure it using SQL commands, and use the capability in Etleap.

To learn more about Etleap, take a look at the Etleap ETL on AWS Quick Start, or visit their listing on AWS Marketplace.

by Caius Brindescu [Etleap], Jobin George [AWS], Maneesh Sharma [AWS], and TJ Green [AWS]