, , ,

Introduction to Apache Flink for Data Engineers

Not going to lie. I’ve been trying to figure out for awhile where Apache Flink fits in the Data Engineering world for awhile now. A year or two ago I didn’t seem much content posted about it, but it seems to be picking up stream. I’ve mostly managed to avoid understanding what Flink is or does, but I figured it’s time to give my brain a much needed workout. When I was ignoring Flink, I just chalked it up as another messaging/streaming system like Kafka or Pulsar. Apparently I was wrong … no surprise there.

Introduction to Apache Flink

So what is Flink if it’s not a messaging system?

“….framework and distributed processing engine for stateful computations over unbounded and bounded data streams”

– Flink docs

I don’t know why it is, but every time I hear something about data streams, I think messaging systems … which it appears Flink isn’t. Flink isn’t for passing messages around, it’s a big data processing tool. I mean the first description you get in the docs is that it’s for “computations.” I think that’s key to understand what Flink is, and what I can be best used for.

My first impressions, and best way I’ve found to think about Flink … is that it’s like Apache Spark streaming on steroids. It was designed with a different type of data processing in mind …. treating incoming data more like a stream … instead of the classic ETL approach of Spark.

What more can we know about Flink before diving in?

  • can run on common distributed platforms like YARN, Kubernetes, Mesos, etc.
  • unbounded (start and no end) and bounded (starts and ends) of data.
  • lots of memory usage ( like Spark) to make things fast.
  • Flink can process realtime or already stored (persisted) data.
  • has State (aka can apply business logic, or remember what happened etc)
  • has Time (aggregations etc usually are related to time)

What’s Flink used for?

Even after all that, my mind was still a little fuzzy on Flink. It’s not a messaging system … it’s a distributed system for event processing. In the word’s of the Flink documentation it can also be used for analytics, and data pipelines. In the past those things have always seemed very different …. event processing vs big data analytics. But I guess that’s what Flink’s trying to do? Both?

Flink talk’s a lot about moving from the from the paradigm of batch triggered ETL and analytics to continuous data processing.

In the real world that’ not much different from Spark Streaming … but what do I know.

Apache Flink APIs

At a very high level it appears Flink offers us 3 different methods for interacting with our data.

  • Table API (SQL)
  • DataStream API
  • UDF’s for the above.

Apache Flink + Docker

It does look like Flink has an official Docker image. That’s a nice change of pace, it’s amazing how many Apache projects lack that.

Installing Apache Flink

For local messing around you can either run the Docker image above, or use the install instructions. Which are super simple.

>> curl https://mirrors.ocf.berkeley.edu/apache/flink/flink-1.12.2/flink-1.12.2-bin-scala_2.11.tgz -o flink-1.12.2-bin-scala_2.11.tgz
>> tar -xzf flink-1.12.2-bin-scala_2.11.tgz
>> cd flink-1.12.2
>> ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host Daniels-......
Starting taskexecutor daemon on host Daniels-.....

Flink + Python = PyFlink

Having a good Python package with documentation is always a great sign in my book.

pip3 install apache-flink

Quick PyFlink Table/SQL API Example

Let’s take a quick look at a simple batch ETL job. Can we do it with PyFlink and how easy is it?

My first few minute impressions about writing a PyFlink script is that it doesn’t feel smooth and Pythonic. But hey, no one’s perfect.

Firstly, when reading all the documentation and example’s, there is bunch of environment setup in Python that seems strange and remind of using Hadoop and Python. I downloaded a file from the free open source Divvy Bike Trips data set.

It looks like such…

ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual
A847FADBBC638E45,docked_bike,2020-04-26 17:45:14,2020-04-26 18:12:03,Eckhart Park,86,Lincoln Ave & Diversey Pkwy,152,41.8964,-87.661,41.9322,-87.6586,member
5405B80E996FF60D,docked_bike,2020-04-17 17:08:54,2020-04-17 17:17:03,Drake Ave & Fullerton Ave,503,Kosciuszko Park,499,41.9244,-87.7154,41.9306,-87.7238,member
5DD24A79A4E006F4,docked_bike,2020-04-01 17:54:13,2020-04-01 18:08:36,McClurg Ct & Erie St,142,Indiana Ave & Roosevelt Rd,255,41.8945,-87.6179,41.8679,-87.623,member

So now let’s write some PyFlink. Try to read in the sample data, and do a simple aggregation.

from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, EnvironmentSettings
from pyflink.table.expressions import col, lit

env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
table_env = BatchTableEnvironment.create(environment_settings=env_settings)

After those strange imports, it’s time to write the ETL part.

input = f"""
        create table bike_trips (
            ride_id varchar, rideable_type varchar, started_at varchar,
            ended_at varchar, start_station_name varchar, start_station_id varchar,
            end_station_name varchar, end_station_id varchar,
            start_lat varchar, start_lng varchar, end_lat varchar, end_lng varchar, member_casual varchar
        ) with (
            'connector' = 'filesystem',
            'format' = 'csv',
            'path' = '{input_file}'
        )
        """
table_env.execute_sql(input)

output = f"""
    create table `result`(
        rideable_type varchar,
        cnt BIGINT
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '{output_file}'
    )
    """
table_env.execute_sql(output)

trips = table_env.from_path("bike_trips")

trips.group_by(col('rideable_type')).select(trips.rideable_type, trips.ride_id.count.alias('cnt')) \
    .execute_insert('result').wait()

Anyone who is used to Spark and SparkSQL will recognize this right away. Registering a table with ANSI SQL, not big deal. Executing the SQL tied to a file seems straight forward.

Even the actual aggregations themselves are just like what any Spark user would expect.

trips.group_by(col('rideable_type')).select(trips.rideable_type, trips.ride_id.count.alias('cnt'))

To actually run the code, where Flink was installed.

>> ./bin/flink run --python test.py
Job has been submitted with JobID 7b3c04a8cfe8d602123f7da055540570

The results are pretty much what we were looking for. CSV file output with the following lines, just as expected, a count of rideable_types

rideable_type,1
docked_bike,84776

Musings

I know I just scratched the surface of Flink, but it was good to kick the tires. I wasn’t that impressed with the lack of documentation and directions on how to setup the imports and table and environment settings in PyFlink. It’s confusing and pretty much glossed over.

Once you get into reading data sources, and actually running transforms … it’s pretty much a mirror of Spark. Which is good and bad in a way. I’m sure Flink shines when it comes to event based processing … but it does make me wonder why exactly I should choose Flink over Spark unless i really have a use case centered around events and streaming transforms or aggregations … but then there is Spark Streaming.

Overall I look forward to learning more about Flink, it seems better then a lot of other new distributed systems, easy to use, good and obvious SQL apis and Python API’s will probably help it catch on more then some other tools.