
     hD4                        d Z ddlZddlZddlZddlmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZmZmZmZ ddlmZmZ d	d
lmZmZ d	dlmZ d	dlmZ d	dlmZ dZdZ ee          Z da!d Z"d Z#d(dZ$ G d de          Z%d Z&d)dZ'd Z(d Z)d Z*d*dZ+	 	 d*dZ,d+dZ-	 d,dZ.d  Z/d! Z0ed"             Z1d-d#Z2d-d$Z3d.d%Z4 G d& d'          Z5dS )/zCommon Utilities.    N)deque)contextmanager)partial)count)NAMESPACE_OIDuuid3uuid4uuid5)ChannelErrorRecoverableConnectionError   )ExchangeQueue)
get_logger)registry)uuid)		Broadcastmaybe_declarer   itermessages
send_replycollect_repliesinsureddrain_consumer	eventloopi  c                  D    t           t                      j        a t           S N)_node_idr	   int     H/var/www/html/Sam_Eipo/venv/lib/python3.11/site-packages/kombu/common.pyget_node_idr"       s    77;Or    c                     d                     | ||t          |                    }	 t          t          t          |                    }n2# t
          $ r% t          t          t          |                    }Y nw xY w|S )Nz{:x}-{:x}-{:x}-{:x})formatidstrr   r   
ValueErrorr
   )node_id
process_id	thread_idinstanceentrets         r!   generate_oidr.   '   s    

&
&Y86 6C-%s++,, - - -%s++,,-Js   "A
 
,A98A9Tc                     t          t                      t          j                    |rt	          j                    nd|           S Nr   )r.   r"   osgetpid	threading	get_ident)r+   threadss     r!   oid_fromr6   1   s=    
	!(/	a	  r    c                   D     e Zd ZdZej        dz   Z	 	 	 	 	 	 d fd	Z xZS )r   a  Broadcast queue.

    Convenience class used to define broadcast queues.

    Every queue instance will have a unique name,
    and both the queue and exchange is configured with auto deletion.

    Arguments:
        name (str): This is used as the name of the exchange.
        queue (str): By default a unique id is used for the queue
            name for every consumer.  You can specify a custom
            queue name here.
        unique (bool): Always create a unique queue
            even if a queue name is supplied.
        **kwargs (Any): See :class:`~kombu.Queue` for a list
            of additional keyword arguments supported.
    ))queueNNFTc                     |r%d                     |pdt                                }n|pdt                       } t                      j        d|p||||||nt	          |d          d| d S )Nz{}.{}bcastzbcast.fanout)type)aliasr8   nameauto_deleteexchanger   )r$   r   super__init__r   )	selfr>   r8   uniquer?   r@   r=   kwargs	__class__s	           r!   rB   zBroadcast.__init__O   s      	/NN5#3GTVV<<EE..dff..E 	
-4#"*"6hh#Dx888	
 	
 	
 	
 	
 	
 	
r    )NNFTNN)__name__
__module____qualname____doc__r   attrsrB   __classcell__)rF   s   @r!   r   r   :   sj         $ K,,E !
 
 
 
 
 
 
 
 
 
r    r   c                 (    | |j         j        j        v S r   )
connectionclientdeclared_entities)entitychannels     r!   declaration_cachedrS   f   s    W'.@@@r    Fc                 B    |rt          | |fi |S t          | |          S )zDeclare entity (cached).)_imaybe_declare_maybe_declare)rQ   rR   retryretry_policys       r!   r   r   j   s3     @vw??,???&'***r    c                 t    | j         }|s.|st          d| d|            |                     |          } | S dS )zMake sure the channel is bound to the entity.

    :param entity: generic kombu nomenclature, generally an exchange or queue
    :param channel: channel to bind to the entity
    :return: the updated entity
    zCannot bind channel z to entity N)is_boundr   bind)rQ   rR   rZ   s      r!   _ensure_channel_is_boundr\   q   sd     H  	ECwCC6CCE E EW%% r    c                    | }t          | |           |!| j        st          d|  d          | j        }d x}}|j        r-| j        r&|j        j        j        }t          |           }||v rdS |j        st          d          | 
                    |           ||r|                    |           || j        |_        dS )Nzchannel is None and entity z not bound.Fchannel disconnected)rR   T)r\   rZ   r   rR   rN   can_cache_declarationrO   rP   hashr   declareaddr>   )rQ   rR   origdeclaredidents        r!   rV   rV      s    DVW--- 	CAfAAAC C C.Hu f: %,>VH5 A()?@@@
NN7N###UK	4r    c                     t          | |           | j        j        st          d            | j        j        j        j        | t          fi || |          S )Nr^   )r\   rR   rN   r   rO   ensurerV   )rQ   rR   rX   s      r!   rU   rU      sw    VW--->$ A()?@@@026>$+20 0".0 006A A Ar    c              #     K   t                      fd}|g|pg z   | _        | 5  t          | j        j        j        ||d          D ])}	                                 V  # t          $ r Y &w xY w	 ddd           dS # 1 swxY w Y   dS )z&Drain messages from consumer instance.c                 6                         | |f           d S r   )append)bodymessageaccs     r!   
on_messagez"drain_consumer.<locals>.on_message   s    

D'?#####r    T)limittimeoutignore_timeoutsN)r   	callbacksr   rR   rN   rO   popleft
IndexError)consumerro   rp   rr   rn   _rm   s         @r!   r   r      s     
''C$ $ $ $ $ %b9H	  8+6=!&O O O 	 	Akkmm####   		                 s5   %BA#"B#
A0-B/A00BBBc                 F    t           | j        d|g|d||||          S )zIterator over messages.)queuesrR   )ro   rp   rr   r   )r   Consumer)connrR   r8   ro   rp   rr   rE   s          r!   r   r      s@     @eWg@@@@W	   r    c              #      K   |rt          |          pt                      D ]5}	 |                     |          V  # t          j        $ r |r|s Y 2w xY wdS )a  Best practice generator wrapper around ``Connection.drain_events``.

    Able to drain events forever, with a limit, and optionally ignoring
    timeout errors (a timeout of 1 is often used in environments where
    the socket can get "stuck", and is a best practice for Kombu consumers).

    ``eventloop`` is a generator.

    Examples:
        >>> from kombu.common import eventloop

        >>> def run(conn):
        ...     it = eventloop(conn, timeout=1, ignore_timeouts=True)
        ...     next(it)   # one event consumed, or timed out.
        ...
        ...     for _ in eventloop(conn, timeout=1, ignore_timeouts=True):
        ...         pass  # loop forever.

    It also takes an optional limit parameter, and timeout errors
    are propagated by default::

        for _ in eventloop(connection, limit=1, timeout=1):
            pass

    See Also:
        :func:`itermessages`, which is an event loop bound to one or more
        consumers, that yields any messages received.
    )rp   N)ranger   drain_eventssocketrp   )rz   ro   rp   rq   is        r!   r   r      s      : #uU||.uww  	##G#444444~ 	 	 	  	 s   >AAc                      |j         |f| ||dt          |j        d         |j                            d          t          j        |j                 |j        dfi |S )a  Send reply for request.

    Arguments:
        exchange (kombu.Exchange, str): Reply exchange
        req (~kombu.Message): Original request, a message with
            a ``reply_to`` property.
        producer (kombu.Producer): Producer instance
        retry (bool): If true must retry according to
            the ``reply_policy`` argument.
        retry_policy (Dict): Retry settings.
        **props (Any): Extra properties.
    )r@   rW   rX   reply_tocorrelation_id)routing_keyr   
serializercontent_encoding)publishdict
propertiesgetserializerstype_to_namecontent_typer   )r@   reqmsgproducerrW   rX   propss          r!   r   r      s     8,  s~j9"%."4"45E"F"F)6s7GH$'$8: : D D >CD D  r    c              /   &  K   |                     dd          }d}	 t          | ||g|R i |D ]!\  }}|s|                                 d}|V  "	 |r|                    |j                   dS dS # |r|                    |j                   w w xY w)z,Generator collecting replies from ``queue``.no_ackTFN)
setdefaultr   ackafter_reply_message_receivedr>   )	rz   rR   r8   argsrE   r   receivedrk   rl   s	            r!   r   r      s      x..FH	=)$ ;+/; ; ;39; ; 	 	MD' HJJJJ	  	=00<<<<<	= 	=8 	=00<<<<	=s   4A1 1Bc                 B    t                               d| |d           d S )Nz#Connection error: %r. Retry in %ss
T)exc_info)loggererror)excintervals     r!   _ensure_errbackr     s1    
LL.X      r    c              #   F   K   	 d V  d S # | j         | j        z   $ r Y d S w xY wr   )connection_errorschannel_errors)rz   s    r!   _ignore_errorsr     sE      !D$77   s   
   c                     |r/t          |           5   ||i |cddd           S # 1 swxY w Y   t          |           S )a  Ignore connection and channel errors.

    The first argument must be a connection object, or any other object
    with ``connection_error`` and ``channel_error`` attributes.

    Can be used as a function:

    .. code-block:: python

        def example(connection):
            ignore_errors(connection, consumer.channel.close)

    or as a context manager:

    .. code-block:: python

        def example(connection):
            with ignore_errors(connection):
                consumer.channel.close()


    Note:
        Connection and channel errors should be properly handled,
        and not ignored.  Using this function is only acceptable in a cleanup
        phase, like when a connection is lost or at shutdown.
    N)r   )rz   funr   rE   s       r!   ignore_errorsr     s    6  (D!! 	( 	(3'''	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	($s   '++c                 $    |r ||           d S d S r   r   )rN   rR   	on_revives      r!   revive_connectionr   @  s*     	' r    c           	      8   |pt           }|                     d          5 }|                    |           |j        }t	          t
          ||          }	 |j        ||f||	d|}
 |
|i t          ||          \  }}|cddd           S # 1 swxY w Y   dS )zFunction wrapper to handle connection errors.

    Ensures function performing broker commands completes
    despite intermittent connection failures.
    T)block)errback)r   )r   r   )rN   N)r   acquireensure_connectiondefault_channelr   r   	autoretryr   )poolr   r   rE   r   r   optsrz   rR   reviver   retvalrv   s                r!   r   r   E  s    (G	D	!	! 	Tw/// &*DIFFF $.g ;w+1; ;59; ;GTCT&T%B%B%BCC		 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	s   A"BBBc                   8    e Zd ZdZdZd Zd	dZd	dZd Zd Z	dS )
QoSa  Thread safe increment/decrement of a channels prefetch_count.

    Arguments:
        callback (Callable): Function used to set new prefetch count,
            e.g. ``consumer.qos`` or ``channel.basic_qos``.  Will be called
            with a single ``prefetch_count`` keyword argument.
        initial_value (int): Initial prefetch count value..

    Example:
        >>> from kombu import Consumer, Connection
        >>> connection = Connection('amqp://')
        >>> consumer = Consumer(connection)
        >>> qos = QoS(consumer.qos, initial_prefetch_count=2)
        >>> qos.update()  # set initial

        >>> qos.value
        2

        >>> def in_some_thread():
        ...     qos.increment_eventually()

        >>> def in_some_other_thread():
        ...     qos.decrement_eventually()

        >>> while 1:
        ...    if qos.prev != qos.value:
        ...        qos.update()  # prefetch changed so update.

    It can be used with any function supporting a ``prefetch_count`` keyword
    argument::

        >>> channel = connection.channel()
        >>> QoS(channel.basic_qos, 10)


        >>> def set_qos(prefetch_count):
        ...     print('prefetch count now: %r' % (prefetch_count,))
        >>> QoS(set_qos, 10)
    Nc                 V    || _         t          j                    | _        |pd| _        d S r0   )callbackr3   RLock_mutexvalue)rC   r   initial_values      r!   rB   zQoS.__init__  s(     o''"'a


r    r   c                     | j         5  | j        r| j        t          |d          z   | _        ddd           n# 1 swxY w Y   | j        S )zIncrement the value, but do not update the channels QoS.

        Note:
            The MainThread will be responsible for calling :meth:`update`
            when necessary.
        r   N)r   r   maxrC   ns     r!   increment_eventuallyzQoS.increment_eventually  s     [ 	4 	4z 4!Z#a))3
	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 zs   %9= =c                     | j         5  | j        r"| xj        |z  c_        | j        dk     rd| _        ddd           n# 1 swxY w Y   | j        S )zDecrement the value, but do not update the channels QoS.

        Note:
            The MainThread will be responsible for calling :meth:`update`
            when necessary.
        r   N)r   r   r   s     r!   decrement_eventuallyzQoS.decrement_eventually  s     [ 	# 	#z #

a

:>>!"DJ		# 	# 	# 	# 	# 	# 	# 	# 	# 	# 	# 	# 	# 	# 	#
 zs   *>AAc                     || j         k    rg|}|t          k    r"t                              dt                     d}t                              d|           |                     |           || _         |S )z#Set channel prefetch_count setting.z(QoS: Disabled: prefetch_count exceeds %rr   zbasic.qos: prefetch_count->%s)prefetch_count)prevPREFETCH_COUNT_MAXr   warningdebugr   )rC   pcount	new_values      r!   setzQoS.set  sv    TYI***I13 3 3	LL8)DDDMMM333DIr    c                 x    | j         5  |                     | j                  cddd           S # 1 swxY w Y   dS )z)Update prefetch count with current value.N)r   r   r   )rC   s    r!   updatez
QoS.update  s    [ 	( 	(88DJ''	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	(s   /33)r   )
rG   rH   rI   rJ   r   rB   r   r   r   r   r   r    r!   r   r   Y  s{        & &P D( ( (

 
 
 
     ( ( ( ( (r    r   )T)NF)r   NN)NNF)NFNr   )NN)6rJ   r1   r~   r3   collectionsr   
contextlibr   	functoolsr   	itertoolsr   r   r   r   r	   r
   amqpr   r   rQ   r   r   logr   serializationr   r   
utils.uuid__all__r   rG   r   r   r"   r.   r6   r   rS   r   r\   rV   rU   r   r   r   r   r   r   r   r   r   r   r   r   r    r!   <module>r      s     				            % % % % % %             3 3 3 3 3 3 3 3 3 3 3 3 9 9 9 9 9 9 9 9 # # # # # # # #       2 2 2 2 2 2        	H		       )
 )
 )
 )
 )
 )
 )
 )
XA A A+ + + +     :A A A   $ 9=   " " " "L 9=   0= = =             B   
   (Z( Z( Z( Z( Z( Z( Z( Z( Z( Z(r    