o
    rhD                     @   s   d dl Z d dlZd dlZd dlZd dlmZ d dlZd dlZd dlm	Z	 d dl
Z
d dlZd dlZd dlZd dlmZ G dd dZG dd dZd	ejejd d d
dddddd ddfddZdS )    N)
ThreadPool)Queue)get_invlistc                   @   s~   e Zd ZdZ		dddZdd Zdd	 Zd
d Zdd Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd ZdS ) BigBatchSearcherz
    Object that manages all the data related to the computation
    except the actual within-bucket matching and the organization of the
    computation (parallel or not)
    r   Fc                 C   s`   || _ g | _|| _|| _|| _t|j}tjt	|||d| _
dgd | _t  | _| _d S )N)keep_maxr      )verbosetictocxqindexuse_float16faissis_similarity_metricmetric_type
ResultHeaplenrht_accutime	t_displayt0)selfr   r
   kr   r   r    r   \/var/www/html/alpaca_bot/venv/lib/python3.10/site-packages/faiss/contrib/big_batch_search.py__init__   s   zBigBatchSearcher.__init__c                 C   s   t   | _d S N)r   	t_accu_t0r   r   r   r   start_t_accu0   s   zBigBatchSearcher.start_t_accuc                 C   s    | j |  t | j 7  < d S r   )r   r   r   )r   nr   r   r   stop_t_accu3   s    zBigBatchSearcher.stop_t_accuc                 C   s.   |t   f| _| jdkrt|ddd d S d S )Nr   Tendflush)r   r	   r   print)r   namer   r   r   tic6   s   
zBigBatchSearcher.ticc                 C   s:   | j \}}t | }| jdkrt| d|dd |S )Nr   z: .3fz s)r	   r   r   r&   )r   r'   r   dtr   r   r   toc;   s
   

zBigBatchSearcher.tocc                 C   s  | j dks| j dkr|dkrt | jd k rd S t | j }td|dd| d| jj d	| jd
 dd| jd dd| jd dd| jd dd| jd dd| jd ddtj	|| jj |d  | d dt
  | j dkrudnddd t | _d S )N      i  g      ?[z.1fz	 s] list /z times prep q r   r)   z prep b z comp z res    z	 wait in    z
 wait out    z eta )secondsz mem r"   
Tr#   )r   r   r   r   r&   r   nlistr   datetime	timedeltar   get_mem_usage_kb)r   ltr   r   r   reportB   s6   







zBigBatchSearcher.reportc                 C   s   |  d d}t| j}tj|| jjfdd}td||D ] }t||| }| jj	
| j|| | jj\}}||||< q|   || _d S )Nzcoarse quantizationi   int32)dtyper   )r(   r   r
   npemptyr   nproberangemin	quantizersearchr+   q_assign)r   bsnqrE   i0i1q_dis_i
q_assign_ir   r   r   coarse_quantizationW   s   


z$BigBatchSearcher.coarse_quantizationc                 C   sz   |  d | j}|d7 }tj| j| jjd dd| _| j | _| j	dkr-t
d| jd  | jdd  | _| `|   d S )Nzbucket sortr,      )nbucketntr   z  number of -1s:)r(   rE   r   matrix_bucket_sort_inplacer   r5   bucket_limsravel	query_idsr   r&   r+   )r   rE   r   r   r   reorder_assigne   s   

zBigBatchSearcher.reorder_assignc                 C   s   t   }| j}| j| | j|d  }}| j|| }| j| }| jr+||j| }t   }t|j	|\}	}
| j
du rA|
 }
n| 
|
}
| jrS|
d}
|d}t   }| jd  || 7  < | jd  || 7  < |||	|
fS )z4 prepare the queries and database items for bucket lr,   Nfloat16r   )r   r   rQ   rS   r
   by_residualrC   reconstructr   invlistsdecode_funcrR   r   astyper   )r   r9   r   r   rH   rI   q_subsetxq_lt1list_idsxb_lt2r   r   r   prepare_bucketr   s&   





zBigBatchSearcher.prepare_bucketc                 C   sX   |du rdS t   }|du r|}n|| }| j||| | jd  t   | 7  < dS )z,add the bucket results to the heap structureNr0   )r   r   add_result_subsetr   )r   r[   Dr^   Ir   r   r   r   add_results_to_heap   s   z$BigBatchSearcher.add_results_to_heapc                 C   s   | j j| jj| jjfS r   )r
   shaper   r@   r5   r   r   r   r   sizes_in_checkpoint   s   z$BigBatchSearcher.sizes_in_checkpointc                 C   sh   |d }t |d}t|  || jj| jjfd|d W d    n1 s'w   Y  t|| d S )Nz.tmpwb)sizes	completedr   )	openpickledumprg   r   rc   rd   osreplace)r   fnamerj   tmpnamefr   r   r   write_checkpoint   s   z!BigBatchSearcher.write_checkpointc                 C   s   t |d}t|}W d    n1 sw   Y  |d |  ks$J |d d | jjd d < |d d | jjd d < |d S )Nrbri   r   r   r,   rj   )rl   rm   loadrg   r   rc   rd   )r   rq   rs   ckpr   r   r   read_checkpoint   s   z BigBatchSearcher.read_checkpointN)r   F)__name__
__module____qualname____doc__r   r   r!   r(   r+   r;   rL   rT   ra   re   rg   rt   rx   r   r   r   r   r      s"    	
r   c                   @   s,   e Zd ZdZdejejfddZdd ZdS )BlockComputerz computation within one bucket knn_functionc                 C   s   || _ |jtjkrt|j|j}dd }d}nL|jtjkr;t|j|j	j
|j	j|j}|j	|_	|j	j}d|_|j}n)|jtjkr[t|j|jj|j}|j|_|jj}d|_|j}n	td|j d|| _|dkrmd n|| _|| _|| _|| _|| _d S )Nc                 S   s
   |  dS )Nfloat32)view)xr   r   r   <lambda>   s   
 z(BlockComputer.__init__.<locals>.<lambda>FTzindex type z not supportedr   )r   	__class__r   IndexIVFFlat	IndexFlatdr   
IndexIVFPQIndexPQpqMnbitsdecode
is_trainedrV   IndexIVFScalarQuantizerIndexScalarQuantizersqqtypeRuntimeError
index_helprY   methodpairwise_distancesknn)r   r   r   r   r   r   rY   rV   r   r   r   r      s8   
zBlockComputer.__init__c           	      K   s   | j j}|jdks|jdkrd  }}||fS | jdkr6t|| jj t|| j_	| j
||\}}||fS | jdkrI| j|||d}d }||fS | jdkr]| j|||fd|i|\}}||fS )Nr   r   r   )metricr~   r   )r   r   sizer   r   copy_array_to_vectorr   codesr   ntotalrD   r   r   )	r   r\   r_   r^   r   
extra_argsr   rc   rd   r   r   r   block_search   s    


zBlockComputer.block_searchN)	ry   rz   r{   r|   r   r   r   r   r   r   r   r   r   r}      s    
#r}   r~   Fr,   i   rk   c           1         sT  j }|dv s	J |j}t|| tdj }t| tdjtdj  }|| | }|dkrLtd| d| d| d	| d
|d dd t|||d t	|||dj
 _
j _|du rn   n| _   |du r|j}t }|dur||fdjfksJ tj|rtd|   |}tdt|  ntd |dkrt||D ]3} |  |\}}}}t }|||\}} jd  t | 7  <  |||| qn$|dkrU fdd} |} d}!td}"t||D ]>} | |"||!|d f}#| \}}}}   |||\}} d ||||f}!   |#  }  d q j|!  |"!  ndd fdd}$d( fdd	}%fdd }&t"d}'t"d}(|$|&dt |(|'})|$|%|	||||'d}*t }+	 td" |(  },|,sno|,\}-}.}/}0}}}}|-|krdd   jd  |07  <  jd  |.7  <  jd#  |/7  < td$|-   |||| td%|-  |#|-  |- |dur
t |+ |kr
td&  $|| t }+q|*%  |)%   &d'  j'(   )   j'j* j'j+fS ))a  
    Search queries xq in the IVF index, with a search function that collects
    batches of query vectors per inverted list. This can be faster than the
    regular search indexes.
    Supports IVFFlat, IVFPQ and IVFScalarQuantizer.

    Supports three computation methods:
    method = "index":
        build a flat index and populate it separately for each index
    method = "pairwise_distances":
        decompress codes and compute all pairwise distances for the queries
        and index and add result to heap
    method = "knn_function":
        decompress codes and compute knn results for the queries

    threaded=0: sequential execution
    threaded=1: prefetch next bucket while computing the current one
    threaded=2: prefetch prefetch_threads buckets at a time.

    compute_threads>1: the knn function will get an additional thread_no that
        tells which worker should handle this.

    In threaded mode, the computation is tiled with the bucket perparation and
    the writeback of results (useful to maximize GPU utilization).

    use_float16: convert all matrices to float16 (faster for GPU gemm)

    q_assign: override coarse assignment, should be a matrix of size nq * nprobe

    checkpointing (only for threaded > 1):
    checkpoint: file where the checkpoints are stored
    checkpoint_freq: when to perform checkpoinging. Should be a multiple of threaded

    start_list, end_list: process only a subset of invlists
    )r   r   r~   r<   int64r   r   zmemory: queries z assign z result z total z = i   @r)   z GiB)r   r   )r   r   r   Nzrecovering checkpoint: z   already completed: z$no checkpoint: starting from scratchr-   r,   c                    s*   | dur	 j |   |jk r |S dS )z` perform the addition for the previous bucket and
            prefetch the next (if applicable) N)re   r5   ra   )to_addr9   )bbsr   r   r   add_results_and_prefetcha  s
   


z2big_batch_search.<locals>.add_results_and_prefetchr1   c           	         s   z=t |) fddt||D }|D ]}|  q    W d    n1 s1w   Y  d  W d S    t  t	   )Nc                    s(   g | ]}| vrj |fd qS ))args)apply_async).0irj   input_queueoutput_queuepooltaskr   r   
<listcomp>  s    zAbig_batch_search.<locals>.task_manager_thread.<locals>.<listcomp>)
r   rA   getclosejoinput	traceback	print_exc_threadinterrupt_main)	r   	pool_size
start_taskend_taskrj   r   r   resrr   r   r   task_manager_thread~  s   	



z-big_batch_search.<locals>.task_manager_threadc                     s    t j | d}d|_|  |S )N)targetr   T)	threadingThreaddaemonstart)r   task_manager)r   r   r   r     s   z&big_batch_search.<locals>.task_managerc                    sf   z&t d|    | \}}}}|| ||||f t d|   W d S    t  t   )NzPrepare start: zPrepare end: )logginginfora   r   r   r   r   r   )task_idr   r   r[   r\   r^   r_   )r   r   r   prepare_task  s   z&big_batch_search.<locals>.prepare_taskc              
      s:  zt d|   d}	 t }t d|   | }t | }|d u r,|d  nZ|\}}}	}
}t d|  d|  t }dkrS j|	||
| d\}}n
 |	||
\}}t | }t d	|  d|  t }||||||||
|f t | }qt d
|   W d S    t  t	   )NzCompute start: r   TzCompute input: task zCompute work: task z, centroid r,   )	thread_idzCompute output: task zCompute end: )
r   r   r   r   r   r   r   r   r   r   )r   r   r   
t_wait_outr   input_value	t_wait_incentroidr[   r\   r^   r_   rc   rd   	t_compute)compcomputation_threadsr   r   r   compute_task  s@   

z&big_batch_search.<locals>.compute_taskTzWaiting for resultr2   zAdding to heap start: centroid zAdding to heap end: centroid zwriting checkpointzfinalize heapr   ),r@   nbytesr   r>   r=   itemsizer   r   r   r}   rY   rV   rL   rE   rT   r5   setro   pathexistsrx   rA   r;   ra   r   r   r   re   r   r   r   r!   r   r   r   addrt   r   r(   r   finalizer+   rc   rd   )1r   r
   r   r   r   r   r   threadedr   prefetch_threadsr   rE   
checkpointcheckpoint_freq
start_listend_listcrash_atr@   mem_queries
mem_assignmem_resmem_totrj   r9   r[   r\   r^   r_   t0irc   rd   r   prefetched_bucketr   r   prefetched_bucket_ar   r   r   prepare_to_compute_queuecompute_to_main_queuecompute_task_managerprepare_task_managert_checkpointvaluer   r   r   r   r   )r   r   r   r   r   r   r   big_batch_search   s  4












	#	








r   )r   rm   ro   r   multiprocessing.poolr   r   r   queuer   r   r6   numpyr>   r   faiss.contrib.inspect_toolsr   r   r}   r   r   r   r   r   r   r   <module>   s>    :