AWS: Running Flink Application on Kinesis Data Analytics(KDA)- Part 1

Arjun Sunil Kumar
Cloud Engineering
Published in
5 min readApr 12, 2020

--

KDA+Flink

Update [05/12/2020]:

Amazon Kinesis Data Analytics(KDA) now supports Apache Flink v1.11. So please skip the Flink 1.8.2 workaround mentioned in this article.

If you are here, you might have already heard of apache-flink. It is primarily used for distributed stream processing/aggregation. Amazon Kinesis Data Analytics (KDA) is a PaaS, where you can run your Flink application.

Tutorial List:

  1. Best practices for building your KDA Flink app. [Part 1]
  2. Deploying your Flink jar in KDA. [Part 2]

Download:

Please fork the complete project from my Github.

AWS Kinesis Data Analytics:

As mentioned, KDA is a Platform as a Service. KDA is Flink Cluster running on Fargate, which can scale based on the load. There is another way of running the flink app on AWS, which is by using EMR. KDA currently supports Flink version 1.6 and 1.8. We will be using flink 1.8 throughout our series. KDA gives a lot of stuff out of the box.

  1. We can set an IAM role to restrict our flink applications resource access. We can clearly specify the resources required by our flink application (Kinesis, DynamoDB, Cassandra) using IAM Policy.
  2. KDA by default provides integrated RocksDBStateBackend.
  3. KDA already has a Restart-Strategy configured.
  4. We can enable application snapshotting.
  5. It provides autoscaling based on CPU usage.

Flink 1.8.2

We will be using flink 1.8.2 as our base version for the projects. As of today (12–04–2020), KDA has support for Flink 1.8 (and not Flink 1.10).

Note: In Flink 1.8.2, flink-connector-kinesis, had some licensing issue. Henceforth, this connector was removed from maven central. As per the docs, we need to pull the flink 1.8.2 source code and build the project to get flink-connector-kinesis in our local .m2. From then on, we can use it in our pom. But this was not feasible for local execution in everyone's machine. Hence we devised a different approach to solve this.

Hello World!

As every programmer, we start off with a hello world. The application is a simple decorator chain. Kinesis message is read and appended with the timestamp and then appended with a greeting message. We are as of now just printing the result stream, instead of passing it to sink.

Breaking the main class into smaller chunks

Source

  • KINESIS_SOURCE_STREAM_NAME_KEY, is the key which is used to fetch stream name from environment properties.
  • .name(“Operator Name”) is used to provide the operator name, which will be used in the job graph dashboard.
  • .uid(“operator_id”) is used while saving and recovering the state of an operator in flink. Make sure you provided a unique id to every operator.

Properties Map

The above code fetches properties from the below application-properties-dev.json based on the execution environment.

As you can see, the properties are defined in a JSON structure. We are doing a conditional check to load properties from the resource folder if we are running it locally. Else it will be loaded from KDA environment.

Pipeline code

  • We are creating a stream execution environment (sEnv).
  • We are fetching the properties from application-properties-dev.json
  • There are 3 operators in this chain: source → time appender → greet appender.
  • .startNewChain() is used to break the operators into separate blocks in the job graph.
  • executions are lazy. ie unless a print/execute is invoked, the result won’t be calculated.

Logging:

According to flink best practices, it is not good to log, per entry that arrives as kinesis input. We can log per output, if input rate > output rate.

To keep it simple we can use log.debug() and run the application with log level set to INFO. When there is an issue, we can set the log level to DEBUG and see the full logs.

Now let's look into applications POM.

Maven POM

Let break them down too.

Dependencies:

  • In the properties section of the parent module, we have defined the dependencies version to use. (1.8.2)
  • We will cover the flink-kinesis-connector part later in this article.
  • aws-kinesisanalytics-runtime is required to run the application in KDA.
  • Note that some of the dependency scopes are provided because these are not required in the build jar while running in KDA. Flink/KDA Cluster will already provide those dependencies in the classpath. Henceforth the scope is provided.
  • You won’t be able to run the application, given the scope of these dependencies are provided. You need to make the scope compile, in order to run the application from IDEA. There is a simple trick to solve this, which will be covered in the latter part of this article.
  • Lombok is used to get the benefits of annotations like @Getter, @Setter, @Slf4j.
  • Flink Test Dependency, provide testHarness and MiniCluster, both very powerful tools for testing windowing logics & E2E flow.

Profile

As I told you earlier, in order to run the application in your IDEA, you need to make these dependencies scoped as compile. A simpler solution is to create a maven profile to run the application from IDEA. So when running the application from your IDEA, activate this profile.

IDEA Profile Activation

When building the jar for running in KDA, make sure to turn this profile off, so that unwanted dependencies are not added into the jar.

Build

Let now see the build plugins

  • maven-compiler-plugin is used to compile the source code into java 1.8 compactable byte code.
  • maven-shade-plugin is used to build uber jar. Using shade, classes with same name coming from different dependencies will be managed properly, and won’t be overwritten in the uber jar.
  • maven-install-plugin is used to install flink-connector-kinesis:1.8.2 into your local .m2. The dependency jar is present in the{$baseDir}/lib folder. When you perform: mvn validate, the dependency is installed into your local .m2 folder. Please do read the README .

.gitignore

This file is very imported in this project. We need to include the flink-connector-kinesis jar from our source code and exclude other .jar.

As you can see, the flink-connector-kinesis jar is exempted from .gitignore.

Now let's see how to build the app.

Building the APP

  • Before we begin, we need our local .m2 to have flink-connector-kinesis. For that, perform mvn validate.
  • Now make sure you have unchecked add-dependencies-for-IDEA and perform mvn clean install.

The next part will cover deploying this flink app into Kinesis Data Analytics.

Found it Interesting?
Please show your support by 👏.

--

--

Arjun Sunil Kumar
Cloud Engineering

Writes on Database Kernel, Distributed Systems, Cloud Technology, Data Engineering & SDE Paradigm. github.com/arjunsk