
     hS7                        d Z ddlZddlmZ ddlmZmZ ddlmZm	Z	 ddl
mZ ddlmZ ddlmZ dd	lmZ dd
lmZ dZdZ G d de          Z G d d          Z	 	 d%dZd&dZd ZeddfdZd Z	 	 d'dZd Zd Z d Z!d Z" G d d          Z#	 	 	 	 d(dZ$d  Z%d! Z&d" Z'd# Z( eee$          Z) ee%e$          Z* ee&e$          Z+ ee'e$          Z,dS ))z,Message migration tools (Broker <-> Broker).    N)partial)cycleislice)Queue	eventloop)maybe_declare)ensure_bytes)app_or_default)worker_direct)str_to_list)StopFilteringState	republishmigrate_taskmigrate_tasksmove
task_id_eq
task_id_instart_filtermove_task_by_idmove_by_idmapmove_by_taskmapmove_directmove_direct_by_idzGMoving task {state.filtered}/{state.strtotal}: {body[task]}[{body[id]}]c                       e Zd ZdZdS )r   z*Semi-predicate used to signal filter stop.N)__name__
__module____qualname____doc__     R/var/www/html/Sam_Eipo/venv/lib/python3.11/site-packages/celery/contrib/migrate.pyr   r      s        4444r!   r   c                   :    e Zd ZdZdZdZdZed             Zd Z	dS )r   zMigration progress state.r   c                 <    | j         sdS t          | j                   S )N?)	total_apxstrselfs    r"   strtotalzState.strtotal&   s!    ~ 	34>"""r!   c                 F    | j         r
d| j          S | j         d| j         S )N^/)filteredcountr*   r(   s    r"   __repr__zState.__repr__,   s4    = 	'&t}&&&*..t}...r!   N)
r   r   r   r   r/   r.   r&   propertyr*   r0   r    r!   r"   r   r      sQ        ##EHI# # X#
/ / / / /r!   r   c           
      V   |sg d}t          |j                  }|j        |j        |j        }}}||d         n|}||d         n|}|j        |j        }
}	|                    dd          }|D ]}|                    |d            | j        t          |          f|||||	|
d| dS )zRepublish message.)application_headerscontent_typecontent_encodingheadersNexchangerouting_keycompression)r7   r8   r9   r6   r4   r5   )	r	   bodydelivery_infor6   
propertiesr4   r5   poppublish)producermessager7   r8   remove_propsr:   infor6   propsctypeencr9   keys                r"   r   r   2   s     77 7 7%%D#1#OW-? 'D#+#3tJH)4)<$}%%+K%w'?3E ++mT22K  		#tH\$'' 4(!,+$5&)4 4 .34 4 4 4 4r!   c           	          |j         }|i n|}t          | ||                    |d                   |                    |d                              dS )zMigrate single task message.Nr7   r8   r7   r8   )r;   r   get)r?   body_r@   queuesrB   s        r"   r   r   K   sd     D>RRvFhzz$z"233 **T-%899; ; ; ; ; ;r!   c                       fd}|S )Nc                 8    r| d         vrd S  | |          S Ntaskr    )r:   r@   callbacktaskss     r"   r.   z!filter_callback.<locals>.filteredV   s0     	T&\..Fxg&&&r!   r    )rP   rQ   r.   s   `` r"   filter_callbackrR   T   s)    ' ' ' ' ' '
 Or!   c                     t          |          }t                    |j                            |d          t	          |          }fd}t          || |f|d|S )z)Migrate tasks from one broker to another.F)auto_declarerK   c                     | j                   }                    | j        | j                  |_        |j        | j        k    r%                    | j        |j                  |_        |j        j        | j        k    r*                    | j        | j                  |j        _        |                                 d S N)channelrI   namer8   r7   declare)queue	new_queuer?   rK   s     r"   on_declare_queuez'migrate_tasks.<locals>.on_declare_queuef   s    E(*++	EJ
;;	 EJ..$*JJuz/8/D%F %FI!"ej00&,jjUZ&H&HI#r!   )rK   r]   )r
   prepare_queuesamqpProducerr   r   )sourcedestmigrateapprK   kwargsr]   r?   s       `  @r"   r   r   ^   s     

CF##Fx  E ::Hgx777G      VW EV)9E E=CE E Er!   c                 T    t          |t                    r| j        j        |         S |S rW   )
isinstancer'   r_   rK   )rd   qs     r"   _maybe_queueri   t   s(    !S "xq!!Hr!   c	           
      F    t                    fd|pg D             pd}
                    |d          5 j                                      t	                       f	d}t          |fd|
i|	cddd           S # 1 swxY w Y   dS )aG	  Find tasks by filtering them and move the tasks to a new queue.

    Arguments:
        predicate (Callable): Filter function used to decide the messages
            to move.  Must accept the standard signature of ``(body, message)``
            used by Kombu consumer callbacks.  If the predicate wants the
            message to be moved it must return either:

                1) a tuple of ``(exchange, routing_key)``, or

                2) a :class:`~kombu.entity.Queue` instance, or

                3) any other true value means the specified
                    ``exchange`` and ``routing_key`` arguments will be used.
        connection (kombu.Connection): Custom connection to use.
        source: List[Union[str, kombu.Queue]]: Optional list of source
            queues to use instead of the default (queues
            in :setting:`task_queues`).  This list can also contain
            :class:`~kombu.entity.Queue` instances.
        exchange (str, kombu.Exchange): Default destination exchange.
        routing_key (str): Default destination routing key.
        limit (int): Limit number of messages to filter.
        callback (Callable): Callback called after message moved,
            with signature ``(state, body, message)``.
        transform (Callable): Optional function to transform the return
            value (destination) of the filter function.

    Also supports the same keyword arguments as :func:`start_filter`.

    To demonstrate, the :func:`move_task_by_id` operation can be implemented
    like this:

    .. code-block:: python

        def is_wanted_task(body, message):
            if body['id'] == wanted_id:
                return Queue('foo', exchange=Exchange('foo'),
                             routing_key='foo')

        move(is_wanted_task)

    or with a transform:

    .. code-block:: python

        def transform(value):
            if isinstance(value, str):
                return Queue(value, Exchange(value), value)
            return value

        move(is_wanted_task, transform=transform)

    Note:
        The predicate may also return a tuple of ``(exchange, routing_key)``
        to specify the destination to where the task should be moved,
        or a :class:`~kombu.entity.Queue` instance.
        Any other true value means that the task will be moved to the
        default exchange/routing_key.
    c                 0    g | ]}t          |          S r    )ri   ).0r[   rd   s     r"   
<listcomp>zmove.<locals>.<listcomp>   s#    AAA5l3&&AAAr!   NF)poolc                   	  	| |          }|rr |          }t          |t                    r)t          |j                   |j        j        |j        }}nt          |          \  }}t          
|||           |	                                 xj
        dz  c_
        r | |           rj
        k    rt                      d S d S d S )NrH      )rg   r   r   default_channelr7   rY   r8   expand_destr   ackr.   r   )r:   r@   retexrkrP   connr7   limit	predicater?   r8   state	transforms        r"   on_taskzmove.<locals>.on_task   s   )D'**C * )#)C..Cc5)) E!#t';<<< \.BB(hDDFB(G#%27 7 7 7!# 3HUD'222 *U^u44'//)!* ** *44r!   consume_from)r
   connection_or_acquirer_   r`   r   r   )ry   
connectionr7   r8   ra   rd   rP   rx   r{   re   rK   r|   rw   r?   rz   s   ` `` ````   @@@r"   r   r   z   sH   | 

CAAAAFLbAAAITF		"	":E	"	:	: Od8$$T**	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	*( CwNNVNvNN1O O O O O O O O O O O O O O O O O Os   ABBBc                 N    	 | \  }}n# t           t          f$ r ||}}Y nw xY w||fS rW   )	TypeError
ValueError)rt   r7   r8   ru   rv   s        r"   rr   rr      sH    'BBz" ' ' ';B'r6Ms      c                     |d         | k    S )z'Return true if task id equals task_id'.idr    )task_idr:   r@   s      r"   r   r      s    :  r!   c                     |d         | v S )z-Return true if task id is member of set ids'.r   r    )idsr:   r@   s      r"   r   r      s    :r!   c                     t          | t                    r|                     d          } t          | t                    rt	          d | D                       } | i } | S )N,c           
   3      K   | ]B}t          t          t          |                    d                     dd                    V  CdS ):N   )tupler   r   split)rl   rh   s     r"   	<genexpr>z!prepare_queues.<locals>.<genexpr>   s\       ' ' F5#6#6a@@AA ' ' ' ' ' 'r!   )rg   r'   r   listdictrU   s    r"   r^   r^      sq    &# #c""&$ ' ' '%' ' ' ' '~Mr!   c                   B    e Zd Z	 	 	 	 ddZd Zd Zd Zd Zd	 Zd
 Z	dS )FiltererN      ?Fc                 z    | _         | _        | _        | _        | _        | _        t          t          |          pg            _        t          |           _
        |	 _        |
 _        | _         fd|pt           j
                  D              _        |pt!                       _        | _        d S )Nc                 :    g | ]}t          j        |          S r    )ri   rd   )rl   rh   r)   s     r"   rm   z%Filterer.__init__.<locals>.<listcomp>  s5     
 
 
 1%%
 
 
r!   )rd   rw   filterrx   timeoutack_messagessetr   rQ   r^   rK   rP   foreverr]   r   r}   r   rz   accept)r)   rd   rw   r   rx   r   r   rQ   rK   rP   r   r]   r}   rz   r   re   s   `               r"   __init__zFilterer.__init__   s    
 	
(U++1r22
$V,,  0
 
 
 
!6T$+%6%6
 
 
 %egg
r!   c                    |                      |                                           5  	 t          | j        | j        | j                  D ]}n # t          j        $ r Y nt          $ r Y nw xY wd d d            n# 1 swxY w Y   | j        S )N)r   ignore_timeouts)	prepare_consumercreate_consumerr   rw   r   r   socketr   rz   )r)   _s     r"   startzFilterer.start  s    ""4#7#7#9#9:: 		 		"49+/<37<A A A  A  >       		 		 		 		 		 		 		 		 		 		 		 		 		 		 		 zs@   A;$AA;A,A; 	A,)A;+A,,A;;A?A?c                     | j         xj        dz  c_        | j        r#| j         j        | j        k    rt                      d S d S )Nrp   )rz   r/   rx   r   r)   r:   r@   s      r"   update_statezFilterer.update_state  sM    
A: 	"$**dj88//!	" 	"88r!   c                 .    |                                  d S rW   )rs   r   s      r"   ack_messagezFilterer.ack_message  s    r!   c                 d    | j         j                            | j        | j        | j                  S )N)rK   r   )rd   r_   TaskConsumerrw   r}   r   r(   s    r"   r   zFilterer.create_consumer!  s3    x}))I$; * 
 
 	
r!   c                     | j         }| j        }| j        }| j        r?t	          || j                  }t	          || j                  }t	          || j                  }|                    |           |                    |           | j        r|                    | j                   | j        Kt          | j        | j	                  }| j        rt	          || j                  }|                    |           | 
                    |           |S rW   )r   r   r   rQ   rR   register_callbackr   rP   r   rz   declare_queues)r)   consumerr   r   r   rP   s         r"   r   zFilterer.prepare_consumer(  s   (&: 	C$VTZ88F*<DDL)+tzBBK""6***""<000 	9&&t'7888=$t}dj99Hz A*8TZ@@&&x000H%%%r!   c                 (   |j         D ]}| j         r|j        | j         vr| j        |                     |           	  ||j                                      d          \  }}}|r| j        xj        |z  c_        u# | j        j        $ r Y w xY wd S )NT)passive)	rK   rY   r]   rX   queue_declarerz   r&   rw   channel_errors)r)   r   r[   r   mcounts        r"   r   zFilterer.declare_queues<  s    _ 	 	E{ uz<<$0%%e,,,$u$ &  &&3mDm&A&A 61 3J((F2((9+   	 	s   ?A==BBNr   FNNNFNNNN)
r   r   r   r   r   r   r   r   r   r   r    r!   r"   r   r      s         &)8<@D7;	   .  " " "
  
 
 
  (    r!   r   r   Fc                 \    t          | ||f|||||||	|
|||d|                                S )zFilter tasks.)rx   r   r   rQ   rK   rP   r   r]   r}   rz   r   )r   r   )rd   rw   r   rx   r   r   rQ   rK   rP   r   r]   r}   rz   r   re   s                  r"   r   r   L  s]    
 T6!)!    %''r!   c                      t          | |ifi |S )a  Find a task by id and move it to another queue.

    Arguments:
        task_id (str): Id of task to find and move.
        dest: (str, kombu.Queue): Destination queue.
        transform (Callable): Optional function to transform the return
            value (destination) of the filter function.
        **kwargs (Any): Also supports the same keyword
            arguments as :func:`move`.
    )r   )r   rb   re   s      r"   r   r   a  s     '433F333r!   c                 F      fd}t          |fdt                     i|S )a  Move tasks by matching from a ``task_id: queue`` mapping.

    Where ``queue`` is a queue to move the task to.

    Example:
        >>> move_by_idmap({
        ...     '5bee6e82-f4ac-468e-bd3d-13e8600250bc': Queue('name'),
        ...     'ada8652d-aef3-466b-abd2-becdaf1b82b3': Queue('name'),
        ...     '3a2b140d-7db1-41ba-ac90-c36a0ef4ab1f': Queue('name')},
        ...   queues=['hipri'])
    c                 D                         |j        d                   S )Ncorrelation_id)rI   r<   r:   r@   maps     r"   task_id_in_mapz%move_by_idmap.<locals>.task_id_in_map{  s    www)*:;<<<r!   rx   )r   len)r   re   r   s   `  r"   r   r   o  s?    = = = = =
 99c#hh9&999r!   c                 (      fd}t          |fi |S )a  Move tasks by matching from a ``task_name: queue`` mapping.

    ``queue`` is the queue to move the task to.

    Example:
        >>> move_by_taskmap({
        ...     'tasks.add': Queue('name'),
        ...     'tasks.mul': Queue('name'),
        ... })
    c                 :                         | d                   S rN   )rI   r   s     r"   task_name_in_mapz)move_by_taskmap.<locals>.task_name_in_map  s    wwtF|$$$r!   )r   )r   re   r   s   `  r"   r   r     s5    % % % % %  ++F+++r!   c                 H    t          t          j        d| |d|           d S )N)rz   r:   r    )printMOVING_PROGRESS_FMTformat)rz   r:   r@   re   s       r"   filter_statusr     s/    	

$
F5t
F
Fv
F
FGGGGGr!   )r{   )NNNrW   )NNNNNNNNr   )-r   r   	functoolsr   	itertoolsr   r   kombur   r   kombu.commonr   kombu.utils.encodingr	   
celery.appr
   celery.utils.nodenamesr   celery.utils.textr   __all__r   	Exceptionr   r   r   r   rR   r   ri   r   rr   r   r   r^   r   r   r   r   r   r   r   r   move_direct_by_idmapmove_direct_by_taskmapr    r!   r"   <module>r      s   2 2        # # # # # # # # " " " " " " " " & & & & & & - - - - - - % % % % % % 0 0 0 0 0 0 ) ) ) ) ) ) 5 5 5 5 5I 5 5 5/ / / / / / / /& =A4 4 4 42; ; ; ;   )5$E E E E,   AEEIXO XO XO XOv  ! ! !
  
  W W W W W W W Wt 9<8<@D7;   *4 4 4: : :(, , ,"H H H gdm444GO}EEE w}FFF  MJJJ   r!   