基于coroutine的gevent

coroutine也是一种并发模型,但不同于thread和callback,它的所有task都是可以在一个线程里面执行,然后可以通过在一个task里面主动放弃执行来切换到另一个task执行,它的调度是程序级的,不像thread是系统级的调度。gevent就是一个基于coroutine的python网络开发框架,不像twisted那样集成了很多库和协议,gevent非常精简,当然文档也很少,在性能上的话,看了PyCon上的视频,对比了twisted和其他的几个库,在内存的性能上非常地优秀,代码简洁而且也支持多核。

做了个相当简单的ChatService,有登录、群发消息、退出这3个消息,协议就是用简单的json了,有时间再试试xmpp看看。”monkey.patch_all()” 这个调用就把python原生的一些socket对象和方法替换成非阻塞的异步调用,写起来感觉还是很方便。

#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys
import simplejson
from gevent import monkey; monkey.patch_all();
from gevent.server import StreamServer
from common import Protocol, gen_pro_data, parse_pro_data

class ChatService:
    def __init__(self):
        self._clients = {}
        self._handlers = {
            Protocol.Connect : self.connect,
            Protocol.SendMsg : self.sendmsg,
            Protocol.Quit : self.quit,
        }

    def connect(self, sock, data):
        userinfo = self._clients.get(sock, None)
        if userinfo: return
        self._clients[sock] = data['name']
        print "New User : %s" % (data['name'],)

    def sendmsg(self, sock, data):
        msg = gen_pro_data(data)
        for s, u in self._clients.items():
            if sock != s:
                s.send(msg)

    def quit(self, sock, data=None):
        if sock in self._clients:
            print "User [%s] quit." % (self._clients[sock],)
            del self._clients[sock]
            sock.close()

    def process(self, sock, promsg):
        cmd = promsg['cmd']
        func = self._handlers[cmd]
        func(sock, promsg)

# server chat service
chatservice = ChatService()

def serve(sock, addr):
    while True:
        try:
            length = ord(sock.recv(1))
            content = sock.recv(length)
            # protocol :  length + json({'cmd' : 'sendmsg', 'msg' : 'hallo'})
            promsg = parse_pro_data(content)
            chatservice.process(sock, promsg)
        except:
            chatservice.quit(sock)
            break

if __name__ == "__main__":
    host = sys.argv[1]
    port = int(sys.argv[2])
    print "Start chat server on %s:%s" % (host, port)
    try:
        chatserver = StreamServer((host, port), serve)
        chatserver.serve_forever()
    except:
        chatserver.kill()

写了一个简单的测试,创建了5000个连接,然后定时发消息,服务器也没显得有什么压力。

def test():
    import gevent
    from gevent.pool import Pool

    def robot(idx, host, port):
        # client chat service
        chatservice = ChatService(host, port)
        chatservice.connect("user%s" % (idx,))
        while True:
            chatservice.sendmsg("hallo from %s" % (idx,))
            gevent.sleep(3)
    host = sys.argv[1]
    port = int(sys.argv[2])
    count = 5000
    pool = Pool(5000)
    for i in range(count):
        pool.spawn(robot, i, host, port)
    pool.join()
This entry was posted in Python. Bookmark the permalink.

Leave a Reply

Your email address will not be published. Required fields are marked *

*

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>