a
    xdb                     @   s   d Z ddlZddlZddlZddlZddlmZmZ ddlm	Z	 ddl
mZ ddlmZ ddlmZ d	Zd
ZdZdZG dd dejZG dd dejejZG dd dejZG dd dejZdS )zT`librabbitmq`_ transport.

.. _`librabbitmq`: https://pypi.org/project/librabbitmq/
    N)ChannelErrorConnectionError)get_manager)version_string_as_tuple   )base)to_rabbitmq_queue_argumentsz
    librabbitmq version too old to detect RabbitMQ version information
    so make sure you are using librabbitmq 1.5 when using rabbitmq > 3.3
i(  i'  zAssl not supported by librabbitmq, please use pyamqp:// or stunnelc                       s    e Zd ZdZ fddZ  ZS )MessagezAMQP Message (librabbitmq).c                    s8   t  j|||||d|d|d|dd d S )Ndelivery_tagcontent_typecontent_encodingheaders)channelbodyZdelivery_info
propertiesr
   r   r   r   )super__init__get)selfr   propsinfor   	__class__ S/var/www/html/Ranjet/env/lib/python3.9/site-packages/kombu/transport/librabbitmq.pyr   "   s    zMessage.__init__)__name__
__module____qualname____doc__r   __classcell__r   r   r   r   r	      s   r	   c                   @   s&   e Zd ZdZeZdddZdd ZdS )ChannelzAMQP Channel (librabbitmq).Nc                 C   s:   |dur|ni }| |||d |dur2||d< ||fS )z%Encapsulate data into a AMQP message.N)r   r   r   priority)update)r   r   r!   r   r   r   r   r   r   r   prepare_message3   s    zChannel.prepare_messagec                 K   s"   t |fi |}dd | D S )Nc                 S   s   i | ]\}}| d |qS )utf8)encode).0kvr   r   r   
<dictcomp>D       z3Channel.prepare_queue_arguments.<locals>.<dictcomp>)r   items)r   	argumentskwargsr   r   r   prepare_queue_argumentsB   s    zChannel.prepare_queue_arguments)NNNNN)r   r   r   r   r	   r#   r.   r   r   r   r   r    .   s      
r    c                   @   s   e Zd ZdZeZeZdS )
ConnectionzAMQP Connection (librabbitmq).N)r   r   r   r   r    r	   r   r   r   r   r/   G   s   r/   c                   @   s   e Zd ZdZeZeZeZe	j
jeejeef Ze	j
jef ZdZdZe	j
jjdddZdd Zd	d
 Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Z dd Z!e"dd Z#dS ) 	TransportzAMQP Transport (librabbitmq).amqplibrabbitmqTF)ZasynchronousZ
heartbeatsc                 K   s4   || _ |dp| j| _|dp&| j| _d | _d S )Ndefault_portdefault_ssl_port)clientr   r3   r4   Z_Transport__reader)r   r5   r-   r   r   r   r   e   s    
zTransport.__init__c                 C   s   t jS N)r1   __version__r   r   r   r   driver_versionl   s    zTransport.driver_versionc                 C   s   |  S r6   )r   r   
connectionr   r   r   create_channelo   s    zTransport.create_channelc                 K   s   |j f i |S r6   )drain_events)r   r;   r-   r   r   r   r=   r   s    zTransport.drain_eventsc              
   C   s   | j }| j D ] \}}t||dst||| q|jr@ttt|j	|j
|j|j|j|j|j|jdfi |jpri }| jf i |}| j |_ |j| j _|S )z(Establish connection to the AMQP broker.N)hostuseridpasswordvirtual_hostlogin_methodinsistsslconnect_timeout)r5   default_connection_paramsr+   getattrsetattrrD   NotImplementedErrorNO_SSL_ERRORdictr>   r?   r@   rA   rB   rC   rE   Ztransport_optionsr/   r=   )r   Zconninfonamedefault_valueoptsconnr   r   r   establish_connectionu   s,    	

zTransport.establish_connectionc                 C   s   d| j _|  dS )z!Close the AMQP broker connection.N)r5   r=   closer:   r   r   r   close_connection   s    zTransport.close_connectionc              	   C   sn   |d ur\|j  D ]
}d |_qzt|  W n ttfyF   Y n0 |j   |j	  d | j
_d | _
d S r6   )Zchannelsvaluesr;   osrQ   filenoOSError
ValueErrorclear	callbacksr5   r=   )r   r;   r   r   r   r   _collect   s    

zTransport._collectc                 C   s   |j S r6   )	connectedr:   r   r   r   verify_connection   s    zTransport.verify_connectionc                 C   s   | | | j|| d S r6   )Z
add_readerrU   Zon_readable)r   r;   Zloopr   r   r   register_with_event_loop   s    z"Transport.register_with_event_loopc                 O   s   t | jg|R i |S r6   )r   r5   )r   argsr-   r   r   r   r      s    zTransport.get_managerc                 C   sN   z
|j }W n  ty*   ttt Y n 0 |ddkrJt|d dk S dS )NproductZRabbitMQversion)   ra   T)Zserver_propertiesAttributeErrorwarningswarnUserWarning	W_VERSIONr   r   )r   r;   r   r   r   r   qos_semantics_matches_spec   s    
z$Transport.qos_semantics_matches_specc                 C   s    dd| j jr| jn| jdddS )NZguest	localhostZPLAIN)r?   r@   porthostnamerB   )r5   rD   r4   r3   r8   r   r   r   rF      s    z#Transport.default_connection_paramsN)$r   r   r   r   r/   DEFAULT_PORTr3   DEFAULT_SSL_PORTr4   r   r0   Zconnection_errorsr   socketerrorIOErrorrV   Zchannel_errorsr   Zdriver_typeZdriver_nameZ
implementsextendr   r9   r<   r=   rP   rR   rZ   r\   r]   r   rg   propertyrF   r   r   r   r   r0   N   s:   

r0   )r   rT   rm   rc   r2   r1   r   r   Zkombu.utils.amq_managerr   Zkombu.utils.textr    r   r   rf   rk   rl   rJ   r	   r    Z
StdChannelr/   r0   r   r   r   r   <module>   s"   