a
    xd                     @   s   d Z ddlmZ ddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZmZ dd	lmZ dd
lmZ ddlmZ ddlmZ dZeeZejej ZZG dd dejZdS )z)Worker <-> Worker communication Bootstep.    )defaultdict)partial)heappush)
itemgetter)Consumer)	DummyLock)ContentDisallowedDecodeError)	bootsteps)
get_logger)Bunch   )Mingle)Gossipc                       s   e Zd ZdZd ZefZedddddddZd	d
hZ	d- fdd	Z
dd Zd.ddZdd Zdd Z fddZdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)d* Zd+d, Z  ZS )/r   zfBootstep consuming events from other workers.

    This keeps the logical clock value up to date.
    idclockhostnamepidtopicactioncverZamqpZredisF      @       @c                    s  | o|  |j| _|j| _| |_|jjj| _|j| _d| jt|j	g| _
tt t t d| _|j| _| jr|jjj| j| jdd| _|jrt |_| jj| _|| _|| _d | _tt| _i | _| j| j d| _!|jj"| _"d| j#i| _$t% j&|fi | d S )N.)	node_join
node_leave	node_lostr   )on_node_joinon_node_leaveZmax_tasks_in_memory)zworker.electzworker.elect.acktask)'compatible_transportappZenabledZgossipeventsReceiverr   joinstrr   full_hostnamer   setontimerStater   r   stateZhubr   Z_mutexeventupdate_stateintervalheartbeat_interval_trefr   listconsensus_requestsconsensus_replieson_electon_elect_ackevent_handlersr   	call_taskelection_handlerssuper__init__)selfcZwithout_gossipr.   r/   kwargs	__class__ U/var/www/html/Ranjet/env/lib/python3.9/site-packages/celery/worker/consumer/gossip.pyr:   $   sB    


zGossip.__init__c                 C   s:   |  }|jj| jv W  d    S 1 s,0    Y  d S N)Zconnection_for_read	transportZdriver_typecompatible_transports)r;   r!   connr@   r@   rA   r    M   s    
zGossip.compatible_transportNc                 C   s$   g | j |< | jjd|||dd d S )Nzworker-electr   )r   r   r   r   )r3   
dispatchersend)r;   r   r   r   r@   r@   rA   electionQ   s
    
zGossip.electionc              
   C   sJ   z| j |  W n0 tyD } ztd| W Y d }~n
d }~0 0 d S )NzCould not call task: %r)r!   	signatureZapply_async	Exceptionlogger	exception)r;   r   excr@   r@   rA   r7   X   s    zGossip.call_taskc           
   
   C   s   z|  |\}}}}}}}W n0 tyL }	 ztd|	W  Y d }	~	S d }	~	0 0 t| j| || d| ||f | jjd|d d S )Nz!election request missing field %sr   zworker-elect-ack)r   )_cons_stamp_fieldsKeyErrorrK   rL   r   r2   rF   rG   )
r;   r,   Zid_r   r   r   r   r   _rM   r@   r@   rA   r4   ^   s    
"zGossip.on_electc                    s   t  | |j| _d S rB   )r9   startZevent_dispatcherrF   )r;   r<   r>   r@   rA   rQ   j   s    zGossip.startc           
      C   s   |d }z| j | }W n ty*   Y d S 0 t| j }||d  t|t|kr| j| j	| \}}}}|| j
krtd| z| j| }	W n ty   td| Y q0 |	| ntd|| | j	|d  | j |d  d S )Nr   r   zI won the election %rzUnknown election topic %rznode %s elected for %r)r3   rO   r'   r+   alive_workersappendlenr   Z	sort_heapr2   r&   infor8   rK   rL   pop)
r;   r,   r   ZrepliesrR   rP   Zleaderr   r   handlerr@   r@   rA   r5   n   s*    


zGossip.on_elect_ackc                 C   s    t d|j | | jj| d S )Nz%s joined the party)debugr   _call_handlersr(   r   r;   workerr@   r@   rA   r      s    zGossip.on_node_joinc                 C   s    t d|j | | jj| d S )Nz%s left)rX   r   rY   r(   r   rZ   r@   r@   rA   r      s    zGossip.on_node_leavec                 C   s    t d|j | | jj| d S )Nzmissed heartbeat from %s)rU   r   rY   r(   r   rZ   r@   r@   rA   on_node_lost   s    zGossip.on_node_lostc                 O   sT   |D ]J}z||i | W q t yL } ztd|| W Y d }~qd }~0 0 qd S )Nz!Ignored error from handler %r: %r)rJ   rK   rL   )r;   handlersargsr=   rW   rM   r@   r@   rA   rY      s    zGossip._call_handlersc                 C   s,   | j d ur| j   | j| j| j| _ d S rB   )r0   cancelr)   Zcall_repeatedlyr.   periodic)r;   r@   r@   rA   register_timer   s    

zGossip.register_timerc                 C   sR   | j j}t }| D ]}|js|| | | q|D ]}||jd  q:d S rB   )	r+   workersr'   valuesaliveaddr\   rV   r   )r;   rb   Zdirtyr[   r@   r@   rA   r`      s    
zGossip.periodicc                 C   s:   |    | j|d| jd}t||jgt| j|jddgS )Nzworker.#)routing_keyZ	queue_ttlT)Zqueues
on_messageZno_ack)ra   r#   r/   r   queuer   rg   Zevent_from_message)r;   ZchannelZevr@   r@   rA   get_consumers   s    zGossip.get_consumersc           	   
   C   s   |j d }|ddd dkr"d S z| j| }W n tyB   Y n0 ||jS |jdpb|jd }|| jkrz||j\}}| | W q t	t
tfy } zt| W Y d }~qd }~0 0 n
| j  d S )Nrf   r   r   r   r   r   )Zdelivery_infosplitr6   rO   payloadheadersgetr   r-   r	   r   	TypeErrorrK   errorr   Zforward)	r;   preparemessage_typerW   r   rP   r,   rM   r@   r@   rA   rg      s$    


"zGossip.on_message)Fr   r   )N)__name__
__module____qualname____doc__labelr   requiresr   rN   rD   r:   r    rH   r7   r4   rQ   r5   r   r   r\   rY   ra   r`   ri   rg   __classcell__r@   r@   r>   rA   r      s0     )

r   N)rv   collectionsr   	functoolsr   heapqr   operatorr   Zkombur   Zkombu.asynchronous.semaphorer   Zkombu.exceptionsr   r	   Zceleryr
   Zcelery.utils.logr   Zcelery.utils.objectsr   Zmingler   __all__rs   rK   rX   rU   ZConsumerStepr   r@   r@   r@   rA   <module>   s   