o
    .i                     @   s"  d Z ddlZddlZddlZddlmZ ddlmZmZm	Z	 ddl
ZddlmZmZ ddlmZmZ ddlmZ ddlmZ ejd	d
Zedi dZedddhdZeddhdZG dd dejZeddeddddfddZeddededdfddZeddedfddZd ddZ dS )!z'Embedded workers for integration tests.    N)contextmanager)AnyIterableUnion)Celeryworker)_set_task_join_will_blockallow_join_result)Signal)anon_nodenameWORKER_LOGLEVELerrortest_worker_starting)nameproviding_argstest_worker_startedr   consumertest_worker_stoppedc                       sT   e Zd ZdZdZ fddZG dd dejjZ fddZ	d	d
 Z
dd Z  ZS )TestWorkControllerz3Worker that can synchronize on being fully started.Nc                    s   t  | _t j|i | | jjdd dkrPddlm	} | | _
t | _zddlm} |  W n	 ty=   Y nw tj| j
t | _| j  d S d S )N.preforkr   )Queue)pickling_support)	threadingEvent_on_startedsuper__init__pool_cls
__module__splitbilliardr   logger_queueosgetpidpidtblibr   installImportErrorlogginghandlersQueueListener	getLoggerqueue_listenerstart)selfargskwargsr   r   	__class__ X/var/www/html/philips/venv/lib/python3.10/site-packages/celery/contrib/testing/worker.pyr   #   s   

zTestWorkController.__init__c                   @   s   e Zd Zdd Zdd ZdS )zTestWorkController.QueueHandlerc                 C   s
   d|_ |S )NT)
from_queuer0   recordr5   r5   r6   prepare:   s   z'TestWorkController.QueueHandler.preparec                 C   s   t jr d S )N)r*   raiseExceptionsr8   r5   r5   r6   handleError?   s   z+TestWorkController.QueueHandler.handleErrorN)__name__r    __qualname__r:   r<   r5   r5   r5   r6   QueueHandler9   s    r?   c                    s@    j r  j }| fdd t }|| t  S )Nc                    s   | j  jkot| dd S )Nr7   F)processr&   getattr)rr0   r5   r6   <lambda>F   s    z*TestWorkController.start.<locals>.<lambda>)r#   r?   	addFilterr*   r-   
addHandlerr   r/   )r0   handlerloggerr3   rC   r6   r/   C   s   

zTestWorkController.startc                 C   s    | j   tj| j| |d dS )z=Callback called when the Consumer blueprint is fully started.)senderr   r   N)r   setr   sendapp)r0   r   r5   r5   r6   on_consumer_readyK   s   

z$TestWorkController.on_consumer_readyc                 C   s   | j   dS )zWait for worker to be fully up and running.

        Warning:
            Worker must be started within a thread for this to work,
            or it will block forever.
        N)r   waitrC   r5   r5   r6   ensure_startedR   s   z!TestWorkController.ensure_started)r=   r    r>   __doc__r#   r   r*   r+   r?   r/   rM   rO   __classcell__r5   r5   r3   r6   r      s    
r      soloTg      $@c              
   k   s    t j| d d}	z]t| f||||||d|2}	|rAddlm}
 t  |
 j|ddks2J W d   n1 s<w   Y  |	V  W d   n1 sNw   Y  W tj| |	d dS W tj| |	d dS tj| |	d w )	z[Start embedded worker.

    Yields:
        celery.app.worker.Worker: worker instance.
    )rI   N)concurrencypoolloglevellogfileperform_ping_checkshutdown_timeoutrR   )ping)timeoutpong)rI   r   )	r   rK   _start_worker_threadtasksrZ   r	   delaygetr   )rL   rT   rU   rV   rW   rX   ping_task_timeoutrY   r2   r   rZ   r5   r5   r6   start_worker]   s2   "rb   c                 k   s&   t | || |rd| jv sJ | jtjdd}	|	jj W d   n1 s)w   Y  |d| |t |||d|	ddddd
|}
t
j|
jdd}|  |
  td	 z|
V  W d
dlm} d
|_|| | rttdd|_dS d
dlm} d
|_|| | rtdd|_w )zaStart Celery worker in a thread.

    Yields:
        celery.worker.Worker: worker instance.
    zcelery.pingTEST_BROKER)hostnameNwithout_heartbeatT)
rL   rT   rd   rU   rV   rW   ready_callbackre   without_minglewithout_gossip)targetdaemonFr   )statezWorker thread failed to exit within the allocated timeout. Consider raising `shutdown_timeout` if your tasks take longer to execute.r5   )setup_app_for_workerr^   
connectionr$   environr`   default_channelqueue_declarer   popr   Threadr/   rO   r   celery.workerrk   should_terminatejoinis_aliveRuntimeError)rL   rT   rU   rV   rW   WorkControllerrX   rY   r2   connr   trk   r5   r5   r6   r]      sV   




r]   c           	      k   sP    ddl m}m} |   ||dg}|  z
dV  W |  dS |  w )zfStart worker in separate process.

    Yields:
        celery.app.worker.Worker: worker instance.
    r   )ClusterNodeztestworker1@%hN)celery.apps.multir{   r|   set_currentr/   stopwait)	rL   rT   rU   rV   rW   r2   r{   r|   clusterr5   r5   r6   _start_worker_process   s   r   returnc                 C   s8   |    |   |   dt| j_| jj||d dS )z9Setup the app to be used for starting an embedded worker.F)rV   rW   N)finalizer~   set_defaulttypelog_setupsetup)rL   rV   rW   r5   r5   r6   rl      s
   rl   )r   N)!rP   r*   r$   r   
contextlibr   typingr   r   r   celery.worker.consumerceleryr   r   celery.resultr   r	   celery.utils.dispatchr
   celery.utils.nodenamesr   rn   r`   r   r   r   r   rx   r   rb   r]   r   rl   r5   r5   r5   r6   <module>   sd    ?'8