Google’s Dataflow framework?is a data processing engine that supports various types of data-related jobs: ETL, batch jobs and most–importantly stream processing. It is based a programming model that takes into account complex problems such as balancing latency vs. consistency, dealing with out of order or late data. You can run data flow pipelines on the Google cloud platform where efficient use of resources in abstracted and let you focus on solving the problems you care about and not fiddle with moving bits around. The Dataflow engine is implemented in Java, but recently a Python SDK was released. In this article I’ll show how to get started with creating Dataflow pipelines in Python released. The data flow programming model consists of several concepts such as pipelines, PCollections, PTransforms and data sources and sinks. Before diving in let’s see a quick example.
The Dataflow-zodiac project?is all about stars. It takes a list of NBA all-star players and, based on their birth date, clusters them according their astrological signs. Here is the code of the main program zodiac.py
"""import google.cloud.Dataflow as dfzodiac = dict( Aries=((3, 21), (4, 19)), Taurus=((4, 20), (5, 20)), Gemini=((5, 21), (6, 20)), Cancer=((6, 21), (7, 22)), Leo=((7, 23), (8, 22)), Virgo=((8, 23), (9, 22)), Libro=((9, 23), (10, 22)), Scorpio=((10, 23), (11, 21)), Sagittarius=((11, 22), (12, 21)), Capricron=((12, 21), (1, 19)), Aquarius=((1, 20), (2, 18)), Pisces=((2, 19), (3, 20)))def get_zodiac_sign(line): name, day, month = line.split(',') d = int(day) m = int(month) for sign, (s, e) in zodiac.iteritems(): # special case for Capricorn if (m == 12 and d >= 21) or (m == 1 and d <= 19): return 'Capricorn' if s <= m <= e: if (m == s and d >= s) or (m == e and d <= e): return sign returnp = df.Pipeline('DirectPipelineRunner')(p | df.Read('load messages', df.io.TextFileSource('./player_birth_dates.csv')) | df.Map('get zodiac sign', get_zodiac_sign) | df.combiners.Count.PerElement('count words') | df.Write('save', df.io.TextFileSink('./results')))p.run()"""
In the following sections I'll explain each part.
A Dataflow pipeline is a collection of steps that can read data from various sources, apply various transforms on the data and eventually write it out to data sinks. A pipeline doesn't have to be a linear set of steps. The processing can branch and merge forming a directed acyclic graph. The Python SDK provides a beautiful syntax using the pipe character "|" to construct the pipeline steps.
A pipeline is first defined and executed using the run() method. There are two pipeline runners. The DirectPipelineRunner runs the pipeline locally on your machine and is super useful for experimentation, development and debugging. The DataflowPipelineRunner and BlockingDataflowPipelineRunner execute the pipeline remotely in the Google cloud.
Here is the definition and execution of the zodiac pipeline:
p = df.Pipeline('DirectPipelineRunner')(p | df.Read('load messages', df.io.TextFileSource('./player_birth_dates.csv')) | df.Map('get zodiac sign', get_zodiac_sign) | df.combiners.Count.PerElement('count signs') | df.Write('save', df.io.TextFileSink('./results')))p.run()
The data that flows through the pipeline adheres to a specific protocol and is represented as a PCollection object. The "P" stands for parallel and means that the data in a PCollection can be processed in parallel and has no internal dependencies. A PCollection belongs exclusively to its pipeline. It is a container for an arbitrary number of items that possibly can't all fit in memory. Steps of the pipeline communicate by consuming and producing PCollections. The PCollections in the zodiac pipeline are the initial lit of NBA players and their birth dates in the following format: "player name,day,month". Here are a few examples:
"""Kobe Bryant,22,8Tim Duncan,24,4Kevin Garnett,18,5Dirk Nowitzki,18,6LeBron James,29,12Dwyane Wade,16,1Chris Bosh,23,3Ray Allen,19,7Paul Pierce,12,10Carmelo Anthony,28,5Chris Paul,5,5Vince Carter,25,1"""
There is no need for a year because it's not required for finding the astrological sign.
Another interim PCollection that is created is the list of signs and finally the result PCollection is a count of the occurrences of each sign.
"""('Aries', 6)('Taurus', 8)('Gemini', 6)('Cancer', 9)('Leo', 6)('Virgo', 5)('Libro', 5)('Scorpio', 3)('Sagittarius', 3)('Capricorn', 4)('Aquarius', 9)('Pisces', 10)"""
The actual steps to process the data that flows through the pipeline are represented as PTransforms. A transform can perform arbitrary processing including mathematical or textual transforms, grouping, filtering, splitting and even combining multiple transforms. The nice thing about the Python SDK is that you can provide plain Python functions to operate as transforms or as parts of existing transforms. In the zodiac pipeline, the primary transform is mapping lines of text that contain a player name and a birth date to its astrological sign. It is invoked in the pipeline via:
| df.Map('get zodiac sign', get_zodiac_sign)
The df.Map part is a standard Dataflow map transform, but how to map each item is specified by the get_zodiac_sign() function. This function has a lot of logic that has to do with parsing the input data arriving as a string and then checking which sign the birth date belongs to.
def get_zodiac_sign(line): name, day, month = line.split(',') d = int(day) m = int(month) for sign, (s, e) in zodiac.iteritems(): # special case for Capricorn if (m == 12 and d >= 21) or (m == 1 and d <= 19): return 'Capricorn' if s <= m <= e: if (m == s and d >= s) or (m == e and d <= e): return sign return
Another standard transform used is the counter that just counts the occurrences of each astrological sign. It is invoked by this line:
| df.combiners.Count.PerElement('count signs')
The transforms get all the glory because they do all the cool stuff, but someone has to get the input data into the pipeline and get the output out of the pipeline for safe keeping, further analysis and visualizations. That's the job of the Read and Write transforms that support various data sources and data sinks such as text files, Google cloud storage, Big Query, Avro files and Google's Pub/Sub service.
In the zodiac pipeline the input data comes from the CSV file: player_birth_dates.csv. It is read by the first step in the pipeline:
| df.Read('load messages', df.io.TextFileSource('./player_birth_dates.csv'))
This ends up as a PCollection in which each item is a line from the csv file that can be processed by the rest of the pipeline.
Once processing is done the pipeline writes the output to a file called 'results' via:
| df.Write('save', df.io.TextFileSink('./results')))
The Dataflow framework is a solid stream processing engine based on strong conceptual underpinnings. The Python SDK provide a very idiomatic interface to its capabilities and is easy to use and extend. Give it a try.