o
    .i|8                     @   s.  d Z ddlZddlZddlmZ ddlmZ ddlmZ ddlm	Z	 ddlm
Z ddlmZ dd	lmZmZ dd
lmZmZmZ ddlmZmZ ddlmZ ddlmZ ddlmZ ddlmZmZ ddl m!Z! ddl"m#Z# ddl$m%Z% zddl&Z&W n e'y   dZ&Y nw dZ(dZ)dZ*dZ+G dd dZ,dS )a  WorkController can be used to instantiate in-process workers.

The command-line interface for the worker is in :mod:`celery.bin.worker`,
while the worker program is in :mod:`celery.apps.worker`.

The worker program is responsible for adding signal handlers,
setting up logging, etc.  This is a bare-bones worker without
global side-effects (i.e., except for the global state stored in
:mod:`celery.worker.state`).

The worker consists of several components, all managed by bootsteps
(mod:`celery.bootsteps`).
    N)datetime)	cpu_count)detect_environment)	bootsteps)concurrency)signals)RUN	TERMINATE)ImproperlyConfiguredTaskRevokedErrorWorkerTerminate)
EX_FAILUREcreate_pidlock)reload_from_cwd)mlevel)worker_logger)default_nodenameworker_direct)str_to_list)default_socket_timeout   state)WorkControllerg      @z
Trying to select queue subset of {0!r}, but queue {1} isn't
defined in the `task_queues` setting.

If you want to automatically declare unknown queues you can
enable the `task_create_missing_queues` setting.
ze
Trying to deselect queue subset of {0!r}, but queue {1} isn't
defined in the `task_queues` setting.
c                   @   s  e Zd ZdZdZdZdZdZdZdZ	G dd de
jZdHddZ		dIddZd	d
 Zdd Zdd Zdd Zdd Zdd Zdd ZdJddZdd Zdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)d* ZdKd,d-ZdLd.d/Z dMd1d2Z!dNd3d4Z"dJd5d6Z#dKd7d8Z$d9d: Z%d;d< Z&d=d> Z'd?d@ Z(dAdB Z)e*dCdD Z+																					dOdFdGZ,dS )Pr   zUnmanaged worker instance.Nc                   @   s   e Zd ZdZdZh dZdS )zWorkController.BlueprintzWorker bootstep blueprint.Worker>   celery.worker.components:Hubcelery.worker.components:Beatcelery.worker.components:Poolcelery.worker.components:Timer celery.worker.components:StateDB!celery.worker.components:Consumer'celery.worker.autoscale:WorkerComponentN)__name__
__module____qualname____doc__namedefault_steps r(   r(   O/var/www/html/philips/venv/lib/python3.10/site-packages/celery/worker/worker.py	BlueprintK   s    r*   c                 K   s|   |p| j | _ t|| _t | _| j j  | jdi | | j	di | | j
di | | jdi | jdi | d S )Nr(   )appr   hostnamer   utcnowstartup_timeloaderinit_workeron_before_initsetup_defaultson_after_initsetup_instanceprepare_args)selfr+   r,   kwargsr(   r(   r)   __init__Y   s   

 zWorkController.__init__c                 K   s   || _ | || | t| | js&zt | _W n ty%   d| _Y nw t| j| _|p0| j	| _
| j | _|d u r@|  n|| _|| _tjj| d t| j| _g | _|   | j| jjd | j| j| jd| _| jj| fi | d S )N   senderworker)stepson_starton_close
on_stopped)pidfilesetup_queuessetup_includesr   r   r   NotImplementedErrorr   loglevelon_consumer_readyready_callbackr+   connection_for_read	_conninfoshould_use_eventloopuse_eventloopoptionsr   worker_initsend_concurrencyget_implementationpool_clsr=   on_init_blueprintr*   r>   r?   r@   	blueprintapply)r6   queuesrG   rA   includerK   exclude_queuesr7   r(   r(   r)   r4   d   s6   

zWorkController.setup_instancec                 C      d S Nr(   r6   r(   r(   r)   rR         z WorkController.on_init_blueprintc                 K   rX   rY   r(   r6   r7   r(   r(   r)   r1      r[   zWorkController.on_before_initc                 K   rX   rY   r(   r\   r(   r(   r)   r3      r[   zWorkController.on_after_initc                 C   s   | j rt| j | _d S d S rY   )rA   r   pidlockrZ   r(   r(   r)   r>      s   zWorkController.on_startc                 C   rX   rY   r(   )r6   consumerr(   r(   r)   rF      r[   z WorkController.on_consumer_readyc                 C   s   | j j  d S rY   )r+   r/   shutdown_workerrZ   r(   r(   r)   r?      s   zWorkController.on_closec                 C   s,   | j   | j  | jr| j  d S d S rY   )timerstopr^   shutdownr]   releaserZ   r(   r(   r)   r@      s
   

zWorkController.on_stoppedc              
   C   s   t |}t |}z
| jjj| W n ty( } z
tt 	||d }~ww z
| jjj
| W n tyI } z
tt 	||d }~ww | jjjr\| jjjt| j d S d S rY   )r   r+   amqprU   selectKeyErrorr
   SELECT_UNKNOWN_QUEUEstripformatdeselectDESELECT_UNKNOWN_QUEUEconfr   
select_addr,   )r6   rV   excludeexcr(   r(   r)   rB      s*   
zWorkController.setup_queuesc                    sf   t  jjj}|r|t |7 } fdd|D  | _dd  jj D }t t||B  jj_d S )Nc                    s   g | ]	} j j|qS r(   )r+   r/   import_task_module.0mrZ   r(   r)   
<listcomp>   s    z1WorkController.setup_includes.<locals>.<listcomp>c                 S   s   h | ]}|j jqS r(   )	__class__r#   )rr   taskr(   r(   r)   	<setcomp>   s    z0WorkController.setup_includes.<locals>.<setcomp>)tupler+   rl   rV   tasksvaluesset)r6   includesprevtask_modulesr(   rZ   r)   rC      s   
zWorkController.setup_includesc                 K   s   |S rY   r(   r\   r(   r(   r)   r5      r[   zWorkController.prepare_argsc                 C   s   t jj| d d S )Nr:   )r   worker_shutdownrN   rZ   r(   r(   r)   _send_worker_shutdown   s   z$WorkController._send_worker_shutdownc              
   C   s   z	| j |  W d S  ty   |   Y d S  ty7 } ztjd|dd | jtd W Y d }~d S d }~w t	yP } z| j|j
d W Y d }~d S d }~w ty_   | jtd Y d S w )NzUnrecoverable error: %rT)exc_info)exitcode)rS   startr   	terminate	Exceptionloggercriticalra   r   
SystemExitcodeKeyboardInterrupt)r6   ro   r(   r(   r)   r      s   zWorkController.startc                 C   s   | j j| d|fdd d S )Nregister_with_event_loopzhub.register)argsdescription)rS   send_all)r6   hubr(   r(   r)   r      s   
z'WorkController.register_with_event_loopc                 C   s   |  | j|S rY   )_quick_acquire_process_taskr6   reqr(   r(   r)   _process_task_sem   s   z WorkController._process_task_semc                 C   sJ   z	| | j W dS  ty$   z|   W Y dS  ty#   Y Y dS w w )z2Process task by sending it to the pool of workers.N)execute_using_poolpoolr   _quick_releaseAttributeErrorr   r(   r(   r)   r      s   zWorkController._process_taskc                 C   s&   z| j   W d S  ty   Y d S w rY   )r^   closer   rZ   r(   r(   r)   signal_consumer_close   s
   z$WorkController.signal_consumer_closec                 C   s    t  dko| jjjjo| jj S )Ndefault)r   rI   	transport
implementsasynchronousr+   
IS_WINDOWSrZ   r(   r(   r)   rJ      s
   

z#WorkController.should_use_eventloopFc                 C   sF   |dur|| _ | jjtkr|   |r| jjr| jdd |   dS )z'Graceful shutdown of the worker server.NTwarm)	r   rS   r   r   r   r   signal_safe	_shutdownr   )r6   in_sighandlerr   r(   r(   r)   ra      s   zWorkController.stopc                 C   s8   | j jtkr|   |r| jjr| jdd dS dS dS )z.Not so graceful shutdown of the worker server.Fr   N)rS   r   r	   r   r   r   r   )r6   r   r(   r(   r)   r      s   zWorkController.terminateTc                 C   sX   | j d ur*tt | j j| | d | j   W d    d S 1 s#w   Y  d S d S )N)r   )rS   r   SHUTDOWN_SOCKET_TIMEOUTra   join)r6   r   r(   r(   r)   r     s   

"zWorkController._shutdownc                 C   sT   t | j|||d | jr| j  | j  z| j  W d S  ty)   Y d S w )N)force_reloadreloader)list_reload_modulesr^   update_strategiesreset_rate_limitsr   restartrD   )r6   modulesreloadr   r(   r(   r)   r     s   

zWorkController.reloadc                    s4    fddt |d u rjjjD S |pdD S )Nc                 3   s"    | ]}j |fi  V  qd S rY   )_maybe_reload_modulerq   r7   r6   r(   r)   	<genexpr>  s
    
z1WorkController._reload_modules.<locals>.<genexpr>r(   )r{   r+   r/   r~   )r6   r   r7   r(   r   r)   r     s   
zWorkController._reload_modulesc                 C   sH   |t jvrtd| | jj|S |r"td| tt j| |S d S )Nzimporting module %szreloading module %s)sysr   r   debugr+   r/   import_from_cwdr   )r6   moduler   r   r(   r(   r)   r     s   
z#WorkController._maybe_reload_modulec                 C   s4   t  | j }| jjt t| jj	t
| dS )N)totalpidclockuptime)r   r-   r.   r   total_countosgetpidstrr+   r   roundtotal_seconds)r6   r   r(   r(   r)   info'  s   

zWorkController.infoc                 C   s   t d u rtdt t j}i d|jd|jd|jd|jd|jd|j	d|j
d	|jd
|jd|jd|jd|jd|jd|jd|jd|jS )Nz%rusage not supported by this platformutimestimemaxrssixrssidrssisrssminfltmajfltnswapinblockoublockmsgsndmsgrcvnsignalsnvcswnivcsw)resourcerD   	getrusageRUSAGE_SELFru_utimeru_stime	ru_maxrssru_ixrssru_idrssru_isrss	ru_minflt	ru_majfltru_nswap
ru_inblock
ru_oublock	ru_msgsnd	ru_msgrcvru_nsignalsru_nvcsw	ru_nivcsw)r6   sr(   r(   r)   rusage.  sH   	
zWorkController.rusagec                 C   s`   |   }|| j |  || jj | j z	|  |d< W |S  ty/   d|d< Y |S w )Nr   zN/A)r   updaterS   r^   r   rD   )r6   r   r(   r(   r)   statsE  s   
zWorkController.statsc                 C   s"   dj | | jr| j dS ddS )z``repr(worker)``.z#<Worker: {self.hostname} ({state})>INIT)r6   r   )ri   rS   human_staterZ   r(   r(   r)   __repr__O  s   zWorkController.__repr__c                 C   s   | j S )z#``str(worker) == worker.hostname``.)r,   rZ   r(   r(   r)   __str__V  s   zWorkController.__str__c                 C   s   t S rY   r   rZ   r(   r(   r)   r   Z  s   zWorkController.stateWARNc                 K   s  | j j}|| _|| _|d|| _|d|| _|d||| _|d|| _|d|| _|d|| _	|p2|| _
|d|	| _|d|
| _|d	|| _|d
||| _|d|| _|d||| _|d||| _|d||| _|d|| _|d|| _t|d|| _|d|| _|d|| _d S )Nworker_concurrencyworker_send_task_eventsworker_poolworker_consumerworker_timerworker_timer_precisionworker_autoscalerworker_pool_putlocksworker_pool_restartsworker_state_dbbeat_schedule_filenamebeat_schedulertask_time_limittask_soft_time_limitworker_max_tasks_per_childworker_max_memory_per_childworker_prefetch_multiplierworker_disable_rate_limitsworker_lost_wait)r+   eitherrE   logfiler   task_eventsrQ   consumer_cls	timer_clstimer_precisionoptimizationautoscaler_clspool_putlockspool_restartsstatedbschedule_filename	scheduler
time_limitsoft_time_limitmax_tasks_per_childmax_memory_per_childintprefetch_multiplierdisable_rate_limitsr   )r6   r   rE   r  r  r   r  r  r  r  r	  r
  r  Or  r  r  r  rQ   state_dbr   r   scheduler_clsr  r  r  r  r   r  _kwr  r(   r(   r)   r2   ^  sN   
zWorkController.setup_defaults)NN)NNNNNNrY   )FN)F)T)NFN)Nr   NNNNNNNNNNNNNNNNNNNNNNNNNN)-r"   r#   r$   r%   r+   r]   rS   r   	semaphorer   r   r*   r8   r4   rR   r1   r3   r>   rF   r?   r@   rB   rC   r5   r   r   r   r   r   r   rJ   ra   r   r   r   r   r   r   r   r   r   r   propertyr   r2   r(   r(   r(   r)   r   >   s    

(










r   )-r%   r   r   r   billiardr   kombu.utils.compatr   celeryr   r   rO   r   celery.bootstepsr   r	   celery.exceptionsr
   r   r   celery.platformsr   r   celery.utils.importsr   celery.utils.logr   r   r   celery.utils.nodenamesr   r   celery.utils.textr   celery.utils.threadsr    r   r   ImportError__all__r   rg   rk   r   r(   r(   r(   r)   <module>   s:    