### WEBINAR:

On-Demand

Full Text Search: The Key to Better Natural Language Queries for NoSQL in Node.js

#### New Modules

OK, that's enough boring stuff. Python 3.0 introduces some exciting new modules too (these are all distributed with Python 2.6 as well).

### The *processing* Module

Python's threading model (the CPython implementation) can't utilize multiple cores. This is a serious drawback for a general-purpose programming language in today's world, where multi-code processing is becoming mandatory. Due to Python's design—the infamous Global Interpreter Lock (GIL)—it is extremely difficult to fix. A lot of digital ink has been spilled about it. The official position is that there are better ways to do parallel programming than multithreading. I happen to agree with the management this time around. Enter the processing module. This module presents an interface almost identical to the threading module, which is the standard Python multithreading interface. However instead of running jobs in different threads in the same process it launches multiple processes. The processing module is not a panacea though; it has some stringent requirements. Often, you can't simply replace Thread with Process in multithreaded code.

The processing module supports many programming models and interprocess communication models. The main class you work with is of course the Process, which is the equivalent of the Thread class in the threading module. When you create a Process you pass it a function and arguments. After you call

start(), it creates a new process and executes the function with the provided arguments in the new process. After the function completes, the processing module terminates the process.

The parent process can exchange information with the child processes in multiple ways. Here's an example that uses a Manager object to pass a shared list to multiple child processes that simply write their process ID (obtained via

os.getpid()). The Manager runs in a server process and communicates via Python objects such as lists and dictionaries:

```
import os
import multiprocessing
from multiprocessing import Process, Manager
def process(id, results):
results[id] = os.getpid()
cpu_count = multiprocessing.cpu_count()
manager = Manager()
results = manager.list([0] * cpu_count)
processes = []
for i in range(cpu_count):
p = Process(target=process, args=[i, results])
p.start()
processes.append(p)
for p in processes:
p.join()
print(results)
```

Each child process runs the

process() function. The important utility function

cpu_count() tells you how many cores are available. It is often useful to create as many processes as the available cores: If you create fewer, you underutilize your hardware, and if you create more, the processes must share the available cores. The code creates a process for each available core and passes it both the results list obtained from the manager and the ID of each process. This allows each child process to know its place and write to a separate entry in the list. Each process is started, and then the parent process waits for the processes to finish, using the

join() method that blocks until the target process is done.

That's nice code, but it contains a lot of ceremony. The processing module also has a Pool class that works at a little higher level of abstraction. It represents a pool of worker processes that allows you to run parallel tasks and collect the results easily via a

map() function that operates just like the standard

map() except that it runs on multiple processes. You'll find this functionality particularly suitable for embarrassingly parallel scenarios where you can divide your data into independent chunks and feed each chunk to a worker process, which processes it and returns the results.

For example, suppose you have a big database of textual works, and you want to find all the palindromes in it (a palindrome being a word (or phrase, but this deals only with single words) that reads the same whether read forward or backward, such as "dad" or "noon").

First, you need to make some words. I tried to use generators, but I discovered that you can't pass a generator as input to a process—that's a limitation of the processing module. Generally, when you pass an object to a child process the object is pickled and the process received the serialized object, which it unpickles. Unpicklable objects can't be used.

The following code shows an example. It takes a generator (the

generate_words() function) and converts it to a function that returns a plain list using a partial application of the

listify decorator. The example demonstrates the type of preprocessing you will have to do to prepare your data to be passed to worker processes:

```
def generate_words(word_count, word_length):
a = ord('a')
for i in range(word_count):
word = ''
for j in range(word_length):
word += chr(a + random.randint(0, 25))
yield word
def listify(f, *args, **kwdargs):
return list(f(*args, **kwdargs))
make_words = partial(listify, generate_words)
```

The goal here is to find all the palindromes in a list of 8,000 4-letter words. The work gets split between two processes, so the 8,000 words gets split into two 4,000 word lists. This is clearly an embarrassingly parallel problem:

```
input_list = [make_words(4000, 4) for x in range(2)]
```

The function that finds palindromes in a word list is simple: It basically checks if the first half of each word is equivalent to its reversed second half:

```
def find_palindromes(words):
results = []
for w in words:
half = len(w) // 2 # Note the explicit floor division
if w[:half] == w[-half:][::-1]:
results += [w]
return results
```

Running two such palindrome finders in parallel is as easy as instantiating the Pool and calling

map() on the input list. There are two items in the input list (the two 4000 word lists) so two processes will be created and run under the covers:

```
pool = Pool()
results = pool.map(find_palindromes, input_list)
print(results)
```

Passing large amounts of data to/from your processes (as in this case) erodes the performance boost you get from running in parallel. You will have to experiment and measure to verify that your code doesn't actually run more slowly due to I/O requirements.

In many real-world problems you need to interact with the worker processes and send/receive information during the computation. A perfect example is finding prime numbers. One of the earliest and most famous algorithms is the

Sieve of Eratosthenes, a very elegant algorithm that uses only addition to filter out non-primes. Here's a little Python implementation that includes a few optimizations, such as starting only with the odd numbers (the prime

2 gets added at the end) and skipping multiples of the current prime.

```
def prime_sieve(n):
# start with the odd numbers >= 3 (we'll add 2 in the end)
primes = list(range(3, n+1, 2))
# It's enough to go until the square root of n.
limit = int(n ** 0.5)
if limit % 2 == 0:
limit -= 1
for i in range(limit // 2):
cur_prime = primes[i]
if cur_prime == 0:
continue
# index i is our next prime, skip it and first at the next multiply.
for j in range(cur_prime, len(primes) - i, cur_prime):
# Zero out multiples of the current prime
primes[j + i] = 0
# That's it. All the non-zeroes are primes.
return [2] + [p for p in primes if p > 0]
primes = prime_sieve(80)
print(primes)
```

Now, the question is how to make this algorithm parallel so it can execute using multiple simultaneous processes. It's pretty clear that you will need to divide the integer space between all the processes, letting each process work on a separate range. For example, if you want to find all the primes up to 80, and you have two processors, you can decide that one will be in charge of all the primes up to 40, and the second will take charge of all the primes from 41 to 80. Unfortunately, the sieve algorithm works only when you start from the beginning, because it has to filter out all the primes found so far. The process responsible for 41-80 needs to know about the primes below 40 found by the first process. This means that a proper parallel sieve algorithm needs to pass information between processes.

In addition, you need to collect the results somehow. The algorithm below uses a main process to divide the ranges and instantiate child processes. It also establishes pipes to communicate with the child processes. The general flow is that whenever a process finishes processing its range, it sends all the primes it found to the main process. The main process sends all these primes to downstream processes. As each process completes, it sends its primes back to the main process, and so forth.

First, here's the code for the

sieve() function that each child process runs:

```
def sieve(first, last, conn):
start = time.time()
numbers = list(range(first, last))
while (True):
number, skip = conn.recv()
if number == -1:
# start working on its own primes
for i, n in enumerate(numbers):
if n == 0 or n * n > last:
continue
for j in range(i + n, len(numbers), n):
numbers[j] = 0
conn.send([n for n in numbers if n > 0])
return
else:
index = number % first
i = None
for i in range(index, len(numbers), skip):
numbers[i] = 0
```

Note the connection object used to receive and send information bidirectionally to the main process. When the process receives

-1, that means no more primes are coming from upstream processes and it can send its own primes to the main process and complete. When it receives an initial number and skip value (a prime number), it zeroes all the intervals of the skip number, starting with the initial number.

The logic for the main process is in the

collect_primes() function. It takes a list of ranges, creates a child process and a pipe for each range and launches each process with the proper range and connection. Then it ignites the process by sending the known primes

2,

3,

5, and

7 to all the processes and starts the main loop, which tells the current process that no more primes are coming, and then waits to get its primes, which in turn are sent to subsequent processes. This procedure repeats until all the child processes have returned their primes, collected in the

all_primes list. The main process waits for the child processes to finish using the

join() method:

```
def collect_primes(ranges):
"""Start multiple processes and calculate all the
primes in the input ranges
ranges - a list of pairs that represent a contiguous range of integers
"""
# Init (prepare pipes, processes, ranges and conections)
pipes = []
parent_conn, child_conn = Pipe()
pipes.append(parent_conn)
processes = []
conns = []
for i, r in enumerate(ranges):
parent_conn, child_conn = Pipe()
conns.append(parent_conn)
p = Process(target=sieve, args=[r[0], r[1], child_conn])
processes += [p]
p.start()
# A list of the first number each process is reponsible for
first_numbers = [r[0] for r in ranges]
last_numbers = [r[1] for r in ranges]
# Initial known primes
all_primes = [2, 3, 5, 7]
# The primes to send to the current process
primes = all_primes
for i in range(len(conns)):
# Send primes to all downstream processes
send_primes(primes, conns[i:], zip(first_numbers[i:],
last_numbers[i:]))
# Tell the current process no more primes are coming
conns[i].send((-1, None))
# Receive the primes of the current process
primes = conns[i].recv()
all_primes += primes
# All processes should be done by now
for p in processes:
p.join()
return all_primes
```

The main process uses a utility function called

send_primes(), shown below for completeness.

```
def send_primes(primes, conns, limits):
for prime in primes:
for conn, limit in zip(conns, limits):
n = limit[0]
last = limit[1]
if n % prime == 0:
number = n
else:
number = n + (prime - n % prime)
if number < last:
conn.send((number, prime))
```

Here's

collect_primes() in action, computing all primes below 50 using two sub-processes:

```
>>> collect_primes(ranges=((10, 30), (30, 50)))
[2, 3, 5, 7, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 35, 37, 39, 41, 43, 45, 47, 49]
```

The processing module is very flexible and powerful, but it requires a solid understanding of parallel programming concepts, and also has some special requirements compared to the threading module. For example, as mentioned earlier, objects that you want to share between processes via Pipe or Queue must be picklable. This turns out to be a very harsh requirement in many real-world situations.

**Author's Note**: In a recent project I needed to run some computations in parallel. The Python objects that performed the computation used C++ extensions, which are not picklable. I was in quite a pickle to say the least. I had to implement an additional layer on top of the C++ extensions that extracted the essential information from each unpicklable object into a serialized form, and then implement a factory to instantiate new objects based on that serialized form. This is not only considerable extra work, but it has performance implications, too. In my case, the required objects were very expensive to construct. |

The limitations and restrictions are detailed in the

programming guidelines section of the documentation. Make sure you read it if you plan to use this module.