
     h7                        d Z ddlZddlmZ ddlZddlmZmZmZ ddlm	Z	 ddl
mZ ddlmZ ddlmZ dd	lmZmZ dd
lmZ ddlmZ ddlmZ dZdZ G d d          Z G d dej                  Z G d dej                  ZdS )a  MongoDB transport module for kombu.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: Yes
* Supports TTL: Yes

Connection String
=================
 *Unreviewed*

Transport Options
=================

* ``connect_timeout``,
* ``ssl``,
* ``ttl``,
* ``capped_queue_size``,
* ``default_hostname``,
* ``default_port``,
* ``default_database``,
* ``messages_collection``,
* ``routing_collection``,
* ``broadcast_collection``,
* ``queues_collection``,
* ``calc_queue_size``,
    N)Empty)MongoClienterrors
uri_parser)
CursorType)VersionMismatch)_detect_environment)bytes_to_str)dumpsloads)cached_property   )virtualto_rabbitmq_queue_argumentsz3Kombu requires MongoDB version 1.3+ (server is {0})zKKombu requires MongoDB version 2.2+ (server is {0}) for TTL indexes supportc                   <    e Zd ZdZd Zd Zd Zd
dZd Zd Z	e	Z
d	S )BroadcastCursorzCursor for broadcast queues.c                 @    || _         |                     d           d S )NF)rewind)_cursorpurge)selfcursors     S/var/www/html/Sam_Eipo/venv/lib/python3.11/site-packages/kombu/transport/mongodb.py__init__zBroadcastCursor.__init__@   s#    

%
         c                 D    | j                                         | j        z
  S N)r   count_offsetr   s    r   get_sizezBroadcastCursor.get_sizeE   s    |!!##dl22r   c                 8    | j                                          d S r   )r   closer!   s    r   r$   zBroadcastCursor.closeH   s    r   Tc                     |r| j                                          | j                                         | _        | j                             | j                  | _         d S r   )r   r   r   r    skip)r   r   s     r   r   zBroadcastCursor.purgeK   sR     	"L!!! |))++|((66r   c                     | S r    r!   s    r   __iter__zBroadcastCursor.__iter__S   s    r   c                     	 	 t          | j                  }nG# t          j        j        $ r0}dt          |          v r|                                  Y d }~U d }~ww xY w| xj        dz  c_        |S )NTznot valid at serverr   )nextr   pymongor   OperationFailurestrr   r    )r   msgexcs      r   __next__zBroadcastCursor.__next__V   s    	4<((  >2    )CHH44JJLLLHHHH 	
s    A%AAAN)T)__name__
__module____qualname____doc__r   r"   r$   r   r)   r1   r+   r(   r   r   r   r   =   sz        &&! ! !
3 3 3  7 7 7 7    & DDDr   r   c                   p    e Zd ZdZdZi ZdZdZdZdZ	dZ
dZdZdZd	Zd
ZdZdZej        j        dz   Z fdZd Zd Z fdZd Zd Zd Zd Zd Z fdZd*dZd Z d Z!d*dZ"d Z#d Z$d Z%e&d              Z'e&d!             Z(e&d"             Z)e&d#             Z*e&d$             Z+d% Z,d& Z-d' Z.d( Z/d) Z0 xZ1S )+ChannelzMongoDB Channel.TFNi z	127.0.0.1ii  kombu_defaultmessageszmessages.routingzmessages.broadcastzmessages.queues)connect_timeoutsslttlcapped_queue_sizedefault_hostnamedefault_portdefault_databasemessages_collectionrouting_collectionbroadcast_collectionqueues_collectioncalc_queue_sizec                 V     t                      j        |i | i | _        | j         d S r   )superr   _broadcast_cursorsclient)r   vargskwargs	__class__s      r   r   zChannel.__init__   s4    %*6***"$ 	r   c           	          | j         r9| j                            d|i|||                     |d          dd           d S d S )N_id	x-expires)rN   options	expire_atTupsert)r<   queuesupdate_get_expire)r   queuerK   s      r   
_new_queuezChannel._new_queue   sp    8 	K""..v{CCE E       	 	r   c                 t   || j         v r6	 t          |                     |                    }n># t          $ r d }Y n0w xY w| j                            d|idt          j        fgd          }| j        r| 	                    |           |t                      t          t          |d                             S )NrW   priorityT)querysortremovepayload)_fanout_queuesr+   _get_broadcast_cursorStopIterationr9   find_and_modifyr,   	ASCENDINGr<   _update_queues_expirer   r   r
   )r   rW   r/   s      r   _getzChannel._get   s    D'''455e<<==     -//&!7#456 0  C 8 	.&&u---;''M\#i.11222s   ". ==c                    | j         s!t                                          |          S || j        v r'|                     |                                          S | j                            d|i                                          S NrW   )	rE   rG   _sizer_   r`   r"   r9   findr   )r   rW   rL   s     r   rh   zChannel._size   s{     # 	(77=='''D'''--e44==???}!!7E"23399;;;r   c                     t          |          ||                     |d          d}| j        r|                     |d          |d<   | j                            |           d S )NT)reverse)r^   rW   rZ   zx-message-ttlrQ   )r   _get_message_priorityr<   rV   r9   insert)r   rW   messagerK   datas        r   _putzChannel._put   ss    W~~227D2II
 
 8 	I $ 0 0 H HDT"""""r   c                 Z    | j                             t          |          |d           d S )N)r^   rW   )	broadcastrm   r   )r   exchangern   routing_keyrK   s        r   _put_fanoutzChannel._put_fanout   s;    %..(02 2 	3 	3 	3 	3 	3r   c                     |                      |          }|| j        v r(|                     |                                           n| j                            d|i           |S rg   )rh   r_   r`   r   r9   r]   )r   rW   sizes      r   _purgezChannel._purge   sd    zz%  D'''&&u--335555M  '5!1222r   c                     t          | j        j        |         d                   }| j                            d|i          }|t          d |D                       z  S )Ntablers   c              3   D   K   | ]}|d          |d         |d         fV  dS )rt   patternrW   Nr(   ).0rs     r   	<genexpr>z$Channel.get_table.<locals>.<genexpr>   sJ       '
 '
 }q|QwZ8'
 '
 '
 '
 '
 '
r   )	frozensetstate	exchangesroutingri   )r   rs   localRoutesbrokerRoutess       r   	get_tablezChannel.get_table   st    
 4X >w GHH|(("
 
 Y '
 '
!'
 '
 '
 
 
 
 	
r   c                 6   |                      |          j        dk    r"|                     ||||           || j        |<   ||||d}|                                }| j        r|                     |d          |d<   | j                            ||d           d S )Nfanout)rs   rW   rt   r|   rO   rQ   TrR   )	typeoftype_create_broadcast_cursorr_   copyr<   rV   r   rU   )r   rs   rt   r|   rW   lookupro   s          r   _queue_bindzChannel._queue_bind   s    ;;x  %11))+w7 7 7)1D& !&	
 
 {{}}8 	E $ 0 0 D DDFD66666r   c                    | j                             d|i           | j        r| j                            d|i            t	                      j        |fi | || j        v r\	 | j                            |          }|	                                 | j                            |           d S # t          $ r Y d S w xY wd S )NrW   rN   )r   r]   r<   rT   rG   queue_deleter_   rH   popr$   KeyError)r   rW   rK   r   rL   s       r   r   zChannel.queue_delete   s    We,---8 	/Ku~...U--f---D'''/044U;; #''.....     ('s   %B/ /
B=<B=
mongodb://c                 z   | j         j        }|j        }|                    |          s||z   }|t	          |          d          s
|| j        z  }|j        rEd|vrA|                    d          \  }}|j        }|j        r|d|j        z   z  }|dz   |z   dz   |z   }|j	        r|j	        n| j
        }t          j        ||          }|d         p|j        }	|	dv r| j        }	d| j        | j        rt#          | j        dz            nd d}
|
                    |d	                    |                     |
          }
||	|
fS )
N@z://:database)/NTi  )auto_start_requestr;   connectTimeoutMSrP   )
connectionrI   hostname
startswithlenr>   useridsplitpasswordportr?   r   	parse_urivirtual_hostr@   r;   r:   intrU   _prepare_client_options)r   schemerI   r   headtailcredentialsr   parseddbnamerP   s              r   
_parse_urizChannel._parse_uri  s{    '?""6** 	)(HF% 	.--H= 	?S00!..JD$ -K 5sV_44e|k1C7$>H$k@v{{t/@%h55
#:v':[  *F #'8$($8"CT%9D%@!A!A!A>B	
 
 	vi()))..w77((r   c                     t           j        dk    r`|                    dd            t          |                    d          t
                    r"t           j        j        }||d                  |d<   |S )N   r   readpreference)r,   version_tupler   
isinstancegetr   read_preferences_MONGOS_MODES)r   rP   modess      r   r   zChannel._prepare_client_options5  sk     D((KK,d333'++&677== M0>,1':J2K,L()r   c                     t          |fi |S r   r   )r   	argumentsrK   s      r   prepare_queue_argumentszChannel.prepare_queue_arguments=  s    *9?????r   c                    |                      |          \  }}}||d<   t                      }|dk    rddlm} |                                 n|dk    rddlm}  |             t          di |}||         }	|                                d         }
|
	                    d	          d         }
t          t          t          |
	                    d
                              }|dk     r't          t                              |
                    | j        r-|dk     r't          t"                              |
                    |	S )N)r   hostgeventr   )monkeyeventlet)monkey_patchversion-.)r   r   )   r   r(   )r   r	   r   r   	patch_allr   r   r   server_infor   tuplemapr   r   E_SERVER_VERSIONformatr<   E_NO_TTL_INDEXES)r   r   r   r   confenvr   r   	mongoconnr   version_strr   s               r   _openzChannel._open@  sW   !%!?!?&$V!##(??%%%%%%J------LNNN''$''	V$++--i8!'',,Q/C!2!23!7!78899V!"2"9"9+"F"FGGGX 	H'F**!"2"9"9+"F"FGGGr   c                     | j         |                                v rdS |                    | j         | j        d           dS )z0Create capped collection for broadcast messages.NT)rw   capped)rC   collection_namescreate_collectionr=   r   r   s     r   _create_broadcastzChannel._create_broadcast[  sV    $(A(A(C(CCCF""4#<(,(>*. 	# 	0 	0 	0 	0 	0r   c                    || j                  }|                    g dd           || j                                     dg           || j                 }|                    ddg           | j        rU|                    dgd           |                    dgd           || j                                     dgd           d	S d	S )
zEnsure indexes on collections.)rW   r   )rZ   r   )rN   r   T)
backgroundr   )rs   r   )rQ   r   r   )expireAfterSecondsN)rA   ensure_indexrC   rB   r<   rD   )r   r   r9   r   s       r   _ensure_indexeszChannel._ensure_indexesd  s   D45777D 	 	
 	
 	
 	*+88,HHH423lO<===8 	:!!#3"4!KKK  "2!3 JJJT+,99!"q : : : : : :		: 	:r   c                     |                                  }|                     |           |                     |           |S )zActually creates connection.)r   r   r   r   s     r   _create_clientzChannel._create_clientw  s<    ::<<x(((X&&&r   c                 *    |                                  S r   )r   r!   s    r   rI   zChannel.client  s    ""$$$r   c                 &    | j         | j                 S r   )rI   rA   r!   s    r   r9   zChannel.messages  s    {4344r   c                 &    | j         | j                 S r   )rI   rB   r!   s    r   r   zChannel.routing  s    {4233r   c                 &    | j         | j                 S r   )rI   rC   r!   s    r   rr   zChannel.broadcast  s    {4455r   c                 &    | j         | j                 S r   )rI   rD   r!   s    r   rT   zChannel.queues  s    {4122r   c                     	 | j         |         S # t          $ r& |                     | j        |         d d |          cY S w xY wr   )rH   r   r   r_   )r   rW   s     r   r`   zChannel._get_broadcast_cursor  se    	*511 	 	 	 00#E*D$    		s    -??c                     t           j        dk    rd|it          j        d}nd|idd} | j        j        di |}t          |          x}| j        |<   |S )Nr   rW   )filtercursor_typeT)r[   tailabler(   )r,   r   r   TAILABLErr   ri   r   rH   )r   rs   rt   r|   rW   r[   r   rets           r   r   z Channel._create_broadcast_cursor  s     E))"H-)2 EE "8,  E
 %$--u--/>v/F/FFd%e,
r   c                 &   t          |t                    r)| j                            d|i          }|sdS |d         }n|}	 |d         |         }n# t          t
          f$ r Y dS w xY w|                                 t          j        |          z   S )zGet expiration header named `argument` of queue definition.

        Note:
            `queue` must be either queue name or options itself.
        rN   NrP   r   )milliseconds)	r   r.   rT   find_oner   	TypeErrorget_nowdatetime	timedelta)r   rW   argumentdocro   values         r   rV   zChannel._get_expire  s     eS!! 	+&&u~66C y>DDD	%h/EE)$ 	 	 	FF	 ||~~ 2 F F FFFs   A A&%A&c                     |                      |d          }|sdS | j                            d|idd|iid           | j                            d|idd|iid           dS )	z,Update expiration field on queues documents.rO   NrW   z$setrQ   T)multirN   )rV   r   rU   rT   )r   rW   rQ   s      r   rd   zChannel._update_queues_expire  s    $$UK88	 	FevY'?@ 	 	N 	N 	NENVk9%=>d 	 	L 	L 	L 	L 	Lr   c                 >    t           j                                         S )zReturn current time in UTC.)r   utcnowr!   s    r   r   zChannel.get_now  s     '')))r   )r   )2r2   r3   r4   r5   supports_fanoutr_   r;   r<   r:   r=   rE   r>   r?   r@   rA   rB   rC   rD   r   r7   from_transport_optionsr   rX   re   rh   rp   ru   rx   r   r   r   r   r   r   r   r   r   r   r   rI   r9   r   rr   rT   r`   r   rV   rd   r   __classcell__)rL   s   @r   r7   r7   l   s       O N C
COO"L&$+/)%oD H       3 3 3*	< 	< 	< 	< 	<
# 
# 
#3 3 3  	
 	
 	
7 7 7(/ / / / /(') ') ') ')R  @ @ @   60 0 0: : :&   % % _% 5 5 _5 4 4 _4 6 6 _6 3 3 _3	 	 	   G G G.
L 
L 
L* * * * * * *r   r7   c                       e Zd ZdZeZdZdZej        Zej	        j
        ej        fz   Z
ej	        j        ej        ej        fz   ZdZdZej	        j                             eg d                    Zd Zd	S )
	TransportzMongoDB Transport.Tr   mongodbr,   )directtopicr   )exchange_typec                     t           j        S r   )r,   r   r!   s    r   driver_versionzTransport.driver_version  s
    r   N)r2   r3   r4   r5   r7   can_parse_urlpolling_intervalr?   r   r   connection_errorsr   ConnectionFailurechannel_errorsr-   driver_typedriver_name
implementsextendr   r  r(   r   r   r   r     s        GM'L+v/G.II  	($#,% 	% 
 KK"-44i = = =>> 5  J    r   r   )r5   r   rW   r   r,   r   r   r   pymongo.cursorr   kombu.exceptionsr   kombu.utils.compatr	   kombu.utils.encodingr
   kombu.utils.jsonr   r   kombu.utils.objectsr    r   baser   r   r   r   r7   r   r(   r   r   <module>r     s   @         3 3 3 3 3 3 3 3 3 3 % % % % % % , , , , , , 2 2 2 2 2 2 - - - - - - ) ) ) ) ) ) ) ) / / / / / /       - - - - - -  
, , , , , , , ,^g* g* g* g* g*go g* g* g*T    !     r   