The second part

Sdiehl.github. IO /gevent-tuto…

Data Structures Data Structures

Events event

Events are a form of asynchronous communication between Greenlets.

Events is a form of asynchronous communication within Greenlets.

import gevent
from gevent.event import Event

'''
Illustrates the use of events
'''


evt = Event()

def setter() :
    '''After 3 seconds, wake all threads waiting on the value of evt'''
    print('A: Hey wait for me, I have to do something')
    gevent.sleep(3)
    print("Ok, I'm done")
    evt.set(a)def waiter() :
    '''After 3 seconds the get call will unblock'''
    print("I'll wait for you")
    evt.wait()  # blocking
    print("It's about time")

def main() :
    gevent.joinall([
        gevent.spawn(setter),
        gevent.spawn(waiter),
        gevent.spawn(waiter),
        gevent.spawn(waiter),
        gevent.spawn(waiter),
        gevent.spawn(waiter)
    ])

if __name__ == '__main__': 
    main()
    
"""
A: Hey wait for me, I have to do something
I'll wait for you
I'll wait for you
I'll wait for you
I'll wait for you
I'll wait for you
Ok, I'm done
It's about time
It's about time
It's about time
It's about time
It's about time
"""
Copy the code

An extension of the Event object is the AsyncResult which allows you to send a value along with the wakeup call. This is sometimes called a future or a deferred, since it holds a reference to a future value that can be set on an arbitrary time schedule.

AsyncResult, an extension of the Event object, lets you send a value along with a wake-up call. So sometimes a future or a delay is called, and then it can hold values that refer to a future that can be used for any timetable.

import gevent
from gevent.event import AsyncResult
a = AsyncResult()

def setter() :
    """ After 3 seconds set the result of a. """
    gevent.sleep(3)
    a.set('Hello! ')

def waiter() :
    """ After 3 seconds the get call will unblock after the setter puts a value into the AsyncResult. """
    print(a.get())

gevent.joinall([
    gevent.spawn(setter),
    gevent.spawn(waiter),
])

"""
Hello
"""
Copy the code

The Queues queue

Queues are ordered sets of data that have the usual put / get operations but are written in a way such that they can be safely manipulated across Greenlets.

Queues are Queues of data that have the usual PUT/get operations, but can also be written in a different way, when they can be safely operated between Greenlets.

For example if one Greenlet grabs an item off of the queue, the same item will not grabbed by another Greenlet executing simultaneously.

For example, if a Greenlet pulls an element out of a queue, the same element will not be fetched by another executing Greenlet.

import gevent
from gevent.queue import Queue

tasks = Queue()

def worker(n) :
    while not tasks.empty():
        task = tasks.get()
        print('Worker %s got task %s' % (n, task))
        gevent.sleep(0)

    print('Quitting time! ')

def boss() :
    for i in xrange(1.25):
        tasks.put_nowait(i)

gevent.spawn(boss).join()

gevent.joinall([
    gevent.spawn(worker, 'steve'),
    gevent.spawn(worker, 'john'),
    gevent.spawn(worker, 'nancy'),])""" Worker steve got task 1 Worker john got task 2 Worker nancy got task 3 Worker steve got task 4 Worker nancy got task  5 Worker john got task 6 Worker steve got task 7 Worker john got task 8 Worker nancy got task 9 Worker steve got task 10 Worker nancy got task 11 Worker john got task 12 Worker steve got task 13 Worker john got task 14 Worker nancy got task 15 Worker steve got task 16 Worker nancy got task 17 Worker john got task 18 Worker steve got task 19 Worker john got task 20 Worker nancy got task 21 Worker steve got task 22 Worker nancy got task 23 Worker john got task 24 Quitting time! Quitting time! Quitting time! "" "
Copy the code

Queues can also block on either put or get as the need arises.

Queues can also block when put or get, if necessary.

Each of the put and get operations has a non-blocking counterpart, put_nowait and get_nowait which will not block, but instead raise either gevent.queue.Emptyor gevent.queue.Full in the operation is not possible.

Put_nowait and get_nowait do not block, but it is not possible to raise gevent.queue.Empty or gevent.queue.

In this example we have the boss running simultaneously to the workers and have a restriction on the Queue that it can contain no more than three elements. This restriction means that the put operation will block until there is space on the queue. Conversely the get operation will block if there are no elements on the queue to fetch, it also takes a timeout argument to allow for the queue to exit with the exception gevent.queue.Empty if no work can found within the time frame of the Timeout.

In this example, we have a boss giving tasks to workers at the same time. There is a limit that says there can be no more than three workers in the queue. This limit means that the put operation will block when there is no room in the queue. In contrast, the get operation blocks if there are no elements available in the queue. You can also add a timeout argument to allow the queue to exit with an exception, gEvent.queue.Empty, if there is no work within the timeout period.

import gevent
from gevent.queue import Queue, Empty

tasks = Queue(maxsize=3)

def worker(n) :
    try:
        while True:
            task = tasks.get(timeout=1) # decrements queue size by 1
            print('Worker %s got task %s' % (n, task))
            gevent.sleep(0)
    except Empty:
        print('Quitting time! ')

def boss() :
    """ Boss will wait to hand out work until a individual worker is free since the maxsize of the task queue is 3. """

    for i in xrange(1.10):
        tasks.put(i)
    print('Assigned all work in iteration 1')

    for i in xrange(10.20):
        tasks.put(i)
    print('Assigned all work in iteration 2')

gevent.joinall([
    gevent.spawn(boss),
    gevent.spawn(worker, 'steve'),
    gevent.spawn(worker, 'john'),
    gevent.spawn(worker, 'bob'),])""" Worker steve got task 1 Worker john got task 2 Worker bob got task 3 Worker steve got task 4 Worker bob got task 5 Worker john got task 6 Assigned all work in iteration 1 Worker steve got task 7 Worker john got task 8 Worker bob got task 9 Worker steve got task 10 Worker bob got task 11 Worker john got task 12 Worker steve got task 13 Worker john got task 14 Worker bob got task 15 Worker steve got task 16 Worker bob got task 17 Worker john got task 18 Assigned all work  in iteration 2 Worker steve got task 19 Quitting time! Quitting time! Quitting time! "" "
Copy the code

Groups and Pools

A group is a collection of running greenlets which are managed and scheduled together as group. It also doubles as parallel dispatcher that mirrors the Python multiprocessing library.

A group is a collection of running greenlets that are managed and planned together as a group. It can also be used as a parallel scheduler to mirror Python’s Multiprocessing library.

import gevent
from gevent.pool import Group

def talk(msg) :
    for i in xrange(3) :print(msg)

g1 = gevent.spawn(talk, 'bar')
g2 = gevent.spawn(talk, 'foo')
g3 = gevent.spawn(talk, 'fizz')

group = Group()
group.add(g1)
group.add(g2)
group.join()

group.add(g3)
group.join()

"""
bar
bar
bar
foo
foo
foo
fizz
fizz
fizz
"""
Copy the code

This is very useful for managing groups of asynchronous tasks.

This is useful for managing asynchronous task groups.

As mentioned above, Group also provides an API for dispatching jobs to grouped greenlets and collecting their results in various ways.

As mentioned above, groups also provide an API for assigning Jobs to grouped greenlets and collecting their results in a variety of ways.

import gevent
from gevent import getcurrent
from gevent.pool import Group

group = Group()

def hello_from(n) :
    print('Size of group %s' % len(group))
    print('Hello from Greenlet %s' % id(getcurrent()))

group.map(hello_from, range(3))


def intensive(n) :
    gevent.sleep(3 - n)
    return 'task', n

print('Ordered')

ogroup = Group()
for i in ogroup.imap(intensive, range(3)) :print(i)

print('Unordered')

igroup = Group()
for i in igroup.imap_unordered(intensive, range(3)) :print(i)


"""
Size of group 3
Hello from Greenlet 4340152592
Size of group 3
Hello from Greenlet 4340928912
Size of group 3
Hello from Greenlet 4340928592
Ordered
('task', 0)
('task', 1)
('task', 2)
Unordered
('task', 2)
('task', 1)
('task', 0)
"""
Copy the code

A pool is a structure designed for handling dynamic numbers of greenlets which need to be concurrency-limited. This is often desirable in cases where one wants to do many network or IO bound tasks in parallel.

A pool is a structure designed to handle the number of dynamic greenlets that require concurrency limits. This is usually desirable in cases where multiple network or IO binding tasks need to be performed in parallel.

import gevent
from gevent.pool import Pool

pool = Pool(2)

def hello_from(n) :
    print('Size of pool %s' % len(pool))

pool.map(hello_from, range(3))

"""
Size of pool 2
Size of pool 2
Size of pool 1
"""
Copy the code

Often when building gevent driven services one will center the entire service around a pool structure. An example might be a class which polls on various sockets.

Typically, when building gEvent-driven services, you centralize the entire service around a pool structure. An example might be classes that poll on various sockets.

from gevent.pool import Pool

class SocketPool(object) :

    def __init__(self) :
        self.pool = Pool(1000)
        self.pool.start()

    def listen(self, socket) :
        while True:
            socket.recv()

    def add_handler(self, socket) :
        if self.pool.full():
            raise Exception("At maximum pool size")
        else:
            self.pool.spawn(self.listen, socket)

    def shutdown(self) :
        self.pool.kill()
Copy the code

Locks and Semaphores

A semaphore is a low level synchronization primitive that allows greenlets to coordinate and limit concurrent access or execution. A semaphore exposes two methods, acquire and release The difference between the number of times a semaphore has been acquired and released is called the bound of the semaphore. If a semaphore bound reaches 0 it will block until another greenlet releases its acquisition.

Semaphores are low-level synchronization primitives that allow greenlets to coordinate and limit concurrent access or execution. A semaphore exposes two methods, acquire and release. The difference between the number of times a semaphore is acquired and released is called the semaphore bound. If a semaphore binding reaches zero, it blocks until another greenlet releases its capture.

from gevent import sleep
from gevent.pool import Pool
from gevent.coros import BoundedSemaphore

sem = BoundedSemaphore(2)

def worker1(n) :
    sem.acquire()
    print('Worker %i acquired semaphore' % n)
    sleep(0)
    sem.release()
    print('Worker %i released semaphore' % n)

def worker2(n) :
    with sem:
        print('Worker %i acquired semaphore' % n)
        sleep(0)
    print('Worker %i released semaphore' % n)

pool = Pool()
pool.map(worker1, range(0.2))
pool.map(worker2, range(3.6))


""" Worker 0 acquired semaphore Worker 1 acquired semaphore Worker 0 released semaphore Worker 1 released semaphore Worker 3 acquired semaphore Worker 4 acquired semaphore Worker 3 released semaphore Worker 4 released semaphore Worker 5  acquired semaphore Worker 5 released semaphore """
Copy the code

A semaphore with bound of 1 is known as a Lock. it provides exclusive execution to one greenlet. They are often used to ensure that resources are only in use at one time in the context of a program.

A semaphore bounded by 1 is called a lock. It provides exclusive execution for a greenlet. They are typically used to ensure that resources are used only once in a program context.

Thread Locals

Gevent also allows you to specify data which is local to the greenlet context. Internally, this is implemented as a global lookup which addresses a private namespace keyed by the greenlet’s getcurrent() value.

Gevent also allows you to specify local data for the Greenlet context. Internally, this is implemented as a global lookup, which handles setting the private namespace of the key by the greenlet’s getCurrent () value.

import gevent
from gevent.local import local

stash = local()

def f1() :
    stash.x = 1
    print(stash.x)

def f2() :
    stash.y = 2
    print(stash.y)

    try:
        stash.x
    except AttributeError:
        print("x is not local to f2")

g1 = gevent.spawn(f1)
g2 = gevent.spawn(f2)

gevent.joinall([g1, g2])

"""
1
2
x is not local to f2
"""
Copy the code

Many web frameworks that use gevent store HTTP session objects inside gevent thread locals. For example, using the Werkzeug utility library and its proxy object we can create Flask-style request objects.

Many Web frameworks that use GEvent store HTTP session objects in gEvent thread-local variables. For example, using the Werkzeug utility library and its proxy objects, we can create flask-style request objects.

from gevent.local import local
from werkzeug.local import LocalProxy
from werkzeug.wrappers import Request
from contextlib import contextmanager

from gevent.wsgi import WSGIServer

_requests = local()
request = LocalProxy(lambda: _requests.request)

@contextmanager
def sessionmanager(environ) :
    _requests.request = Request(environ)
    yield
    _requests.request = None

def logic() :
    return "Hello " + request.remote_addr

def application(environ, start_response) :
    status = '200 OK'

    with sessionmanager(environ):
        body = logic()

    headers = [
        ('Content-Type'.'text/html')
    ]

    start_response(status, headers)
    return [body]

WSGIServer((' '.8000), application).serve_forever()
Copy the code

Flask’s system is a bit more sophisticated than this example, but the idea of using thread locals as local session storage is nonetheless the same.

Flask’s system is a little more complex than this example, but the idea of using thread-local variables as local session storage is the same.

Subprocess

As of gevent 1.0, gevent.subprocess — a patched version of Python’s subprocess module — has been added. It supports cooperative waiting on subprocesses.

Since gEvent 1.0, “gevent. Subprocess” — a patched version of Python’s “subprocess” module — has been added. Supports collaborative waiting sub-processes.

import gevent
from gevent.subprocess import Popen, PIPE

def cron() :
    while True:
        print("cron")
        gevent.sleep(0.2)

g = gevent.spawn(cron)
sub = Popen(['sleep 1; uname'], stdout=PIPE, shell=True)
out, err = sub.communicate()
g.kill()
print(out.rstrip())

"""
cron
cron
cron
cron
cron
Linux
"""
Copy the code

Many people also want to use gevent and multiprocessing together. One of the most obvious challenges is that inter-process communication provided by multiprocessing is not cooperative by default. Since multiprocessing.Connection-based objects (such as Pipe) expose their underlying file descriptors, gevent.socket.wait_read and wait_write can be used to cooperatively wait for ready-to-read/ready-to-write events before actually reading/writing:

Many people also want to use “gevent” with “multiprocessing.” One of the most obvious challenges is that interprocess communication provided by “multiprocessing” is not cooperative by default. Because based on multiprocessing. Connection – objects (such as Pipe) exposure to the underlying file descriptor gevent. Socket. Wait_read and wait_write can be used for collaborative waiting ready to read/write, and then you can actually read/write:

import gevent
from multiprocessing import Process, Pipe
from gevent.socket import wait_read, wait_write

# To Process
a, b = Pipe()

# From Process
c, d = Pipe()

def relay() :
    for i in range(10):
        msg = b.recv()
        c.send(msg + " in " + str(i))

def put_msg() :
    for i in range(10):
        wait_write(a.fileno())
        a.send('hi')

def get_msg() :
    for i in range(10):
        wait_read(d.fileno())
        print(d.recv())

if __name__ == '__main__':
    proc = Process(target=relay)
    proc.start()

    g1 = gevent.spawn(get_msg)
    g2 = gevent.spawn(put_msg)
    gevent.joinall([g1, g2], timeout=1)
Copy the code

Note, however, that the combination of multiprocessing and gevent brings along certain OS-dependent pitfalls, among others:

  • After forking on POSIX-compliant systems gevent’s state in the child is ill-posed. One side effect is that greenlets spawned before multiprocessing.Process creation run in both, parent and child process.
  • a.send() in put_msg() above might still block the calling thread non-cooperatively: a ready-to-write event only ensures that one byte can be written. The underlying buffer might be full before the attempted write is complete.
  • The wait_write() / wait_read()-based approach as indicated above does not work on Windows (IOError: 3 is not a socket (files are not supported)), because Windows cannot watch pipes for events.

The Python package gipc overcomes these challenges for you in a largely transparent fashion on both, POSIX-compliant and Windows systems. It provides gevent-aware multiprocessing.Process-based child processes and gevent-cooperative inter-process communication based on pipes.

Actors

The actor model is a higher level concurrency model popularized by the language Erlang. In short the main idea is that you have a collection of independent Actors which have an inbox from which they receive messages from other Actors. The main loop inside the Actor iterates through its messages and takes action according to its desired behavior.

The Actor model is a high-level concurrency model popularized by the Erlang language. In short, the main idea is that you have a collection of independent participants who have an inbox from which they can receive messages from other participants. The main loop in the Actor iterates over its messages and performs actions based on the desired behavior.

Gevent does not have a primitive Actor type, but we can define one very simply using a Queue inside of a subclassed Greenlet.

Gevent does not have a basic participant type, but we can define one quite simply using queues in the subclass Greenlet.

import gevent

class Actor(gevent.Greenlet) :

    def __init__(self) :
        self.inbox = queue.Queue()
        Greenlet.__init__(self)

    def receive(self, message) :
        """ Define in your subclass. """
        raise NotImplemented(a)def _run(self) :
        self.running = True

        while self.running:
            message = self.inbox.get()
            self.receive(message)
Copy the code

In a use case:

import gevent
from gevent.queue import Queue
from gevent import Greenlet

class Pinger(Actor) :
    def receive(self, message) :
        print message
        pong.inbox.put('ping')
        gevent.sleep(0)

class Ponger(Actor) :
    def receive(self, message) :
        print message
        ping.inbox.put('pong')
        gevent.sleep(0)

ping = Pinger()
pong = Ponger()

ping.start()
pong.start()

ping.inbox.put('start')
gevent.joinall([ping, pong])
Copy the code