a
    xd                     @   sX  d Z ddlZddlZddlmZ ddlmZmZ ddlm	Z	m
Z
 ddlmZ zddlZddlmZ dd	lmZ ejjejjejjejjejjejjejjejjejjf	Zejjejjejjejjejjejjejj ejj!ejjejj"ejj#ejj$ejjejj%ej&fZ'W n  e(y&   dZd
 ZZ'Y n0 dZ)dZ*G dd dej+Z+G dd dej,Z,dS )a  Zookeeper transport module for kombu.

Zookeeper based transport. This transport uses the built-in kazoo Zookeeper
based queue implementation.

**References**

- https://zookeeper.apache.org/doc/current/recipes.html#sc_recipes_Queues
- https://kazoo.readthedocs.io/en/latest/api/recipe/queue.html

**Limitations**
This queue does not offer reliable consumption.  An entry is removed from
the queue prior to being processed.  So if an error occurs, the consumer
has to re-queue the item or it will be lost.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: No
* Supports Priority: Yes
* Supports TTL: No

Connection String
=================
Connects to a zookeeper node as:

.. code-block::

    zookeeper://SERVER:PORT/VHOST

The <vhost> becomes the base for all the other znodes.  So we can use
it like a vhost.


Transport Options
=================

    N)Empty)bytes_to_strensure_bytes)dumpsloads   )virtual)KazooClient)Queue i  z!Mahendra M <mahendra.m@gmail.com>c                       s   e Zd ZdZdZi Z fddZdd Zdd Zd	d
 Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zedd Z  ZS )ChannelzZookeeper Channel.Nc                    s4   t  j|fi | | jjj}d|d| _d S )Nz/{}/)super__init__
connectionclientZvirtual_hostformatstrip_vhost)selfr   kwargsZvhost	__class__r   Q/var/www/html/Ranjet/env/lib/python3.9/site-packages/kombu/transport/zookeeper.pyr   g   s    
zChannel.__init__c                 C   s   t j| j|S N)ospathjoinr   )r   
queue_namer   r   r   	_get_pathl   s    zChannel._get_pathc                 C   s>   | j |d }|d u r:t| j| |}|| j |< t| |S r   )_queuesgetr
   r   r   len)r   r   queuer   r   r   
_get_queueo   s    
zChannel._get_queuec                 K   s&   |  |jtt|| j|dddS )NT)reverse)priority)r$   putr   r   Z_get_message_priority)r   r#   messager   r   r   r   _put{   s    

zChannel._putc                 C   s,   |  |}| }|d u r t tt|S r   )r$   r!   r   r   r   )r   r#   msgr   r   r   _get   s
    
zChannel._getc                 C   s.   d}|  |}| }|d u r q*|d7 }q|S )Nr   r   )r$   r!   )r   r#   countr*   r   r   r   _purge   s    

zChannel._purgec                 O   s*   |  |r&| | | j| | d S r   )
_has_queuer-   r   deleter   )r   r#   argsr   r   r   r   _delete   s    

zChannel._deletec                 C   s   |  |}t|S r   )r$   r"   r   r#   r   r   r   _size   s    
zChannel._sizec                 K   s   |  |s| |}d S r   )r.   r$   )r   r#   r   r   r   r   
_new_queue   s    
zChannel._new_queuec                 C   s   | j | |d uS r   )r   existsr   r2   r   r   r   r.      s    zChannel._has_queuec              	   C   s   | j j}g }|jr|jD ]}|dr6|tdd  }|s<qz |dd\}}|t|f}W n4 ty   ||jkr||j	p~t
f}n|t
f}Y n0 || q|j|j	pt
f}||vr|d| ddd |D }t|}|  |S )Nzzookeeper://:r   r   ,c                 S   s   g | ]\}}| d | qS )r6   r   ).0hpr   r   r   
<listcomp>       z!Channel._open.<locals>.<listcomp>)r   r   Zalt
startswithr"   splitint
ValueErrorhostnameportDEFAULT_PORTappendinsertr   r	   start)r   Zconninfohosts	host_porthostrB   Zconn_strconnr   r   r   _open   s.    


zChannel._openc                 C   s   | j d u r|  | _ | j S r   )_clientrK   r   r   r   r   r      s    

zChannel.client)__name__
__module____qualname____doc__rL   r    r   r   r$   r)   r+   r-   r1   r3   r4   r.   rK   propertyr   __classcell__r   r   r   r   r   a   s    	r   c                       sT   e Zd ZdZeZdZeZej	j
e Z
ej	je ZdZdZ fddZdd Z  ZS )		TransportzZookeeper Transport.r   Z	zookeeperkazooc                    s&   t d u rtdt j|i | d S )Nz"The kazoo library is not installed)rU   ImportErrorr   r   )r   r0   r   r   r   r   r      s    zTransport.__init__c                 C   s   t jS r   )rU   __version__rM   r   r   r   driver_version   s    zTransport.driver_version)rN   rO   rP   rQ   r   Zpolling_intervalrC   default_portr   rT   Zconnection_errorsKZ_CONNECTION_ERRORSZchannel_errorsKZ_CHANNEL_ERRORSZdriver_typeZdriver_namer   rX   rS   r   r   r   r   rT      s   

rT   )-rQ   r   socketr#   r   Zkombu.utils.encodingr   r   Zkombu.utils.jsonr   r    r   rU   Zkazoo.clientr	   Zkazoo.recipe.queuer
   
exceptionsZSystemErrorExceptionZConnectionLossExceptionZMarshallingErrorExceptionZUnimplementedExceptionZOperationTimeoutExceptionZNoAuthExceptionZInvalidACLExceptionZAuthFailedExceptionZSessionExpiredExceptionrZ   ZRuntimeInconsistencyExceptionZDataInconsistencyExceptionZBadArgumentsExceptionZApiErrorExceptionZNoNodeExceptionZNodeExistsExceptionZ NoChildrenForEphemeralsExceptionZNotEmptyExceptionZInvalidCallbackExceptionerrorr[   rV   rC   
__author__r   rT   r   r   r   r   <module>   sV   )f