Lolizeppelin's Blog

OpenStack Mitaka从零开始 openstack里的AMQP使用(1)

Posted on By gcy

先回顾下l3的启动部分

# neutron.service.Service
class Service(n_rpc.Service):
    def __init__(self, host, binary, topic, manager, report_interval=None,
                 periodic_interval=None, periodic_fuzzy_delay=None,
                 *args, **kwargs):
        .....
        self.manager_class_name = manager
        manager_class = importutils.import_class(self.manager_class_name)
        # manager = neutron.agent.l3.agent.L3NATAgent
        # 这里的manager将作为endpoint传给后面的
        self.manager = manager_class(host=host, *args, **kwargs)
        .....
        # topic=topics.L3_AGENT,也就是字符串"l3_agent"
        super(Service, self).__init__(host, topic, manager=self.manager)

    def start(self):
        # 这个只有dhcp agent有用,其他都是空
        self.manager.init_host()
        # n_rpc.Service.start(),这里就是初始化rpc服务器的地方
        super(Service, self).start()
        ....
        self.manager.after_start()

n_rpc是# neutron.common.rpc

# neutron.common.rpc.Service
class Service(service.Service):

    def __init__(self, host, topic, manager=None, serializer=None):
        # service.Service主要就是初始化了一个绿色线程池组
        # 也就是多了一个属性
        # self.tg = threadgroup.ThreadGroup(threads)
        super(Service, self).__init__()
        self.host = host
        # topic是字符串l3_agent
        self.topic = topic
        self.serializer = serializer
        # manager = neutron.agent.l3.agent.L3NATAgent
        if manager is None:
            self.manager = self
        else:
            self.manager = manager

    def start(self):
        # 这行没用
        super(Service, self).start()
        # 创建一个connection对象,注意这个Connection对象不是rabbit的Connection
        self.conn = create_connection()
        LOG.debug("Creating Consumer connection for Service %s",
                  self.topic)
        endpoints = [self.manager]
        # 作为endpoints传入给create_consumer
        # 当接收到消息队列数据的时候,最终调用endponrt中的函数
        # 这里是创建了MessageHandlingServer类并插入到conn.servers列表中
        # l3-agent只有一个MessageHandlingServer实例
        self.conn.create_consumer(self.topic, endpoints)

        # manager里有钩子函数,将当前Service实例注册到里面
        if callable(getattr(self.manager, 'initialize_service_hook', None)):
            self.manager.initialize_service_hook(self)
        # 调用conn.servers列表中的实例的start函数
        # 也就是调用MessageHandlingServer的start函数
        self.conn.consume_in_threads()

# 这个就是create_connection()函数返回的Connection对象的类
# neutron.common.rpc.Connection
class Connection(object):
    def __init__(self):
        super(Connection, self).__init__()
        # 可以看出这个Connection用于存放多个rpc server
        self.servers = []

    def create_consumer(self, topic, endpoints, fanout=False):
        # oslo_messaging.Target类实例下面有说明
        # server的取值是默认是当前机器的hostname
        # topic是字符串l3_agent
        target = oslo_messaging.Target(
            topic=topic, server=cfg.CONF.host, fanout=fanout)
        # 每次调用create_consumer都会生成一个server
        # 这个server是MessageHandlingServer的实例
        # 这个server也就是rpc server, 就是一个基于amqp的rpc server
        # rpc server的作用是从amqp收到消息
        # 调用endpoints执行具体的函数并返回
        # rpc server通过target和endpoints生成
        server = get_server(target, endpoints)
        self.servers.append(server)

    def consume_in_threads(self):
        for server in self.servers:
            server.start()
        return self.servers

    def close(self):
        for server in self.servers:
            server.stop()
        for server in self.servers:
            server.wait()

# 我们来看看生成rpc server的必要参数
# oslo_messaging.target.Target
class Target(object):
    def __init__(self, exchange=None, topic=None, namespace=None,
                 version=None, server=None, fanout=None,
                 legacy_namespaces=None):
        # 从属性可以看出Target相当于一份配置文件
        # -----------------
        # 当Target的使用者是server(这里的server表示接收方的意思)的时候
        # topic和server是必要属性
        # exchange可选属性
        # -----------------
        # 与endpoint相关的namespace和version是可选属性
        # -----------------
        # 当Target的使用者是客户端的时候
        # 客户端发送信息, topic是必须的,其他参数都是可选的
        # -----------------
        # 注意,neutron server是发送方、是客户端
        # 各个agent是接收方、是服务端
        self.exchange = exchange
        self.topic = topic
        self.namespace = namespace
        self.version = version
        # Clients can request that a message be directed to a specific
        # server, rather than just one of a pool of servers listening on the topic.
        # ------------原文翻译---------------------
        # 客户端从指定的server请求信息而不是从监听topic的的servers组成的池中选择
        # ----------------------------------------
        # server就是配置文件中default部分的host值
        # 默认取值为当前机器的hostname
        # 下层会声明两个消费者
        # 一个是topic
        # 一个是topic.server
        self.server = server
        self.fanout = fanout
        self.accepted_namespaces = [namespace] + (legacy_namespaces or [])

    def __call__(self, **kwargs):
        # 可以看出这里复制了一份自身
        for a in ('exchange', 'topic', 'namespace',
                  'version', 'server', 'fanout'):
            kwargs.setdefault(a, getattr(self, a))
        return Target(**kwargs)

    ...

def get_server(target, endpoints, serializer=None):
    # TRANSPORT就是rabbitmq的驱动
    # 看后面TRANSPORT的代码
    assert TRANSPORT is not None
    # serializer是RequestContextSerializer
    serializer = RequestContextSerializer(serializer)
    return oslo_messaging.get_rpc_server(TRANSPORT, target, endpoints,
                                         'eventlet', serializer)

def get_rpc_server(transport, target, endpoints,
                executor='blocking', serializer=None):
     """Construct an RPC server.
     从amqp到来的消息如何被接收和分发由executor来控制,最简单的executor是阻塞模式
     如果使用eventlet作为executor,threading和time模块必须被monkeypatch过(默认使用eventlet)

     :param transport: the messaging transport
     :param target: the exchange, topic and server to listen on
     :param endpoints: 是manager(在l3 agent中就是L3NATAgent)组成的列表
     :type executor: 指定为eventlet字符串
     :param serializer: 用于将amqp数据序列化
     """
     # dispatcher就是分发器
     dispatcher = rpc_dispatcher.RPCDispatcher(target, endpoints, serializer)
     # MessageHandlingServer就是rpc server
     return msg_server.MessageHandlingServer(transport, dispatcher, executor)

从上面我们可以看出,rpc server,也就是MessageHandlingServer实例的两个主要参数

1、dispatcher 2、transport

我们先来看TRANSPORT的初始化,TRANSPORT是一个全局变量

def init(conf):
    global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER
    exmods = get_allowed_exmods()
    # 初始化TRANSPORT
    TRANSPORT = oslo_messaging.get_transport(conf,
                                             allowed_remote_exmods=exmods,
                                             aliases=TRANSPORT_ALIASES)
    # 初始化NOTIFICATION_TRANSPORT
    # get_notification_transport是
    # 读取oslo_messaging_notifications部分配置文件的get_transport
    NOTIFICATION_TRANSPORT = oslo_messaging.get_notification_transport(
        conf, allowed_remote_exmods=exmods, aliases=TRANSPORT_ALIASES)
    serializer = RequestContextSerializer()
    # NOTIFIER就是NOTIFICATION_TRANSPORT的再封装
    # 外部直接调用NOTIFIER来发送notification而不是用NOTIFICATION_TRANSPORT
    # NOTIFICATION_TRANSPORT和TRANSPORT为什么要拆分是因为
    # TRANSPORT要封装rpc server,是rpc服务
    # NOTIFICATION_TRANSPORT要封装为NOTIFIER, 只发消息
    # 这次我们重点在TRANSPORT,所以NOTIFIER相关的东西略过
    NOTIFIER = oslo_messaging.Notifier(NOTIFICATION_TRANSPORT,
                                       serializer=serializer)

# TRANSPORT的创建函数get_transport
# get_notification_transport最终也是调用get_transport
def get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None):
    ......
    # 初始化rabbit驱动,rabbit驱动,前面省略校验代码

    allowed_remote_exmods = allowed_remote_exmods or []
    conf.register_opts(_transport_opts)

    if not isinstance(url, TransportURL):
        url = url or conf.transport_url
        parsed = TransportURL.parse(conf, url, aliases)
        if not parsed.transport:
            raise InvalidTransportURL(url, 'No scheme specified in "%s"' % url)
        url = parsed

    # 交换机名在这里设置的
    kwargs = dict(default_exchange=conf.control_exchange,
                  allowed_remote_exmods=allowed_remote_exmods)

    try:
       mgr = driver.DriverManager('oslo.messaging.drivers',
                                  url.transport.split('+')[0],
                                  invoke_on_load=True,
                                  invoke_args=[conf, url],
                                  invoke_kwds=kwargs)
    except RuntimeError as ex:
       raise DriverLoadFailure(url.transport, ex)

    return Transport(mgr.driver)

# AMQP
class Transport(object):
    # 代码可以看出Transport是直接封装了rabbit驱动的调用
    # 也就是信息的收发通过Transport
    # 里面没有直接的接收方法
    # 都是通过liensn注册到驱动中接收数据
    def __init__(self, driver):
        self.conf = driver.conf
        self._driver = driver

    def cleanup(self):
        """Release all resources associated with this transport."""
        self._driver.cleanup()

    def _require_driver_features(self, requeue=False):
        self._driver.require_features(requeue=requeue)

    def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
              retry=None):
        # 调用send, target必须有topic属性
        if not target.topic:
            raise exceptions.InvalidTarget('A topic is required to send',
                                           target)
        return self._driver.send(target, ctxt, message,
                                 wait_for_reply=wait_for_reply,
                                 timeout=timeout, retry=retry)
    def _listen(self, target):
        # 调用_listen, 必须同时有topic、和server属性
        if not (target.topic and target.server):
         raise exceptions.InvalidTarget('A server\'s target must have '
                                        'topic and server names specified',
                                        target)
        # 这里初始化rabbit驱动的监听
        # 返回的是AMQPListener累实例,后面几节会分析到
        return self._driver.listen(target)

    # ------------------下面的函数是NOTIFIER用的-----------------------
    # NOTIFIER发信息调用_send_notification
    def _send_notification(self, target, ctxt, message, version, retry=None):
        # 调用_send_notification, target必须有topic属性
        if not target.topic:
            raise exceptions.InvalidTarget('A topic is required to send',
                                           target)
        self._driver.send_notification(target, ctxt, message, version,
                                       retry=retry)


    # NOTIFIER接收信息调用_listen_for_notifications来监听
    def _listen_for_notifications(self, targets_and_priorities, pool):
        # 这个函数应该是notification直接用了内部消息组件的是后调用的
        for target, priority in targets_and_priorities:
            if not target.topic:
                raise exceptions.InvalidTarget('A target must have '
                                               'topic specified',
                                               target)
        return self._driver.listen_for_notifications(
            targets_and_priorities, pool)

RPCDispatcher比较复杂,我们在下一节接着讲