a
    xd                     @   s   d Z ddlZddlZddlmZmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZ dd	lmZ dd
lmZ dZeeZejejej  ZZZeejddZG dd de	jZG dd deZdS )zPool Autoscaling.

This module implements the internal thread responsible
for growing and shrinking the pool according to the
current autoscale settings.

The autoscale thread is only enabled if
the :option:`celery worker --autoscale` option is used.
    N)	monotonicsleep)	DummyLock)	bootsteps)
get_logger)bgThread   )state)Pool)
AutoscalerWorkerComponentAUTOSCALE_KEEPALIVE   c                   @   s>   e Zd ZdZdZdZefZdd Zdd Z	dd	 Z
d
d ZdS )r   z?Bootstep that starts the autoscaler thread/timer in the worker.r   Tc                 K   s   |j | _d |_d S N)Z	autoscaleZenabled
autoscaler)selfwkwargs r   O/var/www/html/Ranjet/env/lib/python3.9/site-packages/celery/worker/autoscale.py__init__&   s    zWorkerComponent.__init__c                 C   s>   | j |j|j|j|j||jr"t nd d }|_|js:|S d S )N)workermutex)ZinstantiateZautoscaler_clspoolmax_concurrencymin_concurrencyZuse_eventloopr   r   )r   r   Zscalerr   r   r   create*   s    zWorkerComponent.createc                 C   s*   |j j|jj ||jj|jj d S r   )consumerZon_task_messageaddr   maybe_scaleZcall_repeatedly	keepalive)r   r   Zhubr   r   r   register_with_event_loop2   s    z(WorkerComponent.register_with_event_loopc                 C   s   d|j  iS )zReturn `Autoscaler` info.r   )r   info)r   r   r   r   r   r"   8   s    zWorkerComponent.infoN)__name__
__module____qualname____doc__labelZconditionalr
   requiresr   r   r!   r"   r   r   r   r   r      s   r   c                       s   e Zd ZdZddedf fdd	Zdd Zddd	Zdd
dZd ddZ	dd Z
dd Zdd Zdd Zdd Zdd Zedd Zedd Z  ZS )!r   z,Background thread to autoscale pool workers.r   Nc                    sN   t    || _|pt | _|| _|| _|| _d | _	|| _
| jsJJ dd S )Nzcannot scale down too fast.)superr   r   	threadingLockr   r   r   r    _last_scale_upr   )r   r   r   r   r   r    r   	__class__r   r   r   @   s    
zAutoscaler.__init__c                 C   s:   | j  |   W d    n1 s$0    Y  td d S )Ng      ?)r   r   r   r   r   r   r   bodyN   s    &zAutoscaler.bodyc                 C   sZ   | j }t| j| j}||kr.| ||  dS t| j| j}||k rV| ||  dS d S )NT)	processesminqtyr   scale_upmaxr   
scale_down)r   reqZprocscurr   r   r   _maybe_scaleS   s    zAutoscaler._maybe_scalec                 C   s   |  |r| j  d S r   )r9   r   Zmaintain_pool)r   r7   r   r   r   r   ^   s    
zAutoscaler.maybe_scalec                 C   s   | j v |d ur:|| jk r*| | j|  | | || _|d urb|| jkr\| || j  || _| j| jfW  d    S 1 s0    Y  d S r   )r   r1   _shrink_update_consumer_prefetch_countr   _growr   )r   r5   r2   r   r   r   updateb   s    


zAutoscaler.updatec                 C   s   t  | _| |S r   )r   r,   r<   r   nr   r   r   r4   o   s    zAutoscaler.scale_upc                 C   s&   | j r"t | j  | jkr"| |S d S r   )r,   r   r    r:   r>   r   r   r   r6   s   s    zAutoscaler.scale_downc                 C   s   t d| | j| d S )NzScaling up %s processes.)r"   r   Zgrowr>   r   r   r   r<   x   s    
zAutoscaler._growc              
   C   sj   t d| z| j| W nJ ty4   td Y n2 tyd } ztd|dd W Y d }~n
d }~0 0 d S )NzScaling down %s processes.z0Autoscaler won't scale down: all processes busy.zAutoscaler: scale_down: %rT)exc_info)r"   r   shrink
ValueErrordebug	Exceptionerror)r   r?   excr   r   r   r:   |   s    
zAutoscaler._shrinkc                 C   s    || j  }|r| jj| d S r   )r   r   r   Z_update_prefetch_count)r   Znew_maxdiffr   r   r   r;      s
    
z*Autoscaler._update_consumer_prefetch_countc                 C   s   | j | j| j| jdS )N)r5   r2   currentr3   )r   r   r1   r3   r/   r   r   r   r"      s
    zAutoscaler.infoc                 C   s
   t tjS r   )lenr	   Zreserved_requestsr/   r   r   r   r3      s    zAutoscaler.qtyc                 C   s   | j jS r   )r   Znum_processesr/   r   r   r   r1      s    zAutoscaler.processes)N)N)NN)r#   r$   r%   r&   r   r   r0   r9   r   r=   r4   r6   r<   r:   r;   r"   propertyr3   r1   __classcell__r   r   r-   r   r   =   s$   


	
r   )r&   osr*   timer   r   Zkombu.asynchronous.semaphorer   Zceleryr   Zcelery.utils.logr   Zcelery.utils.threadsr    r	   
componentsr
   __all__r#   loggerrC   r"   rE   floatenvirongetr   ZStartStopStepr   r   r   r   r   r   <module>   s   	