Lolizeppelin's Blog

python kombu与amqp(3)

Posted on By gcy

来看看上一节kombu的connection中的transport

# amqp.transport.TCPTransport
class TCPTransport(_AbstractTransport):

    def _setup_transport(self):
        # 找到落地socket.send了
        self._write = self.sock.sendall
        self._read_buffer = EMPTY_BUFFER
        # 落地socket.recv
        self._quick_recv = self.sock.recv

    def _read(self, n, initial=False, _errnos=(errno.EAGAIN, errno.EINTR)):
        # initial表示从头开始读
        recv = self._quick_recv
        # 处理分包年粘包的变量
        rbuf = self._read_buffer
        try:
            # 读取指定长度
            while len(rbuf) < n:
                try:
                    s = recv(n - len(rbuf))
                except socket.error as exc:
                    # 可以接受的socket错误
                    if exc.errno in _errnos:
                        # 抛出timeout错误
                        # 外部只有读包头才会设置initial=True
                        # 包头7字节中任意一个自己字节读取过程有错都把之前的部分抛掉
                        if initial and self.raise_on_initial_eintr:
                            raise socket.timeout()
                        # 否则继续读socket数据
                        continue
                    raise
                # 读到空返回,socket关闭
                if not s:
                    raise IOError('Socket closed')
                rbuf += s
        except:
            self._read_buffer = rbuf
            raise
        # self._read_buffer切片,切掉返回的帧数
        # 这里处理玩分包粘包返回完整协议帧
        result, self._read_buffer = rbuf[:n], rbuf[n:]
        return result


class _AbstractTransport(object):
    connected = False
    def __init__(self, host, connect_timeout=None,
                 read_timeout=None, write_timeout=None,
                 socket_settings=None, raise_on_initial_eintr=True, **kwargs):
        self.connected = True
        self.sock = None
        self.raise_on_initial_eintr = raise_on_initial_eintr
        self._read_buffer = EMPTY_BUFFER
        self.host, self.port = to_host_port(host)
        self.connect_timeout = connect_timeout
        self.read_timeout = read_timeout
        self.write_timeout = write_timeout
        self.socket_settings = socket_settings

    def connect(self):
        self._connect(self.host, self.port, self.connect_timeout)
        self._init_socket(
            self.socket_settings, self.read_timeout, self.write_timeout,
        )

    def read_frame(self, unpack=unpack):
        read = self._read
        # 初始化保存整帧变量, 不用''是兼容2、3
        read_frame_buffer = EMPTY_BUFFER
        try:
            # 读取帧头7个字节,从头开始读
            # 后面的True配合raise_on_initial_eintr
            # 是为了保证第一个包出错能截掉错误的数据
            frame_header = read(7, True)
            # 真帧数据包括原始包头
            read_frame_buffer += frame_header
            # 从帧头解析出帧长度
            frame_type, channel, size = unpack('>BHI', frame_header)
            # 最大帧长度超过int的上限
            # 差成两部分.....
            # 也就是说...要是长度超过2倍int上限就有问题
            if size > SIGNED_INT_MAX:
                part1 = read(SIGNED_INT_MAX)
                part2 = read(size - SIGNED_INT_MAX)
                payload = ''.join([part1, part2])
            else:
                # 读出包体
                payload = read(size)
            read_frame_buffer += payload
            # 在多读一个字节
            ch = ord(read(1))
        except socket.timeout:
            self._read_buffer = read_frame_buffer + self._read_buffer
            raise
        except (OSError, IOError, SSLError, socket.error) as exc:
            # Don't disconnect for ssl read time outs
            # http://bugs.python.org/issue10272
            if isinstance(exc, SSLError) and 'timed out' in str(exc):
                raise socket.timeout()
            if get_errno(exc) not in _UNAVAIL:
                self.connected = False
            raise
        # 包尾必须是206
        if ch == 206:  # '\xce'
            # 最后一个payload是普通字节
            # 所以把msg再封装的部分在channel的dispatch_method中
            # channel的id没有在客户端connect的时候设置
            # channel的id值是rabbit自己设置的?
            return frame_type, channel, payload
        else:
            raise UnexpectedFrame(
                'Received {0:#04x} while expecting 0xce'.format(ch))

下面我们来看看channel的dispatch_method


# kombu.transport.pyamqp.Channel
# 这是我们使用的类
class Channel(amqp.Channel, base.StdChannel):
    # 定义Message类
    Message = Message
    ....
    def message_to_python(self, raw_message):
        # 把帧msg转化为Message类
        return self.Message(raw_message, channel=self)

# amqp.channel.Channel
class Channel(AbstractChannel):
    # 和amqp.connection.Connection一样继承自AbstractChannel
    _METHODS = {
    spec.method(spec.Channel.Close, 'BsBB'),
    spec.method(spec.Channel.CloseOk),
    spec.method(spec.Channel.Flow, 'b'),
    spec.method(spec.Channel.FlowOk, 'b'),
    ....
    }
    # 上面是个蛋碎的python2.7+语法糖
    # 结果是
    # set([method_t(method_sig=(20, 41), args=None, content=False),
    # method_t(method_sig=(20, 40), args='BsBB', content=False)])
    # 是一个set不是dict............简直卧槽
    _METHODS = {m.method_sig: m for m in _METHODS}
    # 上面的set转换成字典
    # {(20, 41): method_t(method_sig=(20, 41), args=None, content=False),
    # (20, 40): method_t(method_sig=(20, 40), args='BsBB', content=False)}
    # 上述写法倒是提供了一些参数处理的思路
    def __init__(self, connection,
                 channel_id=None, auto_decode=True, on_open=None):
        # 一般最终是在amqp.connection.Connection生成Channel实例
        # connection当然也就是amqp.connection.Connection
        # 一个connection对应多个Channel
        # ------------------------------
        # 初始化的时候指定了channel_id
        # 让connection将channel_id从可用id队列中移除
        # connection生成channel一般都不指定id
        if channel_id:
            connection._claim_channel_id(channel_id)
        # 让connection自动分配一个可用channel_id
        else:
            channel_id = connection._get_free_channel_id()
        AMQP_LOGGER.debug('using channel_id: %s', channel_id)
        # 父类中将调用_setup_listeners注册回调函数
        super(Channel, self).__init__(connection, channel_id)
        # 消费者的callback
        # key是consumer_tag
        # 外部callback存放位置,key是consumer_tag,value就是callable对象
        self.callbacks = {}
        # 内部callback, channel关注的method_sig的回调
        self._callbacks = {}
        ...

    def _setup_listeners(self):
        # 这里可以看出Channel关注的method_sig
        self._callbacks.update({
            spec.Channel.Close: self._on_close,
            spec.Channel.CloseOk: self._on_close_ok,
            spec.Channel.Flow: self._on_flow,
            spec.Channel.OpenOk: self._on_open_ok,
            spec.Basic.Cancel: self._on_basic_cancel,
            spec.Basic.CancelOk: self._on_basic_cancel_ok,
            # 这个回调就是调用外部callback的函数
            # 收到method_sig为spec.Basic.Deliver的时候
            # 调用_on_basic_deliver
            spec.Basic.Deliver: self._on_basic_deliver,
            spec.Basic.Return: self._on_basic_return,
            spec.Basic.Ack: self._on_basic_ack,
        })


    def open(self):
        # 生成channel实例后一般会立刻调用channel.open()
        if self.is_open:
            return
        # 发送
        return self.send_method(
            spec.Channel.Open, 's', ('',), wait=spec.Channel.OpenOk,
        )

    # -----------------------------------------------------#
    # 绑定消费者的过程依次调用下面的函数
    # 1 exchange_declare 声明交换机
    # 2 queue_declare    声明队列
    # 3 queue_bind      队列绑定到交换机(传入routing_key)
    # 4 basic_consume   消费者订阅/绑定队列
    # 外部无论如何封装都是到channel中调用上述方法
    # 也就是说channel必须被创建
    # -----------------------------------------------------#
    def exchange_declare(self, exchange, type, passive=False, durable=False,
                         auto_delete=True, nowait=False, arguments=None,
                         argsig='BssbbbbbF'):
         if auto_delete:
             warn(VDeprecationWarning(EXCHANGE_AUTODELETE_DEPRECATED))

         self.send_method(
             spec.Exchange.Declare, argsig,
             (0, exchange, type, passive, durable, auto_delete,
              False, nowait, arguments),
             wait=None if nowait else spec.Exchange.DeclareOk,
         )

    def queue_declare(self, queue='', passive=False, durable=False,
                      exclusive=False, auto_delete=True, nowait=False,
                      arguments=None, argsig='BsbbbbbF'):

        self.send_method(
            spec.Queue.Declare, argsig,
            (0, queue, passive, durable, exclusive, auto_delete,
             nowait, arguments),
        )
        if not nowait:
            return queue_declare_ok_t(*self.wait(
                spec.Queue.DeclareOk, returns_tuple=True,
            ))

    def queue_bind(self, queue, exchange='', routing_key='',
                   nowait=False, arguments=None, argsig='BsssbF'):
       return self.send_method(
           spec.Queue.Bind, argsig,
           (0, queue, exchange, routing_key, nowait, arguments),
           wait=None if nowait else spec.Queue.BindOk,
       )

    def basic_consume(self, queue='', consumer_tag='', no_local=False,
                     no_ack=False, exclusive=False, nowait=False,
                     callback=None, arguments=None, on_cancel=None,
                     argsig='BssbbbbF'):
       # 声明消费者
       # 外部的callback就是这里传入的
       # 在openstack里, 这里的callback就是Consumer._callback
       # 也就是AMQPListener.__call__的调用位置
       p = self.send_method(
           spec.Basic.Consume, argsig,
           (0, queue, consumer_tag, no_local, no_ack, exclusive,
            nowait, arguments),
           wait=None if nowait else spec.Basic.ConsumeOk,
       )
       if not nowait and not consumer_tag:
           consumer_tag = p
       # 这里外部callback注册到self.callbacks字典中
       self.callbacks[consumer_tag] = callback
       if on_cancel:
           self.cancel_callbacks[consumer_tag] = on_cancel
       if no_ack:
           # 不需要主动回复no_ack的,添加到
           # no_ack_consumers队列中
           # openstack里需要确认的执行的消息都是no_ack=False的
           # 基本不会走到这里
           self.no_ack_consumers.add(consumer_tag)
       return p


   def _on_basic_deliver(self, consumer_tag, delivery_tag, redelivered,
                         exchange, routing_key, msg):
       # 这个回调函数被dispatch_method调用
       # 当收到spec.Basic.Deliver包的时候channel
       # 的内部回调就是当前方法_on_basic_deliver
       # 这个方法内部就是调用外部回调的
       msg.channel = self
       msg.delivery_info = {
           'consumer_tag': consumer_tag,
           'delivery_tag': delivery_tag,
           'redelivered': redelivered,
           'exchange': exchange,
           'routing_key': routing_key,
       }
       # 这里就是调用外部callback的地方
       # 在openstack中,这里的fun就是Consumer._callback
       # 也就是AMQPListener.__call__调用开始的位置
       try:
           fun = self.callbacks[consumer_tag]
       except KeyError:
           # 找不到consumer_tag对应的callback
           # 调用basic_reject拒绝包
           AMQP_LOGGER.warn(
               REJECTED_MESSAGE_WITHOUT_CALLBACK,
               delivery_tag, consumer_tag, exchange, routing_key,
           )
           self.basic_reject(delivery_tag, requeue=True)
       else:
           # 这里调用了callback
           # msg是dispatch_method调用_on_basic_deliver的最后一个参数
           # 那个参数也就是openstack里我们要了解的message
           fun(msg)


class AbstractChannel(object):
    # Channel的几个重要方法在父类中
    # 我们顺便一起看了
    def __init__(self, connection, channel_id):
       self.connection = connection
       self.channel_id = channel_id
       connection.channels[channel_id] = self
       self.method_queue = []  # Higher level queue for methods
       self.auto_decode = False
       self._pending = {}
       self._callbacks = {}
       self._setup_listeners()
    # -----------------------------------------------------#
    # 我们来看看send_method
    # 这个方法也是在父类AbstractChannel中
    # 所以amqp.connection.Connection的send_method也是这个
    def send_method(self, sig,
                    format=None, args=None, content=None,
                    wait=None, callback=None, returns_tuple=False):
        # 这玩意是vine模块的先不用管
        p = promise()
        # amqp.channel.Channel的connection是amqp.connection.Connection
        # amqp.connection.Connection的connection自身(self)
        conn = self.connection
        if conn is None:
            raise RecoverableConnectionError('connection already closed')
        # 如果有format传入,用format解析args
        # 就使用第二个参数解析第三个参数
        args = dumps(format, args) if format else bytes_if_py2('')
        try:
            # 通过connection回包
            conn.frame_writer(1, self.channel_id, sig, args, content)
        except StopIteration:
            raise RecoverableConnectionError('connection already closed')
        # TODO temp: callback should be after write_method ... ;)
        if callback:
            p.then(callback)
        p()
        if wait:
            # 需要等待rabbit的回复确认
            # 调用等待函数
            return self.wait(wait, returns_tuple=returns_tuple)
        return p

    # 这个方法也是在父类AbstractChannel中
    def wait(self, method, callback=None, timeout=None, returns_tuple=False):
        p = ensure_promise(callback)
        pending = self._pending
        prev_p = []
        if not isinstance(method, list):
            method = [method]

        for m in method:
            # 从self._pending获取到对应的回调
            prev_p.append(pending.get(m))
            pending[m] = p

        try:
            while not p.ready:
                self.connection.drain_events(timeout=timeout)
            # 都和vine有关...不太看得懂
            if p.value:
                args, kwargs = p.value
                return args if returns_tuple else (args and args[0])
        finally:
            for i, m in enumerate(method):
                if prev_p[i] is not None:
                    pending[m] = prev_p[i]
                else:
                    pending.pop(m, None)

    # 这个方法也是在父类AbstractChannel中
    # 用于分发接收到的帧
    def dispatch_method(self, method_sig, payload, content):
        # connection最后是调用channel的dispatch_method来分发数据
        if content and \
                self.auto_decode and \
                hasattr(content, 'content_encoding'):
            try:
                content.body = content.body.decode(content.content_encoding)
            except Exception:
                pass
        try:
            # 匹配到对应的amqp_method
            # 返回值是类似method_t(method_sig=(20, 40), args='BsBB', content=False)
            # 的对象
            amqp_method = self._METHODS[method_sig]
        except KeyError:
            raise AMQPNotImplementedError(
                'Unknown AMQP method {0!r}'.format(method_sig))
        # self._callbacks存放Channel关注的method_sig
        # 所有关注的method_sig看_setup_listeners中注册的回调部分
        # 例如spec.Basic.Deliver的回调是self._on_basic_deliver
        # _on_basic_deliver中会调用外部回调
        # 见上面_on_basic_deliver函数
        try:
            listeners = [self._callbacks[method_sig]]
        except KeyError:
            listeners = None
        try:
            # 从_pending中弹出回调方法
            # 关键点在于_pending里有什么
            # 都和vine有关
            one_shot = self._pending.pop(method_sig)
        except KeyError:
            # _pending找不到,也不是channel默认关注的method_sig
            if not listeners:
                return
        else:
            if listeners is None:
                listeners = [one_shot]
            else:
                listeners.append(one_shot)
        args = []
        # amqp_method.args是反序列化用的格式化字符串
        if amqp_method.args:
            args, _ = loads(amqp_method.args, payload, 4)
        # amqp_method的content为True
        # 插入函数入口传入的content
        if amqp_method.content:
            # 最后一个参数是content
            # 回头看dispatch_method
            # 是frame_handler传入的最后一个参数
            # 类型就是amqp.basic_message.Message!
            # openstack里会通过message_to_python
            # 将这个message封装为函数封装为
            # kombu.transport.pyamqp.Message
            args.append(content)
        # 顺序调用回调
        for listener in listeners:
            listener(*args)

上一节