Small programming challenge No. 6 – nblocks

I came up with this challenge when I had to write a function to divide a sequence to percentiles. I needed this to calculate some statistics over trips for plnnr.com. “This sounds trivial” I thought, and reached for my simple blocks function:

def blocks(seq, block_len):
    """blocks(range(5),2) -> [[0, 1], [2, 3], [4]]"""
    seq_len = len(seq)
    if seq_len%block_len == 0:
        num_blocks = seq_len/block_len
    else:
        num_blocks = 1 + (seq_len/block_len)
 
    result = [[] for i in xrange(num_blocks)]
    for idx, obj in enumerate(seq):
        result[idx/block_len].append(obj)
    return result

So I set block_len to len(seq)/10 and called blocks(seq, block_len). Unfortunately, according to the docs of blocks (which I wrote…), when there is extra data, a “partial” block is added – which is exactly what we don’t want when calculating percentiles.
Instead, the behavior we want is nblocks(seq, number_of_blocks), for example: nblocks(range(10), 3) -> [0, 1, 2], [3, 4, 5], [6, 7, 8, 9].

This is a surprisingly hard to write function, or rather, harder than you’d expect. I’ll be especially glad if someone writes it elegantly. My solution works well enough, but isn’t the prettiest.

So, you have the definition – let’s see if you can do better than me. Once enough solutions are presented, I will present my own.

IMPORTANT EDIT: the required signature is nblocks(seq, num_blocks). So for seq(range(10), 3), the return value should be 3 blocks, with the last one having an extra item. As a general rule, the extra items should be spread as evenly as possible.

This entry was posted in Challenges, Programming, Python, Statistics and tagged , , , , . Bookmark the permalink.

21 Responses to Small programming challenge No. 6 – nblocks

  1. Jeremy says:

    First stab at it:

    def blocks(seq, block_len):
         out = []
         curr = []
         for item in seq:
                 curr.append(item)
                 if len(curr) == block_len:
                         out.append(curr)
                         curr = []
         return out

    Then I did a recursive solution to see if I could make it more compact:

    def rblocks(seq, curr_block, block_len):
         if len(curr_block) == block_len:
                 return [curr_block] + rblocks(seq, [], block_len)
         if seq:
                 return rblocks(seq[1:], curr_block+seq[0:1], block_len)
         return []
  2. Lucas says:

    How’s this?

    def blocks(seq, block_len):
        return map(lambda block_start: seq[block_start:block_start + block_len], xrange(0, len(seq), block_len))
     
     
    def nblocks(seq, block_len):
        result = blocks(seq, block_len)
        if not (len(result) in (0, 1) or len(result[-1]) == block_len):
            last_block = result[-1]
            del(result[-1])
            result[-1].extend(last_block)
        return result
  3. Yuv says:

    A quickie, I’m not sure if it’s elegant enough…

    http://codepad.org/Di6Z3POC

  4. lorg says:

    Thank you all for your solutions!

    Jeremy: I think I didn’t explain myself well enough: I wasn’t aiming for a better “blocks” function. I was aiming for a “nblocks” function, whose signature is nblocks(seq, num_blocks).

    Lucas: your solution is problematic, as for nblocks(range(10), 3), the last element was dropped.

    Yuv: try to write it as nblocks(seq, num_blocks) :)

    I will add an edit to the blog post to make the requried signature more clear :)

  5. ephes says:

    Ok, let’s try…

    from itertools import islice, groupby, count
     
    def blocks_islice(seq, seq_len, block_len):
        remainder = seq_len % block_len
        max_index = (seq_len / block_len) - 1
        for index in count():
            if index < max_index:
                yield list(islice(seq, 0, block_len))
            else:
                yield list(islice(seq, 0, block_len + remainder))
                break
     
    def blocks_groupby(seq, seq_len, block_len):
        last_block = []
        max_index = (seq_len / block_len) - 1
        for k, block in groupby(enumerate(seq), lambda x: x[0] / block_len):
            result_block = [i[1] for i in block]
            if k < max_index:
                yield result_block
            else:
                last_block += result_block
        yield last_block
     
    x = range(10)
    print list(blocks_islice(iter(x), len(x), 3))
    print list(blocks_groupby(x, len(x), 3))

    Hmm, not really elegant. Having to know the length of the sequence seems unavoidable. And izip(*[seq] * block_len) looks promising, but i dunno how to make it work :).

  6. lorg says:

    Ephes:
    Thanks for the solution! However, that’s not quite what I was looking for :) See again my update and my comment above: the signature should be nblocks(seq, num_blocks). So for the “usual” percentiles, it’d be nblocks(seq, 10).
    Regarding the length of the sequence: you are right, it’s unavoidable to know it but that’s ok. (It’s avoidable only if you’re willing to do some slightly ugly hacks :)

  7. Lucas says:

    Oh, I see I misread what you were trying to do. Will work on a better solution.

  8. Lucas says:
    def blocks(seq, block_len):
        return map(lambda block_start: seq[block_start:block_start + block_len], xrange(0, len(seq), block_len))
     
     
    def nblocks(seq, num_blocks):
        r = len(seq) % num_blocks
        block_len = len(seq) / num_blocks
        end_small_blocks = block_len * (num_blocks - r)
        result = blocks(seq[:end_small_blocks], block_len)
        result.extend(blocks(seq[end_small_blocks:], block_len + 1))
        return result
     
    &gt;&gt;&gt; nblocks(range(10), 3)
    [[0, 1, 2], [3, 4, 5], [6, 7, 8, 9]]
    &gt;&gt;&gt; map(len, nblocks(range(17*9), 10))
    [15, 15, 15, 15, 15, 15, 15, 16, 16, 16]

    Is that the function you’re looking for?

  9. Timothy Kim says:

    I’m not sure what you mean by “As a general rule, the extra items should be spread as evenly as possible.”

    Here’s my solution with my own interpretation of that statement:


    def blocks(seq, block_len):
    rest = seq
    result = []
    while len(rest) > block_len:
    result.append(rest[:block_len])
    rest = rest[block_len:]
    #now distribute
    for i, v in enumerate(rest):
    result[-(i+1)].append(v)
    return result

  10. Timothy Kim says:

    Oops. Let me try it again with the pre tag.

    def blocks(seq, block_len):
        rest = seq
        result = []
        while len(rest) &gt; block_len:
            result.append(rest[:block_len])
            rest = rest[block_len:]
        #now distribute
        for i, v in enumerate(rest):
            result[-(i+1)].append(v)
        return result
  11. Erez says:

    Breshenham to the rescue!


    def nblocks( seq, block_len ):
    assert seq
    l = []
    error = 0.0
    deltaerr = float(block_len) / len(seq)

    for item in seq:
    l.append( item )
    error += deltaerr
    if error >= 1.0:
    yield l
    l = []
    error -= 1.0

    if l:
    yield l

    &gt;&gt;&gt; list( nblocks( range(10), 3) )
    [[0, 1, 2, 3], [4, 5, 6], [7, 8, 9]]
    &gt;&gt;&gt; list( nblocks( range(9), 3) )
    [[0, 1, 2], [3, 4, 5], [6, 7, 8]]
    &gt;&gt;&gt; map(len, nblocks(range(17*9), 10))
    [16, 15, 15, 16, 15, 15, 16, 15, 15, 15]

    ( Provides better distribution )

  12. Erez says:

    Shouldn’t have used the code tag… Trying again:

    Breshenham to the rescue!

    def nblocks( seq, block_len ):
        assert seq
        l = []
        error = 0.0
        deltaerr = float(block_len) / len(seq)
     
        for item in seq:
            l.append( item )
            error += deltaerr
            if error &gt;= 1.0:
                yield l
                l = []
                error -= 1.0
     
        if l:
            yield l

    >>> list( try2.nblocks( range(10), 3) )
    [[0, 1, 2, 3], [4, 5, 6], [7, 8, 9]]
    >>> list( try2.nblocks( range(9), 3) )
    [[0, 1, 2], [3, 4, 5], [6, 7, 8]]
    >>> map(len, try2.nblocks(range(17*9), 10))
    [16, 15, 15, 16, 15, 15, 16, 15, 15, 15]

    ( Provides better distribution )

  13. lorg says:

    Everyone: It seems I wasn’t clear in my explanation of the challenge – it is to write a function that receives as parameter the *number of blocks*, and not the length of the block.

    I tried to improve the situation in the blog post, by emphasizing the required signature, let me know if you think it’s still unclear.

  14. Erez says:

    Yeah, my bad. Rename block_len to block_count. It does what you asked, I just didn’t pay enough attention to the naming.

    For instance:

    &gt;&gt;&gt; list( nblocks(range(10), 7) )
    [[0, 1], [2], [3, 4], [5], [6, 7], [8], [9]]
  15. eric says:

    here are my two cents :

    def lblocks(length, number_of_blocks):
    	average, extra = divmod(length, number_of_blocks)
    	nx= number_of_blocks - extra
    	return [(i+1)*average +(i-nx+1 if i&gt;=nx else 0) for i in range(number_of_blocks)]
     
    def nblocks(seq, number_of_blocks) :
        """nblocks(range(10), 3) -&gt; [0, 1, 2], [3, 4, 5], [6, 7, 8, 9]"""
        lengths = lblocks(len(seq), number_of_blocks)
        return [    seq[j:k] for j,k in zip([0]+lengths, lengths )]

    the idea is to work on “lengths” first: average is length/nb_blocks, and extra, is to be dispatched.
    Then I cumulate the lengths to turn them in indices

    that’s the lblock function.

    the nblock is then straightforward.

  16. eric says:

    little improvement. I didn’t get the “even” part !

    def nblocks(seq, number_of_blocks) :
        """nblocks(range(10), 3) -&gt; [0, 1, 2], [3, 4, 5], [6, 7, 8, 9]"""
        length = len(seq)
        lengths = [int(i*length*1./number_of_blocks) for i in range(number_of_blocks+1)] 
        return [    seq[j:k] for j,k in zip(lengths, lengths[1:] )]

    it uses rounding error to dispatch evenly the extras, this time.

  17. eric says:

    oups, sorry, needed a little renaming to keep the code readable

    def nblocks(seq, number_of_blocks) :
        """nblocks(range(10), 3) - &gt; [0, 1, 2], [3, 4, 5], [6, 7, 8, 9]"""
        step = len(seq)*1./number_of_blocks
        indices = [int(i*step) for i in range(number_of_blocks+1)] 
        return [    seq[j:k] for j,k in zip(indices, indices[1:] )]
  18. Kamil Kisiel says:

    Maybe I’m late to the party, but I wrote this last night:

    def nblocks(seq, num_blocks):
        seq = list(seq)
        seq_len = len(seq)
        blen, num_extra = divmod(seq_len, num_blocks)
        num_norm = num_blocks - num_extra
        bins = [seq[i*blen:(i+1)*blen] for i in range(num_norm)]
        o = num_norm*blen
        bins.extend([seq[(o + i*(blen+1)):(o + (i+1)*(blen+1))].
                     for i in range(num_extra)])
        return bins
  19. ephes says:

    Ah ok, hope to understand the problem better, now. A straightforward approach (thanks Thilo):

    def nblocks(seq, num_blocks):
        result = [[] for i in range(num_blocks)]
        for i, item in enumerate(seq):
            result[i * num_blocks / len(seq)].append(item)
        return result

    A generator of blocks:

    def nblocks(seq, num_blocks):
        slen = len(seq)
        def min_index(i):
            index, r = divmod(i * slen, num_blocks)
            return index + (1 if r &gt; 0 else 0)
        return (seq[min_index(i):min_index(i + 1)] for i in range(num_blocks))

    So that seq doesn’t have to be a list:

    def nblocks(seq, num_blocks):
        block = []
        num_block = 0
        for i, item in enumerate(seq):
            if i * num_blocks / len(seq) == num_block:
                block.append(item)
            else:
                yield block
                num_block += 1
                block = [item]
        yield block
  20. lorg says:

    Here is my solution:

    def nblocks(seq, num_blocks):
        """nblocks(range(5), 2) -> [0, 1], [2, 3, 4]"""
        result = [[] for i in xrange(num_blocks)]
        cur_block_idx = 0
        switch_over = num_blocks - (len(seq) % num_blocks)
        block_len = len(seq) / num_blocks
        switch_idx = switch_over*block_len
        for idx, obj in enumerate(seq):
            if switch_idx and idx > switch_idx:
                cur_block_idx = switch_over + (idx - switch_idx)/(block_len + 1)
            else:
                cur_block_idx = idx/block_len
            result[cur_block_idx].append(obj)
        return result

    I do believe that there were other solutions prettier than this.
    What I really liked about this challenge is that it sounds really easy, and even once you define the problem, it takes more time than you expected.

  21. I took a small liberty here and relaxed the requirements by saying that nblocks must return an iterator (which is not necessarily a list). This is the best I came up with:

    from itertools import chain, repeat
    
    def nblocks(seq, num):
        size = len(seq)
        larger = size % num
        sizes = chain(repeat(size / num, num - larger), repeat(size / num + 1, larger))
        start = 0
        for size in sizes:
            yield seq[start:start+size]
            start += size