a
    xdd                  	   @   s  d Z ddlZddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZ dd
lmZmZ ddlmZ ddlmZ ddlmZ ddlmZmZmZ ddlmZ dZe edZ!dZ"dZ#dZ$ee%Z&e&j'Z(dZ)dZ*dZ+ej,ej-ej.ej/ej0ej1ej2ej3dZ4G dd deZ5e6e5 eddd ddd  Z7d!e"e
e8e9fd"d#Z:d$d% Z;d&d' Z<e<d(G d)d* d*Z=e<d+G d,d- d-Z>G d.d/ d/Z?d0d1 Z@d2d3 ZAdS )4a  In-memory representation of cluster state.

This module implements a data-structure used to keep
track of the state of a cluster of workers and the tasks
it is working on (by consuming events).

For every event consumed the state is updated,
so the state represents the state of the cluster
at the time of the last event.

Snapshots (:mod:`celery.events.snapshot`) can be used to
take "pictures" of this state at regular intervals
to for example, store that in a database.
    N)defaultdict)Callable)datetime)Decimal)islice)
itemgetter)time)WeakSetref	timetuple)cached_property)states)LRUCachememoizepass1)
get_logger)WorkerTaskStateheartbeat_expirespypy_version_info      znSubstantial drift from %s may mean clocks are out of sync.  Current drift is
%s seconds.  [orig: %s recv: %s]
z4<State: events={0.event_count} tasks={0.task_count}>z9<Worker: {0.hostname} ({0.status_string} clock:{0.clock})z4<Task: {0.name}({0.uuid}) {0.state} clock:{0.clock}>)sentreceivedstartedfailedretried	succeededrevokedrejectedc                       s(   e Zd ZdZ fddZdd Z  ZS )CallableDefaultdicta  :class:`~collections.defaultdict` with configurable __call__.

    We use this for backwards compatibility in State.tasks_by_type
    etc, which used to be a method but is now an index instead.

    So you can do::

        >>> add_tasks = state.tasks_by_type['proj.tasks.add']

    while still supporting the method call::

        >>> add_tasks = list(state.tasks_by_type(
        ...     'proj.tasks.add', reverse=True))
    c                    s   || _ t j|i | d S N)funsuper__init__)selfr$   argskwargs	__class__ K/var/www/html/Ranjet/env/lib/python3.9/site-packages/celery/events/state.pyr&   ^   s    zCallableDefaultdict.__init__c                 O   s   | j |i |S r#   )r$   )r'   r(   r)   r,   r,   r-   __call__b   s    zCallableDefaultdict.__call__)__name__
__module____qualname____doc__r&   r.   __classcell__r,   r,   r*   r-   r"   N   s   r"   i  c                 C   s   | d S Nr   r,   )a_r,   r,   r-   <lambda>i       r7   )maxsizeZkeyfunc                 C   s    t t| |t|t| d S r#   )warnDRIFT_WARNINGr   fromtimestamp)hostnamedriftlocal_received	timestampr,   r,   r-   _warn_drifti   s    rA   <   c                 C   s8   |||r||n|}|| |r(|| } | ||d   S )z#Return time when heartbeat expires.g      Y@r,   )r@   freqexpire_windowr   float
isinstancer,   r,   r-   r   q   s    
r   c                 C   s   | f i |S r#   r,   )clsfieldsr,   r,   r-   _depickle_task}   s    rI   c                    s    fdd}|S )Nc                    s6    fdd}|| _ dd }|| _ fdd}|| _| S )Nc                    s$   t || jr t|  t| kS tS r#   )rF   r+   getattrNotImplemented)thisotherattrr,   r-   __eq__   s    z8with_unique_field.<locals>._decorate_cls.<locals>.__eq__c                 S   s   |  |}|tu rdS | S )NT)rP   rK   )rL   rM   resr,   r,   r-   __ne__   s    
z8with_unique_field.<locals>._decorate_cls.<locals>.__ne__c                    s   t t|  S r#   )hashrJ   )rL   rN   r,   r-   __hash__   s    z:with_unique_field.<locals>._decorate_cls.<locals>.__hash__)rP   rR   rT   )rG   rP   rR   rT   rN   r,   r-   _decorate_cls   s    z(with_unique_field.<locals>._decorate_clsr,   )rO   rU   r,   rN   r-   with_unique_field   s    rV   r=   c                   @   s   e Zd ZdZdZeZdZes$ed Z	ddd	Z
d
d Zdd Zdd Zdd Zedd Zedd ZeefddZedd ZdS )r   zWorker State.   )r=   pidrC   
heartbeatsclockactive	processedloadavgsw_identsw_versw_sys)event__dict____weakref__NrB   r   c                 C   s`   || _ || _|| _|d u rg n|| _|p*d| _|| _|| _|| _|	| _|
| _	|| _
|  | _d S r4   )r=   rX   rC   rY   rZ   r[   r\   r]   r^   r_   r`   _create_event_handlerra   )r'   r=   rX   rC   rY   rZ   r[   r\   r]   r^   r_   r`   r,   r,   r-   r&      s    
zWorker.__init__c                 C   s6   | j | j| j| j| j| j| j| j| j| j	| j
| jffS r#   )r+   r=   rX   rC   rY   rZ   r[   r\   r]   r^   r_   r`   r'   r,   r,   r-   
__reduce__   s
    zWorker.__reduce__c                    sP   t j jjjjjjd d d tttt	j
tf fdd	}|S )Nc	                    s   |pi }|  D ]\}	}
 |	|
 q| dkr<g d d < n||rD|sHd S ||||| }||krttj||| |r|}|d krd |r|d kr| n
|| d S )Noffline   r   )itemsrA   r=   )type_r@   r?   rH   Z	max_driftabsintinsortlenkvr>   heartsZ_setZ	hb_appendZhb_popZhbmaxrY   r'   r,   r-   ra      s&    
z+Worker._create_event_handler.<locals>.event)object__setattr__heartbeat_maxrY   popappendHEARTBEAT_DRIFT_MAXrl   rm   bisectrn   ro   r'   ra   r,   rs   r-   rd      s    zWorker._create_event_handlerc                 K   s:   |rt |fi |n|}| D ]\}}t| || q d S r#   )dictrj   setattr)r'   fkwdrp   rq   r,   r,   r-   update   s    zWorker.updatec                 C   s
   t | S r#   )R_WORKERformatre   r,   r,   r-   __repr__   s    zWorker.__repr__c                 C   s   | j r
dS dS )NZONLINEZOFFLINEalivere   r,   r,   r-   status_string   s    zWorker.status_stringc                 C   s   t | jd | j| jS )Nri   )r   rY   rC   rD   re   r,   r,   r-   r      s    
zWorker.heartbeat_expiresc                 C   s   t | jo| | jk S r#   )boolrY   r   )r'   Znowfunr,   r,   r-   r      s    zWorker.alivec                 C   s
   d | S )Nz{0.hostname}.{0.pid})r   re   r,   r,   r-   id   s    z	Worker.id)NNrB   Nr   NNNNNN)r/   r0   r1   r2   rv   HEARTBEAT_EXPIRE_WINDOWrD   _fieldsPYPY	__slots__r&   rf   rd   r   r   propertyr   r   r   r   r   r,   r,   r,   r-   r      s,      
!

r   uuidc                   @   s6  e Zd ZdZd Z Z Z Z Z Z	 Z
 Z Z Z Z Z Z Z Z Z Z Z Z Z Z Z Z ZZejZdZ dZ!e"sdZ#ej$diZ%dZ&d$dd	Z'dddej(e)e*j+ej,fd
dZ-d%ddZ.dd Z/dd Z0dd Z1dd Z2dd Z3dd Z4e5dd Z6e5dd Z7e5dd Z8e9d d! Z:e9d"d# Z;dS )&r   zTask State.Nr   )r   namestater   r   r   r!   r   r   r   r    r(   r)   etaexpiresretriesworkerresult	exceptionr@   runtime	tracebackexchangerouting_keyrZ   clientrootroot_idparent	parent_idchildren)rb   rc   )r   r(   r)   r   r   r   r   r   )r(   r)   r   r   r   r   r   r   r   r   r   r   c                    sd   | _ | _ jd ur4t fdd|p(dD  _nt  _ j j jd _|r` j	| d S )Nc                 3   s(   | ] }| j jv r j j|V  qd S r#   )cluster_statetasksget).0Ztask_idre   r,   r-   	<genexpr>&  s   z Task.__init__.<locals>.<genexpr>r,   )r   r   r   )
r   r   r	   r   _serializable_children_serializable_root_serializable_parent_serializer_handlersrb   r   )r'   r   r   r   r)   r,   re   r-   r&   "  s    
zTask.__init__c	           
         s   |pi }||}	|	d ur&|| || n|  }	|	|kr~| j|kr~||	|| jkr~| j|	  d ur fdd| D }n|j|	|d | j| d S )Nc                    s   i | ]\}}| v r||qS r,   r,   )r   rp   rq   Zkeepr,   r-   
<dictcomp>I  s   zTask.event.<locals>.<dictcomp>)r   r@   )upperr   merge_rulesr   rj   r   rb   )
r'   rk   r@   r?   rH   
precedencer}   Ztask_event_to_stateRETRYr   r,   r   r-   ra   5  s    
z
Task.eventc                    s8    sg n  du rj n fdd}t| S )z;Information about this task suitable for on-screen display.Nc                  3   s8   t t   D ]"} t| d }|d ur| |fV  qd S r#   )listrJ   )keyvalueextrarH   r'   r,   r-   _keysW  s    zTask.info.<locals>._keys)_info_fieldsr|   )r'   rH   r   r   r,   r   r-   infoR  s    z	Task.infoc                 C   s
   t | S r#   )R_TASKr   re   r,   r,   r-   r   _  s    zTask.__repr__c                    s&   t j jj fddjD S )Nc                    s"   i | ]}||t  |qS r,   )r   )r   rp   r   handlerr'   r,   r-   r   e  s   z Task.as_dict.<locals>.<dictcomp>)rt   __getattribute__r   r   r   re   r,   r   r-   as_dictb  s
    zTask.as_dictc                 C   s   dd | j D S )Nc                 S   s   g | ]
}|j qS r,   r   )r   taskr,   r,   r-   
<listcomp>j  r8   z/Task._serializable_children.<locals>.<listcomp>)r   r'   r   r,   r,   r-   r   i  s    zTask._serializable_childrenc                 C   s   | j S r#   )r   r   r,   r,   r-   r   l  s    zTask._serializable_rootc                 C   s   | j S r#   )r   r   r,   r,   r-   r   o  s    zTask._serializable_parentc                 C   s   t | j|  ffS r#   )rI   r+   r   re   r,   r,   r-   rf   r  s    zTask.__reduce__c                 C   s   | j S r#   )r   re   r,   r,   r-   r   u  s    zTask.idc                 C   s   | j d u r| jS | j jS r#   )r   r   r   re   r,   r,   r-   originy  s    zTask.originc                 C   s   | j tjv S r#   r   r   ZREADY_STATESre   r,   r,   r-   ready}  s    z
Task.readyc                 C   s2   z| j o| jjj| j  W S  ty,   Y d S 0 d S r#   )r   r   r   dataKeyErrorre   r,   r,   r-   r     s    zTask.parentc                 C   s2   z| j o| jjj| j  W S  ty,   Y d S 0 d S r#   )r   r   r   r   r   re   r,   r,   r-   r     s    z	Task.root)NNN)NN)<r/   r0   r1   r2   r   r   r   r   r   r   r   r    r!   r(   r)   r   r   r   r   r   r   r@   r   r   r   r   r   r   r   r   PENDINGr   rZ   r   r   r   RECEIVEDr   r   r&   r   r}   TASK_EVENT_TO_STATEr   r   ra   r   r   r   r   r   r   rf   r   r   r   r   r   r   r   r,   r,   r,   r-   r      s   






r   c                
   @   s   e Zd ZdZeZeZdZdZdZd6ddZ	e
d	d
 Zdd Zd7ddZd8ddZd9ddZd:ddZdd Zdd Zdd Zdd Zdd Zd d! Zd"d# Zefd$d%Zd;d&d'Zd<d(d)ZeZd=d*d+Zd>d,d-Zd.d/ Zd0d1 Z d2d3 Z!d4d5 Z"dS )?r   zRecords clusters state.r   rW   N  '  c                 C   s   || _ |d u rt|n|| _|d u r,t|n|| _|d u r>g n|| _|| _|| _|| _|| _t	
 | _i | _t | _i | _|   t| jt| _| jt|	| j t| jt| _| jt|
| j d S r#   )event_callbackr   workersr   	_taskheapmax_workers_in_memorymax_tasks_in_memoryon_node_joinon_node_leave	threadingLock_mutexhandlersset_seen_types_tasks_to_resolverebuild_taskheapr"   _tasks_by_typer	   tasks_by_typer   !_deserialize_Task_WeakSet_Mapping_tasks_by_workertasks_by_worker)r'   callbackr   r   taskheapr   r   r   r   r   r   r,   r,   r-   r&     s>    




zState.__init__c                 C   s   |   S r#   )_create_dispatcherre   r,   r,   r-   _event  s    zState._eventc              	   O   sr   | dd}| jL z,||i |W |r0|   W  d    S n|rN|   0 W d    n1 sd0    Y  d S )Nclear_afterF)rw   r   _clear)r'   r$   r(   r)   r   r,   r,   r-   freeze_while  s     zState.freeze_whileTc                 C   s4   | j  | |W  d    S 1 s&0    Y  d S r#   )r   _clear_tasksr'   r   r,   r,   r-   clear_tasks  s    zState.clear_tasksc                 C   sJ   |r.dd |   D }| j  | j| n
| j  g | jd d < d S )Nc                 S   s"   i | ]\}}|j tjvr||qS r,   r   r   r   r   r,   r,   r-   r     s   z&State._clear_tasks.<locals>.<dictcomp>)	itertasksr   clearr   r   )r'   r   Zin_progressr,   r,   r-   r     s    

zState._clear_tasksc                 C   s$   | j   | | d| _d| _d S r4   )r   r   r   event_count
task_countr   r,   r,   r-   r     s    

zState._clearc                 C   s4   | j  | |W  d    S 1 s&0    Y  d S r#   )r   r   r   r,   r,   r-   r     s    zState.clearc                 K   s^   z"| j | }|r|| |dfW S  tyX   | j|fi | }| j |< |df Y S 0 dS )zsGet or create worker by hostname.

        Returns:
            Tuple: of ``(worker, was_created)`` pairs.
        FTN)r   r   r   r   )r'   r=   r)   r   r,   r,   r-   get_or_create_worker  s    


zState.get_or_create_workerc                 C   sH   z| j | dfW S  tyB   | j|| d }| j |< |df Y S 0 dS )zGet or create task by uuid.Fr   TN)r   r   r   )r'   r   r   r,   r,   r-   get_or_create_task  s
    zState.get_or_create_taskc                 C   s4   | j  | |W  d    S 1 s&0    Y  d S r#   )r   r   r{   r,   r,   r-   ra     s    zState.eventc                 C   s    |  t|dd|gdd S )Deprecated, use :meth:`event`.-r   typer   r   r|   joinr'   rk   rH   r,   r,   r-   
task_event  s    zState.task_eventc                 C   s    |  t|dd|gdd S )r   r   r   r   r   r   r   r,   r,   r-   worker_event  s    zState.worker_eventc                    s   j jjtdddtdddddjjjjj 	j	j
jj 
jj  jj jjjj jjjjtttjdf 	
fdd	}|S )	Nr=   r@   r?   r   rZ   Tc                    sV   j d7  _ r|  | d d\}}}z|}W n |yN   Y n0 ||| |fS |dkr>z| \}	}
}W n |y   Y n0 |dk}z|	d }}W n6 |y   |rʈ|	d }}n|	 }|	< Y n0 |||
||  
r|s|dkr
| r.|r.| |	d  ||f|fS n|dkrR| \}}	}
}}|d	k}z|d }}W n, |y    |d
 }|< d}Y n0 |r|	|_nVz|	}W n$ |y   |	 }|	< Y n0 ||_|d ur|r|d ||
 |r|	n|j}t}|d 	kr4d |||
|t|}|rd|d krd| n
|| |dkr j	d7  _	|||
||  |j
}|d urЈ| |rЈ|| |	| |jrzj|j }W n |y   | Y n0 |j| zj|}W n |y8   Y n0 |j| ||f|fS d S )Nrh   r   r   r   rg   FZonliner   r   r   Tr   ri   r   )r   	partitionra   rw   r   r   r   ro   r
   r   r   addr   r   _add_pending_task_childr   r   r   )ra   r   r   rn   createdgroupr6   subjectr   r=   r@   r?   Z
is_offliner   r   rZ   Zis_client_eventr   Ztask_createdr   ZheapsZtimetupZ	task_nameZparent_task	_childrenr   r   add_typer   Zget_handlerZget_taskZget_task_by_type_setZget_task_by_worker_setZ
get_workerZmax_events_in_heapr   r   r'   r   r   ZtfieldsZ	th_appendZth_popZwfieldsr   r,   r-   r   "  s    







z(State._create_dispatcher.<locals>._event)r   __getitem__r   r   r   rx   rw   r   heap_multiplierr   r   r   r   r   r   r   r   r   r   r   r   r   rz   rn   )r'   r   r,   r  r-   r     s*    4^zState._create_dispatcherc                 C   sD   z| j |j }W n$ ty4   t  }| j |j< Y n0 || d S r#   )r   r   r   r	   r   )r'   r   chr,   r,   r-   r     s
    zState._add_pending_task_childc                    s2    fdd| j  D  }| jd d < |  d S )Nc                    s$   g | ]} |j |j|jt|qS r,   )rZ   r@   r   r
   r   tr   r,   r-   r     s   z*State.rebuild_taskheap.<locals>.<listcomp>)r   valuesr   sort)r'   r   heapr,   r   r-   r     s    
zState.rebuild_taskheapc                 c   s6   t | j D ]"\}}|V  |r|d |kr q2qd S )Nrh   )	enumerater   rj   )r'   limitindexrowr,   r,   r-   r     s    zState.itertasksc                 c   sb   | j }|rt|}t }t|d|D ]8}|d  }|dur$|j}||vr$||fV  || q$dS )zkGenerator yielding tasks ordered by time.

        Yields:
            Tuples of ``(uuid, Task)``.
        r      N)r   reversedr   r   r   r   )r'   r  reverseZ_heapseenZevtupr   r   r,   r,   r-   tasks_by_time  s    

zState.tasks_by_timec                    s"   t  fdd| j|dD d|S )zGet all tasks by type.

        This is slower than accessing :attr:`tasks_by_type`,
        but will be ordered by time.

        Returns:
            Generator: giving ``(uuid, Task)`` pairs.
        c                 3   s$   | ]\}}|j  kr||fV  qd S r#   r   r   r  r,   r-   r     s   

z'State._tasks_by_type.<locals>.<genexpr>r  r   r   r  )r'   r   r  r  r,   r  r-   r     s    	zState._tasks_by_typec                    s"   t  fdd| j|dD d|S )znGet all tasks by worker.

        Slower than accessing :attr:`tasks_by_worker`, but ordered by time.
        c                 3   s&   | ]\}}|j j kr||fV  qd S r#   )r   r=   r   r=   r,   r-   r     s   
z)State._tasks_by_worker.<locals>.<genexpr>r  r   r  )r'   r=   r  r  r,   r  r-   r     s    zState._tasks_by_workerc                 C   s
   t | jS )z%Return a list of all seen task types.)sortedr   re   r,   r,   r-   
task_types  s    zState.task_typesc                 C   s   dd | j  D S )z+Return a list of (seemingly) alive workers.c                 s   s   | ]}|j r|V  qd S r#   r   )r   wr,   r,   r-   r     r8   z&State.alive_workers.<locals>.<genexpr>)r   r	  re   r,   r,   r-   alive_workers  s    zState.alive_workersc                 C   s
   t | S r#   )R_STATEr   re   r,   r,   r-   r     s    zState.__repr__c                 C   s8   | j | j| j| jd | j| j| j| jt| j	t| j
f
fS r#   )r+   r   r   r   r   r   r   r   _serialize_Task_WeakSet_Mappingr   r   re   r,   r,   r-   rf     s    zState.__reduce__)
NNNNr   r   NNNN)T)T)T)T)N)NT)NT)NT)#r/   r0   r1   r2   r   r   r   r   r  r&   r   r   r   r   r   r   r   r   r   ra   r   r   r   r   r   r   r   r  Ztasks_by_timestampr   r   r  r  r   rf   r,   r,   r,   r-   r     sH        
!
	



{



r   c                 C   s   dd |   D S )Nc                 S   s    i | ]\}}|d d |D qS )c                 S   s   g | ]
}|j qS r,   r   r  r,   r,   r-   r     r8   z>_serialize_Task_WeakSet_Mapping.<locals>.<dictcomp>.<listcomp>r,   )r   r   r   r,   r,   r-   r     r8   z3_serialize_Task_WeakSet_Mapping.<locals>.<dictcomp>rj   )mappingr,   r,   r-   r    s    r  c                    s   | pi }  fdd|   D S )Nc                    s(   i | ] \}}|t  fd d|D qS )c                 3   s   | ]}| v r | V  qd S r#   r,   )r   ir   r,   r-   r     r8   z?_deserialize_Task_WeakSet_Mapping.<locals>.<dictcomp>.<genexpr>)r	   )r   r   idsr"  r,   r-   r     s   z5_deserialize_Task_WeakSet_Mapping.<locals>.<dictcomp>r  )r   r   r,   r"  r-   r     s    
r   )Br2   rz   sysr   collectionsr   collections.abcr   r   decimalr   	itertoolsr   operatorr   r   weakrefr	   r
   Zkombu.clocksr   Zkombu.utils.objectsr   Zceleryr   Zcelery.utils.functionalr   r   r   Zcelery.utils.logr   __all__hasattrr   r   ry   r;   r/   loggerwarningr:   r  r   r   r   r   ZSTARTEDFAILUREr   SUCCESSZREVOKEDZREJECTEDr   r"   registerrA   rE   rF   r   rI   rV   r   r   r   r  r   r,   r,   r,   r-   <module>   sn   



]   I