introduce
This guide assumes an intermediate level of Python, but does not require additional knowledge and does not expect concurrent knowledge. The goal of this guide is to give you the tools you need to get started with GEvent, help you tame existing concurrency problems, and start writing asynchronous applications from now on.
contributors
In chronological order of contribution: Stephen Diehl Jeremy Bethmont SWW Bruno Bigras David Ripton Travis Cline Boris Feld Youngsterxyf Eddie Hebert Alexis Metaireau Daniel Velkov
Thanks also to Denis Bilenko for writing gevent and the corresponding guide to form this guide.
This is a collaborative document distributed under an MIT license. You want to add something? Or see a typographical error? Fork a branch and post a request to Github. We welcome any contributions.
This page is also available in Japanese.
The core part of the
Greenlets
The main pattern used in GEvent is Greenlet, a lightweight coroutine that plugs into Python in the form of a C extension module. Greenlets all run inside the main program operating system process, but they are scheduled collaboratively.
Only one coroutine is running at any one time.
This is different from libraries such as Multiprocessing or threading that provide truly parallel constructs. These library rotations use processes and threads scheduled by the operating system and are truly parallel.
Synchronous and asynchronous execution
The core idea of concurrency is that large tasks can be broken down into a series of subtasks that can be scheduled to be executed simultaneously or asynchronously, rather than one at a time or synchronously. A switch between two subtasks is a context switch.
In GEvent, context switching was achieved using the use of yielding. In the following example, we have two contexts that yield to each other by calling gevent.sleep(0).
import gevent
def foo():
print('Running in foo')
gevent.sleep(0)
print('Explicit context switch to foo again')
def bar():
print('Explicit context to bar')
gevent.sleep(0)
print('Implicit context switch back to bar')
gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
])
Copy the code
Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar
Copy the code
The following diagram visualizes the flow of control, as if stepping through the entire program in a debugger, to illustrate how context switches occur.
Review images
When we use GEvent in functions that are limited to network or IO, these functions are scheduled collaboratively, and the true power of GEvent comes into play. Gevent handles all the details to ensure that your web library implicitly cedes execution of the Greenlet context when possible. It is hard to overstate how powerful such a use is. Or let’s elaborate with some examples.
The select() function in the following example is usually a blocking call polling on various file descriptors.
import time import gevent from gevent import select start = time.time() tic = lambda: 'at % 1.1 f seconds' % (time. Time () - start) def gr1 () : # Busy waits for a second, but we don' t want to stick around... print('Started Polling: %s' % tic()) select.select([], [], [], 2) print('Ended Polling: %s' % tic()) def gr2(): # Busy waits for a second, but we don't want to stick around... print('Started Polling: %s' % tic()) select.select([], [], [], 2) print('Ended Polling: %s' % tic()) def gr3(): print("Hey lets do some stuff while the greenlets poll, %s" % tic()) gevent.sleep(1) gevent.joinall([ gevent.spawn(gr1), gevent.spawn(gr2), gevent.spawn(gr3), ])Copy the code
Started Polling: at 0.0 seconds
Started Polling: at 0.0 seconds
Hey lets do some stuff while the greenlets poll, at 0.0 seconds
Ended Polling: at 2.0 seconds
Ended Polling: at 2.0 seconds
Copy the code
Here is another somewhat artificial example, defining a task function that is non-deterministic (given the same input, its output is not guaranteed to be the same). The side effect of executing this function in this example is that each time the task is stopped for some random second during its execution.
import gevent import random def task(pid): "" Some non-deterministic task """ gevent. Sleep (random. Randint (0,2)*0.001) print(' task %s done' % pid) def Synchronous (): for I in range(1,10): task(I) def asynchronous(): threads = [gevent.spawn(task, i) for i in xrange(10)] gevent.joinall(threads) print('Synchronous:') synchronous() print('Asynchronous:') asynchronous()Copy the code
Synchronous:
Task 1 done
Task 2 done
Task 3 done
Task 4 done
Task 5 done
Task 6 done
Task 7 done
Task 8 done
Task 9 done
Asynchronous:
Task 3 done
Task 7 done
Task 9 done
Task 2 done
Task 4 done
Task 1 done
Task 8 done
Task 6 done
Task 0 done
Task 5 done
Copy the code
In the above example, during the synchronization part, all tasks execute synchronously, resulting in the main process being blocked (execution of the main process temporarily halted) while each task is executing.
An important part of the program is to wrap the Task function into the gEvent.spawn of the Greenlet internal thread. The initialized list of Greenlets is stored in the array Threads, which is passed to the gEvent. Joinall function, which blocks the current process and executes all given Greenlets. The execution process will only proceed down after all greenlets have been executed.
It is important to note that the asynchronous part is random in nature, and the overall running time of the asynchronous part is much less than that of the synchronous part. In fact, the maximum running time for the synchronization portion was 0.002 seconds per task, resulting in 0.02 seconds for the entire queue. The maximum running time for the asynchronous part is roughly 0.002 seconds, since no task blocks the execution of any other task.
In a more common application scenario, such as asynchronously fetching data from a server, the execution time of the fetch operation depends on the load of the remote server at the time of the request, and the execution time of each request may vary.
import gevent.monkey gevent.monkey.patch_socket() import gevent import urllib2 import simplejson as json def fetch(pid): response = urllib2.urlopen('http://json-time.appspot.com/time.json') result = response.read() json_result = json.loads(result) datetime = json_result['datetime'] print('Process %s: %s' % (' synchronous') return json_result['datetime'] def synchronous(): for I in range(1,10): Fetch (I) def asynchronous(): threads = [] for I in range(1,10): threads.append(gevent.spawn(fetch, i)) gevent.joinall(threads) print('Synchronous:') synchronous() print('Asynchronous:') asynchronous()Copy the code
deterministic
As mentioned earlier, greenlets are deterministic. With the same configuration and the same input, they always produce the same output. Here is an example where we perform a series of tasks between pools of Multiprocessing compared to gEvent pools.
import time def echo(i): Time.sleep (0.001) return I # Non Deterministic Process Pool from multiprocessing. Pool import Pool p = Pool(10) run1 = [a for a in p.imap_unordered(echo, xrange(10))] run2 = [a for a in p.imap_unordered(echo, xrange(10))] run3 = [a for a in p.imap_unordered(echo, xrange(10))] run4 = [a for a in p.imap_unordered(echo, xrange(10))] print(run1 == run2 == run3 == run4) # Deterministic Gevent Pool from gevent.pool import Pool p = Pool(10) run1 = [a for a in p.imap_unordered(echo, xrange(10))] run2 = [a for a in p.imap_unordered(echo, xrange(10))] run3 = [a for a in p.imap_unordered(echo, xrange(10))] run4 = [a for a in p.imap_unordered(echo, xrange(10))] print(run1 == run2 == run3 == run4)Copy the code
False
TrueCopy the code
Even though GEvents are usually deterministic, uncertainty can creep into your program when you start interacting with external services such as sockets or files. So while the GEvent thread is a form of “determined concurrency,” using it can still lead to problems like those encountered with POSIX threads or processes.
A long-standing problem involving concurrency is the race condition. Simply put, a race condition occurs when two concurrent threads/processes are both dependent on a shared resource and both attempt to modify it. This causes the resulting state of a resource modification to depend on time and execution order. This is a problem, and we generally do a lot of work to try to avoid race conditions, because it makes the whole program behavior uncertain.
The best approach is to always avoid all global states. Global state and import-time side effects always come back to bite you!
Create Greenlets
Gevent provides some encapsulation for Greenlet initialization, and one of the most commonly used templates is
import gevent from gevent import Greenlet def foo(message, n): """ Each thread will be passed the message, and n arguments in its initialization. """ gevent.sleep(n) print(message) # Initialize a new Greenlet instance running the named function # foo thread1 = Greenlet.spawn(foo, "Hello", 1) # Wrapper for creating and running a new Greenlet from the named # function foo, with the passed arguments thread2 = gevent.spawn(foo, "I live!" , 2) # Lambda expressions thread3 = gevent.spawn(lambda x: (x+1), 2) threads = [thread1, thread2, thread3] # Block until all threads complete. gevent.joinall(threads)Copy the code
Hello
I live!
Copy the code
In addition to using the basic Greenlet class, you can subclass the Greenlet class and override its _run method.
import gevent from gevent import Greenlet class MyGreenlet(Greenlet): def __init__(self, message, n): Greenlet.__init__(self) self.message = message self.n = n def _run(self): print(self.message) gevent.sleep(self.n) g = MyGreenlet("Hi there!" , 3) g.start() g.join()Copy the code
Hi there!
Copy the code
Greenlet state
Just like any other piece of code, a Greenlet can fail in different ways. The Greenlet may fail to throw an exception, fail to stop running, or consume too many system resources.
The state of a greenlet is usually a time-dependent parameter. There are several flags in the Greenlet that allow you to monitor its internal thread state:
started
— Boolean, indicating whether the Greenlet has been startedready()
— Boolean, indicating whether this Greenlet has been stoppedsuccessful()
— Boolean, indicating whether this Greenlet has been stopped and no exceptions thrownvalue
— Any value returned by this Greenlet codeexception
— exception, an uncaught exception thrown in this Greenlet
import gevent def win(): return 'You win! ' def fail(): raise Exception('You fail at failing.') winner = gevent.spawn(win) loser = gevent.spawn(fail) print(winner.started) # True print(loser.started) # True # Exceptions raised in the Greenlet, stay inside the Greenlet. try: gevent.joinall([winner, loser]) except Exception as e: print('This will never be reached') print(winner.value) # 'You win! ' print(loser.value) # None print(winner.ready()) # True print(loser.ready()) # True print(winner.successful()) # True print(loser.successful()) # False # The exception raised in fail, will not propogate outside the # greenlet. A stack trace will be printed to stdout but it # will not unwind the stack of the parent. print(loser.exception) # It is possible though to raise the exception again outside # raise loser.exception # or with # loser.get()Copy the code
True
True
You win!
None
True
True
True
False
You fail at failing.
Copy the code
The program to stop
When the main program receives a SIGQUIT signal, a Greenlet that fails to yield may unexpectedly suspend execution of the program. This results in so-called zombie processes that need to be killed outside of the Python interpreter.
A common handling pattern for this is to listen for the SIGQUIT signal in the main program and call gevent.shutdown when the program exits.
import gevent
import signal
def run_forever():
gevent.sleep(1000)
if __name__ == '__main__':
gevent.signal(signal.SIGQUIT, gevent.shutdown)
thread = gevent.spawn(run_forever)
thread.join()
Copy the code
timeout
A timeout is a constraint on the running time of a piece of code or a Greenlet.
import gevent
from gevent import Timeout
seconds = 10
timeout = Timeout(seconds)
timeout.start()
def wait():
gevent.sleep(10)
try:
gevent.spawn(wait).join()
except Timeout:
print('Could not complete')
Copy the code
The timeout class can also be used in the Context Manager, in the with statement.
import gevent
from gevent import Timeout
time_to_wait = 5 # seconds
class TooLong(Exception):
pass
with Timeout(time_to_wait, TooLong):
gevent.sleep(10)
Copy the code
In addition, gEvent also provides timeout parameters for various greenlets and data structure-related calls. Such as:
import gevent
from gevent import Timeout
def wait():
gevent.sleep(2)
timer = Timeout(1).start()
thread1 = gevent.spawn(wait)
try:
thread1.join(timeout=timer)
except Timeout:
print('Thread 1 timed out')
# --
timer = Timeout.start_new(1)
thread2 = gevent.spawn(wait)
try:
thread2.get(timeout=timer)
except Timeout:
print('Thread 2 timed out')
# --
try:
gevent.with_timeout(1, wait)
except Timeout:
print('Thread 3 timed out')
Copy the code
Thread 1 timed out
Thread 2 timed out
Thread 3 timed out
Copy the code
Monkey patching
Here we are in the dead end of Gevent. Until now, I’ve avoided mentioning Monkey patching in an attempt to bring the powerful coroutine model of Gevent to life, but now it’s time to discuss the black art of monkey patching. You may have noticed earlier that we mentioned the monkey.patch_socket() command, a purely side effect command used to change the standard socket library.
import socket
print(socket.socket)
print("After monkey patch")
from gevent import monkey
monkey.patch_socket()
print(socket.socket)
import select
print(select.select)
monkey.patch_select()
print("After monkey patch")
print(select.select)
Copy the code
class 'socket.socket'
After monkey patch
class 'gevent.socket.socket'
built-in function select
After monkey patch
function select at 0x1924de8
Copy the code
Python’s runtime environment allows us to modify most objects at run time, including modules, classes, and even functions. This is a surprisingly bad idea in general because it creates “implicit side effects” that are often extremely difficult to debug if something goes wrong. Nonetheless, monkey patches come in handy in extreme cases when a library needs to modify the underlying behavior of Python itself. In this case, GEvent can modify most of the blocking system calls in the standard library, including modules such as Socket, SSL, threading, and SELECT, to run cooperatively.
For example, the Python binding of Redis typically uses regular TCP sockets to communicate with the Redis – Server instance. By simply calling gevent.monkey.patch_all(), you can make Redis’s binding collaborative scheduling requests work with the rest of the GEvent stack.
This allows us to combine libraries that normally wouldn’t work with GEvent without having to write a single line of code. While the Monkey patch is still evil, it is “useful Evil” in this case.
The data structure
The event
Events are a form of asynchronous communication between greenlets.
import gevent
from gevent.event import Event
'''
Illustrates the use of events
'''
evt = Event()
def setter():
'''After 3 seconds, wake all threads waiting on the value of evt'''
print('A: Hey wait for me, I have to do something')
gevent.sleep(3)
print("Ok, I'm done")
evt.set()
def waiter():
'''After 3 seconds the get call will unblock'''
print("I'll wait for you")
evt.wait() # blocking
print("It's about time")
def main():
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter)
])
if __name__ == '__main__': main()
Copy the code
An extension of the event object is AsyncResult, which allows you to append a value to the wake call. It is also sometimes called future or Defered because it holds a reference to a future value that can be set to any value at any time.
import gevent from gevent.event import AsyncResult a = AsyncResult() def setter(): """ After 3 seconds set the result of a. """ gevent.sleep(3) a.set('Hello! ') def waiter(): """ After 3 seconds the get call will unblock after the setter puts a value into the AsyncResult. """ print(a.get()) gevent.joinall([ gevent.spawn(setter), gevent.spawn(waiter), ])Copy the code
The queue
A queue is a sorted collection of data that has the usual PUT/GET operations, but it is implemented in a way that is safe to operate between greenlets.
For example, if a Greenlet fetches an item from the queue, that item will not be fetched by other greenlets executing at the same time.
import gevent from gevent.queue import Queue tasks = Queue() def worker(n): while not tasks.empty(): task = tasks.get() print('Worker %s got task %s' % (n, task)) gevent.sleep(0) print('Quitting time! ') def boss(): for I in xrange(1,25): tasks.put_nowait(i) gevent.spawn(boss).join() gevent.joinall([ gevent.spawn(worker, 'steve'), gevent.spawn(worker, 'john'), gevent.spawn(worker, 'nancy'), ])Copy the code
Worker steve got task 1
Worker john got task 2
Worker nancy got task 3
Worker steve got task 4
Worker nancy got task 5
Worker john got task 6
Worker steve got task 7
Worker john got task 8
Worker nancy got task 9
Worker steve got task 10
Worker nancy got task 11
Worker john got task 12
Worker steve got task 13
Worker john got task 14
Worker nancy got task 15
Worker steve got task 16
Worker nancy got task 17
Worker john got task 18
Worker steve got task 19
Worker john got task 20
Worker nancy got task 21
Worker steve got task 22
Worker nancy got task 23
Worker john got task 24
Quitting time!
Quitting time!
Quitting time!
Copy the code
Queues can also block on put or GET operations if desired.
Both put and GET operations have non-blocking versions. Put_nowait and get_nowait do not block, but raise gevent.queue.Empty or gEvent.queue.
In the example below, we have a boss running with multiple workers at the same time and limit the queue to no more than three elements. This limitation means that the PUT operation is blocked until the queue has free space. Conversely, if there are no elements in the queue, the GET operation is blocked. It also takes a timeout argument, allowing gEvent.queue.Empty to be thrown if the queue cannot complete within the timeout period.
import gevent from gevent.queue import Queue, Empty tasks = Queue(maxsize=3) def worker(n): try: while True: task = tasks.get(timeout=1) # decrements queue size by 1 print('Worker %s got task %s' % (n, task)) gevent.sleep(0) except Empty: print('Quitting time! ') def boss(): """ Boss will wait to hand out work until a individual worker is free since the maxsize of the task queue is 3. """ for I in xrange(1,10): tasks. Put (I) print('Assigned all work in iteration 1') for I in xrange(10,20): tasks.put(i) print('Assigned all work in iteration 2') gevent.joinall([ gevent.spawn(boss), gevent.spawn(worker, 'steve'), gevent.spawn(worker, 'john'), gevent.spawn(worker, 'bob'), ])Copy the code
Worker steve got task 1 Worker john got task 2 Worker bob got task 3 Worker steve got task 4 Worker bob got task 5 Worker john got task 6 Assigned all work in iteration 1 Worker steve got task 7 Worker john got task 8 Worker bob got task 9 Worker steve got task 10 Worker bob got task 11 Worker john got task 12 Worker steve got task 13 Worker john got task 14 Worker bob got task 15 Worker steve got task 16 Worker bob got task 17 Worker john got task 18 Assigned all work in iteration 2 Worker steve got task 19 Quitting time! Quitting time! Quitting time!Copy the code
Group and the pool
A group is a collection of running greenlets that are collectively managed and scheduled like a group. It also doubles as a parallel scheduler like Python’s Multiprocessing library.
import gevent
from gevent.pool import Group
def talk(msg):
for i in xrange(3):
print(msg)
g1 = gevent.spawn(talk, 'bar')
g2 = gevent.spawn(talk, 'foo')
g3 = gevent.spawn(talk, 'fizz')
group = Group()
group.add(g1)
group.add(g2)
group.join()
group.add(g3)
group.join()
Copy the code
bar
bar
bar
foo
foo
foo
fizz
fizz
fizz
Copy the code
It can be very useful in managing groups of asynchronous tasks.
As mentioned above, groups also provide apis for grouping greenlet/ distribution efforts and collecting their results in different ways.
import gevent
from gevent import getcurrent
from gevent.pool import Group
group = Group()
def hello_from(n):
print('Size of group %s' % len(group))
print('Hello from Greenlet %s' % id(getcurrent()))
group.map(hello_from, xrange(3))
def intensive(n):
gevent.sleep(3 - n)
return 'task', n
print('Ordered')
ogroup = Group()
for i in ogroup.imap(intensive, xrange(3)):
print(i)
print('Unordered')
igroup = Group()
for i in igroup.imap_unordered(intensive, xrange(3)):
print(i)
Copy the code
Size of group 3
Hello from Greenlet 31048720
Size of group 3
Hello from Greenlet 31049200
Size of group 3
Hello from Greenlet 31049040
Ordered
('task', 0)
('task', 1)
('task', 2)
Unordered
('task', 2)
('task', 1)
('task', 0)
Copy the code
A pool is a structure designed to handle greenlets that vary in number and need to limit concurrency. It is often needed when you need to do many network – and IO-constrained tasks in parallel.
import gevent
from gevent.pool import Pool
pool = Pool(2)
def hello_from(n):
print('Size of pool %s' % len(pool))
pool.map(hello_from, xrange(3))
Copy the code
Size of pool 2
Size of pool 2
Size of pool 1
Copy the code
When constructing GEvent-driven services, you often center the entire service around a pool structure. An example is the class polling on various sockets.
from gevent.pool import Pool
class SocketPool(object):
def __init__(self):
self.pool = Pool(1000)
self.pool.start()
def listen(self, socket):
while True:
socket.recv()
def add_handler(self, socket):
if self.pool.full():
raise Exception("At maximum pool size")
else:
self.pool.spawn(self.listen, socket)
def shutdown(self):
self.pool.kill()
Copy the code
Locks and semaphores
A semaphore is a low-level synchronization primitive that allows greenlets to cooperate with each other to limit concurrent access or running. Semaphores have two methods, acquire and release. The difference between whether a semaphore has been acquired or released and the number of resources it has is called the bound of the semaphore. If a semaphore’s range has been reduced to 0, it blocks the acquire operation until another greenlet that has acquired the semaphore releases it.
from gevent import sleep from gevent.pool import Pool from gevent.coros import BoundedSemaphore sem = BoundedSemaphore(2) def worker1(n): sem.acquire() print('Worker %i acquired semaphore' % n) sleep(0) sem.release() print('Worker %i released semaphore' % n) def worker2(n): with sem: print('Worker %i acquired semaphore' % n) sleep(0) print('Worker %i released semaphore' % n) pool = Pool() The pool. The map (worker1 and xrange (0, 2)) pool. The map (worker2, xrange (3, 6))Copy the code
Worker 0 acquired semaphore Worker 1 acquired semaphore Worker 0 released semaphore Worker 1 released semaphore Worker 3 acquired semaphore Worker 4 acquired semaphore Worker 3 released semaphore Worker 4 released semaphore Worker 5 acquired semaphore Worker 5 released semaphoreCopy the code
A semaphore of range 1 is also called a lock. It provides mutually exclusive access to a single greenlet. Semaphores and locks are often used to ensure that resources are used only once in a program context.
Thread-local variable
Gevent also lets you specify data that is local to the Greenlet context. Internally, it is implemented as a global lookup addressed in a private namespace with greenlet’s getCurrent () as the key.
import gevent
from gevent.local import local
stash = local()
def f1():
stash.x = 1
print(stash.x)
def f2():
stash.y = 2
print(stash.y)
try:
stash.x
except AttributeError:
print("x is not local to f2")
g1 = gevent.spawn(f1)
g2 = gevent.spawn(f2)
gevent.joinall([g1, g2])
Copy the code
1
2
x is not local to f2
Copy the code
Many Web frameworks that integrate GEvents store HTTP session objects within GEvents as thread-local variables. For example, using the Werkzeug utility library and its Proxy object, we can create flask-style request objects.
from gevent.local import local
from werkzeug.local import LocalProxy
from werkzeug.wrappers import Request
from contextlib import contextmanager
from gevent.wsgi import WSGIServer
_requests = local()
request = LocalProxy(lambda: _requests.request)
@contextmanager
def sessionmanager(environ):
_requests.request = Request(environ)
yield
_requests.request = None
def logic():
return "Hello " + request.remote_addr
def application(environ, start_response):
status = '200 OK'
with sessionmanager(environ):
body = logic()
headers = [
('Content-Type', 'text/html')
]
start_response(status, headers)
return [body]
WSGIServer(('', 8000), application).serve_forever()
Copy the code
The Flask system is a bit more complex than this example, but the idea is the same, using thread-local variables as local session storage.
The child process
Since GEvent 1.0, gEvent. Subprocess, a patched version of the Python subprocess module, has been added. It supports cooperative wait subprocesses.
import gevent from gevent.subprocess import Popen, PIPE def cron(): while True: Print ("cron") gevent. Spawn (0.2) g = gevent. Spawn (cron) sub = Popen(['sleep 1; uname'], stdout=PIPE, shell=True) out, err = sub.communicate() g.kill() print(out.rstrip())Copy the code
cron
cron
cron
cron
cron
Linux
Copy the code
Many people also want to use GEvent with Multiprocessing. One of the most obvious challenges is that interprocess communication provided by Multiprocessing is not collaborative by default. Because based on multiprocessing. The Connection object (such as Pipe) exposes them below the file descriptor (file descriptor), Wait_read and wait_write can be used to cooperatively wait for ready-to-read/ready-to-write events before directly reading or writing.
import gevent
from multiprocessing import Process, Pipe
from gevent.socket import wait_read, wait_write
# To Process
a, b = Pipe()
# From Process
c, d = Pipe()
def relay():
for i in xrange(10):
msg = b.recv()
c.send(msg + " in " + str(i))
def put_msg():
for i in xrange(10):
wait_write(a.fileno())
a.send('hi')
def get_msg():
for i in xrange(10):
wait_read(d.fileno())
print(d.recv())
if __name__ == '__main__':
proc = Process(target=relay)
proc.start()
g1 = gevent.spawn(get_msg)
g2 = gevent.spawn(put_msg)
gevent.joinall([g1, g2], timeout=1)
Copy the code
Note, however, that the combination of multiprocessing and gEvent necessarily introduces os-dependent drawbacks, including:
-
After the posiX-compatible system creates the child process (Forking), the gEvent state in the child process is ill-posed (ill-posed). As a side effect, greenlet creation before multiprocessing.Process is created is run on both parent and child processes.
-
A.end () in put_msg() may still block the calling thread uncooperatively: a ready-to-write event guarantees that only one byte has been written. The bottom buffer may be full before attempting to write.
-
The wait_write()/wait_read() method shown above does not work on Windows (IOError: 3 is not a socket (files are not supported)), because Windows cannot monitor pipe events.
The Python package GIPC overcomes these challenges in a largely transparent manner on POSIX compatible systems and Windows. It provides gEvent aware child processes based on Multiprocessing.process and gEvent collaborative interprocess communication based on PIPE.
Actors
The Actor model is a higher-level concurrency model popularized by Erlang. In a nutshell, the main idea is that there are many independent actors, each with an inbox that can receive messages from other actors. The main loop inside the Actor iterates over the messages it receives and takes action based on the behavior it expects.
Gevent has no native Actor type, but using queues within a subclassed Greenlet, we can define a very simple one.
import gevent
from gevent.queue import Queue
class Actor(gevent.Greenlet):
def __init__(self):
self.inbox = Queue()
Greenlet.__init__(self)
def receive(self, message):
"""
Define in your subclass.
"""
raise NotImplemented()
def _run(self):
self.running = True
while self.running:
message = self.inbox.get()
self.receive(message)
Copy the code
Here is an example in use:
import gevent
from gevent.queue import Queue
from gevent import Greenlet
class Pinger(Actor):
def receive(self, message):
print(message)
pong.inbox.put('ping')
gevent.sleep(0)
class Ponger(Actor):
def receive(self, message):
print(message)
ping.inbox.put('pong')
gevent.sleep(0)
ping = Pinger()
pong = Ponger()
ping.start()
pong.start()
ping.inbox.put('start')
gevent.joinall([ping, pong])
Copy the code
Real world applications
Gevent ZeroMQ
ZeroMQ is described by its authors as "a socket library that behaves like a concurrency framework." It is a very powerful messaging layer for building concurrent and distributed applications.
ZeroMQ provides a variety of socket primitives. The simplest is a request-response socket pair. A socket has two methods send and recv, both of which are blocking operations. But Travis Cline has an excellent library that compensates for this by using gevent.socket to poll ZereMQ sockets in a non-blocking manner. By command:
pip install gevent-zeromq
You can install GEvent-ZeremQ from PyPi.
# Note: Remember to ``pip install pyzmq gevent_zeromq`` import gevent from gevent_zeromq import zmq # Global Context context = zmq.Context() def server(): Bind (" TCP ://127.0.0.1:5000") for request in range(1,10): server_socket.send("Hello") print('Switched to Server for %s' % request) # Implicit context switch occurs here server_socket.recv() def client(): Client_socket = context.socket(zmq.REP) client_socket.connect(" TCP ://127.0.0.1:5000") for request in range(1,10): client_socket.recv() print('Switched to Client for %s' % request) # Implicit context switch occurs here client_socket.send("World") publisher = gevent.spawn(server) client = gevent.spawn(client) gevent.joinall([publisher, client])Copy the code
Switched to Server for 1
Switched to Client for 1
Switched to Server for 2
Switched to Client for 2
Switched to Server for 3
Switched to Client for 3
Switched to Server for 4
Switched to Client for 4
Switched to Server for 5
Switched to Client for 5
Switched to Server for 6
Switched to Client for 6
Switched to Server for 7
Switched to Client for 7
Switched to Server for 8
Switched to Client for 8
Switched to Server for 9
Switched to Client for 9
Copy the code
Simple server
# On Unix: Access with ' '$nc 127.0.0.1 5000' '# On Window: Access with ' '$Telnet 127.0.0.1 5000' 'from gevent.server import StreamServer def handle(socket, address): socket.send("Hello from a telnet! \n") for i in range(5): Socket. Send (STR (I) + '\n') socket. Close () server = StreamServer(('127.0.0.1', 5000), handle) server.serve_forever()Copy the code
WSGI Servers
Gevent provides two WSGI Servers for HTTP content services. Henceforth called WSGI and PyWSGi:
- gevent.wsgi.WSGIServer
- gevent.pywsgi.WSGIServer
In earlier versions before 1.0.x, gEvent used libevent instead of libev. Libevent includes a fast HTTP Server that is used in gevent's WSGI Server.
In gEvent 1.0.x, HTTP Server is not included. Instead, gevent.wsgi is now an alias for pure Python Server gEvent. pywsgi.
Streaming server
This section does not apply to gEvent 1.0.x
Those familiar with the Streaming HTTP Service know that its core idea is not to specify the length of the content in the header. Instead, we leave the connection open, prefix each block of data with a hexadecimal byte to indicate the length of the block, and brush the data into the pipe. The stream is closed when a block of 0 length is emitted.
HTTP/1.1 200 OK
Content-Type: text/plain
Transfer-Encoding: chunked
8
Hello
9
World
0
Copy the code
The HTTP connection described above cannot be created in WSGI because it does not support streaming. The request is only buffered down.
from gevent.wsgi import WSGIServer
def application(environ, start_response):
status = '200 OK'
body = 'Hello World
'
headers = [
('Content-Type', 'text/html')
]
start_response(status, headers)
return [body]
WSGIServer(('', 8000), application).serve_forever()
Copy the code
With PyWSGi, however, we can write the handler as a generator and yield the result as a block.
from gevent.pywsgi import WSGIServer
def application(environ, start_response):
status = '200 OK'
headers = [
('Content-Type', 'text/html')
]
start_response(status, headers)
yield "Hello"
yield "World
"
WSGIServer(('', 8000), application).serve_forever()
Copy the code
However, gEvent Server performance is superior compared to other Python Servers. Libev is a very well-vetted technology, and servers written from it are well known for performing well on a large scale.
To test the Benchmark, try Apache Benchmark AB or browse Benchmark of Python WSGI Servers to compare with other Servers.
$ab-n 10000-c 100 http://127.0.0.1:8000/Copy the code
Long Polling
import gevent
from gevent.queue import Queue, Empty
from gevent.pywsgi import WSGIServer
import simplejson as json
data_source = Queue()
def producer():
while True:
data_source.put_nowait('Hello World')
gevent.sleep(1)
def ajax_endpoint(environ, start_response):
status = '200 OK'
headers = [
('Content-Type', 'application/json')
]
start_response(status, headers)
while True:
try:
datum = data_source.get(timeout=5)
yield json.dumps(datum) + '\n'
except Empty:
pass
gevent.spawn(producer)
WSGIServer(('', 8000), ajax_endpoint).serve_forever()
Copy the code
Websockets
The example of running Websocket requires the gevent-webSocket package.
# Simple gevent-websocket server import json import random from gevent import pywsgi, sleep from geventwebsocket.handler import WebSocketHandler class WebSocketApp(object): '''Send random data to the websocket''' def __call__(self, environ, start_response): ws = environ['wsgi.websocket'] x = 0 while True: data = json.dumps({'x': x, 'y': Random. Randint (1, 5)}) ws.send(data) x += 1 sleep(0.5) server = pywsGi.wsgiserver (("", 10000), WebSocketApp() handler_class=WebSocketHandler) server.serve_forever()Copy the code
HTML Page:
Minimal websocket application
WebSocket Example
Not Connected
Copy the code
Chat server
One last vivid example of implementing a live chat room. Flask is required to run this example (you can use Django, Pyramid, etc, but not required). The corresponding Javascript and HTML files can be found here.
# Micro gevent chatroom. # ---------------------- from flask import Flask, render_template, request from gevent import queue from gevent.pywsgi import WSGIServer import simplejson as json app = Flask(__name__) app.debug = True rooms = { 'topic1': Room(), 'topic2': Room(), } users = {} class Room(object): def __init__(self): self.users = set() self.messages = [] def backlog(self, size=25): return self.messages[-size:] def subscribe(self, user): self.users.add(user) def add(self, message): for user in self.users: print(user) user.queue.put_nowait(message) self.messages.append(message) class User(object): def __init__(self): self.queue = queue.Queue() @app.route('/') def choose_name(): return render_template('choose.html') @app.route('/') def main(uid): return render_template('main.html', uid=uid, rooms=rooms.keys() ) @app.route('//') def join(room, uid): user = users.get(uid, None) if not user: users[uid] = user = User() active_room = rooms[room] active_room.subscribe(user) print('subscribe %s %s' % (active_room, user)) messages = active_room.backlog() return render_template('room.html', room=room, uid=uid, messages=messages) @app.route("/put//", methods=["POST"]) def put(room, uid): user = users[uid] room = rooms[room] message = request.form['message'] room.add(':'.join([uid, message])) return '' @app.route("/poll/", methods=["POST"]) def poll(uid): try: msg = users[uid].queue.get(timeout=10) except queue.Empty: msg = [] return json.dumps(msg) if __name__ == "__main__": http = WSGIServer(('', 5000), app) http.serve_forever()Copy the code