• Skip to primary navigation
  • Skip to main content

Algorithm.co.il

  • About
  • Best Posts
  • Origami
  • Older Projects

Small programming challenge No. 6 – nblocks

Posted on 2011-01-25 by lorg 22 Comments

I came up with this challenge when I had to write a function to divide a sequence to percentiles. I needed this to calculate some statistics over trips for plnnr.com. “This sounds trivial” I thought, and reached for my simple blocks function:

def blocks(seq, block_len):
    """blocks(range(5),2) -> [[0, 1], [2, 3], [4]]"""
    seq_len = len(seq)
    if seq_len%block_len == 0:
        num_blocks = seq_len/block_len
    else:
        num_blocks = 1 + (seq_len/block_len)
 
    result = [[] for i in xrange(num_blocks)]
    for idx, obj in enumerate(seq):
        result[idx/block_len].append(obj)
    return result

def blocks(seq, block_len): """blocks(range(5),2) -> [[0, 1], [2, 3], [4]]""" seq_len = len(seq) if seq_len%block_len == 0: num_blocks = seq_len/block_len else: num_blocks = 1 + (seq_len/block_len) result = [[] for i in xrange(num_blocks)] for idx, obj in enumerate(seq): result[idx/block_len].append(obj) return result

So I set block_len to len(seq)/10 and called blocks(seq, block_len). Unfortunately, according to the docs of blocks (which I wrote…), when there is extra data, a “partial” block is added – which is exactly what we don’t want when calculating percentiles.
Instead, the behavior we want is nblocks(seq, number_of_blocks), for example: nblocks(range(10), 3) -> [0, 1, 2], [3, 4, 5], [6, 7, 8, 9].

This is a surprisingly hard to write function, or rather, harder than you’d expect. I’ll be especially glad if someone writes it elegantly. My solution works well enough, but isn’t the prettiest.

So, you have the definition – let’s see if you can do better than me. Once enough solutions are presented, I will present my own.

IMPORTANT EDIT: the required signature is nblocks(seq, num_blocks). So for seq(range(10), 3), the return value should be 3 blocks, with the last one having an extra item. As a general rule, the extra items should be spread as evenly as possible.

Filed under: Challenges, Programming, Python, Statistics

Reader Interactions

Comments

  1. Jeremy says

    2011-01-25 at 1:49 am

    First stab at it:

    def blocks(seq, block_len):
         out = []
         curr = []
         for item in seq:
                 curr.append(item)
                 if len(curr) == block_len:
                         out.append(curr)
                         curr = []
         return out

    def blocks(seq, block_len): out = [] curr = [] for item in seq: curr.append(item) if len(curr) == block_len: out.append(curr) curr = [] return out

    Then I did a recursive solution to see if I could make it more compact:

    def rblocks(seq, curr_block, block_len):
         if len(curr_block) == block_len:
                 return [curr_block] + rblocks(seq, [], block_len)
         if seq:
                 return rblocks(seq[1:], curr_block+seq[0:1], block_len)
         return []

    def rblocks(seq, curr_block, block_len): if len(curr_block) == block_len: return [curr_block] + rblocks(seq, [], block_len) if seq: return rblocks(seq[1:], curr_block+seq[0:1], block_len) return []

    Reply
  2. Lucas says

    2011-01-25 at 2:04 am

    How’s this?

    def blocks(seq, block_len):
        return map(lambda block_start: seq[block_start:block_start + block_len], xrange(0, len(seq), block_len))
     
     
    def nblocks(seq, block_len):
        result = blocks(seq, block_len)
        if not (len(result) in (0, 1) or len(result[-1]) == block_len):
            last_block = result[-1]
            del(result[-1])
            result[-1].extend(last_block)
        return result

    def blocks(seq, block_len): return map(lambda block_start: seq[block_start:block_start + block_len], xrange(0, len(seq), block_len)) def nblocks(seq, block_len): result = blocks(seq, block_len) if not (len(result) in (0, 1) or len(result[-1]) == block_len): last_block = result[-1] del(result[-1]) result[-1].extend(last_block) return result

    Reply
  3. Yuv says

    2011-01-25 at 2:46 am

    A quickie, I’m not sure if it’s elegant enough…

    http://codepad.org/Di6Z3POC

    Reply
  4. lorg says

    2011-01-25 at 8:20 am

    Thank you all for your solutions!

    Jeremy: I think I didn’t explain myself well enough: I wasn’t aiming for a better “blocks” function. I was aiming for a “nblocks” function, whose signature is nblocks(seq, num_blocks).

    Lucas: your solution is problematic, as for nblocks(range(10), 3), the last element was dropped.

    Yuv: try to write it as nblocks(seq, num_blocks) :)

    I will add an edit to the blog post to make the requried signature more clear :)

    Reply
  5. ephes says

    2011-01-25 at 8:36 am

    Ok, let’s try…

    from itertools import islice, groupby, count
     
    def blocks_islice(seq, seq_len, block_len):
        remainder = seq_len % block_len
        max_index = (seq_len / block_len) - 1
        for index in count():
            if index < max_index:
                yield list(islice(seq, 0, block_len))
            else:
                yield list(islice(seq, 0, block_len + remainder))
                break
     
    def blocks_groupby(seq, seq_len, block_len):
        last_block = []
        max_index = (seq_len / block_len) - 1
        for k, block in groupby(enumerate(seq), lambda x: x[0] / block_len):
            result_block = [i[1] for i in block]
            if k < max_index:
                yield result_block
            else:
                last_block += result_block
        yield last_block
     
    x = range(10)
    print list(blocks_islice(iter(x), len(x), 3))
    print list(blocks_groupby(x, len(x), 3))

    from itertools import islice, groupby, count def blocks_islice(seq, seq_len, block_len): remainder = seq_len % block_len max_index = (seq_len / block_len) - 1 for index in count(): if index < max_index: yield list(islice(seq, 0, block_len)) else: yield list(islice(seq, 0, block_len + remainder)) break def blocks_groupby(seq, seq_len, block_len): last_block = [] max_index = (seq_len / block_len) - 1 for k, block in groupby(enumerate(seq), lambda x: x[0] / block_len): result_block = [i[1] for i in block] if k < max_index: yield result_block else: last_block += result_block yield last_block x = range(10) print list(blocks_islice(iter(x), len(x), 3)) print list(blocks_groupby(x, len(x), 3))

    Hmm, not really elegant. Having to know the length of the sequence seems unavoidable. And izip(*[seq] * block_len) looks promising, but i dunno how to make it work :).

    Reply
  6. lorg says

    2011-01-25 at 8:44 am

    Ephes:
    Thanks for the solution! However, that’s not quite what I was looking for :) See again my update and my comment above: the signature should be nblocks(seq, num_blocks). So for the “usual” percentiles, it’d be nblocks(seq, 10).
    Regarding the length of the sequence: you are right, it’s unavoidable to know it but that’s ok. (It’s avoidable only if you’re willing to do some slightly ugly hacks :)

    Reply
  7. Lucas says

    2011-01-25 at 8:58 am

    Oh, I see I misread what you were trying to do. Will work on a better solution.

    Reply
  8. Lucas says

    2011-01-25 at 9:27 am

    def blocks(seq, block_len):
        return map(lambda block_start: seq[block_start:block_start + block_len], xrange(0, len(seq), block_len))
     
     
    def nblocks(seq, num_blocks):
        r = len(seq) % num_blocks
        block_len = len(seq) / num_blocks
        end_small_blocks = block_len * (num_blocks - r)
        result = blocks(seq[:end_small_blocks], block_len)
        result.extend(blocks(seq[end_small_blocks:], block_len + 1))
        return result
     
    &gt;&gt;&gt; nblocks(range(10), 3)
    [[0, 1, 2], [3, 4, 5], [6, 7, 8, 9]]
    &gt;&gt;&gt; map(len, nblocks(range(17*9), 10))
    [15, 15, 15, 15, 15, 15, 15, 16, 16, 16]

    def blocks(seq, block_len): return map(lambda block_start: seq[block_start:block_start + block_len], xrange(0, len(seq), block_len)) def nblocks(seq, num_blocks): r = len(seq) % num_blocks block_len = len(seq) / num_blocks end_small_blocks = block_len * (num_blocks - r) result = blocks(seq[:end_small_blocks], block_len) result.extend(blocks(seq[end_small_blocks:], block_len + 1)) return result &gt;&gt;&gt; nblocks(range(10), 3) [[0, 1, 2], [3, 4, 5], [6, 7, 8, 9]] &gt;&gt;&gt; map(len, nblocks(range(17*9), 10)) [15, 15, 15, 15, 15, 15, 15, 16, 16, 16]

    Is that the function you’re looking for?

    Reply
  9. Timothy Kim says

    2011-01-25 at 3:04 pm

    I’m not sure what you mean by “As a general rule, the extra items should be spread as evenly as possible.”

    Here’s my solution with my own interpretation of that statement:


    def blocks(seq, block_len):
    rest = seq
    result = []
    while len(rest) > block_len:
    result.append(rest[:block_len])
    rest = rest[block_len:]
    #now distribute
    for i, v in enumerate(rest):
    result[-(i+1)].append(v)
    return result

    Reply
  10. Timothy Kim says

    2011-01-25 at 3:05 pm

    Oops. Let me try it again with the pre tag.

    def blocks(seq, block_len):
        rest = seq
        result = []
        while len(rest) &gt; block_len:
            result.append(rest[:block_len])
            rest = rest[block_len:]
        #now distribute
        for i, v in enumerate(rest):
            result[-(i+1)].append(v)
        return result

    def blocks(seq, block_len): rest = seq result = [] while len(rest) &gt; block_len: result.append(rest[:block_len]) rest = rest[block_len:] #now distribute for i, v in enumerate(rest): result[-(i+1)].append(v) return result

    Reply
  11. Erez says

    2011-01-25 at 4:46 pm

    Breshenham to the rescue!


    def nblocks( seq, block_len ):
    assert seq
    l = []
    error = 0.0
    deltaerr = float(block_len) / len(seq)

    for item in seq:
    l.append( item )
    error += deltaerr
    if error >= 1.0:
    yield l
    l = []
    error -= 1.0

    if l:
    yield l

    &gt;&gt;&gt; list( nblocks( range(10), 3) )
    [[0, 1, 2, 3], [4, 5, 6], [7, 8, 9]]
    &gt;&gt;&gt; list( nblocks( range(9), 3) )
    [[0, 1, 2], [3, 4, 5], [6, 7, 8]]
    &gt;&gt;&gt; map(len, nblocks(range(17*9), 10))
    [16, 15, 15, 16, 15, 15, 16, 15, 15, 15]

    &gt;&gt;&gt; list( nblocks( range(10), 3) ) [[0, 1, 2, 3], [4, 5, 6], [7, 8, 9]] &gt;&gt;&gt; list( nblocks( range(9), 3) ) [[0, 1, 2], [3, 4, 5], [6, 7, 8]] &gt;&gt;&gt; map(len, nblocks(range(17*9), 10)) [16, 15, 15, 16, 15, 15, 16, 15, 15, 15]

    ( Provides better distribution )

    Reply
  12. Erez says

    2011-01-25 at 4:48 pm

    Shouldn’t have used the code tag… Trying again:

    Breshenham to the rescue!

    def nblocks( seq, block_len ):
        assert seq
        l = []
        error = 0.0
        deltaerr = float(block_len) / len(seq)
     
        for item in seq:
            l.append( item )
            error += deltaerr
            if error &gt;= 1.0:
                yield l
                l = []
                error -= 1.0
     
        if l:
            yield l

    def nblocks( seq, block_len ): assert seq l = [] error = 0.0 deltaerr = float(block_len) / len(seq) for item in seq: l.append( item ) error += deltaerr if error &gt;= 1.0: yield l l = [] error -= 1.0 if l: yield l

    >>> list( try2.nblocks( range(10), 3) )
    [[0, 1, 2, 3], [4, 5, 6], [7, 8, 9]]
    >>> list( try2.nblocks( range(9), 3) )
    [[0, 1, 2], [3, 4, 5], [6, 7, 8]]
    >>> map(len, try2.nblocks(range(17*9), 10))
    [16, 15, 15, 16, 15, 15, 16, 15, 15, 15]

    ( Provides better distribution )

    Reply
  13. lorg says

    2011-01-25 at 9:13 pm

    Everyone: It seems I wasn’t clear in my explanation of the challenge – it is to write a function that receives as parameter the *number of blocks*, and not the length of the block.

    I tried to improve the situation in the blog post, by emphasizing the required signature, let me know if you think it’s still unclear.

    Reply
  14. Erez says

    2011-01-26 at 12:06 am

    Yeah, my bad. Rename block_len to block_count. It does what you asked, I just didn’t pay enough attention to the naming.

    For instance:

    &gt;&gt;&gt; list( nblocks(range(10), 7) )
    [[0, 1], [2], [3, 4], [5], [6, 7], [8], [9]]

    &gt;&gt;&gt; list( nblocks(range(10), 7) ) [[0, 1], [2], [3, 4], [5], [6, 7], [8], [9]]

    Reply
  15. eric says

    2011-01-26 at 1:10 am

    here are my two cents :

    def lblocks(length, number_of_blocks):
    	average, extra = divmod(length, number_of_blocks)
    	nx= number_of_blocks - extra
    	return [(i+1)*average +(i-nx+1 if i&gt;=nx else 0) for i in range(number_of_blocks)]
     
    def nblocks(seq, number_of_blocks) :
        """nblocks(range(10), 3) -&gt; [0, 1, 2], [3, 4, 5], [6, 7, 8, 9]"""
        lengths = lblocks(len(seq), number_of_blocks)
        return [    seq[j:k] for j,k in zip([0]+lengths, lengths )]

    def lblocks(length, number_of_blocks): average, extra = divmod(length, number_of_blocks) nx= number_of_blocks - extra return [(i+1)*average +(i-nx+1 if i&gt;=nx else 0) for i in range(number_of_blocks)] def nblocks(seq, number_of_blocks) : """nblocks(range(10), 3) -&gt; [0, 1, 2], [3, 4, 5], [6, 7, 8, 9]""" lengths = lblocks(len(seq), number_of_blocks) return [ seq[j:k] for j,k in zip([0]+lengths, lengths )]

    the idea is to work on “lengths” first: average is length/nb_blocks, and extra, is to be dispatched.
    Then I cumulate the lengths to turn them in indices

    that’s the lblock function.

    the nblock is then straightforward.

    Reply
  16. eric says

    2011-01-26 at 9:51 am

    little improvement. I didn’t get the “even” part !

    def nblocks(seq, number_of_blocks) :
        """nblocks(range(10), 3) -&gt; [0, 1, 2], [3, 4, 5], [6, 7, 8, 9]"""
        length = len(seq)
        lengths = [int(i*length*1./number_of_blocks) for i in range(number_of_blocks+1)] 
        return [    seq[j:k] for j,k in zip(lengths, lengths[1:] )]

    def nblocks(seq, number_of_blocks) : """nblocks(range(10), 3) -&gt; [0, 1, 2], [3, 4, 5], [6, 7, 8, 9]""" length = len(seq) lengths = [int(i*length*1./number_of_blocks) for i in range(number_of_blocks+1)] return [ seq[j:k] for j,k in zip(lengths, lengths[1:] )]

    it uses rounding error to dispatch evenly the extras, this time.

    Reply
  17. eric says

    2011-01-26 at 9:53 am

    oups, sorry, needed a little renaming to keep the code readable

    def nblocks(seq, number_of_blocks) :
        """nblocks(range(10), 3) - &gt; [0, 1, 2], [3, 4, 5], [6, 7, 8, 9]"""
        step = len(seq)*1./number_of_blocks
        indices = [int(i*step) for i in range(number_of_blocks+1)] 
        return [    seq[j:k] for j,k in zip(indices, indices[1:] )]

    def nblocks(seq, number_of_blocks) : """nblocks(range(10), 3) - &gt; [0, 1, 2], [3, 4, 5], [6, 7, 8, 9]""" step = len(seq)*1./number_of_blocks indices = [int(i*step) for i in range(number_of_blocks+1)] return [ seq[j:k] for j,k in zip(indices, indices[1:] )]

    Reply
  18. Kamil Kisiel says

    2011-01-26 at 6:19 pm

    Maybe I’m late to the party, but I wrote this last night:

    def nblocks(seq, num_blocks):
        seq = list(seq)
        seq_len = len(seq)
        blen, num_extra = divmod(seq_len, num_blocks)
        num_norm = num_blocks - num_extra
        bins = [seq[i*blen:(i+1)*blen] for i in range(num_norm)]
        o = num_norm*blen
        bins.extend([seq[(o + i*(blen+1)):(o + (i+1)*(blen+1))].
                     for i in range(num_extra)])
        return bins

    def nblocks(seq, num_blocks): seq = list(seq) seq_len = len(seq) blen, num_extra = divmod(seq_len, num_blocks) num_norm = num_blocks - num_extra bins = [seq[i*blen:(i+1)*blen] for i in range(num_norm)] o = num_norm*blen bins.extend([seq[(o + i*(blen+1)):(o + (i+1)*(blen+1))]. for i in range(num_extra)]) return bins

    Reply
  19. ephes says

    2011-01-28 at 1:44 pm

    Ah ok, hope to understand the problem better, now. A straightforward approach (thanks Thilo):

    def nblocks(seq, num_blocks):
        result = [[] for i in range(num_blocks)]
        for i, item in enumerate(seq):
            result[i * num_blocks / len(seq)].append(item)
        return result

    def nblocks(seq, num_blocks): result = [[] for i in range(num_blocks)] for i, item in enumerate(seq): result[i * num_blocks / len(seq)].append(item) return result

    A generator of blocks:

    def nblocks(seq, num_blocks):
        slen = len(seq)
        def min_index(i):
            index, r = divmod(i * slen, num_blocks)
            return index + (1 if r &gt; 0 else 0)
        return (seq[min_index(i):min_index(i + 1)] for i in range(num_blocks))

    def nblocks(seq, num_blocks): slen = len(seq) def min_index(i): index, r = divmod(i * slen, num_blocks) return index + (1 if r &gt; 0 else 0) return (seq[min_index(i):min_index(i + 1)] for i in range(num_blocks))

    So that seq doesn’t have to be a list:

    def nblocks(seq, num_blocks):
        block = []
        num_block = 0
        for i, item in enumerate(seq):
            if i * num_blocks / len(seq) == num_block:
                block.append(item)
            else:
                yield block
                num_block += 1
                block = [item]
        yield block

    def nblocks(seq, num_blocks): block = [] num_block = 0 for i, item in enumerate(seq): if i * num_blocks / len(seq) == num_block: block.append(item) else: yield block num_block += 1 block = [item] yield block

    Reply
  20. lorg says

    2011-02-05 at 6:51 pm

    Here is my solution:

    def nblocks(seq, num_blocks):
        """nblocks(range(5), 2) -> [0, 1], [2, 3, 4]"""
        result = [[] for i in xrange(num_blocks)]
        cur_block_idx = 0
        switch_over = num_blocks - (len(seq) % num_blocks)
        block_len = len(seq) / num_blocks
        switch_idx = switch_over*block_len
        for idx, obj in enumerate(seq):
            if switch_idx and idx > switch_idx:
                cur_block_idx = switch_over + (idx - switch_idx)/(block_len + 1)
            else:
                cur_block_idx = idx/block_len
            result[cur_block_idx].append(obj)
        return result

    def nblocks(seq, num_blocks): """nblocks(range(5), 2) -> [0, 1], [2, 3, 4]""" result = [[] for i in xrange(num_blocks)] cur_block_idx = 0 switch_over = num_blocks - (len(seq) % num_blocks) block_len = len(seq) / num_blocks switch_idx = switch_over*block_len for idx, obj in enumerate(seq): if switch_idx and idx > switch_idx: cur_block_idx = switch_over + (idx - switch_idx)/(block_len + 1) else: cur_block_idx = idx/block_len result[cur_block_idx].append(obj) return result

    I do believe that there were other solutions prettier than this.
    What I really liked about this challenge is that it sounds really easy, and even once you define the problem, it takes more time than you expected.

    Reply
  21. Adomas Paltanavicius says

    2011-02-05 at 11:54 pm

    I took a small liberty here and relaxed the requirements by saying that nblocks must return an iterator (which is not necessarily a list). This is the best I came up with:

    from itertools import chain, repeat
    
    def nblocks(seq, num):
        size = len(seq)
        larger = size % num
        sizes = chain(repeat(size / num, num - larger), repeat(size / num + 1, larger))
        start = 0
        for size in sizes:
            yield seq[start:start+size]
            start += size
    
    Reply
  22. Boris says

    2013-07-13 at 11:00 am

    Quite magical solution that uses same idea as grouper example in itertools docs
    http://docs.python.org/2/library/itertools.html#recipes

    [PYTHON]
    def superpercentile(l, n):
    border = len(l) / n * n
    args = [iter(l[:border])] * n
    r = zip(*args)
    r[-1] += tuple(l[border:])
    return r
    [/PYTHON]

    Reply

Leave a Reply Cancel reply

© 2022 Algorithm.co.il - Algorithms, for the heck of it