这节主要讲解Connection类以及相关的Consumer类,同时确认了AMQPListener.__call__传入的变量是什么
# oslo_messaging._drivers.impl_rabbit.Connection
class Connection(object):
"""Connection object."""
pools = {}
def __init__(self, conf, url, purpose):
# NOTE(viktors): Parse config options
driver_conf = conf.oslo_messaging_rabbit
......
# 省略属性部分
# 当前最大重试次数
# 来源是配置文件的rabbit_max_retries
# 用于控制ensure中的默认retry次数
if self.max_retries <= 0:
self.max_retries = None
# rabbit的 vhost
if url.virtual_host is not None:
virtual_host = url.virtual_host
else:
virtual_host = self.virtual_host
self._url = ''
if self.fake_rabbit:
# 假冒rabbit方式
....
elif url.hosts:
# 常规配置走这里
if url.transport.startswith('kombu+'):
LOG.warning(_LW('Selecting the kombu transport through the '
'transport url (%s) is a experimental feature '
'and this is not yet supported.'),
url.transport)
if len(url.hosts) > 1:
random.shuffle(url.hosts)
for host in url.hosts:
# 我们的rabbit走这里
transport = url.transport.replace('kombu+', '')
# rabbit被替换成amqp
# 所以我们的最终驱动其实是amqp,和py-amqp有关
transport = transport.replace('rabbit', 'amqp')
# 用字符串";"合并url
self._url += '%s%s://%s:%s@%s:%s/%s' % (
";" if self._url else '',
transport,
parse.quote(host.username or ''),
parse.quote(host.password or ''),
self._parse_url_hostname(host.hostname) or '',
str(host.port or 5672),
virtual_host)
elif url.transport.startswith('kombu+'):
....
else:
.....
self._initial_pid = os.getpid()
self._consumers = {}
self._new_tags = set()
self._active_tags = {}
self._tags = itertools.count(1)
self._consume_loop_stopped = False
# 当前connection所在的channel
self.channel = None
self.purpose = purpose
# send和listen的锁不一样
if purpose == rpc_common.PURPOSE_SEND:
self._connection_lock = ConnectionLock()
else:
self._connection_lock = DummyConnectionLock()
# 初始化kombu的Connection
# socket数据的收发,amqp消息的封装生成都在kombu里完成
# kombu最终是调用py-amqp封装成amqp数据包的
# 所以url里把rabbit替换成了amqp
self.connection = kombu.connection.Connection(
self._url, ssl=self._fetch_ssl_params(),
# login_method没什么大用,不同登陆模式区别看
# https://www.rabbitmq.com/authentication.html
login_method=self.login_method,
heartbeat=self.heartbeat_timeout_threshold,
failover_strategy=self.kombu_failover_strategy,
# 这个是py-amqp用的参数
transport_options={
'confirm_publish': True,
'client_properties': {'capabilities': {
'authentication_failure_close': True,
'connection.blocked': True,
'consumer_cancel_notify': True}},
'on_blocked': self._on_connection_blocked,
'on_unblocked': self._on_connection_unblocked,
},
)
LOG.debug('Connecting to AMQP server on %(hostname)s:%(port)s',
self.connection.info())
# 心跳相关
# 单次心跳超时是心跳阈值时间/(心跳阈值时间内发送心跳次数/2)
self._heartbeat_wait_timeout = (
float(self.heartbeat_timeout_threshold) /
float(self.heartbeat_rate) / 2.0)
self._heartbeat_support_log_emitted = False
# self.connection.connection的初始化
# 这里socket开始连接
self.ensure_connection()
# 心跳线程,当我们用的是eventlet的时候,走的也是绿色线程
self._heartbeat_thread = None
# 为send类型,启动心跳监听
if purpose == rpc_common.PURPOSE_SEND:
# 在这里生成线程赋值给_heartbeat_thread
self._heartbeat_start()
LOG.debug('Connected to AMQP server on %(hostname)s:%(port)s '
'via [%(transport)s] client',
self.connection.info())
# 超时设置相关
if self._heartbeat_supported_and_enabled():
self._poll_timeout = self._heartbeat_wait_timeout
else:
self._poll_timeout = 1
....
# 判断是否支持心跳
def _heartbeat_supported_and_enabled(self):
# 配置文件里心跳阈值<=0
# 配置里默认值是60
if self.heartbeat_timeout_threshold <= 0:
return False
# 这里是看kombu是否支持心跳
if self.connection.supports_heartbeats:
return True
# 如果当前kombu版本的connection不支持心跳
# 设置_heartbeat_support_log_emitted参数为true后再返回False
# _heartbeat_support_log_emitted我没发现有什么用
elif not self._heartbeat_support_log_emitted:
LOG.warning(_LW("Heartbeat support requested but it is not "
"supported by the kombu driver or the broker"))
self._heartbeat_support_log_emitted = True
return False
def ensure_connection(self):
self._set_current_channel(None)
# ensure比较复杂,method本身需要callable,所以这里直接弄了个匿名函数
# 返回值是self.connection.connection
# self.connection是kombu的connection
# self.connection.connection就是amqp的connection
# 访问self.connection.connection的时候
# socket会开始连接
self.ensure(method=lambda: self.connection.connection)
self.set_transport_socket_timeout()
def _set_current_channel(self, new_channel):
# 设置当前正在使用的channel
# 在connection还没链接的时候这个方法调用没有任何效果
if new_channel == self.channel:
return
if self.channel is not None:
self.PUBLISHER_DECLARED_QUEUES.pop(self.channel, None)
self.connection.maybe_close_channel(self.channel)
self.channel = new_channel
if (new_channel is not None and
self.purpose == rpc_common.PURPOSE_LISTEN):
# 监听型设置qos,默认是没有qos限制
self._set_qos(new_channel)
def ensure(self, method, retry=None,
recoverable_error_callback=None, error_callback=None,
timeout_is_error=True):
"""
底层执行函数,基本不会直接调用,被其他函数调用
用于执行method,重试次数根据retry的值来定
retry = None 相当于 retry=rabbit_max_retries
retry = -1 则无限重试
retry = 0 不重试
retry = N 重试N次
这个函数调用前必须调用connection lock
ensure执行的method不能有参数
"""
# 当前pid
current_pid = os.getpid()
# pid不是原始pid
if self._initial_pid != current_pid:
LOG.warning(_LW("Process forked after connection established! "
"This can result in unpredictable behavior. "
"See: http://docs.openstack.org/developer/"
"oslo.messaging/transport.html"))
self._initial_pid = current_pid
if retry is None:
retry = self.max_retries
if retry is None or retry < 0:
retry = None
# 失败的时候调用
def on_error(exc, interval):
LOG.debug("Received recoverable error from kombu:",
exc_info=True)
# 有recoverable_errors,调用recoverable_errors函数
recoverable_error_callback and recoverable_error_callback(exc)
# 计算失败间隔
interval = (self.kombu_reconnect_delay + interval
if self.kombu_reconnect_delay > 0
else interval)
info = {'err_str': exc, 'sleep_time': interval}
info.update(self.connection.info())
if 'Socket closed' in six.text_type(exc):
LOG.error(_LE('AMQP server %(hostname)s:%(port)s closed'
' the connection. Check login credentials:'
' %(err_str)s'), info)
else:
LOG.error(_LE('AMQP server on %(hostname)s:%(port)s is '
'unreachable: %(err_str)s. Trying again in '
'%(sleep_time)d seconds.'), info)
# sleep到kombu_reconnect_delay<=0
# 整个on_error主要就是用来sleep的
if self.kombu_reconnect_delay > 0:
LOG.trace('Delaying reconnect for %1.1f seconds ...',
self.kombu_reconnect_delay)
time.sleep(self.kombu_reconnect_delay)
# 重连调用
def on_reconnection(new_channel):
self.set_transport_socket_timeout()
self._set_current_channel(new_channel)
for consumer in self._consumers:
# 重新声明消费者
consumer.declare(self)
LOG.info(_LI('Reconnected to AMQP server on '
'%(hostname)s:%(port)s via [%(transport)s] client'),
self.connection.info())
# 执行method
def execute_method(channel):
self._set_current_channel(channel)
method()
# 我们用rabbit mq的时候
# 这里相当于找py-amqp有没有recoverable_connection_errors
# recoverable_errors是多个异常组成的tuple
# 用于后面捕获异常
# recoverable_errors是可恢复的error
has_modern_errors = hasattr(
self.connection.transport, 'recoverable_connection_errors',
)
if has_modern_errors:
recoverable_errors = (
self.connection.recoverable_channel_errors +
self.connection.recoverable_connection_errors)
else:
recoverable_errors = ()
try:
# 通过kombu的autoretry生成一个自动retry的实例
# 在还没有设置过channel的情况下(self.channel为None)
# kombu的autoretry会调用下层connection的
# default_channel方法生成一个默认channel
autoretry_method = self.connection.autoretry(
execute_method, channel=self.channel,
max_retries=retry,
errback=on_error,
interval_start=self.interval_start or 1,
interval_step=self.interval_stepping,
interval_max=self.interval_max,
on_revive=on_reconnection)
# 调用生成的autoretry_method并返回
# 返回的autoretry_method是一个闭包
ret, channel = autoretry_method()
# 设置当前channel(第一次调用过后当前channel就有值了)
self._set_current_channel(channel)
# ret是execute_method的返回值
return ret
except recoverable_errors as exc:
# recoverable_errors可恢复的错误
LOG.debug("Received recoverable error from kombu:",
exc_info=True)
# 触发错误回调
error_callback and error_callback(exc)
# 清理掉当前channel
self._set_current_channel(None)
# 注释说明走到这里说明重试过指定retry次数
info = {'err_str': exc, 'retry': retry}
info.update(self.connection.info())
msg = _('Unable to connect to AMQP server on '
'%(hostname)s:%(port)s after %(retry)s '
'tries: %(err_str)s') % info
LOG.error(msg)
raise exceptions.MessageDeliveryFailure(msg)
except rpc_amqp.AMQPDestinationNotFound:
# 外层有对这个异常的捕获和处理
# 一般处理是重试直至外部定义好的超时
raise
except Exception as exc:
# 触发错误回调
error_callback and error_callback(exc)
# 抛出异常
raise
# 注册一个topic类型的consumer
def declare_topic_consumer(self, exchange_name, topic, callback=None,
queue_name=None):
# 生成Consumer实例
# Consumer类初始化的时候会生成kombu的Exchange
# declare_consumer的时候会生成kombu的Queue
# RabbitDriver调用declare_topic_consumer时
# 传入的callback是AMQPListener实例
# Consumer传入的callback参数是AMQPListener实例
consumer = Consumer(exchange_name=exchange_name,
queue_name=queue_name or topic,
routing_key=topic,
type='topic',
durable=self.amqp_durable_queues,
exchange_auto_delete=self.amqp_auto_delete,
queue_auto_delete=self.amqp_auto_delete,
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues)
self.declare_consumer(consumer)
def declare_consumer(self, consumer):
def _connect_error(exc):
log_info = {'topic': consumer.routing_key, 'err_str': exc}
LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': "
"%(err_str)s"), log_info)
# 实际declare的函数
def _declare_consumer():
# 这里把self传入
# 声明消费者, 这里其实只是声明了队列
# 还没把消费者绑定到队列上
consumer.declare(self)
tag = self._active_tags.get(consumer.queue_name)
if tag is None:
tag = next(self._tags)
self._active_tags[consumer.queue_name] = tag
# 添到_new_tags列表里
self._new_tags.add(tag)
# 每声明一个consumer
# 就会添加到self._consumers字典中
self._consumers[consumer] = tag
return consumer
with self._connection_lock:
# 用ensure去执行_declare_consumer
return self.ensure(_declare_consumer,
error_callback=_connect_error)
def consume(self, timeout=None):
.....
# consume函数内容较多就不全贴了
# 这个函数在AMQPListener的pool函数中被循环调用
# 通过self.connection.drain_events接收数据
def _consume():
....
# 有新消费者
# self._new_tags中的消费者是
# declare_consumer的时候添进去的
if self._new_tags:
for consumer, tag in self._consumers.items():
if tag in self._new_tags:
# 新来的消费者调用一次consume,把消费者绑定到队列
# 这里就是把callback, 也就是注入AMQPListener实例
# 注入到kombu的地方
# 我们在后面的drain_events会从socket中获取到amqp的帧
# 但是kombu调用AMQPListener.__call__的部分需要到kombu中找
consumer.consume(tag=tag)
# 将消费者的tag从_new_tags中移除
self._new_tags.remove(tag)
poll_timeout = (self._poll_timeout if timeout is None
else min(timeout, self._poll_timeout))
# 这个循环是socket超时重试用的
while True:
if self._consume_loop_stopped:
return
if self._heartbeat_supported_and_enabled():
self._heartbeat_check()
try:
# 这里就是从获取到一帧ampq数据并调用
# 数据帧只收一帧
# 收完退出
self.connection.drain_events(timeout=poll_timeout)
return
except socket.timeout as exc:
poll_timeout = timer.check_return(
_raise_timeout, exc, maximum=self._poll_timeout)
with self._connection_lock:
# 用ensure函数确保执行
self.ensure(_consume,
recoverable_error_callback=_recoverable_error_callback,
error_callback=_error_callback)
# ----------------------下面是发送函数-----------------------------
def direct_send(self, msg_id, msg):
# exchange对象
exchange = kombu.entity.Exchange(name=msg_id,
type='direct',
durable=False,
auto_delete=True,
passive=True)
self._ensure_publishing(self._publish_and_raises_on_missing_exchange,
exchange, msg, routing_key=msg_id)
def _ensure_publishing(self, method, exchange, msg, routing_key=None,
timeout=None, retry=None):
def _error_callback(exc):
log_info = {'topic': exchange.name, 'err_str': exc}
LOG.error(_LE("Failed to publish message to topic "
"'%(topic)s': %(err_str)s"), log_info)
LOG.debug('Exception', exc_info=exc)
# 因为ensure的method不能带参数
# 所以这里用functools.partial把参数绑定了
# method是绑定了参数的_publish_and_raises_on_missing_exchange
method = functools.partial(method, exchange, msg, routing_key, timeout)
with self._connection_lock:
self.ensure(method, retry=retry, error_callback=_error_callback)
def _publish_and_raises_on_missing_exchange(self, exchange, msg,
routing_key=None,
timeout=None):
# exchange的passive属性必须是true
if not exchange.passive:
raise RuntimeError("_publish_and_retry_on_missing_exchange() must "
"be called with an passive exchange.")
try:
# 调用下面的_publish推送消息
self._publish(exchange, msg, routing_key=routing_key,
timeout=timeout)
return
except self.connection.channel_errors as exc:
if exc.code == 404:
# 404是AMQPDestinationNotFound错误
raise rpc_amqp.AMQPDestinationNotFound(
"exchange %s doesn't exists" % exchange.name)
raise
def _publish(self, exchange, msg, routing_key=None, timeout=None):
# 创建一个kombu的producer对象
producer = kombu.messaging.Producer(exchange=exchange,
channel=self.channel,
auto_declare=not exchange.passive,
routing_key=routing_key)
log_info = {'msg': msg,
'who': exchange or 'default',
'key': routing_key}
LOG.trace('Connection._publish: sending message %(msg)s to'
' %(who)s with routing key %(key)s', log_info)
# 在socket超时时间内publish数据
with self._transport_socket_timeout(timeout):
producer.publish(msg, expiration=self._get_expiration(timeout),
compression=self.kombu_compression)
.....
# 省略其他代码
# Consumer类
class Consumer(object):
def __init__(self, exchange_name, queue_name, routing_key, type, durable,
exchange_auto_delete, queue_auto_delete, callback,
nowait=True, rabbit_ha_queues=None, rabbit_queue_ttl=0):
# Consumer的属性大部分是amqp相关的参数
self.queue_name = queue_name
self.exchange_name = exchange_name
self.routing_key = routing_key
self.exchange_auto_delete = exchange_auto_delete
self.queue_auto_delete = queue_auto_delete
self.durable = durable
# RabbitDriver中declare_topic_consumer时
# 传入的callback是AMQPListener实例
self.callback = callback
self.type = type
self.nowait = nowait
# 这里是把两个配置文件组成字典
self.queue_arguments = _get_queue_arguments(rabbit_ha_queues,
rabbit_queue_ttl)
self.queue = None
# 交换机在生成Consumer实例的时候生成交换机对象
self.exchange = kombu.entity.Exchange(
name=exchange_name,
type=type,
durable=self.durable,
auto_delete=self.exchange_auto_delete)
def declare(self, conn):
# 声明消费者就是声明一个队列, 交换机同时被声明(队列声明前)
# conn是oslo_messaging._drivers.impl_rabbit.Connection
# 先生成kombu的Queue实例
self.queue = kombu.entity.Queue(
name=self.queue_name,
# Connection链接后就会设置channel
channel=conn.channel,
exchange=self.exchange,
durable=self.durable,
auto_delete=self.queue_auto_delete,
routing_key=self.routing_key,
queue_arguments=self.queue_arguments)
try:
LOG.trace('ConsumerBase.declare: '
'queue %s', self.queue_name)
# 声明当前queue
self.queue.declare()
except conn.connection.channel_errors as exc:
# NOTE(jrosenboom): This exception may be triggered by a race
# condition. Simply retrying will solve the error most of the time
# and should work well enough as a workaround until the race
# condition itself can be fixed.
# See https://bugs.launchpad.net/neutron/+bug/1318721 for details.
if exc.code == 404:
self.queue.declare()
else:
raise
def consume(self, tag):
# 当前消费者订阅队列
# 这里最终调用的是basic_consume
# 将消费者绑定到队列上
# 除非重连否则listen方只执行一次
# 参考前面Connection类的consume方法说明
self.queue.consume(callback=self._callback,
consumer_tag=six.text_type(tag),
nowait=self.nowait)
def cancel(self, tag):
# 取消队列订阅
LOG.trace('ConsumerBase.cancel: canceling %s', tag)
self.queue.cancel(six.text_type(tag))
def _callback(self, message):
# 回调函数
# 当前函数是在consume函数订阅队列时候作为callback
# 传入kombu.entity.Queue中,所以message的类型需要到kombu中看
# ---------------------------------#
# channel中有message_to_python方法就调用message_to_python
# 转换传入的message
m2p = getattr(self.queue.channel, 'message_to_python', None)
if m2p:
message = m2p(message)
# 调用回调
try:
# 也就是说,AMQPListener的__call__的参数是
# 传入的是RabbitMessage的实例
# 也就是说AMQPListener中
# AMQPIncomingMessage封装的message
# 是RabbitMessage
# 但是这个message是什么还要看kombu的代码
self.callback(RabbitMessage(message))
except Exception:
LOG.exception(_LE("Failed to process message"
" ... skipping it."))
# 转化为RabbitMessage或者调用callback报错
# 回复ack
message.ack()
class RabbitMessage(dict):
# RabbitMessage是个字典,简单的封装了一下原始message
# 主要功能作为一个普通字典, 数据来源raw_message.payload['oslo.message']
# 多了两个方法
# acknowledge和requeue
def __init__(self, raw_message):
super(RabbitMessage, self).__init__(
# deserialize_msg主要顺便检查了消息的版本号
# 然后取出raw_message.payload['oslo.message']部分数据
# 并转化这部分数据为dict
rpc_common.deserialize_msg(raw_message.payload))
LOG.trace('RabbitMessage.Init: message %s', self)
self._raw_message = raw_message
def acknowledge(self):
LOG.trace('RabbitMessage.acknowledge: message %s', self)
self._raw_message.ack()
def requeue(self):
LOG.trace('RabbitMessage.requeue: message %s', self)
self._raw_message.requeue()
我们来看看序列化和反序列化
_VERSION_KEY = 'oslo.version'
_MESSAGE_KEY = 'oslo.message'
def deserialize_msg(msg):
# 外部调用的是rpc_common.deserialize_msg(raw_message.payload))
# 这里也就是说raw_message.payload一定是字典
if not isinstance(msg, dict):
return msg
base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
# 这个写法是确保msg的字典包含有base_envelope_keys
# 相当于双循环找key,这个写法比较漂亮
if not all(map(lambda key: key in msg, base_envelope_keys)):
return msg
# 从oslo.version中取出版本号,确认版本号兼容
if not utils.version_is_compatible(_RPC_ENVELOPE_VERSION,
msg[_VERSION_KEY]):
raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
# 从oslo.message中取出数据转成dict并返回
raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
return raw_msg
# AMQPListener从message里获取context_dict的方法也看一下
# 这里传入的msg也就是RabbitMessage
def unpack_context(msg):
context_dict = {}
# 从RabbitMessage中取出所有_context_打头的key和value
for key in list(msg.keys()):
key = six.text_type(key)
if key.startswith('_context_'):
value = msg.pop(key)
context_dict[key[9:]] = value
# 取出key是_msg_id和_reply_q的key
context_dict['msg_id'] = msg.pop('_msg_id', None)
context_dict['reply_q'] = msg.pop('_reply_q', None)
# 转成RpcContext实例
# 也就是说unpack_context返回的不是dict而是
# RpcContext实例
# 主要内容上面也比较明了
# 取出部分数据, 也就是把RabbitMessage的数据再拆分走一部分
return RpcContext.from_dict(context_dict)
class RpcContext(rpc_common.CommonRpcContext):
"""Context that supports replying to a rpc.call."""
def __init__(self, **kwargs):
self.msg_id = kwargs.pop('msg_id', None)
self.reply_q = kwargs.pop('reply_q', None)
super(RpcContext, self).__init__(**kwargs)
def deepcopy(self):
values = self.to_dict()
values['conf'] = self.conf
values['msg_id'] = self.msg_id
values['reply_q'] = self.reply_q
return self.__class__(**values)
class CommonRpcContext(object):
def __init__(self, **kwargs):
# self.values就是前面的context_dict字典
self.values = kwargs
def __getattr__(self, key):
# 模拟字典
try:
return self.values[key]
except KeyError:
raise AttributeError(key)
def to_dict(self):
# 可以看到to_dict方法是个
# self.values就是前面的context_dict字典
return copy.deepcopy(self.values)
@classmethod
def from_dict(cls, values):
return cls(**values)
def deepcopy(self):
return self.from_dict(self.to_dict())
def update_store(self):
# local.store.context = self
pass
接下来要完全搞清楚原生message是什么,需要看kombu和py-amqp