a
    xd$                     @   s  d Z ddlZddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZ dd
lmZ dZi Zdd ZedG dd dZG dd deZedG dd deZedG dd deZG dd dZG dd dZdS )z$Async I/O backend support utilities.    N)deque)Empty)sleep)WeakKeyDictionary)detect_environment)states)TimeoutError)THREAD_TIMEOUT_MAX)AsyncBackendMixinBaseResultConsumerDrainerregister_drainerc                    s    fdd}|S )z5Decorator used to register a new result drainer type.c                    s   | t  < | S N)drainers)clsname T/var/www/html/Ranjet/env/lib/python3.9/site-packages/celery/backends/asynchronous.py_inner   s    z register_drainer.<locals>._innerr   )r   r   r   r   r   r      s    r   defaultc                   @   s<   e Zd ZdZdd Zdd Zdd Zdd
dZdddZdS )r   zResult draining service.c                 C   s
   || _ d S r   )result_consumer)selfr   r   r   r   __init__$   s    zDrainer.__init__c                 C   s   d S r   r   r   r   r   r   start'   s    zDrainer.startc                 C   s   d S r   r   r   r   r   r   stop*   s    zDrainer.stopN   c                 c   st   |p
| j j}t }|r0t | |kr0t z| j|||dV  W n tjyZ   Y n0 |rf|  |jrqpqd S Ntimeout)r   drain_eventstime	monotonicsocketr    wait_forready)r   pr    intervalon_intervalwaitZ
time_startr   r   r   drain_events_until-   s    zDrainer.drain_events_untilc                 C   s   ||d d S r   r   r   r'   r*   r    r   r   r   r%   >   s    zDrainer.wait_for)Nr   NN)N)	__name__
__module____qualname____doc__r   r   r   r+   r%   r   r   r   r   r       s   
r   c                       s<   e Zd ZdZdZ fddZdd Zdd Zdd	 Z  Z	S )
greenletDrainerNc                    s4   t  j|i | t | _t | _t | _d S r   )superr   	threadingEvent_started_stopped	_shutdown)r   argskwargs	__class__r   r   r   F   s    

zgreenletDrainer.__init__c                 C   sL   | j   | j s>z| jjdd W q
 tjy:   Y q
0 q
| j  d S )Nr   r   )	r5   setr6   is_setr   r!   r$   r    r7   r   r   r   r   runL   s    

zgreenletDrainer.runc                 C   s&   | j  s"| | j| _| j   d S r   )r5   r=   spawnr>   _gr*   r   r   r   r   r   U   s    
zgreenletDrainer.startc                 C   s   | j   | jt d S r   )r6   r<   r7   r*   r	   r   r   r   r   r   Z   s    
zgreenletDrainer.stop)
r-   r.   r/   r?   r@   r   r>   r   r   __classcell__r   r   r:   r   r1   B   s   	r1   eventletc                   @   s   e Zd Zdd ZdddZdS )eventletDrainerc                 C   s$   ddl m}m} ||}|d |S )Nr   )r   r?   )rB   r   r?   )r   funcr   r?   gr   r   r   r?   b   s    zeventletDrainer.spawnNc                 C   s"   |    |js| jjj|d d S r   )r   r&   r@   Z_exit_eventr*   r,   r   r   r   r%   h   s    zeventletDrainer.wait_for)Nr-   r.   r/   r?   r%   r   r   r   r   rC   _   s   rC   geventc                   @   s   e Zd Zdd ZdddZdS )geventDrainerc                 C   s    dd l }||}|d |S )Nr   )rG   r?   r   )r   rD   rG   rE   r   r   r   r?   q   s    

zgeventDrainer.spawnNc                 C   s,   dd l }|   |js(|j| jg|d d S )Nr   r   )rG   r   r&   r*   r@   )r   r'   r*   r    rG   r   r   r   r%   w   s    zgeventDrainer.wait_for)NrF   r   r   r   r   rH   n   s   rH   c                   @   s   e Zd ZdZdd ZdddZddd	Zd
d ZdddZd ddZ	dd Z
dd Zdd Zd!ddZd"ddZedd ZdS )#r
   z.Mixin for backends that enables the async API.c                 C   s   || j j|< d S r   )r   buckets)r   resultbucketr   r   r   _collect_into   s    zAsyncBackendMixin._collect_intoTc                 k   s   |    |j}|st t }|D ]8}t|ds<|| q"|jrN|| q"| || q"| j|fd|i|D ]:}|rr|	 }t|ds|j
|jfV  qv|j
|jfV  qvqr|r|	 }|j
|jfV  qd S )N_cacheno_ack)_ensure_not_eagerresultsStopIterationr   hasattrappendrM   rL   _wait_for_pendingpopleftidchildren)r   rJ   rN   r9   rP   rK   node_r   r   r   iter_native   s(    

zAsyncBackendMixin.iter_nativeFc                 C   sH   |r| j j  z| | W n$ tyB   | j|j||d Y n0 |S )N)weak)r   drainerr   _maybe_resolve_from_bufferr   _add_pending_resultrV   )r   rJ   r[   start_drainerr   r   r   add_pending_result   s    z$AsyncBackendMixin.add_pending_resultc                 C   s   | | j|j d S r   )_maybe_set_cache_pending_messagesZtakerV   r   rJ   r   r   r   r]      s    z,AsyncBackendMixin._maybe_resolve_from_bufferc                 C   s<   | j \}}||vr8|j|vr8||r&|n||< | j| d S r   )_pending_resultsrV   r   consume_from)r   task_idrJ   r[   ZconcreteZweak_r   r   r   r^      s    
z%AsyncBackendMixin._add_pending_resultc                    s     j j   fdd|D S )Nc                    s   g | ]} j |d dqS )F)r[   r_   )r`   ).0rJ   r   r[   r   r   
<listcomp>   s   z9AsyncBackendMixin.add_pending_results.<locals>.<listcomp>)r   r\   r   )r   rP   r[   r   rh   r   add_pending_results   s    z%AsyncBackendMixin.add_pending_resultsc                 C   s   |  |j | | |S r   )_remove_pending_resultrV   on_result_fulfilledrc   r   r   r   remove_pending_result   s    
z'AsyncBackendMixin.remove_pending_resultc                 C   s   | j D ]}||d  qd S r   )rd   popr   rf   mappingr   r   r   rk      s    
z(AsyncBackendMixin._remove_pending_resultc                 C   s   | j |j d S r   )r   
cancel_forrV   rc   r   r   r   rl      s    z%AsyncBackendMixin.on_result_fulfilledNc                 K   s.   |    | j|fi |D ]}q|j||dS )N)callback	propagate)rO   rT   Zmaybe_throw)r   rJ   rr   rs   r9   rY   r   r   r   wait_for_pending   s    z"AsyncBackendMixin.wait_for_pendingc                 K   s   | j j|f|||d|S )N)r    r)   
on_message)r   rT   )r   rJ   r    r)   ru   r9   r   r   r   rT      s    z#AsyncBackendMixin._wait_for_pendingc                 C   s   dS )NTr   r   r   r   r   is_async   s    zAsyncBackendMixin.is_async)T)FT)F)F)NT)NNN)r-   r.   r/   r0   rL   rZ   r`   r]   r^   rj   rm   rk   rl   rt   rT   propertyrv   r   r   r   r   r
   ~   s    

	

 
 
	r
   c                   @   s   e Zd ZdZdd Zdd Zdd Zdd	d
Zdd Zdd Z	dd Z
dd Zd ddZd!ddZd"ddZdd Zdd Zdd ZdS )#r   z2Manager responsible for consuming result messages.c                 C   s@   || _ || _|| _|| _|| _d | _t | _tt	  | | _
d S r   )backendappacceptrd   rb   ru   r   rI   r   r   r\   )r   rx   ry   rz   Zpending_resultsZpending_messagesr   r   r   r      s    zBaseResultConsumer.__init__c                 K   s
   t  d S r   NotImplementedError)r   Zinitial_task_idr9   r   r   r   r      s    zBaseResultConsumer.startc                 C   s   d S r   r   r   r   r   r   r      s    zBaseResultConsumer.stopNc                 C   s
   t  d S r   r{   )r   r    r   r   r   r!      s    zBaseResultConsumer.drain_eventsc                 C   s
   t  d S r   r{   r   rf   r   r   r   re      s    zBaseResultConsumer.consume_fromc                 C   s
   t  d S r   r{   r}   r   r   r   rq      s    zBaseResultConsumer.cancel_forc                 C   s$   | j   t | _ d | _|   d S r   )rI   clearr   ru   on_after_forkr   r   r   r   _after_fork   s    
zBaseResultConsumer._after_forkc                 C   s   d S r   r   r   r   r   r   r      s    z BaseResultConsumer.on_after_forkc                 C   s   | j j|||dS )Nr    r)   )r\   r+   )r   r'   r    r)   r   r   r   r+      s    z%BaseResultConsumer.drain_events_untilc                 k   s   | j |fd|i| | j| }| _zRz*| j|j||dD ]}d V  td q:W n tjyl   tdY n0 W || _n|| _0 d S )Nr    r   r   zThe operation timed out.)on_wait_for_pendingru   r+   Zon_readyr   r$   r    r   )r   rJ   r    r)   ru   r9   Z	prev_on_mrY   r   r   r   rT     s    
z$BaseResultConsumer._wait_for_pendingc                 K   s   d S r   r   )r   rJ   r    r9   r   r   r   r     s    z&BaseResultConsumer.on_wait_for_pendingc                 C   s   |  |j| d S r   )on_state_changepayload)r   messager   r   r   on_out_of_band_result  s    z(BaseResultConsumer.on_out_of_band_resultc              	   C   s:   | j D ]&}z|| W   S  ty*   Y q0 qt|d S r   )rd   KeyErrorro   r   r   r   _get_pending_result  s    
z&BaseResultConsumer._get_pending_resultc                 C   s   | j r|  | |d tjv r|d }z| |}W n  tyT   | j|| Y n>0 || | j}z|	|}W n ty   Y n0 |
| td d S )Nstatusrf   r   )ru   r   ZREADY_STATESr   r   rb   putra   rI   rn   rS   r   )r   metar   rf   rJ   rI   rK   r   r   r   r   !  s     


z"BaseResultConsumer.on_state_change)N)NN)NNN)N)r-   r.   r/   r0   r   r   r   r!   re   rq   r   r   r+   rT   r   r   r   r   r   r   r   r   r      s    

 

r   )r0   r$   r3   r"   collectionsr   queuer   r   weakrefr   Zkombu.utils.compatr   Zceleryr   Zcelery.exceptionsr   Zcelery.utils.threadsr	   __all__r   r   r   r1   rC   rH   r
   r   r   r   r   r   <module>   s.   ![