其实当初想设计转发模块的时候是计划在框架内部实现负载均衡的功能的,但是经过测试发现效率并没有提升,我做了很多测试也尝试将代码在协程与多线程之间就行调优,但是速度始终不理想,最终还是使用Nginx去实现Tornado的多进程转发模式了。多进程配合Tornado的协程速度还是不错的。

socket转发

from tornado.tcpserver import TCPServer
from tornado.ioloop  import IOLoop
import tornado
import socket
import threading
import logging
from concurrent.futures import ThreadPoolExecutor
import random

from gevent import monkey
monkey.patch_all()

class Connection():
    def __init__(self,proxy, stream, address):
        print threading.current_thread()
        self._proxy = proxy
        self._proxy.clients.add(self)
        logging.debug("[connect] clients count:"+str(len(self._proxy.clients)))
        self._stream = stream
        self._address = address
        self._stream.set_close_callback(self.on_close)
        self.http_content = ""
        self.read_data()

    def read_data(self):
        self._stream.read_until("\r\n\r\n",self.analysis_head)

    def analysis_head(self, head):
        self.http_content = head
        fields = head.split("\r\n")
        field_map = {}
        for field in fields:
            kv = field.split(":")
            if len(kv) == 2:
                field_map[kv[0].lower()] = kv[1]
        if field_map.has_key("content-length"):
            self._stream.read_bytes(int(field_map["content-length"]),self.proxy_data)

    def write_finish(self):
        if self._stream.closed():
            return
        self._stream.close()

    def recvall(self, sock):
        BUFF_SIZE = 1024
        data = b''
        while True:
            part = sock.recv(BUFF_SIZE)
            data += part
            if len(part) < BUFF_SIZE:
                break
        return data

    def proxy_data(self, data):
        self.http_content += data
        logging.debug("http content:"+self.http_content)
        cur_server = ()
        if self._proxy.mode == 0:
            self._proxy.mutex.acquire()
            cur_server = self._proxy.server_list[self._proxy.cur_server]
            self._proxy.cur_server += 1
            if self._proxy.cur_server == self._proxy.server_count:
                self._proxy.cur_server = 0
            self._proxy.mutex.release()
        if self._proxy.mode == 1:
            self._proxy.cur_server = random.randint(0,self._proxy.server_count-1)
            cur_server = self._proxy.server_list[self._proxy.cur_server]

        logging.debug("proxy -> "+cur_server[0]+":"+str(cur_server[1]))

        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.connect(cur_server)
        s.send(self.http_content)
        ret_data = self.recvall(s)
        if not self._stream.closed():
            self._stream.write(ret_data,self.write_finish)
        s.close()

    def on_close(self):
        self._proxy.clients.remove(self)
        logging.debug("[close] clients count:"+str(len(self._proxy.clients)))


class Proxy(TCPServer):
    def __init__(self, port, server_list, mode):
        super(Proxy,self).__init__()
        self.clients = set()
        self.port = port
        self.server_list = server_list
        self.mode = mode
        self.cur_server = 0
        self.server_count = len(server_list)
        self.mutex = threading.Lock()
        self.executor = ThreadPoolExecutor()

    def worker(self, stream, address):
        Connection(self,stream, address)

    def handle_stream(self, stream, address):
        # self.executor.submit(self.worker, stream, address)
        Connection(self,stream, address)

    def start(self):
        self.listen(self.port)
        IOLoop.instance().start()

tornado.httpclient.AsyncHTTPClient()转发

import tornado.ioloop
import tornado.web
import tornado.httpclient
import tornado.gen
import logging
import threading

from gevent import monkey
monkey.patch_all()


class MainHandler(tornado.web.RequestHandler):

    def get(self, *args, **kwargs):
        try:
            logging.debug(str(self.request.remote_ip)+"\n"+"-"*80+"\n"+str(self.request.body)+"\n"+"-"*80)
            self.application.analysis.decrypt_http(self.request.body,self)
        except Exception as e: logging.error(str(e))

    def handle_request(self, response):
        if not response.error:
            self.write(response.body)
        self.finish()

    @tornado.web.asynchronous
    def post(self, *args, **kwargs):
        try:
            logging.debug(str(self.request.remote_ip)+"\n"+"-"*80+"\n"+str(self.request.body)+"\n"+"-"*80)
            if self.application.mode == 0:
                self.application.mutex.acquire()
                cur_server = self.application.servers[self.application.cur_server]
                self.application.cur_server += 1
                if self.application.cur_server == self.application.server_count:
                    self.application.cur_server = 0
                self.application.mutex.release()
                logging.debug("proxy -> "+cur_server)
                headers = self.request.headers
                body = self.request.body
                request = tornado.httpclient.HTTPRequest(cur_server,
                                                 method="POST", headers=headers, body=body, validate_cert=False)
                tornado.httpclient.AsyncHTTPClient().fetch(request,self.handle_request)
        except Exception as e: logging.error(str(e))

def make_app():
    return tornado.web.Application([
        (r"/", MainHandler),
    ])

class Proxy():
    def __init__(self, port, servers, mode):
        self._port = port
        self._servers = servers
        self._mode = mode

    def start(self):
        app = make_app()
        app.listen(self._port)
        app.mode = self._mode
        app.servers = self._servers
        app.cur_server = 0
        app.server_count = len(self._servers)
        app.mutex = threading.Lock()
        tornado.ioloop.IOLoop.current().start()