a
    xdD4                     @   s  d Z ddlZddlZddlZddlmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZmZmZmZ ddlmZmZ d	d
lmZmZ d	dlmZ d	dlmZ d	dlmZ dZdZeeZ da!dd Z"dd Z#d:ddZ$G dd deZ%dd Z&d;ddZ'dd Z(d d! Z)d"d# Z*d<d$d%Z+d=d&d'Z,d>d(d)Z-d?d*d+Z.d,d- Z/d.d/ Z0ed0d1 Z1d@d2d3Z2dAd4d5Z3dBd6d7Z4G d8d9 d9Z5dS )CzCommon Utilities.    N)deque)contextmanager)partial)count)NAMESPACE_OIDuuid3uuid4uuid5)ChannelErrorRecoverableConnectionError   )ExchangeQueue)
get_logger)registry)uuid)		Broadcastmaybe_declarer   itermessages
send_replycollect_repliesinsureddrain_consumer	eventloopi  c                   C   s   t d u rt ja t S N)_node_idr   int r   r   D/var/www/html/Ranjet/env/lib/python3.9/site-packages/kombu/common.pyget_node_id    s    r   c                 C   sL   d | ||t|}zttt|}W n  tyF   ttt|}Y n0 |S )Nz{:x}-{:x}-{:x}-{:x})formatidstrr   r   
ValueErrorr	   )Znode_idZ
process_idZ	thread_idinstanceentretr   r   r   generate_oid'   s    r'   Tc                 C   s    t t t |rt nd| S Nr   )r'   r   osgetpid	threading	get_ident)r$   threadsr   r   r   oid_from1   s    r.   c                       s,   e Zd ZdZejd Zd fdd	Z  ZS )	r   a  Broadcast queue.

    Convenience class used to define broadcast queues.

    Every queue instance will have a unique name,
    and both the queue and exchange is configured with auto deletion.

    Arguments:
        name (str): This is used as the name of the exchange.
        queue (str): By default a unique id is used for the queue
            name for every consumer.  You can specify a custom
            queue name here.
        unique (bool): Always create a unique queue
            even if a queue name is supplied.
        **kwargs (Any): See :class:`~kombu.Queue` for a list
            of additional keyword arguments supported.
    ))queueNNFTc              
      sb   |rd |pdt }n|p&dt  }t jf |p6|||||d urH|n
t|ddd| d S )Nz{}.{}Zbcastzbcast.Zfanout)type)aliasr/   nameauto_deleteexchange)r    r   super__init__r   )selfr2   r/   uniquer3   r4   r1   kwargs	__class__r   r   r6   O   s    
zBroadcast.__init__)NNFTNN)__name__
__module____qualname____doc__r   attrsr6   __classcell__r   r   r:   r   r   :   s   
      r   c                 C   s   | |j jjv S r   )
connectionclientdeclared_entities)entitychannelr   r   r   declaration_cachedf   s    rG   Fc                 K   s    |rt | |fi |S t| |S )zDeclare entity (cached).)_imaybe_declare_maybe_declare)rE   rF   retryretry_policyr   r   r   r   j   s    r   c                 C   s4   | j }|s0|s"td| d|  | |} | S dS )zMake sure the channel is bound to the entity.

    :param entity: generic kombu nomenclature, generally an exchange or queue
    :param channel: channel to bind to the entity
    :return: the updated entity
    zCannot bind channel z to entity N)is_boundr
   bind)rE   rF   rL   r   r   r   _ensure_channel_is_boundq   s    
rN   c                 C   s   | }t | | |d u r2| js,td|  d| j}d  }}|jrd| jrd|jjj}t| }||v rddS |jsrt	d| j
|d |d ur|r|| |d ur| j|_dS )Nzchannel is None and entity z not bound.Fchannel disconnected)rF   T)rN   rL   r
   rF   rB   Zcan_cache_declarationrC   rD   hashr   Zdeclareaddr2   )rE   rF   origZdeclaredidentr   r   r   rI      s,    



rI   c                 K   s:   t | | | jjstd| jjjj| tfi || |S )NrO   )rN   rF   rB   r   rC   ZensurerI   )rE   rF   rK   r   r   r   rH      s    

rH   c              
   #   s   t    fdd}|g|pg  | _| N t| jjj||ddD ]&}z  V  W q> tyb   Y q>0 q>W d   n1 sz0    Y  dS )z&Drain messages from consumer instance.c                    s     | |f d S r   )append)bodymessageaccr   r   
on_message   s    z"drain_consumer.<locals>.on_messageT)limittimeoutignore_timeoutsN)r   	callbacksr   rF   rB   rC   popleft
IndexError)ZconsumerrZ   r[   r]   rY   _r   rW   r   r      s    

r   c                 K   s$   t | jf |g|d||||dS )zIterator over messages.)ZqueuesrF   )rZ   r[   r]   )r   ZConsumer)connrF   r/   rZ   r[   r]   r9   r   r   r   r      s    r   c              	   c   sN   |rt |pt D ]6}z| j|dV  W q tjyF   |rB|sB Y q0 qdS )a  Best practice generator wrapper around ``Connection.drain_events``.

    Able to drain events forever, with a limit, and optionally ignoring
    timeout errors (a timeout of 1 is often used in environments where
    the socket can get "stuck", and is a best practice for Kombu consumers).

    ``eventloop`` is a generator.

    Examples:
        >>> from kombu.common import eventloop

        >>> def run(conn):
        ...     it = eventloop(conn, timeout=1, ignore_timeouts=True)
        ...     next(it)   # one event consumed, or timed out.
        ...
        ...     for _ in eventloop(conn, timeout=1, ignore_timeouts=True):
        ...         pass  # loop forever.

    It also takes an optional limit parameter, and timeout errors
    are propagated by default::

        for _ in eventloop(connection, limit=1, timeout=1):
            pass

    See Also:
        :func:`itermessages`, which is an event loop bound to one or more
        consumers, that yields any messages received.
    )r[   N)ranger   Zdrain_eventssocketr[   )ra   rZ   r[   r\   ir   r   r   r      s    r   c              	   K   sH   |j |f| ||dt|jd |jdtj|j |jdfi |S )a  Send reply for request.

    Arguments:
        exchange (kombu.Exchange, str): Reply exchange
        req (~kombu.Message): Original request, a message with
            a ``reply_to`` property.
        producer (kombu.Producer): Producer instance
        retry (bool): If true must retry according to
            the ``reply_policy`` argument.
        retry_policy (Dict): Retry settings.
        **props (Any): Extra properties.
    )r4   rJ   rK   Zreply_tocorrelation_id)Zrouting_keyre   
serializercontent_encoding)publishdictZ
propertiesgetserializersZtype_to_namecontent_typerg   )r4   reqmsgZproducerrJ   rK   propsr   r   r   r      s    


r   c           	   	   o   sv   | dd}d}zNt| ||g|R i |D ]\}}|s@|  d}|V  q,W |rr||j n|rp||j 0 dS )z,Generator collecting replies from ``queue``.no_ackTFN)
setdefaultr   ZackZafter_reply_message_receivedr2   )	ra   rF   r/   argsr9   rp   ZreceivedrU   rV   r   r   r   r      s     

r   c                 C   s   t jd| |dd d S )Nz#Connection error: %r. Retry in %ss
T)exc_info)loggererror)excintervalr   r   r   _ensure_errback  s    rx   c              	   c   s*   z
d V  W n | j | j y$   Y n0 d S r   )Zconnection_errorsZchannel_errors)ra   r   r   r   _ignore_errors  s    
ry   c                 O   sB   |r:t |  ||i |W  d   S 1 s00    Y  t | S )a  Ignore connection and channel errors.

    The first argument must be a connection object, or any other object
    with ``connection_error`` and ``channel_error`` attributes.

    Can be used as a function:

    .. code-block:: python

        def example(connection):
            ignore_errors(connection, consumer.channel.close)

    or as a context manager:

    .. code-block:: python

        def example(connection):
            with ignore_errors(connection):
                consumer.channel.close()


    Note:
        Connection and channel errors should be properly handled,
        and not ignored.  Using this function is only acceptable in a cleanup
        phase, like when a connection is lost or at shutdown.
    N)ry   )ra   funrr   r9   r   r   r   ignore_errors  s    
,r{   c                 C   s   |r|| d S r   r   )rB   rF   	on_reviver   r   r   revive_connection@  s    r}   c                 K   s   |pt }| jddh}|j|d |j}tt||d}	|j||f||	d|}
|
|i t||d\}}|W  d   S 1 s0    Y  dS )zFunction wrapper to handle connection errors.

    Ensures function performing broker commands completes
    despite intermittent connection failures.
    T)block)errback)r|   )r   r|   )rB   N)rx   acquireZensure_connectionZdefault_channelr   r}   Z	autoretryri   )poolrz   rr   r9   r   r|   optsra   rF   Zreviver   retvalr`   r   r   r   r   E  s    r   c                   @   s@   e Zd ZdZdZdd ZdddZddd	Zd
d Zdd Z	dS )QoSa  Thread safe increment/decrement of a channels prefetch_count.

    Arguments:
        callback (Callable): Function used to set new prefetch count,
            e.g. ``consumer.qos`` or ``channel.basic_qos``.  Will be called
            with a single ``prefetch_count`` keyword argument.
        initial_value (int): Initial prefetch count value..

    Example:
        >>> from kombu import Consumer, Connection
        >>> connection = Connection('amqp://')
        >>> consumer = Consumer(connection)
        >>> qos = QoS(consumer.qos, initial_prefetch_count=2)
        >>> qos.update()  # set initial

        >>> qos.value
        2

        >>> def in_some_thread():
        ...     qos.increment_eventually()

        >>> def in_some_other_thread():
        ...     qos.decrement_eventually()

        >>> while 1:
        ...    if qos.prev != qos.value:
        ...        qos.update()  # prefetch changed so update.

    It can be used with any function supporting a ``prefetch_count`` keyword
    argument::

        >>> channel = connection.channel()
        >>> QoS(channel.basic_qos, 10)


        >>> def set_qos(prefetch_count):
        ...     print('prefetch count now: %r' % (prefetch_count,))
        >>> QoS(set_qos, 10)
    Nc                 C   s   || _ t | _|pd| _d S r(   )callbackr+   RLock_mutexvalue)r7   r   initial_valuer   r   r   r6     s    
zQoS.__init__r   c                 C   sD   | j ( | jr | jt|d | _W d   n1 s40    Y  | jS )zIncrement the value, but do not update the channels QoS.

        Note:
            The MainThread will be responsible for calling :meth:`update`
            when necessary.
        r   N)r   r   maxr7   nr   r   r   increment_eventually  s    0zQoS.increment_eventuallyc                 C   sP   | j 4 | jr,|  j|8  _| jdk r,d| _W d   n1 s@0    Y  | jS )zDecrement the value, but do not update the channels QoS.

        Note:
            The MainThread will be responsible for calling :meth:`update`
            when necessary.
        r   N)r   r   r   r   r   r   decrement_eventually  s    
$zQoS.decrement_eventuallyc                 C   sH   || j krD|}|tkr&tdt d}td| | j|d || _ |S )z#Set channel prefetch_count setting.z(QoS: Disabled: prefetch_count exceeds %rr   zbasic.qos: prefetch_count->%s)Zprefetch_count)prevPREFETCH_COUNT_MAXrt   warningdebugr   )r7   Zpcount	new_valuer   r   r   set  s    
zQoS.setc                 C   s6   | j  | | jW  d   S 1 s(0    Y  dS )z)Update prefetch count with current value.N)r   r   r   )r7   r   r   r   update  s    z
QoS.update)r   )r   )
r<   r=   r>   r?   r   r6   r   r   r   r   r   r   r   r   r   Y  s   (

r   )T)NF)r   NN)r   NN)NNF)NFN)N)N)NN)6r?   r)   rc   r+   collectionsr   
contextlibr   	functoolsr   	itertoolsr   r   r   r   r   r	   Zamqpr
   r   rE   r   r   logr   Zserializationr   rk   Z
utils.uuid__all__r   r<   rt   r   r   r'   r.   r   rG   r   rN   rI   rH   r   r   r   r   r   rx   ry   r{   r}   r   r   r   r   r   r   <module>   sR   

	,


  
	
& 


!

