a
    xd                     @   s   d Z ddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ dZeeZdd Zdd ZejejeejeefddZ dS )z'Task execution strategy (optimization).    N)to_timestamp)signals)trace)InvalidTaskError)symbol_by_name)
get_logger)saferepr)timezone   )create_request_cls)task_reserved)defaultc                 C   s$  z$| dd| di  }}|j W n2 ty>   tdY n tyV   tdY n0 | d| d| d| d	| d
| d| d| d| d| d| dd| dd| d| d| dd}|| jpi  | d| d| ddd}|||f|d| ddfS )zECreate a fresh protocol 2 message from a hybrid protocol 1/2 message.args kwargs!Message does not have args/kwargs(Task keyword arguments must be a mappinglangtaskidroot_id	parent_idgroupmethshadowetaexpiresretriesr   	timelimit)NNargsrepr
kwargsreprorigin)r   r   r   r   r   r   r   r   r   r   r   r   r   r    r!   	callbackserrbackschordNr"   r#   r$   chainTutc)getitemsKeyErrorr   AttributeErrorupdateheaders)messagebodyr   r   r-   embedr   r   N/var/www/html/Ranjet/env/lib/python3.9/site-packages/celery/worker/strategy.pyhybrid_to_proto2   s@    



r2   c                 C   s   z$| dd| di  }}|j W n2 ty>   tdY n tyV   tdY n0 |jt|t|| jd z|d |d< W n ty   Y n0 | d	| d
| ddd}|||f|d| ddfS )zConvert Task message protocol 1 arguments to protocol 2.

    Returns:
        Tuple: of ``(body, headers, already_decoded_status, utc)``
    r   r   r   r   r   )r   r    r-   Ztasksetr   r"   r#   r$   Nr%   Tr'   )r(   r)   r*   r   r+   r,   r   r-   )r.   r/   r   r   r0   r   r   r1   proto1_to_proto2B   s0    

r3   c	                    s   j jttjjo&j}	o0j|	o:j	j
jjj jj	j
jjtj}
t|
jd jjjtf 	
fdd	S )zDefault task execution strategy.

    Note:
        Strategies are here as an optimization, so sadly
        it's not very easy to override.
    )appc                    sV  |d u r0d| j vr0| j| jd f\}}}}n2d| j v rPt| | j \}}}}n| |\}}}}| ||	||||d r j jd}	tj|	d|	id  j	s jv rĈ 
 rd S tjj d r(d j j j j j j jd	d
 jo j  j	o" j	 d
 d }
d } jrz* jrR| j}n| jj}W nR ttfy } z4d j| jdddd  jdd W Y d }~n
d }~0 0 rƈ
j}
|r|
rj  | |
dfddS |rj  | fdd S |
r* |
dS   |rJ fdd|D    d S )Nr   F)Zon_ackZ	on_rejectr4   hostnameeventerr   connection_errorsr/   r-   decodedr'   )r   namedata)extra)Zsenderrequestztask-receivedr   r   )	uuidr9   r   r   r   r   r   r   r   z2Couldn't convert ETA %r to timestamp: %r. Task: %rT)safe)exc_info)Zrequeuer
      )priorityc                    s   g | ]}| qS r   r   ).0callbackreqr   r1   
<listcomp>       z9default.<locals>.task_message_handler.<locals>.<listcomp>)payloadr/   r-   Zuses_utc_timezoner2   r   r9   
_app_traceZLOG_RECEIVEDr   revokedr   Ztask_receivedsendr   r    r   r   Zrequest_dictr(   r   	isoformatr'   r	   OverflowError
ValueErrorinforejectZqosZincrement_eventually)r.   r/   ZackrP   r"   r   r-   r8   r'   contextZbucketr   excZReqZ
_does_infor4   apply_eta_taskcall_atr7   consumererrorr6   Z
get_buckethandler5   rO   Zlimit_post_etaZ
limit_taskr3   Zrate_limits_enabledZrevoked_tasksZ
send_eventr   task_message_handlerr   Ztask_sends_eventsto_system_tzrD   r1   rY      sv    

"


z%default.<locals>.task_message_handler)r5   r7   loggerisEnabledForloggingINFOZevent_dispatcherZenabledrK   Zsend_eventsZtimerrU   rT   Zdisable_rate_limitsZtask_buckets__getitem__Zon_task_requestZ_limit_taskZ_limit_post_etar   Requestr   pool
controllerstaterJ   r   )r   r4   rV   rO   rW   r   rZ   bytesr3   eventsr`   r   rS   r1   r   c   s(    





<Fr   )!__doc__r]   Zkombu.asynchronous.timerr   Zceleryr   Z
celery.appr   rI   Zcelery.exceptionsr   Zcelery.utils.importsr   Zcelery.utils.logr   Zcelery.utils.safereprr   Zcelery.utils.timer	   r<   r   rc   r   __all____name__r[   r2   r3   rO   rW   Z	to_systemrd   r   r   r   r   r1   <module>   s&   )"
