o
    .i                      @   s   d Z ddlmZ ddlmZ ddlmZ ddlmZ ddlm	Z	 ddl
mZ zdd	lZW n ey7   d	ZY nw d
ZdZG dd deZd	S )z#Elasticsearch result store backend.    )datetimebytes_to_str)
_parse_url)states)ImproperlyConfigured   )KeyValueStoreBackendN)ElasticsearchBackendzVYou need to install the elasticsearch library to use the Elasticsearch result backend.c                       s   e Zd ZdZdZdZdZdZdZdZ	dZ
dZd	Zd
Zd' fdd	Zdd Zdd Zdd Zdd Zdd Zdd Zdd Z fddZ fddZdd  Zd!d" Zd#d$ Zed%d& Z  ZS )(r
   zElasticsearch Backend.

    Raises:
        celery.exceptions.ImproperlyConfigured:
            if module :pypi:`elasticsearch` is not available.
    celerybackendhttp	localhosti#  NF
      c                    s8  t  j|i | || _| jjj}td u rttd  } } } } }	 }
}|rIt	|\}}}	}
}}}|dkr:d }|rI|
d}|d\}}}|pM| j| _|pS| j| _|pY| j| _|p_| j| _|	pe| j| _|
pk| j| _|pq| j| _|dpy| j| _|d}|d ur|| _|d}|d ur|| _|dd| _d | _d S )Nelasticsearch/elasticsearch_retry_on_timeoutelasticsearch_timeoutelasticsearch_max_retrieselasticsearch_save_meta_as_textT)super__init__urlappconfgetr   r   E_LIB_MISSINGr   strip	partitionindexdoc_typeschemehostportusernamepasswordes_retry_on_timeout
es_timeoutes_max_retrieses_save_meta_as_text_server)selfr   argskwargs_getr    r!   r"   r#   r$   r%   r&   path_r(   r)   	__class__ X/var/www/html/philips/venv/lib/python3.10/site-packages/celery/backends/elasticsearch.pyr   ,   s<   


zElasticsearchBackend.__init__c                 C   s    t |tjjr|jdv rdS dS )N>               N/A  TF)
isinstancer   
exceptionsTransportErrorstatus_code)r,   excr4   r4   r5   exception_safe_to_retryU   s   
	z,ElasticsearchBackend.exception_safe_to_retryc              	   C   s`   z#|  |}z|d r|d d W W S W W d S  ttfy#   Y W d S w  tjjy/   Y d S w )Nfound_sourceresult)r/   	TypeErrorKeyErrorr   r?   NotFoundError)r,   keyresr4   r4   r5   r   c   s   
zElasticsearchBackend.getc                 C   s   | j j| j| j|dS N)r    r!   id)serverr   r    r!   r,   rJ   r4   r4   r5   r/   n   s
   zElasticsearchBackend._getc                 C   s\   |d t  d d d}z
| j||d W d S  tjjy-   | ||| Y d S w )Nz{}Z)rF   z
@timestamp)rM   body)	formatr   utcnow	isoformat_indexr   r?   ConflictError_update)r,   rJ   valuestaterQ   r4   r4   r5   _set_with_stateu   s   z$ElasticsearchBackend._set_with_statec                 C   s   |  ||d S N)rZ   )r,   rJ   rX   r4   r4   r5   set   s   zElasticsearchBackend.setc                 K   s<   dd |  D }| jjdt|| j| j|ddid|S )Nc                 S      i | ]	\}}t ||qS r4   r   .0kvr4   r4   r5   
<dictcomp>       z/ElasticsearchBackend._index.<locals>.<dictcomp>op_typecreaterM   r    r!   rQ   paramsr4   )itemsrN   r    r   r!   )r,   rM   rQ   r.   r4   r4   r5   rU      s   zElasticsearchBackend._indexc           
   	   K   s:  dd |  D }z| j|d}|ds | j||fi |W S W n tjjy6   | j||fi | Y S w z| |d d }W n tt	fyM   Y nw |d t
jkrYddiS |d t
jv ri|t
jv riddiS |d	d
}|dd
}| jjdt|| j| jd|i||dd|}	|	d dkrtjddi |	S )au  Update state in a conflict free manner.

        If state is defined (not None), this will not update ES server if either:
        * existing state is success
        * existing state is a ready state and current state in not a ready state

        This way, a Retry state cannot override a Success or Failure, and chord_unlock
        will not retry indefinitely.
        c                 S   r]   r4   r   r^   r4   r4   r5   rb      rc   z0ElasticsearchBackend._update.<locals>.<dictcomp>)rJ   rD   rE   rF   statusnoop_seq_nor   _primary_termdoc)if_primary_term	if_seq_norf   r=   z(conflicting update occurred concurrentlyNr4   )rh   r/   r   rU   r   r?   rI   decode_resultrG   rH   r   SUCCESSREADY_STATESUNREADY_STATESrN   updater   r    r!   rV   )
r,   rM   rQ   rY   r.   res_getmeta_present_on_backendseq_no	prim_termrK   r4   r4   r5   rW      s@   

zElasticsearchBackend._updatec                    sl   | j r	t |S t|tst |S |dr$| |d d |d< |dr4| |d d |d< |S )NrF      	traceback)r*   r   encoder>   dictr   _encode)r,   datar2   r4   r5   r{      s   


zElasticsearchBackend.encodec                    sh   | j r	t |S t|tst |S |dr#t |d |d< |dr2t |d |d< |S )NrF   rz   )r*   r   decoder>   r|   r   )r,   payloadr2   r4   r5   r      s   


zElasticsearchBackend.decodec                    s    fdd|D S )Nc                    s   g | ]}  |qS r4   )r   )r_   rJ   r,   r4   r5   
<listcomp>   s    z-ElasticsearchBackend.mget.<locals>.<listcomp>r4   )r,   keysr4   r   r5   mget   s   zElasticsearchBackend.mgetc                 C   s   | j j| j| j|d d S rL   )rN   deleter    r!   rO   r4   r4   r5   r      s   zElasticsearchBackend.deletec                 C   sH   d}| j r| jr| j | jf}tj| j d| j | j| j| j| j	|dS )z$Connect to the Elasticsearch server.N:)retry_on_timeoutmax_retriestimeoutr"   	http_auth)
r%   r&   r   Elasticsearchr#   r$   r'   r)   r(   r"   )r,   r   r4   r4   r5   _get_server   s   z ElasticsearchBackend._get_serverc                 C   s   | j d u r
|  | _ | j S r[   )r+   r   r   r4   r4   r5   rN      s   

zElasticsearchBackend.serverr[   )__name__
__module____qualname____doc__r    r!   r"   r#   r$   r%   r&   r'   r(   r)   r   rC   r   r/   rZ   r\   rU   rW   r{   r   r   r   r   propertyrN   __classcell__r4   r4   r2   r5   r
      s6    )5r
   )r   r   kombu.utils.encodingr   kombu.utils.urlr   r   r   celery.exceptionsr   baser	   r   ImportError__all__r   r
   r4   r4   r4   r5   <module>   s    