a
    xd                     @   s  d Z ddlZddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZmZ dd
lmZ ddlmZ ddlmZ ddlmZmZmZ ddlmZ ddlmZmZ ddl m!Z! ddl"m#Z# ddl$m%Z% ddl&m'Z' zddl(Z(W n e)y   dZ(Y n0 zddl(m*Z* W n e)yF   dZ*Y n0 edZ+e+j,e+j- Z.Z-dZ/dZ0dZ1g dZ2eddZ3dd Z4dd Z5G d d! d!e6Z7ed"d# Z8d$d% Z9G d&d' d'Z:G d(d) d)e:e(j;Z<G d*d+ d+e:e(j=j>Z?G d,d- d-e(j=j@ZAG d.d/ d/e'jBZBG d0d1 d1ZCG d2d3 d3e'jDZDG d4d5 d5e'jEZEe*r\G d6d7 d7e*jFe(jGZHG d8d9 d9eDZIG d:d; d;eEZJdS )<a  Redis transport module for Kombu.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: Yes
* Supports TTL: No

Connection String
=================
Connection string has the following format:

.. code-block::

    redis://[USER:PASSWORD@]REDIS_ADDRESS[:PORT][/VIRTUALHOST]
    rediss://[USER:PASSWORD@]REDIS_ADDRESS[:PORT][/VIRTUALHOST]

To use sentinel for dynamic Redis discovery,
the connection string has following format:

.. code-block::

    sentinel://[USER:PASSWORD@]SENTINEL_ADDRESS[:PORT]

Transport Options
=================
* ``sep``
* ``ack_emulation``: (bool) If set to True transport will
  simulate Acknowledge of AMQP protocol.
* ``unacked_key``
* ``unacked_index_key``
* ``unacked_mutex_key``
* ``unacked_mutex_expire``
* ``visibility_timeout``
* ``unacked_restore_limit``
* ``fanout_prefix``
* ``fanout_patterns``
* ``global_keyprefix``: (str) The global key prefix to be prepended to all keys
  used by Kombu
* ``socket_timeout``
* ``socket_connect_timeout``
* ``socket_keepalive``
* ``socket_keepalive_options``
* ``queue_order_strategy``
* ``max_connections``
* ``health_check_interval``
* ``retry_on_timeout``
* ``priority_steps``
    N)bisect)
namedtuple)contextmanager)Empty)time)promise)InconsistencyErrorVersionMismatch)
get_logger)register_after_fork)bytes_to_str)ERRREADpoll)accepts_argument)dumpsloads)cached_property)cycle_by_name)
_parse_url   )virtual)sentinelzkombu.transport.redisi     )r         	   error_classes_t)connection_errorschannel_errorsc               	   C   s^   ddl m}  t| dr| j}n| j}ttjjt	t
jtt| j| j| jf tjj|| j| jf S )z$Return tuple of redis error classes.r   
exceptionsInvalidData)redisr!   hasattrr"   	DataErrorr   r   	Transportr   r   socketerrorIOErrorOSErrorConnectionErrorAuthenticationErrorTimeoutErrorr   ZInvalidResponseResponseError)r!   r%    r/   M/var/www/html/Ranjet/env/lib/python3.9/site-packages/kombu/transport/redis.pyget_redis_error_classesw   s(    
r1   c                  C   s   ddl m}  | jS )z1Return the redis ConnectionError exception class.r   r    )r#   r!   r+   r    r/   r/   r0   get_redis_ConnectionError   s    r2   c                   @   s   e Zd ZdZdS )	MutexHeldz)Raised when another party holds the lock.N__name__
__module____qualname____doc__r/   r/   r/   r0   r3      s   r3   c                 c   s   | j ||d}d}zJ|jdd}|r,dV  nt W |rz|  W q tjjyZ   Y q0 n*|rz|  W n tjjy   Y n0 0 dS )zTAcquire redis lock in non blocking way.

    Raise MutexHeld if not successful.
    timeoutF)blockingN)lockacquirer3   releaser#   r!   ZLockNotOwnedError)clientnameZexpirer<   Zlock_acquiredr/   r/   r0   Mutex   s"    rA   c                 C   s   |    d S N)_after_forkchannelr/   r/   r0   _after_fork_cleanup_channel   s    rF   c                       sd   e Zd ZdZg dZdddddddddd	Zd
d Z fddZ fddZdddZ	  Z
S )GlobalKeyPrefixMixina  Mixin to provide common logic for global key prefixing.

    Overriding all the methods used by Kombu with the same key prefixing logic
    would be cumbersome and inefficient. Hence, we override the command
    execution logic that is called by all commands.
    )ZHDELZHGETZHSETZLLENZLPUSHZPUBLISHZRPUSHZRPOPZSADDZSREMZSETZSMEMBERSZZADDZZREMZZREVRANGEBYSCOREr   N)
args_startargs_end   r   )ZDELBRPOPZEVALSHAc                    s   t |}|d}| jv r4 jt|d  |d< nx| jv r j| d } j| d }|dkrn|d | ng }g }|d ur||d  }| fdd||| D  | }|g|S )Nr   rH   rI   c                    s   g | ]} j t| qS r/   global_keyprefixstr.0argselfr/   r0   
<listcomp>   s   z5GlobalKeyPrefixMixin._prefix_args.<locals>.<listcomp>)listpopPREFIXED_SIMPLE_COMMANDSrN   rO   PREFIXED_COMPLEX_COMMANDS)rT   argscommandrH   rI   Zpre_argsZ	post_argsr/   rS   r0   _prefix_args   s"    



z!GlobalKeyPrefixMixin._prefix_argsc                    sH   t  j||fi |}|dkrD|rD|\}}|t| jd }||fS |S )zParse a response from the Redis server.

        Method wraps ``redis.parse_response()`` to remove prefixes of keys
        returned by redis command.
        rL   N)superparse_responselenrN   )rT   
connectioncommand_nameoptionsretkeyvalue	__class__r/   r0   r^      s    z#GlobalKeyPrefixMixin.parse_responsec                    s   t  j| |i |S rB   r]   execute_commandr\   rT   rZ   kwargsrf   r/   r0   ri      s    z$GlobalKeyPrefixMixin.execute_commandTc                 C   s   t | j| j||| jdS )NrN   )PrefixedRedisPipelineconnection_poolZresponse_callbacksrN   )rT   transactionZ
shard_hintr/   r/   r0   pipeline   s    zGlobalKeyPrefixMixin.pipeline)TN)r5   r6   r7   r8   rX   rY   r\   r^   ri   rp   __classcell__r/   r/   rf   r0   rG      s   rG   c                   @   s    e Zd ZdZdd Zdd ZdS )PrefixedStrictRedisz@Returns a ``StrictRedis`` client that prefixes the keys it uses.c                 O   s,   | dd| _tjj| g|R i | d S NrN    )rW   rN   r#   Redis__init__rj   r/   r/   r0   rv   	  s    zPrefixedStrictRedis.__init__c                 K   s   t | jfd| ji|S )NrN   )PrefixedRedisPubSubrn   rN   )rT   rk   r/   r/   r0   pubsub  s    zPrefixedStrictRedis.pubsubN)r5   r6   r7   r8   rv   rx   r/   r/   r/   r0   rr     s   rr   c                   @   s   e Zd ZdZdd ZdS )rm   a   Custom Redis pipeline that takes global_keyprefix into consideration.

    As the ``PrefixedStrictRedis`` client uses the `global_keyprefix` to prefix
    the keys it uses, the pipeline called by the client must be able to prefix
    the keys as well.
    c                 O   s.   | dd| _tjjj| g|R i | d S rs   )rW   rN   r#   r?   Pipelinerv   rj   r/   r/   r0   rv     s    zPrefixedRedisPipeline.__init__N)r5   r6   r7   r8   rv   r/   r/   r/   r0   rm     s   rm   c                       sD   e Zd ZdZdZ fddZdd Z fddZ fd	d
Z  Z	S )rw   zCRedis pubsub client that takes global_keyprefix into consideration.)Z	SUBSCRIBEZUNSUBSCRIBEZ
PSUBSCRIBEZPUNSUBSCRIBEc                    s$   | dd| _t j|i | d S rs   )rW   rN   r]   rv   rj   rf   r/   r0   rv   ,  s    zPrefixedRedisPubSub.__init__c                    s8   t |}|d}| jv r. fdd|D }|g|S )Nr   c                    s   g | ]} j t| qS r/   rM   rP   rS   r/   r0   rU   5  s   z4PrefixedRedisPubSub._prefix_args.<locals>.<listcomp>)rV   rW   PUBSUB_COMMANDS)rT   rZ   r[   r/   rS   r0   r\   0  s    


z PrefixedRedisPubSub._prefix_argsc                    sF   t  j|i |}|du r|S |^}}}|g fdd|D |S )zParse a response from the Redis server.

        Method wraps ``PubSub.parse_response()`` to remove prefixes of keys
        returned by redis command.
        Nc                    s   g | ]}|t  jd  qS rB   )r_   rN   )rQ   rE   rS   r/   r0   rU   N      z6PrefixedRedisPubSub.parse_response.<locals>.<listcomp>)r]   r^   )rT   rZ   rk   rc   Zmessage_typeZchannelsmessagerf   rS   r0   r^   <  s    z"PrefixedRedisPubSub.parse_responsec                    s   t  j| |i |S rB   rh   rj   rf   r/   r0   ri   R  s    z#PrefixedRedisPubSub.execute_command)
r5   r6   r7   r8   rz   rv   r\   r^   ri   rq   r/   r/   rf   r0   rw   "  s   rw   c                       s   e Zd ZdZdZ fddZ fddZd#dd	Z fd
dZd$ddZ	e
d%ddZd&ddZd'ddZd(ddZedd Zedd Zedd Zedd  Zed!d" Z  ZS ))QoSzRedis Ack Emulation.Tc                    s   t  j|i | d| _d S )Nr   )r]   rv   _vrestore_countrj   rf   r/   r0   rv   [  s    zQoS.__init__c              	      s   |j }|d |d  }}tjd dkr4|t ig}n
t |g}|  N}|j| jg|R  | j|t	|j
||g  t || W d    n1 s0    Y  d S )Nexchangerouting_keyr   r   )delivery_infor#   VERSIONr   pipe_or_acquireZzaddunacked_index_keyZhsetunacked_keyr   _rawexecuter]   append)rT   r|   delivery_tagZdeliveryEXRKZ	zadd_argspiperf   r/   r0   r   _  s    

z
QoS.appendNc                 C   sT   | j |*}| jD ]}| j||d qW d    n1 s<0    Y  | j  d S )Nr?   )rE   conn_or_acquireZ
_deliveredrestore_by_tagclear)rT   r?   tagr/   r/   r0   restore_unackedp  s    
.zQoS.restore_unackedc                    s   |  |  t | d S rB   )_remove_from_indicesr   r]   ack)rT   r   rf   r/   r0   r   v  s    zQoS.ackFc                 C   s    |r| j |dd | | d S NT)leftmost)r   r   )rT   r   Zrequeuer/   r/   r0   rejectz  s    z
QoS.rejectc                 c   sF   |r|V  n6| j |}| V  W d    n1 s80    Y  d S rB   )rE   r   rp   )rT   r   r?   r/   r/   r0   r     s    zQoS.pipe_or_acquirec                 C   sF   |  |(}|| j|| j|W  d    S 1 s80    Y  d S rB   )r   Zzremr   hdelr   )rT   r   r   r/   r/   r0   r     s    zQoS._remove_from_indicesr   
   c           	   
   C   s   |  j d7  _ | j d | r d S | j }t | j }znt|| j| jJ |j| j	|d|o^||dd}|png D ]\}}| 
|| qpW d    n1 s0    Y  W n ty   Y n0 W d    n1 s0    Y  d S )Nr   r   T)startnumZ
withscores)r~   rE   r   r   visibility_timeoutrA   unacked_mutex_keyunacked_mutex_expireZzrevrangebyscorer   r   r3   )	rT   r   r   intervalr?   ceilZvisibler   Zscorer/   r/   r0   restore_visible  s"    
0zQoS.restore_visiblec                    sN    fdd}j |}||j W d    n1 s@0    Y  d S )Nc                    sP   |  j}|   |  |rLtt|\}}}j||||   d S rB   )hgetr   multir   r   r   rE   _do_restore_message)r   pMr   r   r   rT   r   r/   r0   restore_transaction  s    z/QoS.restore_by_tag.<locals>.restore_transaction)rE   r   ro   r   )rT   r   r?   r   r   r/   r   r0   r     s    zQoS.restore_by_tagc                 C   s   | j jS rB   )rE   r   rS   r/   r/   r0   r     s    zQoS.unacked_keyc                 C   s   | j jS rB   )rE   r   rS   r/   r/   r0   r     s    zQoS.unacked_index_keyc                 C   s   | j jS rB   )rE   r   rS   r/   r/   r0   r     s    zQoS.unacked_mutex_keyc                 C   s   | j jS rB   )rE   r   rS   r/   r/   r0   r     s    zQoS.unacked_mutex_expirec                 C   s   | j jS rB   )rE   r   rS   r/   r/   r0   r     s    zQoS.visibility_timeout)N)F)NN)N)r   r   r   )NF)r5   r6   r7   r8   Zrestore_at_shutdownrv   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rq   r/   r/   rf   r0   r}   V  s,   








r}   c                   @   s   e Zd ZdZeeB ZdZdZdd Z	dd Z
dd	 Zd
d Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zd d! Zd"d# Zd(d$d%Zed&d' ZdS ))MultiChannelPollerz%Async I/O poller for Redis transport.FNc                 C   s(   t  | _i | _i | _t | _t  | _d S rB   )set	_channels_fd_to_chan_chan_to_sockr   poller
after_readrS   r/   r/   r0   rv     s
    zMultiChannelPoller.__init__c              
   C   sZ   | j  D ],}z| j| W q
 ttfy4   Y q
0 q
| j  | j  | j   d S rB   )	r   valuesr   
unregisterKeyError
ValueErrorr   r   r   )rT   fdr/   r/   r0   close  s    

zMultiChannelPoller.closec                 C   s   | j | d S rB   )r   addrT   rE   r/   r/   r0   r     s    zMultiChannelPoller.addc                 C   s   | j | d S rB   )r   discardr   r/   r/   r0   r     s    zMultiChannelPoller.discardc              	   C   s.   z| j |j W n ttfy(   Y n0 d S rB   )r   r   _sockAttributeError	TypeErrorrT   r`   r/   r/   r0   _on_connection_disconnect  s    z,MultiChannelPoller._on_connection_disconnectc                 C   sr   |||f| j v r| ||| |jjd u r4|j  |jj}||f| j| < || j |||f< | j|| j	 d S rB   )
r   _unregisterr`   r   connectr   filenor   register
eventflags)rT   rE   r?   typesockr/   r/   r0   	_register  s    
zMultiChannelPoller._registerc                 C   s   | j | j|||f  d S rB   )r   r   r   )rT   rE   r?   r   r/   r/   r0   r     s    zMultiChannelPoller._unregisterc                 C   s:   t |dd d u r|jd|_|jjd uo8|||f| jv S )Nr`   _)getattrrn   get_connectionr`   r   r   )rT   rE   r?   cmdr/   r/   r0   _client_registered  s
    z%MultiChannelPoller._client_registeredc                 C   s>   ||j df}| ||j ds,d|_| j|  |js:|  dS )zEnable BRPOP mode for channel.rL   FN)r?   r   _in_pollr   _brpop_start)rT   rE   identr/   r/   r0   _register_BRPOP  s    
z"MultiChannelPoller._register_BRPOPc                 C   s8   |  ||jds&d|_| ||jd |js4|  dS )zEnable LISTEN mode for channel.LISTENFN)r   	subclient
_in_listenr   
_subscriber   r/   r/   r0   _register_LISTEN  s
    z#MultiChannelPoller._register_LISTENc                 C   s:   | j D ].}|jr$|j r$| | |jr| | qd S rB   )r   active_queuesqoscan_consumer   active_fanout_queuesr   r   r/   r/   r0   on_poll_start  s    


z MultiChannelPoller.on_poll_startc                 C   s(   || _ | jD ]}|jj|jd  S d S N)r   )r   r   r   r   unacked_restore_limit)rT   r   rE   r/   r/   r0   on_poll_init  s
    
zMultiChannelPoller.on_poll_initc                 C   s*   | j D ]}|jr|jj|jd  S qd S r   )r   r   r   r   r   r   r/   r/   r0   maybe_restore_messages  s
    
z)MultiChannelPoller.maybe_restore_messagesc                 C   s<   | j D ]0}|jd}|d urtt|dd r|  qd S )Nr   check_health)r   __dict__getcallabler   r   )rT   rE   r?   r/   r/   r0   maybe_check_subclient_health'  s    
z/MultiChannelPoller.maybe_check_subclient_healthc                 C   s(   | j | \}}|j r$|j|   d S rB   )r   r   r   handlers)rT   r   chanr   r/   r/   r0   on_readable/  s    
zMultiChannelPoller.on_readablec                 C   s:   |t @ r| || fS |t@ r6| j| \}}|| d S rB   )r   r   r   r   _poll_error)rT   r   eventr   r   r/   r/   r0   handle_event4  s
    zMultiChannelPoller.handle_eventc           	      C   sB  d| _ z| jD ].}|jr,|j r,| | |jr| | q| j	|}|r|D ]Z\}}| 
||}|rR W d| _ | jrz| j }W n ty   Y qY qt0 |  qtd S qR|   t W d| _ | jrz| j }W n ty   Y qY q0 |  qnDd| _ | jr<z| j }W n ty0   Y q<Y n0 |  q 0 d S )NTF)_in_protected_readr   r   r   r   r   r   r   r   r   r   r   rW   r   r   r   )	rT   callbackr:   rE   eventsr   r   rc   Zfunr/   r/   r0   r   ;  sL    





zMultiChannelPoller.getc                 C   s   | j S rB   )r   rS   r/   r/   r0   fdsY  s    zMultiChannelPoller.fds)N)r5   r6   r7   r8   r   r   r   r   r   rv   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   propertyr   r/   r/   r/   r0   r     s.   

	
r   c                       sz  e Zd ZdZeZdZdZdZdZdZ	dZ
dZdZdZi ZdZdZd	Zd
ZdZdZdZeZdZdZdZdZdZdZeZdZ dZ!dZ"dZ#dZ$dZ%e&j'j(d Z(e)re)j*ndZ+e)re)j,ndZ- fddZ.dd Z/dd Z0dd Z1drddZ2ds fdd	Z3dd Z4 fdd Z5d!d" Z6 fd#d$Z7d%d& Z8d'd( Z9d)d* Z:d+d, Z;d-d. Z<d/d0 Z=d1d2 Z>dtd4d5Z?d6d7 Z@d8d9 ZAd:d; ZBd<d= ZCd>d? ZDd@dA ZEdBdC ZFdDdE ZGdudFdGZHdHdI ZIdJdK ZJdLdM ZKdNdO ZLdPdQ ZM fdRdSZNdTdU ZOdVdW ZPdvdXdYZQdwdZd[ZRdxd\d]ZSdyd^d_ZTd`da ZUeVdzdbdcZWeXddde ZYeXdfdg ZZe[dhdi Z\e[djdk Z]dldm Z^dndo Z_eXdpdq Z`  ZaS ){ChannelzRedis Channel.NFTz_kombu.binding.%sz/{db}.zZunackedZunacked_indexZunacked_mutexi,  i  r   rt   Zround_robin)sepack_emulationr   r   r   r   r   r   fanout_prefixfanout_patternsrN   socket_timeoutsocket_connect_timeoutsocket_keepalivesocket_keepalive_optionsqueue_order_strategymax_connectionshealth_check_intervalretry_on_timeoutpriority_stepsc                    s   t  j|i | | js tj| _t| j | _|  | _	| 
 | _t | _t | _i | _| j| jd| _| jrt| jtr| j| _nd| _z| j  W n ty   |    Y n0 | jj|  | jj| _td urt| t  d S )N)rL   r   rt   )!r]   rv   r   r   r}   r   r   _queue_cycle_get_clientClient_get_response_errorr.   r   r   auto_delete_queues_fanout_to_queue_brpop_read_receiver   r   
isinstancerO   keyprefix_fanoutr?   Zping	Exception_disconnect_poolsr`   cycler   r   r   rF   rj   rf   r/   r0   rv     s.    



zChannel.__init__c                 C   s   |    d S rB   )r  rS   r/   r/   r0   rC     s    zChannel._after_forkc                 C   s<   | j }| j}d  | _| _ |d ur(|  |d ur8|  d S rB   )_pool_async_pool
disconnect)rT   pool
async_poolr/   r/   r0   r    s    zChannel._disconnect_poolsc                 C   s@   | j |u rd | _ | j|u r d | _| jr<| jjr<| jj| d S rB   )r   r   r`   r  r   r   r/   r/   r0   r     s    

z!Channel._on_connection_disconnectc                 C   s   zdz d|d d< d|d d d< W n t y4   Y n0 | ||D ]}|rP|jn|j|t| qBW n  ty   td|dd Y n0 d S )NTheadersZredeliveredZ
propertiesr   zCould not restore message: %rexc_info)r   Z_lookuplpushZrpushr   r  crit)rT   payloadr   r   r   r   queuer/   r/   r0   r     s    
zChannel._do_restore_messagec                    sb   j st |S |j fdd} }||j W d    n1 sT0    Y  d S )Nc                    sP   |  j}|   | j |rLtt|\}}}||||   d S rB   )r   r   r   r   r   r   r   )r   Pr   r   r   r   r/   r0   r     s    z-Channel._restore.<locals>.restore_transaction)r   r]   _restorer   r   ro   r   )rT   r|   r   r   r?   rf   r   r0   r    s    
zChannel._restorec                 C   s   | j |ddS r   )r  )rT   r|   r/   r/   r0   _restore_at_beginning$  s    zChannel._restore_at_beginningc                    sT   || j v r.| j | \}}| j| || j|< t j|g|R i |}|   |S rB   )_fanout_queuesr   r   r   r]   basic_consume_update_queue_cycle)rT   r  rZ   rk   r   r   rc   rf   r/   r0   r  '  s    

zChannel.basic_consumec                 C   s8   | j }|r4|jjr*|jjt| j|fS | |S d S rB   )r`   r  r   r   r   r   _basic_cancel)rT   consumer_tagr`   r/   r/   r0   basic_cancel;  s    zChannel.basic_cancelc                    s   z| j | }W n ty"   Y d S 0 z| j| W n tyF   Y n0 | | z| j| \}}| j| W n ty   Y n0 t 	|}| 
  |S rB   )Z_tag_to_queuer   r   remove_unsubscribe_fromr  r   rW   r]   r  r  )rT   r  r  r   r   rc   rf   r/   r0   r  H  s"    
zChannel._basic_cancelc                 C   s.   |r| j rd| j|d|gS d| j|gS )Nrt   /)r   joinr  )rT   r   r   r/   r/   r0   _get_publish_topic\  s    
zChannel._get_publish_topicc                 C   s   | j | \}}| ||S rB   )r  r!  )rT   r  r   r   r/   r/   r0   _get_subscribe_topica  s    zChannel._get_subscribe_topicc                    sN    fdd j D }|sd S  j}|jjd u r8|j  |j _|| d S )Nc                    s   g | ]}  |qS r/   )r"  rQ   r  rS   r/   r0   rU   f  s   z&Channel._subscribe.<locals>.<listcomp>)r   r   r`   r   r   r   Z
psubscribe)rT   keyscr/   rS   r0   r   e  s    

zChannel._subscribec                 C   s.   |  |}| j}|jr*|jjr*||g d S rB   )r"  r   r`   r   unsubscribe)rT   r  topicr%  r/   r/   r0   r  p  s    
zChannel._unsubscribe_fromc                 C   s   t |d dkr&|d dkr&d|_d S t |d dkr\|d |d |d |d f\}}}}n |d d |d |d f\}}}}||||dS )	Nr   r&  rK   FZpmessager   r   )r   patternrE   data)r   Z
subscribed)rT   r?   rr   r(  rE   r)  r/   r/   r0   _handle_messagev  s    & zChannel._handle_messagec                 C   sd   | j }g }z|| | W n ty0   Y n0 |jd ur\|jjddr\|| | q2t|S )Nr   r9   )r   r   _receive_oner   r`   Zcan_readany)rT   r%  rc   r/   r/   r0   r    s    zChannel._receivec              	   C   s
  d }z|  }W n | jy,   d | _ Y n0 t|ttfr| ||}t|d drt|d }|d r|d dkr|	d\}}}zt
t|d }W n8 ttfy   td|t|d d	 d
d t Y n0 |dd
d }| j|| j|  dS d S )Nr   r|   rE   r)  r   r  .z&Cannot process event on channel %r: %si   r   r  T)r^   r   r   r  rV   tupler+  r   endswith	partitionr   r   r   warnreprr   splitr`   _deliverr   )rT   r%  responser  rE   r   r|   r   r/   r/   r0   r,    s2    

zChannel._receive_oner   c                    sr   j tj  sd S  fddjD |p4dg }jj_dg|}jr`j	|}jjj
|  d S )Nc                    s"   g | ]} D ]} ||qqS r/   )
_q_for_pri)rQ   prir  ZqueuesrT   r/   r0   rU     s   z(Channel._brpop_start.<locals>.<listcomp>r   rL   )r   consumer_   r   r   r?   r`   r   rN   r\   Zsend_command)rT   r:   r$  command_argsr/   r9  r0   r     s    

zChannel._brpop_startc                 K   s   zz| j j| j jdfi |}W n" | jyB   | j j   Y n0 |r|\}}t|| jdd }| j	| | j
tt|| W d | _dS t W d | _nd | _0 d S )NrL   r   r   T)r?   r^   r`   r   r
  r   rsplitr   r   rotater5  r   r   r   )rT   rb   Z
dest__itemdestitemr/   r/   r0   r    s(    
zChannel._brpop_readc                 K   s*   |dkr| j   n| j| jj| d S )Nr   )r   r^   r?   r`   )rT   r   rb   r/   r/   r0   r     s    zChannel._poll_errorc                 C   sr   |   V}| jD ]8}|| ||}|rtt|  W  d    S qt W d    n1 sd0    Y  d S rB   )r   r   Zrpopr7  r   r   r   )rT   r  r?   r8  r?  r/   r/   r0   _get  s    

 zChannel._getc              	   C   s   |   ~}| V}| jD ]}|| ||}q| }tdd |D W  d    W  d    S 1 sn0    Y  W d    n1 s0    Y  d S )Nc                 s   s   | ]}t |tjr|V  qd S rB   )r  numbersIntegral)rQ   sizer/   r/   r0   	<genexpr>  s   z Channel._size.<locals>.<genexpr>)r   rp   r   llenr7  r   sum)rT   r  r?   r   r8  sizesr/   r/   r0   _size  s    


zChannel._sizec                 C   s$   |  |}|r | | j | S |S rB   )priorityr   )rT   r  r8  r/   r/   r0   r7    s    
zChannel._q_for_pric                 C   s   | j }|t||d  S )Nr   )r   r   )rT   nZstepsr/   r/   r0   rI    s    zChannel.priorityc                 K   sR   | j |dd}|  (}|| ||t| W d   n1 sD0    Y  dS )zDeliver message.F)reverseN)Z_get_message_priorityr   r  r7  r   )rT   r  r|   rk   r8  r?   r/   r/   r0   _put  s    
zChannel._putc                 K   sD   |   (}|| ||t| W d   n1 s60    Y  dS )zDeliver fanout message.N)r   publishr!  r   )rT   r   r|   r   rk   r?   r/   r/   r0   _put_fanout  s
    

zChannel._put_fanoutc                 K   s   |r| j | d S rB   )r   r   )rT   r  Zauto_deleterk   r/   r/   r0   
_new_queue  s    zChannel._new_queuec              	   C   s   |  |jdkr&||ddf| j|< |  >}|| j|f | j|pJd|pPd|pVdg W d    n1 sr0    Y  d S )Nfanout#*rt   )	Ztypeofr   replacer  r   Zsaddkeyprefix_queuer   r   )rT   r   r   r(  r  r?   r/   r/   r0   _queue_bind  s    

zChannel._queue_bindc           
   	   O   s   | j | | j|dd}|| j|f | j|p:d|p@d|pFdg | 6}| j	D ]}	|
| ||	}q^|  W d    n1 s0    Y  W d    n1 s0    Y  d S )Nr?   r   rt   )r   r   r   r   ZsremrT  r   r   rp   r   deleter7  r   )
rT   r  r   r   r(  rZ   rk   r?   r   r8  r/   r/   r0   _delete  s    

zChannel._deletec              	   K   s   |   p}| H}| jD ]}|| ||}qt| W  d    W  d    S 1 s`0    Y  W d    n1 s~0    Y  d S rB   )r   rp   r   existsr7  r-  r   )rT   r  rk   r?   r   r8  r/   r/   r0   
_has_queue  s
    


zChannel._has_queuec                    sh    j | }  B}||}|s4g W  d    S  fdd|D W  d    S 1 sZ0    Y  d S )Nc                    s    g | ]}t t| jqS r/   )r/  r   r4  r   )rQ   valrS   r/   r0   rU   )  r{   z%Channel.get_table.<locals>.<listcomp>)rT  r   Zsmembers)rT   r   rd   r?   r   r/   rS   r0   	get_table!  s    


zChannel.get_tablec              	   C   s   |   }| `}| jD ] }| ||}|||}q| }t|d d d W  d    W  d    S 1 sx0    Y  W d    n1 s0    Y  d S )NrK   )r   rp   r   r7  rE  rV  r   rF  )rT   r  r?   r   r8  ZpriqrG  r/   r/   r0   _purge+  s    


zChannel._purgec                    sp   d| _ | jsb| jj|  | jd}|d urR| jD ]}|| jv r4| j	||d q4| 
  |   t   d S )NTr?   r   )_closingclosedr`   r  r   r   r   r  r   Zqueue_deleter  _close_clientsr]   r   )rT   r?   r  rf   r/   r0   r   4  s    

zChannel.closec                 C   sN   dD ]D}z$| j | }|jd  }|_|  W q tt| jfyF   Y q0 qd S )N)r?   r   )r   r`   r
  r   r   r.   )rT   attrr?   r`   r/   r/   r0   r_  D  s    
zChannel._close_clientsc                 C   sf   t |tjsb|r|dkrt}n|dr4|dd  }zt|}W n  ty`   td|Y n0 |S )Nr  r   z/Database is int between 0 and limit - 1, not {})r  rA  rB  
DEFAULT_DB
startswithintr   format)rT   Zvhostr/   r/   r0   _prepare_virtual_hostN  s    

zChannel._prepare_virtual_hostc                 K   s   |S rB   r/   )rT   r   r   paramsr/   r/   r0   _filter_tcp_connparams]  s    zChannel._filter_tcp_connparamsc                    s  | j j}|jpd|jp| j j|j|j|j| j| j	| j
| j| j| j| jd}| j}t|drpt|jdsp|d |jrz||j | j|d< W n ty   Y n0 |d }d|v rPt|\}}}}}	}
}|dkr(| jf i |}|jtjd	|
 d
fi | |dd  |dd  |dd  ||d< |	|d< |dd  |dd  | |dd |d< |  |dpz| j}|rG  fddd|}|}||d< |S )Nz	127.0.0.1)hostportvirtual_hostusernamepasswordr   r   r   r   r   r   r   rv   r   connection_classrh  z://r'   r  )rm  pathr   r   r   rk  rl  ri  rj  dbc                       s   e Zd Z fddZ  ZS )z'Channel._connparams.<locals>.Connectionc                    s   t    |  d S rB   )r]   r
  r   rS   )rg   rE   r/   r0   r
    s    
z2Channel._connparams.<locals>.Connection.disconnect)r5   r6   r7   r
  rq   r/   rD   rf   r0   
Connection  s   rp  )r`   r?   hostnameri  default_portrj  Zuseridrl  r   r   r   r   r   r   r   rm  r$   r   rv   rW   sslupdateconnection_class_sslr   r   rg  r#   ZUnixDomainSocketConnectionre  r   )rT   asynchronousZconninfo
connparamsZ
conn_classrh  schemer   rk  rl  rn  queryZconnection_clsrp  r/   rD   r0   _connparamsa  sp    




zChannel._connparamsc                 C   s    |r| j | jdS | j | jdS )N)rn   )r   r  r  rT   rv  r/   r/   r0   _create_client  s    zChannel._create_clientc                 C   s0   | j |d}| jj|d d| _tjf i |S )Nrv  ro  )ro  )rz  r  rd  r#   ConnectionPool)rT   rv  rf  r/   r/   r0   	_get_pool  s    zChannel._get_poolc                 C   s4   t jdk rtdt | jr.tjt| jdS t jS )N)r   rK   r   zSRedis transport requires redis-py versions 3.2.0 or later. You have {0.__version__}rl   )	r#   r   r	   rd  rN   	functoolspartialrr   ZStrictRedisrS   r/   r/   r0   r     s    
zChannel._get_clientc                 c   s   |r|V  n
|   V  d S rB   r|  rT   r?   r/   r/   r0   r     s    zChannel.conn_or_acquirec                 C   s   | j d u r|  | _ | j S rB   )r  r  rS   r/   r/   r0   r    s    

zChannel.poolc                 C   s   | j d u r| jdd| _ | j S )NTr}  )r	  r  rS   r/   r/   r0   r    s    
zChannel.async_poolc                 C   s   | j ddS )z+Client used to publish messages, BRPOP etc.Tr}  r  rS   r/   r/   r0   r?     s    zChannel.clientc                 C   s   | j dd}| S )z1Pub/Sub connection used to consume fanout queues.Tr}  )r|  rx   r  r/   r/   r0   r     s    zChannel.subclientc                 C   s   | j | j d S rB   )r   rt  r   rS   r/   r/   r0   r    s    zChannel._update_queue_cyclec                 C   s   ddl m} |jS )Nr   r    )r#   r!   r.   )rT   r!   r/   r/   r0   r     s    zChannel._get_response_errorc                    s    fdd j D S )z<Set of queues being consumed from (excluding fanout queues).c                    s   h | ]}| j vr|qS r/   )r   r#  rS   r/   r0   	<setcomp>  s   
z(Channel.active_queues.<locals>.<setcomp>)Z_active_queuesrS   r/   rS   r0   r     s    zChannel.active_queues)F)F)r   )F)NN)F)F)F)N)br5   r6   r7   r8   r}   Z_clientZ
_subclientr]  Zsupports_fanoutrT  r  r   r   r   r  r   r   r   r   r   r   r   PRIORITY_STEPSr   r   r   r   r   r   r   DEFAULT_HEALTH_CHECK_INTERVALr   r   r   rN   r   r	  r  r   r   from_transport_optionsr#   rp  rm  SSLConnectionru  rv   rC   r  r   r   r  r  r  r  r  r!  r"  r   r  r+  r  r,  r   r  r   r@  rH  r7  rI  rL  rN  rO  rU  rW  rY  r[  r\  r   r_  re  rg  rz  r|  r  r   r   r   r   r  r  r   r?   r   r  r   r   rq   r/   r/   rf   r0   r   ^  s   %	 

	

	
  

H





r   c                       st   e Zd ZdZeZdZeZdZdZ	e
jjjdeg ddZerHe \ZZ fddZd	d
 Zdd Zdd Z  ZS )r&   zRedis Transport.Nr#   T)directr'  rP  )rv  Zexchange_typec                    s.   t d u rtdt j|i | t | _d S )Nz)Missing redis library (pip install redis))r#   ImportErrorr]   rv   r   r  rj   rf   r/   r0   rv     s    zTransport.__init__c                 C   s   t jS rB   )r#   __version__rS   r/   r/   r0   driver_version  s    zTransport.driver_versionc                    s   | j j jj | jfdd}|_ fddj 	dj
 |jjdt}	|j d S )Nc                    s@   | j r| j   jr<zj W n ty:   Y n0 d S rB   )r   r  r   on_tickr   )r`   )r  loopr   r/   r0   _on_disconnect  s    z:Transport.register_with_event_loop.<locals>._on_disconnectc                      s       fddj D  d S )Nc                    s   g | ]} ||qS r/   r/   )rQ   r   )
add_readerr   r/   r0   rU      r{   zMTransport.register_with_event_loop.<locals>.on_poll_start.<locals>.<listcomp>)r   r/   )r  r  cycle_poll_startr   r/   r0   r     s    z9Transport.register_with_event_loop.<locals>.on_poll_startr   r   )r  r   r   r   r  r   r   r  r   Zcall_repeatedlyr   r?   Ztransport_optionsr   r  r   )rT   r`   r  r  r   r/   )r  r  r  r  r   r   r0   register_with_event_loop
  s$    z"Transport.register_with_event_loopc                 C   s   | j | dS )z1Handle AIO event for one of our file descriptors.N)r  r   )rT   r   r/   r/   r0   r   ,  s    zTransport.on_readable)r5   r6   r7   r8   r   Zpolling_intervalDEFAULT_PORTrr  Zdriver_typeZdriver_namer   r&   Z
implementsextend	frozensetr#   r1   r   r   rv   r  r  r   rq   r/   r/   rf   r0   r&     s    

"r&   c                   @   s   e Zd ZdZdS )SentinelManagedSSLConnectionzConnect to a Redis server using Sentinel + TLS.

        Use Sentinel to identify which Redis server is the current master
        to connect to and when connecting to the Master server, use an
        SSL Connection.
        Nr4   r/   r/   r/   r0   r  2  s   r  c                   @   sH   e Zd ZdZejd Zer ejndZer,e	ndZ
d	ddZd
ddZdS )SentinelChannela  Channel with explicit Redis Sentinel knowledge.

    Broker url is supposed to look like:

    .. code-block::

        sentinel://0.0.0.0:26379;sentinel://0.0.0.0:26380/...

    where each sentinel is separated by a `;`.

    Other arguments for the sentinel should come from the transport options
    (see `transport_options` of :class:`~kombu.connection.Connection`).

    You must provide at least one option in Transport options:
     * `master_name` - name of the redis group to poll

    Example:

    .. code-block:: python

        >>> import kombu
        >>> c = kombu.Connection(
             'sentinel://sentinel1:26379;sentinel://sentinel2:26379',
             transport_options={'master_name': 'mymaster'}
        )
        >>> c.connect()
    )master_namemin_other_sentinelssentinel_kwargsNFc           	      C   s   |  |}| }|dd  |dd  g }| jjjD ]4}t|}|jdkr8|jpZ| jj	}|
|j|f q8|s|
|d |d f tj|ft| ddt| dd d|}t| dd }|d u rtd	||| jjS )
Nrh  ri  r   r  r   r  )r  r  r  z1'master_name' transport option must be specified.)rz  copyrW   r`   r?   Zaltr   rx  ri  rr  r   rq  r   ZSentinelr   r   Z
master_forr   rn   )	rT   rv  rw  Zadditional_paramsZ	sentinelsurlri  Zsentinel_instr  r/   r/   r0   _sentinel_managed_poold  s:    



z&SentinelChannel._sentinel_managed_poolc                 C   s
   |  |S rB   )r  r{  r/   r/   r0   r    s    zSentinelChannel._get_pool)F)F)r5   r6   r7   r8   r   r  r   SentinelManagedConnectionrm  r  ru  r  r  r/   r/   r/   r0   r  ?  s   

%r  c                   @   s   e Zd ZdZdZeZdS )SentinelTransportzRedis Sentinel Transport.ig  N)r5   r6   r7   r8   rr  r  r   r/   r/   r/   r0   r    s   r  )Kr8   r  rA  r'   r   collectionsr   
contextlibr   r  r   r   Zviner   Zkombu.exceptionsr   r	   Z	kombu.logr
   Zkombu.utils.compatr   Zkombu.utils.encodingr   Zkombu.utils.eventior   r   r   Zkombu.utils.functionalr   Zkombu.utils.jsonr   r   Zkombu.utils.objectsr   Zkombu.utils.schedulingr   Zkombu.utils.urlr   rt   r   r#   r  r   loggercriticalr2  r  r  ra  r  r  r   r1   r2   r  r3   rA   rF   rG   ru   rr   r?   ry   rm   ZPubSubrw   r}   r   r   r&   r  r  r  r  r  r/   r/   r/   r0   <module>   s|   5



Q4i       D
N