Partitioning values in an array

You have a large, potentially huge array of objects, in a random order. You want to split the array in two parts: the lower half with objects matching the condition, the upper half with objects not matching the condition. This operation is called the partitioning of an array.

The above paragraph is a very abstract formulation, so let's make a concrete example: You have an array of random integers in the range [0..100]. The first half of the array should be all numbers < 50, the second part numbers >= 50.

Now, if you think: why is this topic under the heading Searching, bear with me: partitioning lies at the heart of the quicksort algorithm

Ask yourself: Do I understand the problem?

These conditions are fairly similar to those in the problem Finding a duplicate value in an array, but we'll use a totally different approach.

Setting up a test harness

As usual, our first step is to prepare a testbed. Our code strategy for the test will be this:

Implementing these test requirements borders on the trivial:

#! -*- Encoding: Latin-1 -*-

def selftest():
    A = [random.randrange(100) for i in range(1000)]
    A, n = your-partition-function-here(A, 50)
    assert is_partitioned(A, n, 50)
    
if __name__ == "__main__":
    selftest()

This code creates 1.000 random integers in the range 0..100, partitions it, and then verifies that it is indeed partitioned. How do we do verify this?

def is_partitioned(A, n, pel_value):
    for index, item in enumerate(A):
        if index < n: # lower half
            if item >= pel_value: # -> Items must be < pel_value
                return False

        else: # upper half
            if item < pel_value: # -> Items must be >= pel_value
                return False
            
    return True # if no item violates our condition, we have verified the code

That was not too bad. Now, let's try to actually partition the data.

Brute-Force approach

Let's try brute-force first: There are two lists: one for items less than the pivot, and one for items greater than or equal to the pivot. We test each item in the array: if it is less than the pivot, it goes to that list, or else to the other list. At the end, we concatenate the two lists, and return the new list plus the number of items in the lower half. With this description, the code is deceptively simple:

def partition_bf(A, pivot_value):
    items_less_than = []
    items_greater_than_or_equal = []
    
    for item in A:
        if item < pivot_value:
            items_less_than.append( item )
        else:
            items_greater_than_or_equal.append( item )
    
    A = items_less_than + items_greater_than_or_equal
    return A, len(items_less_than)

Some notes on this implementation:

Now, think about how this algorithm could be improved. There is little we can do with the time complexity: after all, if we want to partition all items, we must visit each item at least once, so any optimum solution will be O(N) time complexity. But can we reduce the amount of space we need?

Other strategies

Let's briefly revisit other common strategys and see if they could help with our problem.

So for now we are going to stick with our basic approach and see how we can improve upon it.

Inplace partitioning

Well, we somehow need to find a way to partition the array inplace. The first step to do this is to change our call interface: right now, the code returns a new array and the length. The new interface will get start / stop indices in the array, and not expect an udpated return. This changes our test suite:

#! -*- Encoding: Latin-1 -*-

def selftest():
    A = [random.randrange(100) for i in range(1000)]
    n = your-partition-function-here(A, 0, len(A)-1, 50)
    assert is_partitioned(A, n, 50)
    
if __name__ == "__main__":
    selftest()

You will notice that we pass a stop index that is within the boundaries of the array A. In a first step, we shall keep our main partition logic intact, but change the way we return the results, like this:

def partition_inplace_with_additional_memory(A, start, stop, pivot_index):
    items_less_than = []
    items_greater_than_or_equal = []
    
    read_index = start
    while read_index <= stop:
        item = A[read_index]
        if item < pivot_value:
            items_less_than.append( item )
        else:
            items_greater_than_or_equal.append( item )
        read_index += 1

    write_index = start

    for item in items_less_than:
        A[write_index] = item
        write_index += 1

    for item in items_greater_than_or_equal:
        A[write_index] = item
        write_index += 1

    return len(items_less_than)

We haven't reduced any size complexity just yet, we have just verified that our solution could also work inplace. So that is going to be the focus of our next paragaph

Inplace partitioning without additional memory

Now, it turns out that you can do the modification without needing any additional memory. I think the most instructive way to see this is by studying the following sequence. We have an array of 6 items, and we want to partition that array around 50. So we want all values < 50 in the first half, then all values >= 50.

We need to keep one insertion index, marked with green background below, and one current element pointer, marked in boldface.

Step 0 before226531507749 Current item 22 is < 50, so we need to swap it with the item at the insertion position, which happens to be 22 as well.
Step 0 after226531507749 Plus we move the insertion index one up.
Step 1226531507749 Current item 65 is >= 50, so we skip it.
Step 2 before226531507749 Current item 31 is < 50, so we need to swap it with the item at the insertion position, which happens to be 65.
Step 2 after223165507749 Plus we move the insertion index one up.
Step 3223165507749 Current item 50 is >= 50, so we skip it.
Step 4223165507749 Current item 77 is >= 50, so we skip it.
Step 5 before223165507749 Current item 49 is < 50, so we need to swap it with the item at the insertion position, which happens to be 65
Step 5 after223149507765 Plus we move the insertion index one up.

Given this informal description, the code is actually pretty intuitive:

def partition_inplace_value(A, start, stop, pel_value):

    # enumerate each item
    read_index = start
    while read_index <= stop:
        item = A[read_index]

        # if the current item is less than the pivot value
        if item < pel_value:

            # swap it at the insertion position
            A[start], A[read_index] = A[read_index], A[start]

            # and move the insertion position one up
            start += 1

        read_index += 1

    return start

Abstraction time: from less-than to arbitrary functions

The actual topic of the initial problem was to partition elements around an abstract critera. But soon we introduced less-than as a concrete criteria. Let's consider the impact of using our algorithms for different types of criterias. Why don't we relax, take a deep breath, and think things over?

Looking at the code for partition_inplace_value, it becomes pretty obvious that we can replace less-than by an abstract criteria, like this:

def partition_inplace_policy(A, start, stop, policy):

    # enumerate each item
    read_index = start
    while read_index <= stop:
        item = A[read_index]

        # if the policy matches the current item
        if policy(A[index]):

            # swap it at the insertion position
            A[start], A[read_index] = A[read_index], A[start]

            # and move the insertion position one up
            start += 1

        read_index += 1

    return start

For example, to partition the array in a section of even numbers, and a section of odd numbers, we could define a simple criteria like this:

def even_or_odd(a):
    return a % 2 == 0

Or, we could partition the array in prime numbers, and non-prime numbers:

def is_prime(a):

    if a < 2:
        return False

    if a == 2:
        return True

    for x in xrange(2, int(n**0.5)+1):
        if n % x == 0:
            return False
    return True

And so on. Pretty nifty, actually! And because python allows local functions, you can even do stuff like this:

    ...
    A = [random.randrange(5) for i in range(20)]
    pel_value = A[10]

    def partition_around_pel(x):
        return x < pel_value

    n = partition_inplace_policy(A, 0, len(A)-1, partition_around_pel)
    print A[:n], A[n:]

Partitioning with three conditions

Now, assume that you have not only two conditions, but three. For example, you could want to have primes, even non-primes and odd non-primes. Or, you could have an array of colors, and you want to separate by Red hues, Green hues, and Blue hues. Can we do this with our generalized approach?

Let us first define the test function:

    ...
    A = [random.randrange(5) for i in range(20)]
    pel_value = A[10]

    def partition_around_tripel(x):
        if x < pel_value:
            return 0

        if x == pel_value:
            return 1

        assert x > pel_value
        return 2

    n, m = partition_inplace_three_conditions(A, 0, len(A)-1, partition_around_tripel)
    print A[:n], A[n:m], A[m:]

That was the easy part. Now the difficult part: the partition function. We need to have a policy function that returns three values, identifying the three conditions. Condition 0 is placed at the bottom of the array. Condition 1 is placed in the middle. And, the tricky bit, condition 2 is placed from the top down. But that means that we a) change our upper boundary (because we never re-visit items sorted actively to the top of the array), and b) we need to change our main loop and revisit some items. Maybe looking at the code will make this more clear:

def partition_inplace_three_conditions(A, start, stop, policy):
    insert_pos = start
    top_pos = stop
    index = start
    while index <= stop:
        v = policy(A[index])
        if v == 0:
            A[insert_pos], A[index] = A[index], A[insert_pos]
            insert_pos += 1
            index += 1

        elif v == 1:
            index += 1

        elif v == 2:
            A[top_pos], A[index] = A[index], A[top_pos]
            top_pos -= 1
            stop -= 1

    return insert_pos, top_pos+1

There is a famous problem in Computer Science, called the Dutch National Flag Problem, which we have just solved.

Special case: array of zeros and ones

So far our solutions have considered general cases, now let's briefly turn to special cases. One is the following: Assume that rather than an array of random integers, you have an array of only zeros and ones. And you should partition it around the pivot element 1, meaning you have 0s first, and 1s next. The algorithms above will work for this, but there is an even faster way: count the numbers of 0s and 1s. Then recreate the array, with n zeros and m ones. This is really very much the same idea as the one behind Counting Sort. The Python code for this reads:

def partition_zero_ones(A):

    count = [0, 0]
    for index, item in enumerate(A):
        count[item] += 1

    for i in range(count[0]):
        A[i] = 0

    for i in range(count[0], len(A)):
        A[i] = 1

    return count[0]

You can use a similar approach any time when you a) can easily recreate items from scratch, and b) there are very few items in the array, but the array is potentially huge).