AmvTek blog

complex web systems

Making use of twisted coiterate

Twisted provides various ways to integrate CPU bound operations or blocking libraries to the reactor. It provides very clean integration path for threading or external processes.

In this post, we describe an under documented alternative, where the long running task is implemented using an iterator that will be consumed directly in the reactor event loop after passing it to coiterate.

Where usable, coiteration allows to completely avoid using threading, bypassing the well known python GIL bottleneck…

Basic idea

Coiteration requires developers to use a divide and conquer strategy to plan their task execution. In python, we will code the task using a generator function or alternatively a class implementing the iterator protocol.

The task will be executed step by step in the reactor event loop after passing the iterator that represents it to coiterate.

Summing integers

Let’s consider a python function which sums the N first integers, N being arbitrary large.

def sum_all_integers_until(N):

    s = 0
    for i in xrange(N):
        s += i
    return s

For very large value of N, calling such function from the same thread as the one inside which the reactor is running, is not a good idea as the event loop will be blocked for as long as this function needs to return…

Summing using coiteration

A first approach

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from twisted.internet import reactor
from twisted.internet.task import coiterate

def make_iterator_to_sum_all_integers_until(N):

    s = 0
    for i in xrange(N):
        s += i
        print "Adding %i to result" % i
        yield None # event loop can looks after other things...

    print "result is %i " % s

def sum_all_integers_being_nice_to_reactor(N):

    all_sum_steps = make_iterator_to_sum_all_integers_until(N)
    coiterate(all_sum_steps)

reactor.callLater(0, sum_all_integers_being_nice_to_reactor, 8)
reactor.run()

See gist coiterate01.py

At line 4 a generator function is defined, that return an iterator that will calculate the sum of all integers until a certain value N. At line 17, this iterator is passed to coiterate which will result in such iterator being consumed in the reactor event loop in an optimal way.

Note that this does not make your iterator magically non blocking, as everywhere else in Twisted, developer shall ensure that each iteration is non blocking.

Obtaining the result

If you took the time to run the above sum_all_integer …, you have probably been delighted to see the result being printed in the console.

Retrieving such result to make use of it, requires some additional efforts that will be detailled now.

Let’s first observe that coiterate is a well behaved Twisted citizen. As it is starting an operation (consumption of the argument iterator…) that will take some time to complete, it returns a Deferred. As you may expect, this Deferred will fire when iteration is over.

If we attach a callback function to this Deferred we will not receive our result, but the same iterator that coiterate has consumed. Let’s see a possible solution to obtain a result from the iterator.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from twisted.internet import reactor
from twisted.internet.task import coiterate

def make_iterator_to_sum_all_integers_until(N, context):

    s = 0
    for i in xrange(N):
        s += i
        print "Adding %i to result" % i
        yield None # event loop can looks after other things...

    context['result'] = s

def sum_all_integers_being_nice_to_reactor(N):
    "return Deferred firing calculated sum..."

    def extract_result_cb(ignored, context):
        "return context['result']"

        rv = context['result']
        print "Got result = %s" % rv
        return rv

    context = {}
    all_sum_steps = make_iterator_to_sum_all_integers_until(N, context)

    deferred = coiterate(all_sum_steps)
    deferred.addCallback(extract_result_cb, context)

    return deferred

reactor.callLater(0, sum_all_integers_being_nice_to_reactor, 8)
reactor.run()

See gist coiterate_02.py

Let’s summarize how this code proceeds :

At line 4, we define a generator function which returns an iterator that let us execute our task step by step. As we also need a result from such iterator, we pass it an additional context object which provides a way to "return" any result obtained during iteration.

At line 14, we construct a well behaved python function that returns a Deferred that will fire with the result we are awaiting. Internally this function takes care of all the gory details of constructing the iterator that will be passed to coiterate and extracting the result we need.

Waiting for Deferred…

Meanwhile executing a long running task, it is quite common to have to wait some time until some externals operations complete. Twisted let our coiterable tasks indicate that they shall be paused until a certain Deferred fires. To achieve so, the only thing to do is to yield the Deferred of interest out of the task iterator.

Let’s see how we could have our sum_all_integer… wait 1 second in between each iteration step.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from twisted.internet import reactor
from twisted.internet.task import coiterate, deferLater

def make_iterator_to_sum_all_integers_until(N, context):

    def wait_some_time(t):
        "return Deferred firing after t seconds"

        return deferLater(reactor,t,lambda :"I was paused %.02f seconds"%t)

    def print_pause_cb(msg):
        "callback printing result message..."

        print msg

    s = 0
    for i in xrange(N):

        s += i
        print "Adding %i to result" % i

        d = wait_some_time(1.0)
        d.addCallback(print_pause_cb)
        yield d    # we will be paused until d fires...

    context['result'] = s

See gist coiterate_03.py

At line 4 is the modified generator function that will pause some time in between each step. The wait_some_time function at line 6 could be anything that returns a Deferred.

It is our experience that the yield to wait approach which coiterate allows greatly simplify coding complex tasks with Twisted.

Cancelling coiteration

When requiring clients to wait long time to get the result of a long running operation, we shall expect situations where the client will give up. In such situations, we normally want to cleanup as soon as possible any resources allocated to service such client.

Before showing how this can be achieved in the context of this example, let’s mention that if you need to control your task from the outside to pause it or stop it, you should consider using cooperate instead of coiterate. Like coiterate, cooperate shall be called with an iterator which will be consumed in the reactor event loop. Unlike coiterate that returns a Deferred that fires when iteration is completed, cooperate returns a Task object that can be used to pause or stop the ongoing task…

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
from twisted.internet import reactor
from twisted.internet.defer import Deferred, CancelledError
from twisted.internet.task import coiterate, deferLater

def make_iterator_to_sum_all_integers_until(N, context):

    def wait_some_time(t):
        "return Deferred firing after t seconds"

        return deferLater(reactor,t,lambda :"I was paused %.02f seconds"%t)

    def print_pause_cb(msg):
        "callback printing result message..."

        print msg

    d = None
    s = 0

    try:

        for i in xrange(N):

            s += i
            print "Adding %i to result" % i

            d = wait_some_time(1.0)
            d.addCallback(print_pause_cb)
            yield d    # we will be paused until d fires...

        context['result'] = s

    except GeneratorExit:

        print "---"
        print "Early termination..."

        # cancel pending Defferred
        if d and not d.called:
            d.cancel()

def sum_all_integers_being_nice_to_reactor(N):
    "return Deferred firing calculated sum..."

    def extract_result_cb(ignored, context):
        "return context['result']"

        rv = context['result']
        return rv

    def suppress_cancel_log_eb(error):
        "trap CancelledError"

        # this suppress UnhandledError warning...
        error.trap(CancelledError)

    context = {}
    all_sum_steps = make_iterator_to_sum_all_integers_until(N, context)

    deferred = Deferred(lambda _:all_sum_steps.close())
    coiterate(all_sum_steps).chainDeferred(deferred)
    deferred.addCallback(extract_result_cb, context)
    deferred.addErrback(suppress_cancel_log_eb)
    return deferred

def main():
    "start summing integers and stop after 3 seconds..."

    def print_result_cb(res):
        "print result if any..."

        if res is not None:
            print "Got result = %s" % res

    # start sum calculation using coiteration...
    d = sum_all_integers_being_nice_to_reactor(8)
    d.addCallback(print_result_cb)

    # schedule cancellation after 3.00 seconds
    reactor.callLater(3.0, d.cancel)


reactor.callLater(0, main)
reactor.run()

See gist coiterate_04.py

At line 4 our generator function was again modified. At line 32, an inner handler block for the GeneratorExit exception was added. This block will be reached in case close is called on iterator objects returned by our generator function. In this block, we are cleaning up any pending deferred that the task may be waiting for.

One would expect that cancelling the Deferred returned by coiterate would automatically close the related iterator, but this is not the case. Let’s modify the sum_all_integer… function for this to happen. At line 60, we construct the Deferred that sum_all_integer… will return, providing it a cancellation function. Such function simply close the iterator returned by the generator function. This helper Deferred is chained to the Deferred that coiterate returns, so that when no cancellation occurs, we get our result…