Categories
Challenges Programming Python Statistics

Small programming challenge No. 6 – nblocks

I came up with this challenge when I had to write a function to divide a sequence to percentiles. I needed this to calculate some statistics over trips for plnnr.com. “This sounds trivial” I thought, and reached for my simple blocks function:

def blocks(seq, block_len):
    """blocks(range(5),2) -> [[0, 1], [2, 3], [4]]"""
    seq_len = len(seq)
    if seq_len%block_len == 0:
        num_blocks = seq_len/block_len
    else:
        num_blocks = 1 + (seq_len/block_len)
 
    result = [[] for i in xrange(num_blocks)]
    for idx, obj in enumerate(seq):
        result[idx/block_len].append(obj)
    return result

So I set block_len to len(seq)/10 and called blocks(seq, block_len). Unfortunately, according to the docs of blocks (which I wrote…), when there is extra data, a “partial” block is added – which is exactly what we don’t want when calculating percentiles.
Instead, the behavior we want is nblocks(seq, number_of_blocks), for example: nblocks(range(10), 3) -> [0, 1, 2], [3, 4, 5], [6, 7, 8, 9].

This is a surprisingly hard to write function, or rather, harder than you’d expect. I’ll be especially glad if someone writes it elegantly. My solution works well enough, but isn’t the prettiest.

So, you have the definition – let’s see if you can do better than me. Once enough solutions are presented, I will present my own.

IMPORTANT EDIT: the required signature is nblocks(seq, num_blocks). So for seq(range(10), 3), the return value should be 3 blocks, with the last one having an extra item. As a general rule, the extra items should be spread as evenly as possible.

22 replies on “Small programming challenge No. 6 – nblocks”

First stab at it:

def blocks(seq, block_len):
     out = []
     curr = []
     for item in seq:
             curr.append(item)
             if len(curr) == block_len:
                     out.append(curr)
                     curr = []
     return out

Then I did a recursive solution to see if I could make it more compact:

def rblocks(seq, curr_block, block_len):
     if len(curr_block) == block_len:
             return [curr_block] + rblocks(seq, [], block_len)
     if seq:
             return rblocks(seq[1:], curr_block+seq[0:1], block_len)
     return []

How’s this?

def blocks(seq, block_len):
    return map(lambda block_start: seq[block_start:block_start + block_len], xrange(0, len(seq), block_len))
 
 
def nblocks(seq, block_len):
    result = blocks(seq, block_len)
    if not (len(result) in (0, 1) or len(result[-1]) == block_len):
        last_block = result[-1]
        del(result[-1])
        result[-1].extend(last_block)
    return result

Thank you all for your solutions!

Jeremy: I think I didn’t explain myself well enough: I wasn’t aiming for a better “blocks” function. I was aiming for a “nblocks” function, whose signature is nblocks(seq, num_blocks).

Lucas: your solution is problematic, as for nblocks(range(10), 3), the last element was dropped.

Yuv: try to write it as nblocks(seq, num_blocks) :)

I will add an edit to the blog post to make the requried signature more clear :)

Ok, let’s try…

from itertools import islice, groupby, count
 
def blocks_islice(seq, seq_len, block_len):
    remainder = seq_len % block_len
    max_index = (seq_len / block_len) - 1
    for index in count():
        if index < max_index:
            yield list(islice(seq, 0, block_len))
        else:
            yield list(islice(seq, 0, block_len + remainder))
            break
 
def blocks_groupby(seq, seq_len, block_len):
    last_block = []
    max_index = (seq_len / block_len) - 1
    for k, block in groupby(enumerate(seq), lambda x: x[0] / block_len):
        result_block = [i[1] for i in block]
        if k < max_index:
            yield result_block
        else:
            last_block += result_block
    yield last_block
 
x = range(10)
print list(blocks_islice(iter(x), len(x), 3))
print list(blocks_groupby(x, len(x), 3))

Hmm, not really elegant. Having to know the length of the sequence seems unavoidable. And izip(*[seq] * block_len) looks promising, but i dunno how to make it work :).

Ephes:
Thanks for the solution! However, that’s not quite what I was looking for :) See again my update and my comment above: the signature should be nblocks(seq, num_blocks). So for the “usual” percentiles, it’d be nblocks(seq, 10).
Regarding the length of the sequence: you are right, it’s unavoidable to know it but that’s ok. (It’s avoidable only if you’re willing to do some slightly ugly hacks :)

def blocks(seq, block_len):
    return map(lambda block_start: seq[block_start:block_start + block_len], xrange(0, len(seq), block_len))
 
 
def nblocks(seq, num_blocks):
    r = len(seq) % num_blocks
    block_len = len(seq) / num_blocks
    end_small_blocks = block_len * (num_blocks - r)
    result = blocks(seq[:end_small_blocks], block_len)
    result.extend(blocks(seq[end_small_blocks:], block_len + 1))
    return result
 
&gt;&gt;&gt; nblocks(range(10), 3)
[[0, 1, 2], [3, 4, 5], [6, 7, 8, 9]]
&gt;&gt;&gt; map(len, nblocks(range(17*9), 10))
[15, 15, 15, 15, 15, 15, 15, 16, 16, 16]

Is that the function you’re looking for?

I’m not sure what you mean by “As a general rule, the extra items should be spread as evenly as possible.”

Here’s my solution with my own interpretation of that statement:


def blocks(seq, block_len):
rest = seq
result = []
while len(rest) > block_len:
result.append(rest[:block_len])
rest = rest[block_len:]
#now distribute
for i, v in enumerate(rest):
result[-(i+1)].append(v)
return result

Oops. Let me try it again with the pre tag.

def blocks(seq, block_len):
    rest = seq
    result = []
    while len(rest) &gt; block_len:
        result.append(rest[:block_len])
        rest = rest[block_len:]
    #now distribute
    for i, v in enumerate(rest):
        result[-(i+1)].append(v)
    return result

Breshenham to the rescue!


def nblocks( seq, block_len ):
assert seq
l = []
error = 0.0
deltaerr = float(block_len) / len(seq)

for item in seq:
l.append( item )
error += deltaerr
if error >= 1.0:
yield l
l = []
error -= 1.0

if l:
yield l

&gt;&gt;&gt; list( nblocks( range(10), 3) )
[[0, 1, 2, 3], [4, 5, 6], [7, 8, 9]]
&gt;&gt;&gt; list( nblocks( range(9), 3) )
[[0, 1, 2], [3, 4, 5], [6, 7, 8]]
&gt;&gt;&gt; map(len, nblocks(range(17*9), 10))
[16, 15, 15, 16, 15, 15, 16, 15, 15, 15]

( Provides better distribution )

Shouldn’t have used the code tag… Trying again:

Breshenham to the rescue!

def nblocks( seq, block_len ):
    assert seq
    l = []
    error = 0.0
    deltaerr = float(block_len) / len(seq)
 
    for item in seq:
        l.append( item )
        error += deltaerr
        if error &gt;= 1.0:
            yield l
            l = []
            error -= 1.0
 
    if l:
        yield l

>>> list( try2.nblocks( range(10), 3) )
[[0, 1, 2, 3], [4, 5, 6], [7, 8, 9]]
>>> list( try2.nblocks( range(9), 3) )
[[0, 1, 2], [3, 4, 5], [6, 7, 8]]
>>> map(len, try2.nblocks(range(17*9), 10))
[16, 15, 15, 16, 15, 15, 16, 15, 15, 15]

( Provides better distribution )

Everyone: It seems I wasn’t clear in my explanation of the challenge – it is to write a function that receives as parameter the *number of blocks*, and not the length of the block.

I tried to improve the situation in the blog post, by emphasizing the required signature, let me know if you think it’s still unclear.

Yeah, my bad. Rename block_len to block_count. It does what you asked, I just didn’t pay enough attention to the naming.

For instance:

&gt;&gt;&gt; list( nblocks(range(10), 7) )
[[0, 1], [2], [3, 4], [5], [6, 7], [8], [9]]

here are my two cents :

def lblocks(length, number_of_blocks):
	average, extra = divmod(length, number_of_blocks)
	nx= number_of_blocks - extra
	return [(i+1)*average +(i-nx+1 if i&gt;=nx else 0) for i in range(number_of_blocks)]
 
def nblocks(seq, number_of_blocks) :
    """nblocks(range(10), 3) -&gt; [0, 1, 2], [3, 4, 5], [6, 7, 8, 9]"""
    lengths = lblocks(len(seq), number_of_blocks)
    return [    seq[j:k] for j,k in zip([0]+lengths, lengths )]

the idea is to work on “lengths” first: average is length/nb_blocks, and extra, is to be dispatched.
Then I cumulate the lengths to turn them in indices

that’s the lblock function.

the nblock is then straightforward.

little improvement. I didn’t get the “even” part !

def nblocks(seq, number_of_blocks) :
    """nblocks(range(10), 3) -&gt; [0, 1, 2], [3, 4, 5], [6, 7, 8, 9]"""
    length = len(seq)
    lengths = [int(i*length*1./number_of_blocks) for i in range(number_of_blocks+1)] 
    return [    seq[j:k] for j,k in zip(lengths, lengths[1:] )]

it uses rounding error to dispatch evenly the extras, this time.

oups, sorry, needed a little renaming to keep the code readable

def nblocks(seq, number_of_blocks) :
    """nblocks(range(10), 3) - &gt; [0, 1, 2], [3, 4, 5], [6, 7, 8, 9]"""
    step = len(seq)*1./number_of_blocks
    indices = [int(i*step) for i in range(number_of_blocks+1)] 
    return [    seq[j:k] for j,k in zip(indices, indices[1:] )]

Maybe I’m late to the party, but I wrote this last night:

def nblocks(seq, num_blocks):
    seq = list(seq)
    seq_len = len(seq)
    blen, num_extra = divmod(seq_len, num_blocks)
    num_norm = num_blocks - num_extra
    bins = [seq[i*blen:(i+1)*blen] for i in range(num_norm)]
    o = num_norm*blen
    bins.extend([seq[(o + i*(blen+1)):(o + (i+1)*(blen+1))].
                 for i in range(num_extra)])
    return bins

Ah ok, hope to understand the problem better, now. A straightforward approach (thanks Thilo):

def nblocks(seq, num_blocks):
    result = [[] for i in range(num_blocks)]
    for i, item in enumerate(seq):
        result[i * num_blocks / len(seq)].append(item)
    return result

A generator of blocks:

def nblocks(seq, num_blocks):
    slen = len(seq)
    def min_index(i):
        index, r = divmod(i * slen, num_blocks)
        return index + (1 if r &gt; 0 else 0)
    return (seq[min_index(i):min_index(i + 1)] for i in range(num_blocks))

So that seq doesn’t have to be a list:

def nblocks(seq, num_blocks):
    block = []
    num_block = 0
    for i, item in enumerate(seq):
        if i * num_blocks / len(seq) == num_block:
            block.append(item)
        else:
            yield block
            num_block += 1
            block = [item]
    yield block

Here is my solution:

def nblocks(seq, num_blocks):
    """nblocks(range(5), 2) -> [0, 1], [2, 3, 4]"""
    result = [[] for i in xrange(num_blocks)]
    cur_block_idx = 0
    switch_over = num_blocks - (len(seq) % num_blocks)
    block_len = len(seq) / num_blocks
    switch_idx = switch_over*block_len
    for idx, obj in enumerate(seq):
        if switch_idx and idx > switch_idx:
            cur_block_idx = switch_over + (idx - switch_idx)/(block_len + 1)
        else:
            cur_block_idx = idx/block_len
        result[cur_block_idx].append(obj)
    return result

I do believe that there were other solutions prettier than this.
What I really liked about this challenge is that it sounds really easy, and even once you define the problem, it takes more time than you expected.

I took a small liberty here and relaxed the requirements by saying that nblocks must return an iterator (which is not necessarily a list). This is the best I came up with:

from itertools import chain, repeat

def nblocks(seq, num):
    size = len(seq)
    larger = size % num
    sizes = chain(repeat(size / num, num - larger), repeat(size / num + 1, larger))
    start = 0
    for size in sizes:
        yield seq[start:start+size]
        start += size

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.