a
    xdS7                     @   sd  d Z ddlZddlmZ ddlmZmZ ddlmZm	Z	 ddl
mZ ddlmZ ddlmZ dd	lmZ dd
lmZ dZdZG dd deZG dd dZd4ddZd5ddZdd ZeddfddZdd Zd6ddZdd Zdd  Z d!d" Z!d#d$ Z"G d%d& d&Z#d7d)d*Z$d+d, Z%d-d. Z&d/d0 Z'd1d2 Z(eeed3Z)ee%ed3Z*ee&ed3Z+ee'ed3Z,dS )8z,Message migration tools (Broker <-> Broker).    N)partial)cycleislice)Queue	eventloop)maybe_declare)ensure_bytes)app_or_default)worker_direct)str_to_list)StopFilteringState	republishmigrate_taskmigrate_tasksmove
task_id_eq
task_id_instart_filtermove_task_by_idmove_by_idmapmove_by_taskmapmove_directmove_direct_by_idzGMoving task {state.filtered}/{state.strtotal}: {body[task]}[{body[id]}]c                   @   s   e Zd ZdZdS )r   z*Semi-predicate used to signal filter stop.N)__name__
__module____qualname____doc__ r   r   N/var/www/html/Ranjet/env/lib/python3.9/site-packages/celery/contrib/migrate.pyr      s   r   c                   @   s0   e Zd ZdZdZdZdZedd Zdd Z	dS )r   zMigration progress state.r   c                 C   s   | j s
dS t| j S )N?)	total_apxstrselfr   r   r   strtotal&   s    zState.strtotalc                 C   s$   | j rd| j  S | j d| j S )N^/)filteredcountr%   r#   r   r   r   __repr__,   s    zState.__repr__N)
r   r   r   r   r)   r(   r!   propertyr%   r*   r   r   r   r   r      s   
r   c              	   C   s   |sg d}t |j}|j|j|j  }}}|du r<|d n|}|du rP|d n|}|j|j }	}
|dd}|D ]}||d qr| jt |f|||||	|
d| dS )zRepublish message.)Zapplication_headerscontent_typecontent_encodingheadersNexchangerouting_keycompression)r/   r0   r1   r.   r,   r-   )	r   bodydelivery_infor.   
propertiesr,   r-   poppublish)producermessager/   r0   Zremove_propsr2   infor.   propsctypeencr1   keyr   r   r   r   2   s&    

r   c                 C   s>   |j }|du ri n|}t| |||d ||d d dS )zMigrate single task message.Nr/   r0   r/   r0   )r3   r   get)r7   Zbody_r8   queuesr9   r   r   r   r   K   s    r   c                    s    fdd}|S )Nc                    s   r| d vrd S  | |S NZtaskr   r2   r8   callbacktasksr   r   r(   V   s    z!filter_callback.<locals>.filteredr   )rD   rE   r(   r   rC   r   filter_callbackT   s    rF   c                    sV   t |}t|jj|dd t| d} fdd}t|| |f|d|S )z)Migrate tasks from one broker to another.F)Zauto_declarer@   c                    sh   |  j }| j| j|_|j| jkr:| j|j|_|jj| jkr\| j| j|j_|  d S N)channelr?   namer0   r/   Zdeclare)queueZ	new_queuer7   r@   r   r   on_declare_queuef   s    
z'migrate_tasks.<locals>.on_declare_queue)r@   rM   )r	   prepare_queuesamqpProducerr   r   )sourcedestZmigrateappr@   kwargsrM   r   rL   r   r   ^   s    
r   c                 C   s   t |tr| jj| S |S rH   )
isinstancer"   rO   r@   )rS   qr   r   r   _maybe_queuet   s    
rW   c	              
      s   t    fdd|pg D p d}
 j|ddV jt 	f	dd}t |fd|
i|	W  d   S 1 s0    Y  dS )	aG	  Find tasks by filtering them and move the tasks to a new queue.

    Arguments:
        predicate (Callable): Filter function used to decide the messages
            to move.  Must accept the standard signature of ``(body, message)``
            used by Kombu consumer callbacks.  If the predicate wants the
            message to be moved it must return either:

                1) a tuple of ``(exchange, routing_key)``, or

                2) a :class:`~kombu.entity.Queue` instance, or

                3) any other true value means the specified
                    ``exchange`` and ``routing_key`` arguments will be used.
        connection (kombu.Connection): Custom connection to use.
        source: List[Union[str, kombu.Queue]]: Optional list of source
            queues to use instead of the default (queues
            in :setting:`task_queues`).  This list can also contain
            :class:`~kombu.entity.Queue` instances.
        exchange (str, kombu.Exchange): Default destination exchange.
        routing_key (str): Default destination routing key.
        limit (int): Limit number of messages to filter.
        callback (Callable): Callback called after message moved,
            with signature ``(state, body, message)``.
        transform (Callable): Optional function to transform the return
            value (destination) of the filter function.

    Also supports the same keyword arguments as :func:`start_filter`.

    To demonstrate, the :func:`move_task_by_id` operation can be implemented
    like this:

    .. code-block:: python

        def is_wanted_task(body, message):
            if body['id'] == wanted_id:
                return Queue('foo', exchange=Exchange('foo'),
                             routing_key='foo')

        move(is_wanted_task)

    or with a transform:

    .. code-block:: python

        def transform(value):
            if isinstance(value, str):
                return Queue(value, Exchange(value), value)
            return value

        move(is_wanted_task, transform=transform)

    Note:
        The predicate may also return a tuple of ``(exchange, routing_key)``
        to specify the destination to where the task should be moved,
        or a :class:`~kombu.entity.Queue` instance.
        Any other true value means that the task will be moved to the
        default exchange/routing_key.
    c                    s   g | ]}t  |qS r   )rW   ).0rK   )rS   r   r   
<listcomp>       zmove.<locals>.<listcomp>NF)poolc                    s   | |}|rr|}t |trBt|j |jj|j }}nt|\}}t|||d |	   j
d7  _
 r | | rj
krt d S )Nr>      )rU   r   r   Zdefault_channelr/   rJ   r0   expand_destr   ackr(   r   )r2   r8   retexrk)	rD   connr/   limit	predicater7   r0   state	transformr   r   on_task   s"    

zmove.<locals>.on_taskconsume_from)r	   Zconnection_or_acquirerO   rP   r   r   )rd   
connectionr/   r0   rQ   rS   rD   rc   rf   rT   r@   rg   r   )
rS   rD   rb   r/   rc   rd   r7   r0   re   rf   r   r   z   s    >r   c              	   C   s6   z| \}}W n  t tfy,   || }}Y n0 ||fS rH   )	TypeError
ValueError)r_   r/   r0   r`   ra   r   r   r   r]      s
    r]   c                 C   s   |d | kS )z'Return true if task id equals task_id'.idr   )task_idr2   r8   r   r   r   r      s    r   c                 C   s   |d | v S )z-Return true if task id is member of set ids'.rl   r   )idsr2   r8   r   r   r   r      s    r   c                 C   s@   t | tr| d} t | tr0tdd | D } | d u r<i } | S )N,c                 s   s(   | ] }t tt|d ddV  qdS ):N   )tupler   r   splitrX   rV   r   r   r   	<genexpr>   s   z!prepare_queues.<locals>.<genexpr>)rU   r"   rs   listdictrG   r   r   r   rN      s    


rN   c                   @   sF   e Zd ZdddZdd Zdd	 Zd
d Zdd Zdd Zdd Z	dS )FiltererN      ?Fc                    s   | _ | _| _| _| _| _tt|p0g  _t	| _
|	 _|
 _| _ fdd|pht j
D  _|pxt  _| _d S )Nc                    s   g | ]}t  j|qS r   )rW   rS   rt   r#   r   r   rY     s   z%Filterer.__init__.<locals>.<listcomp>)rS   rb   filterrc   timeoutack_messagessetr   rE   rN   r@   rD   foreverrM   rv   rh   r   re   accept)r$   rS   rb   rz   rc   r{   r|   rE   r@   rD   r~   rM   rh   re   r   rT   r   r#   r   __init__   s     

zFilterer.__init__c              	   C   sx   |  |  T zt| j| j| jdD ]}q&W n$ tjyB   Y n tyR   Y n0 W d    n1 sh0    Y  | jS )N)r{   Zignore_timeouts)	prepare_consumercreate_consumerr   rb   r{   r~   socketr   re   )r$   _r   r   r   start  s    
$zFilterer.startc                 C   s.   | j  jd7  _| jr*| j j| jkr*t d S )Nr\   )re   r)   rc   r   r$   r2   r8   r   r   r   update_state  s    zFilterer.update_statec                 C   s   |   d S rH   )r^   r   r   r   r   ack_message  s    zFilterer.ack_messagec                 C   s   | j jj| j| j| jdS )N)r@   r   )rS   rO   ZTaskConsumerrb   rh   r   r#   r   r   r   r   !  s
    zFilterer.create_consumerc                 C   s   | j }| j}| j}| jr<t|| j}t|| j}t|| j}|| || | jrb|| j | jd urt| j| j	}| jrt|| j}|| | 
| |S rH   )rz   r   r   rE   rF   Zregister_callbackr|   rD   r   re   declare_queues)r$   consumerrz   r   r   rD   r   r   r   r   (  s$    




zFilterer.prepare_consumerc              	   C   s   |j D ]t}| j r|j| j vrq| jd ur2| | z0||jjdd\}}}|r`| j j|7  _W q | jjyx   Y q0 qd S )NT)Zpassive)	r@   rJ   rM   rI   Zqueue_declarere   r!   rb   Zchannel_errors)r$   r   rK   r   Zmcountr   r   r   r   <  s    


zFilterer.declare_queues)Nry   FNNNFNNNN)
r   r   r   r   r   r   r   r   r   r   r   r   r   r   rx      s       
rx   ry   Fc                 K   s0   t | ||f|||||||	|
|||d| S )zFilter tasks.)rc   r{   r|   rE   r@   rD   r~   rM   rh   re   r   )rx   r   )rS   rb   rz   rc   r{   r|   rE   r@   rD   r~   rM   rh   re   r   rT   r   r   r   r   L  s"    r   c                 K   s   t | |ifi |S )a  Find a task by id and move it to another queue.

    Arguments:
        task_id (str): Id of task to find and move.
        dest: (str, kombu.Queue): Destination queue.
        transform (Callable): Optional function to transform the return
            value (destination) of the filter function.
        **kwargs (Any): Also supports the same keyword
            arguments as :func:`move`.
    )r   )rm   rR   rT   r   r   r   r   a  s    r   c                    s$    fdd}t |fdt i|S )a  Move tasks by matching from a ``task_id: queue`` mapping.

    Where ``queue`` is a queue to move the task to.

    Example:
        >>> move_by_idmap({
        ...     '5bee6e82-f4ac-468e-bd3d-13e8600250bc': Queue('name'),
        ...     'ada8652d-aef3-466b-abd2-becdaf1b82b3': Queue('name'),
        ...     '3a2b140d-7db1-41ba-ac90-c36a0ef4ab1f': Queue('name')},
        ...   queues=['hipri'])
    c                    s     |jd S )NZcorrelation_id)r?   r4   rB   mapr   r   task_id_in_map{  s    z%move_by_idmap.<locals>.task_id_in_maprc   )r   len)r   rT   r   r   r   r   r   o  s    r   c                    s    fdd}t |fi |S )a  Move tasks by matching from a ``task_name: queue`` mapping.

    ``queue`` is the queue to move the task to.

    Example:
        >>> move_by_taskmap({
        ...     'tasks.add': Queue('name'),
        ...     'tasks.mul': Queue('name'),
        ... })
    c                    s     | d S rA   )r?   rB   r   r   r   task_name_in_map  s    z)move_by_taskmap.<locals>.task_name_in_map)r   )r   rT   r   r   r   r   r     s    r   c                 K   s   t tjf | |d| d S )N)re   r2   )printMOVING_PROGRESS_FMTformat)re   r2   r8   rT   r   r   r   filter_status  s    r   )rf   )NNN)N)NNNNNNNN)Nry   FNNNFNNNN)-r   r   	functoolsr   	itertoolsr   r   Zkombur   r   Zkombu.commonr   Zkombu.utils.encodingr   Z
celery.appr	   Zcelery.utils.nodenamesr
   Zcelery.utils.textr   __all__r   	Exceptionr   r   r   r   rF   r   rW   r   r]   r   r   rN   rx   r   r   r   r   r   r   r   Zmove_direct_by_idmapZmove_direct_by_taskmapr   r   r   r   <module>   sV     

	

  
[Z    
