Login | Register   
LinkedIn
Google+
Twitter
RSS Feed
Download our iPhone app
TODAY'S HEADLINES  |   ARTICLE ARCHIVE  |   FORUMS  |   TIP BANK
Browse DevX
Sign up for e-mail newsletters from DevX


advertisement
 

Exploring the Google Cloud Stream Processing Framework

Learn more about how to get started with creating pipelines in Python with Google's Dataflow framework.


advertisement

Google's Dataflow framework is a data processing engine that supports various types of data-related jobs: ETL, batch jobs and most--importantly stream processing. It is based a programming model that takes into account complex problems such as balancing latency vs. consistency, dealing with out of order or late data. You can run data flow pipelines on the Google cloud platform where efficient use of resources in abstracted and let you focus on solving the problems you care about and not fiddle with moving bits around. The Dataflow engine is implemented in Java, but recently a Python SDK was released. In this article I'll show how to get started with creating Dataflow pipelines in Python released. The data flow programming model consists of several concepts such as pipelines, PCollections, PTransforms and data sources and sinks. Before diving in let's see a quick example.

Dataflow Example

The Dataflow-zodiac project is all about stars. It takes a list of NBA all-star players and, based on their birth date, clusters them according their astrological signs. Here is the code of the main program zodiac.py

"""
import google.cloud.Dataflow as df

zodiac = dict(
    Aries=((3, 21), (4, 19)),
    Taurus=((4, 20), (5, 20)),
    Gemini=((5, 21), (6, 20)),
    Cancer=((6, 21), (7, 22)),
    Leo=((7, 23), (8, 22)),
    Virgo=((8, 23), (9, 22)),
    Libro=((9, 23), (10, 22)),
    Scorpio=((10, 23), (11, 21)),
    Sagittarius=((11, 22), (12, 21)),
    Capricron=((12, 21), (1, 19)),
    Aquarius=((1, 20), (2, 18)),
    Pisces=((2, 19), (3, 20))
)


def get_zodiac_sign(line):
    name, day, month = line.split(',')

    d = int(day)
    m = int(month)

    for sign, (s, e) in zodiac.iteritems():
        # special case for Capricorn
        if (m == 12 and d >= 21) or (m == 1 and d <= 19):
            return 'Capricorn'

        if s[0] <= m <= e[0]:
            if (m == s[0] and d >= s[1]) or (m == e[0] and d <= e[1]):
                return sign
    return


p = df.Pipeline('DirectPipelineRunner')
(p
 | df.Read('load messages', df.io.TextFileSource('./player_birth_dates.csv'))
 | df.Map('get zodiac sign', get_zodiac_sign)
 | df.combiners.Count.PerElement('count words')
 | df.Write('save', df.io.TextFileSink('./results')))
p.run()
"""


In the following sections I'll explain each part.

Pipelines

A Dataflow pipeline is a collection of steps that can read data from various sources, apply various transforms on the data and eventually write it out to data sinks. A pipeline doesn't have to be a linear set of steps. The processing can branch and merge forming a directed acyclic graph. The Python SDK provides a beautiful syntax using the pipe character "|" to construct the pipeline steps.

A pipeline is first defined and executed using the run() method. There are two pipeline runners. The DirectPipelineRunner runs the pipeline locally on your machine and is super useful for experimentation, development and debugging. The DataflowPipelineRunner and BlockingDataflowPipelineRunner execute the pipeline remotely in the Google cloud.

Here is the definition and execution of the zodiac pipeline:

p = df.Pipeline('DirectPipelineRunner')
(p
 | df.Read('load messages', df.io.TextFileSource('./player_birth_dates.csv'))
 | df.Map('get zodiac sign', get_zodiac_sign)
 | df.combiners.Count.PerElement('count signs')
 | df.Write('save', df.io.TextFileSink('./results')))
p.run()

PCollections

The data that flows through the pipeline adheres to a specific protocol and is represented as a PCollection object. The "P" stands for parallel and means that the data in a PCollection can be processed in parallel and has no internal dependencies. A PCollection belongs exclusively to its pipeline. It is a container for an arbitrary number of items that possibly can't all fit in memory. Steps of the pipeline communicate by consuming and producing PCollections. The PCollections in the zodiac pipeline are the initial lit of NBA players and their birth dates in the following format: "player name,day,month". Here are a few examples:

"""
Kobe Bryant,22,8
Tim Duncan,24,4
Kevin Garnett,18,5
Dirk Nowitzki,18,6
LeBron James,29,12
Dwyane Wade,16,1
Chris Bosh,23,3
Ray Allen,19,7
Paul Pierce,12,10
Carmelo Anthony,28,5
Chris Paul,5,5
Vince Carter,25,1
"""

There is no need for a year because it's not required for finding the astrological sign.

Another interim PCollection that is created is the list of signs and finally the result PCollection is a count of the occurrences of each sign.

"""
('Aries', 6)
('Taurus', 8)
('Gemini', 6)
('Cancer', 9)
('Leo', 6)
('Virgo', 5)
('Libro', 5)
('Scorpio', 3)
('Sagittarius', 3)
('Capricorn', 4)
('Aquarius', 9)
('Pisces', 10)
"""

PTransforms

The actual steps to process the data that flows through the pipeline are represented as PTransforms. A transform can perform arbitrary processing including mathematical or textual transforms, grouping, filtering, splitting and even combining multiple transforms. The nice thing about the Python SDK is that you can provide plain Python functions to operate as transforms or as parts of existing transforms. In the zodiac pipeline, the primary transform is mapping lines of text that contain a player name and a birth date to its astrological sign. It is invoked in the pipeline via:

| df.Map('get zodiac sign', get_zodiac_sign) 

The df.Map part is a standard Dataflow map transform, but how to map each item is specified by the get_zodiac_sign() function. This function has a lot of logic that has to do with parsing the input data arriving as a string and then checking which sign the birth date belongs to.

def get_zodiac_sign(line):
    name, day, month = line.split(',')

    d = int(day)
    m = int(month)

    for sign, (s, e) in zodiac.iteritems():
        # special case for Capricorn
        if (m == 12 and d >= 21) or (m == 1 and d <= 19):
            return 'Capricorn'

        if s[0] <= m <= e[0]:
            if (m == s[0] and d >= s[1]) or (m == e[0] and d <= e[1]):
                return sign
    return

Another standard transform used is the counter that just counts the occurrences of each astrological sign. It is invoked by this line:

| df.combiners.Count.PerElement('count signs')

Pipeline IO

The transforms get all the glory because they do all the cool stuff, but someone has to get the input data into the pipeline and get the output out of the pipeline for safe keeping, further analysis and visualizations. That's the job of the Read and Write transforms that support various data sources and data sinks such as text files, Google cloud storage, Big Query, Avro files and Google's Pub/Sub service.

In the zodiac pipeline the input data comes from the CSV file: player_birth_dates.csv. It is read by the first step in the pipeline:

| df.Read('load messages', df.io.TextFileSource('./player_birth_dates.csv')) 

This ends up as a PCollection in which each item is a line from the csv file that can be processed by the rest of the pipeline.

Once processing is done the pipeline writes the output to a file called 'results' via:

| df.Write('save', df.io.TextFileSink('./results'))) 

Conclusion

The Dataflow framework is a solid stream processing engine based on strong conceptual underpinnings. The Python SDK provide a very idiomatic interface to its capabilities and is easy to use and extend. Give it a try.



   
Gigi Sayfan is the chief platform architect of VRVIU, a start-up developing cutting-edge hardware + software technology in the virtual reality space. Gigi has been developing software professionally for 21 years in domains as diverse as instant messaging, morphing, chip fabrication process control, embedded multi-media application for game consoles, brain-inspired machine learning, custom browser development, web services for 3D distributed game platform, IoT/sensors and most recently virtual reality. He has written production code every day in many programming languages such as C, C++, C#, Python, Java, Delphi, Javascript and even Cobol and PowerBuilder for operating systems such as Windows (3.11 through 7), Linux, Mac OSX, Lynx (embedded) and Sony Playstation. His technical expertise includes databases, low-level networking, distributed systems, unorthodox user interfaces and general software development life cycle.
Comment and Contribute

 

 

 

 

 


(Maximum characters: 1200). You have 1200 characters left.

 

 

Sitemap
Thanks for your registration, follow us on our social networks to keep up-to-date