Exploring the Google Cloud Stream Processing Framework

Exploring the Google Cloud Stream Processing Framework

Google’s Dataflow framework?is a data processing engine that supports various types of data-related jobs: ETL, batch jobs and most–importantly stream processing. It is based a programming model that takes into account complex problems such as balancing latency vs. consistency, dealing with out of order or late data. You can run data flow pipelines on the Google cloud platform where efficient use of resources in abstracted and let you focus on solving the problems you care about and not fiddle with moving bits around. The Dataflow engine is implemented in Java, but recently a Python SDK was released. In this article I’ll show how to get started with creating Dataflow pipelines in Python released. The data flow programming model consists of several concepts such as pipelines, PCollections, PTransforms and data sources and sinks. Before diving in let’s see a quick example.

Dataflow Example

The Dataflow-zodiac project?is all about stars. It takes a list of NBA all-star players and, based on their birth date, clusters them according their astrological signs. Here is the code of the main program zodiac.py

"""import google.cloud.Dataflow as dfzodiac = dict(    Aries=((3, 21), (4, 19)),    Taurus=((4, 20), (5, 20)),    Gemini=((5, 21), (6, 20)),    Cancer=((6, 21), (7, 22)),    Leo=((7, 23), (8, 22)),    Virgo=((8, 23), (9, 22)),    Libro=((9, 23), (10, 22)),    Scorpio=((10, 23), (11, 21)),    Sagittarius=((11, 22), (12, 21)),    Capricron=((12, 21), (1, 19)),    Aquarius=((1, 20), (2, 18)),    Pisces=((2, 19), (3, 20)))def get_zodiac_sign(line):    name, day, month = line.split(',')    d = int(day)    m = int(month)    for sign, (s, e) in zodiac.iteritems():        # special case for Capricorn        if (m == 12 and d >= 21) or (m == 1 and d <= 19):            return 'Capricorn'        if s[0] <= m <= e[0]:            if (m == s[0] and d >= s[1]) or (m == e[0] and d <= e[1]):                return sign    returnp = df.Pipeline('DirectPipelineRunner')(p | df.Read('load messages', df.io.TextFileSource('./player_birth_dates.csv')) | df.Map('get zodiac sign', get_zodiac_sign) | df.combiners.Count.PerElement('count words') | df.Write('save', df.io.TextFileSink('./results')))p.run()"""

In the following sections I'll explain each part.


A Dataflow pipeline is a collection of steps that can read data from various sources, apply various transforms on the data and eventually write it out to data sinks. A pipeline doesn't have to be a linear set of steps. The processing can branch and merge forming a directed acyclic graph. The Python SDK provides a beautiful syntax using the pipe character "|" to construct the pipeline steps.

A pipeline is first defined and executed using the run() method. There are two pipeline runners. The DirectPipelineRunner runs the pipeline locally on your machine and is super useful for experimentation, development and debugging. The DataflowPipelineRunner and BlockingDataflowPipelineRunner execute the pipeline remotely in the Google cloud.

Here is the definition and execution of the zodiac pipeline:

p = df.Pipeline('DirectPipelineRunner')(p | df.Read('load messages', df.io.TextFileSource('./player_birth_dates.csv')) | df.Map('get zodiac sign', get_zodiac_sign) | df.combiners.Count.PerElement('count signs') | df.Write('save', df.io.TextFileSink('./results')))p.run()


The data that flows through the pipeline adheres to a specific protocol and is represented as a PCollection object. The "P" stands for parallel and means that the data in a PCollection can be processed in parallel and has no internal dependencies. A PCollection belongs exclusively to its pipeline. It is a container for an arbitrary number of items that possibly can't all fit in memory. Steps of the pipeline communicate by consuming and producing PCollections. The PCollections in the zodiac pipeline are the initial lit of NBA players and their birth dates in the following format: "player name,day,month". Here are a few examples:

"""Kobe Bryant,22,8Tim Duncan,24,4Kevin Garnett,18,5Dirk Nowitzki,18,6LeBron James,29,12Dwyane Wade,16,1Chris Bosh,23,3Ray Allen,19,7Paul Pierce,12,10Carmelo Anthony,28,5Chris Paul,5,5Vince Carter,25,1"""

There is no need for a year because it's not required for finding the astrological sign.

Another interim PCollection that is created is the list of signs and finally the result PCollection is a count of the occurrences of each sign.

"""('Aries', 6)('Taurus', 8)('Gemini', 6)('Cancer', 9)('Leo', 6)('Virgo', 5)('Libro', 5)('Scorpio', 3)('Sagittarius', 3)('Capricorn', 4)('Aquarius', 9)('Pisces', 10)"""


The actual steps to process the data that flows through the pipeline are represented as PTransforms. A transform can perform arbitrary processing including mathematical or textual transforms, grouping, filtering, splitting and even combining multiple transforms. The nice thing about the Python SDK is that you can provide plain Python functions to operate as transforms or as parts of existing transforms. In the zodiac pipeline, the primary transform is mapping lines of text that contain a player name and a birth date to its astrological sign. It is invoked in the pipeline via:

| df.Map('get zodiac sign', get_zodiac_sign) 

The df.Map part is a standard Dataflow map transform, but how to map each item is specified by the get_zodiac_sign() function. This function has a lot of logic that has to do with parsing the input data arriving as a string and then checking which sign the birth date belongs to.

def get_zodiac_sign(line):    name, day, month = line.split(',')    d = int(day)    m = int(month)    for sign, (s, e) in zodiac.iteritems():        # special case for Capricorn        if (m == 12 and d >= 21) or (m == 1 and d <= 19):            return 'Capricorn'        if s[0] <= m <= e[0]:            if (m == s[0] and d >= s[1]) or (m == e[0] and d <= e[1]):                return sign    return

Another standard transform used is the counter that just counts the occurrences of each astrological sign. It is invoked by this line:

| df.combiners.Count.PerElement('count signs')

Pipeline IO

The transforms get all the glory because they do all the cool stuff, but someone has to get the input data into the pipeline and get the output out of the pipeline for safe keeping, further analysis and visualizations. That's the job of the Read and Write transforms that support various data sources and data sinks such as text files, Google cloud storage, Big Query, Avro files and Google's Pub/Sub service.

In the zodiac pipeline the input data comes from the CSV file: player_birth_dates.csv. It is read by the first step in the pipeline:

| df.Read('load messages', df.io.TextFileSource('./player_birth_dates.csv')) 

This ends up as a PCollection in which each item is a line from the csv file that can be processed by the rest of the pipeline.

Once processing is done the pipeline writes the output to a file called 'results' via:

| df.Write('save', df.io.TextFileSink('./results'))) 


The Dataflow framework is a solid stream processing engine based on strong conceptual underpinnings. The Python SDK provide a very idiomatic interface to its capabilities and is easy to use and extend. Give it a try.

Share the Post:
XDR solutions

The Benefits of Using XDR Solutions

Cybercriminals constantly adapt their strategies, developing newer, more powerful, and intelligent ways to attack your network. Since security professionals must innovate as well, more conventional endpoint detection solutions have evolved

AI is revolutionizing fraud detection

How AI is Revolutionizing Fraud Detection

Artificial intelligence – commonly known as AI – means a form of technology with multiple uses. As a result, it has become extremely valuable to a number of businesses across

AI innovation

Companies Leading AI Innovation in 2023

Artificial intelligence (AI) has been transforming industries and revolutionizing business operations. AI’s potential to enhance efficiency and productivity has become crucial to many businesses. As we move into 2023, several

data fivetran pricing

Fivetran Pricing Explained

One of the biggest trends of the 21st century is the massive surge in analytics. Analytics is the process of utilizing data to drive future decision-making. With so much of

kubernetes logging

Kubernetes Logging: What You Need to Know

Kubernetes from Google is one of the most popular open-source and free container management solutions made to make managing and deploying applications easier. It has a solid architecture that makes

ransomware cyber attack

Why Is Ransomware Such a Major Threat?

One of the most significant cyber threats faced by modern organizations is a ransomware attack. Ransomware attacks have grown in both sophistication and frequency over the past few years, forcing

data dictionary

Tools You Need to Make a Data Dictionary

Data dictionaries are crucial for organizations of all sizes that deal with large amounts of data. they are centralized repositories of all the data in organizations, including metadata such as