o
    tBh_                     @   s  d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZ d dl	m
Z
mZmZmZmZmZmZmZ d dlZd dlmZmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZmZ ddl m!Z! ddl"m#Z#m$Z$m%Z%m&Z&m'Z' e(dZ)ee*ee f Z+G dd dZ,G dd de!Z-dS )    N)abstractmethod)suppress)
SSLContext)AnyAsyncGeneratorDictListOptionalTupleUnioncast)DocumentNodeExecutionResult)WebSocketClientProtocol)HeadersHeadersLike)ConnectionClosed)DataSubprotocol   )AsyncTransport)TransportAlreadyConnectedTransportClosedTransportProtocolErrorTransportQueryErrorTransportServerErrorzgql.transport.websocketsc                   @   sX   e Zd ZdZdededdfddZdefdd	Zd
eddfddZ	de
ddfddZdS )ListenerQueuea&  Special queue used for each query waiting for server answers

    If the server is stopped while the listener is still waiting,
    Then we send an exception to the queue and this exception will be raised
    to the consumer once all the previous messages have been consumed from the queue
    query_id	send_stopreturnNc                 C   s    || _ || _t | _d| _d S )NF)r   r   asyncioQueue_queue_closed)selfr   r    r%   t/var/www/html/riverr-enterprise-integrations-main/venv/lib/python3.10/site-packages/gql/transport/websockets_base.py__init__&   s   

zListenerQueue.__init__c                    st   z| j  }W n tjy   | j  I d H }Y nw | j   t|tr*d| _||\}}|dkr8d| _	d| _|S )NTcompleteF)
r"   
get_nowaitr    
QueueEmptyget	task_done
isinstance	Exceptionr#   r   )r$   itemanswer_typeexecution_resultr%   r%   r&   r+   ,   s   

zListenerQueue.getr/   c                    s"   | j s| j|I d H  d S d S N)r#   r"   put)r$   r/   r%   r%   r&   r3   C   s   zListenerQueue.put	exceptionc                    s$   | j |I d H  d| _d| _d S )NFT)r"   r3   r   r#   )r$   r4   r%   r%   r&   set_exceptionH   s   
zListenerQueue.set_exception)__name__
__module____qualname____doc__intboolr'   ParsedAnswerr+   r3   r.   r5   r%   r%   r%   r&   r      s    r   c                   @   sd  e Zd ZdZddi ddddi fdedee deee	f de
eef d	eeeef  d
eeeef  deeeef  deeeef  de
eef ddfddZdd ZdefddZdd Zdd Zdd Zdd Zdeddfdd Zdefd!d"Ze		dJd#ed$ee
eef  d%ee defd&d'Zed(edeeee ee f fd)d*ZdKd+d,ZdKd-d.Zd/ed0ee d1ee ddfd2d3Z			4dLd#ed$ee
eef  d%ee d5ee	 de edf f
d6d7Z!		dJd#ed$ee
eef  d%ee defd8d9Z"dKd:d;Z#dKd<d=Z$d>e%ddfd?d@Z&dMd>e%dAe	ddfdBdCZ'dMd>e%dAe	ddfdDdEZ(dKdFdGZ)dKdHdIZ*dS )NWebsocketsTransportBasezabstract :ref:`Async Transport <async_transports>` used to implement
    different websockets protocols.

    This transport uses asyncio and the websockets library in order to send requests
    on a websocket connection.
    NF
   urlheaderssslinit_payloadconnect_timeoutclose_timeoutack_timeoutkeep_alive_timeoutconnect_argsr   c
           
      C   s2  || _ || _|| _|| _|| _|| _|| _|| _|	| _d| _	d| _
i | _d| _d| _d| _z"t  tjddd t | _W d   n1 sIw   Y  W n tyc   t | _t| j Y nw t | _| j  t | _| j  | jdurt | _| j  i | _	 d| _d| _g | _ d| _!dS )a  Initialize the transport with the given parameters.

        :param url: The GraphQL server URL. Example: 'wss://server.com:PORT/graphql'.
        :param headers: Dict of HTTP Headers.
        :param ssl: ssl_context of the connection. Use ssl=False to disable encryption
        :param init_payload: Dict of the payload sent in the connection_init message.
        :param connect_timeout: Timeout in seconds for the establishment
            of the websocket connection. If None is provided this will wait forever.
        :param close_timeout: Timeout in seconds for the close. If None is provided
            this will wait forever.
        :param ack_timeout: Timeout in seconds to wait for the connection_ack message
            from the server. If None is provided this will wait forever.
        :param keep_alive_timeout: Optional Timeout in seconds to receive
            a sign of liveness from the server.
        :param connect_args: Other parameters forwarded to websockets.connect
        Nr   ignorezThere is no current event loop)messageF)"r?   r@   rA   rB   rC   rD   rE   rF   rG   	websocketnext_query_id	listenersreceive_data_taskcheck_keep_alive_task
close_taskwarningscatch_warningsfilterwarningsr    get_event_loop_loopRuntimeErrornew_event_loopset_event_loopEvent_wait_closedset_no_more_listeners_next_keep_alive_messagepayloads_connectingclose_exceptionsupported_subprotocolsresponse_headers)
r$   r?   r@   rA   rB   rC   rD   rE   rF   rG   r%   r%   r&   r'   Z   sP   









z WebsocketsTransportBase.__init__c                       dS )zxHook to send the initialization messages after the connection
        and potentially wait for the backend ack.
        Nr%   r$   r%   r%   r&   _initialize      z#WebsocketsTransportBase._initializer   c                    rb   )ziHook to stop to listen to a specific query.
        Will send a stop message in some subclasses.
        Nr%   )r$   r   r%   r%   r&   _stop_listener   re   z&WebsocketsTransportBase._stop_listenerc                    rb   )zbHook to add custom code for subclasses after the connection
        has been established.
        Nr%   rc   r%   r%   r&   _after_connect   re   z&WebsocketsTransportBase._after_connectc                    rb   z_Hook to add custom code for subclasses after the initialization
        has been done.
        Nr%   rc   r%   r%   r&   _after_initialize   re   z)WebsocketsTransportBase._after_initializec                    rb   )z?Hook to add custom code for subclasses for the connection closeNr%   rc   r%   r%   r&   _close_hook   s   z#WebsocketsTransportBase._close_hookc                    rb   rh   r%   rc   r%   r%   r&   _connection_terminate   re   z-WebsocketsTransportBase._connection_terminaterI   c              
      sj   | j s
td| jz| j |I dH  td| W dS  ty4 } z| j|ddI dH  |d}~ww )zISend the provided message to the websocket connection and log the messagezTransport is not connectedNz>>> %sFclean_close)rJ   r   r_   sendloginfor   _fail)r$   rI   er%   r%   r&   _send   s   zWebsocketsTransportBase._sendc                    sJ   | j du r
td| j  I dH }t|tstd|}td| |S )zFWait the next message from the websocket connection and log the answerNzTransport is already closedz%Binary data received in the websocketz<<< %s)rJ   r   recvr-   strr   ro   rp   )r$   dataanswerr%   r%   r&   _receive   s   

z WebsocketsTransportBase._receivedocumentvariable_valuesoperation_namec                    s   t r2   NotImplementedError)r$   ry   rz   r{   r%   r%   r&   _send_query   s   z#WebsocketsTransportBase._send_queryrw   c                 C   s   t r2   r|   )r$   rw   r%   r%   r&   _parse_answer   s   z%WebsocketsTransportBase._parse_answerc                    s~   z	 t | j | jI dH  | j  q t jy4   | jdu r1| jt	dddI dH  Y dS Y dS  t j
y>   Y dS w )zsCoroutine which will periodically check the liveness of the connection
        through keep-alive messages
        TNzeNo keep-alive message has been received within the expected interval ('keep_alive_timeout' parameter)Frl   )r    wait_forr\   waitrF   clearTimeoutErrorrO   rq   r   CancelledErrorrc   r%   r%   r&   _check_ws_liveness  s(   


	z*WebsocketsTransportBase._check_ws_livenessc                    sp  z	 z	|   I dH }W n& ttfy* } z| j|ddI dH  W Y d}~ntd}~w ty2   Y npw z
| |\}}}W nQ tyq } z(t|jt	sOJ dz| j
|j |I dH  W n	 tyf   Y nw W Y d}~qd}~w ttfy } z| j|ddI dH  W Y d}~n d}~ww | |||I dH  qW td dS W td dS W td dS td w )zMain asyncio task which will listen to the incoming messages and will
        call the parse_answer and handle_answer methods of the subclass.TNFrl   z7TransportQueryError should have a query_id defined herezExiting _receive_data_loop())rx   r   r   rq   r   r   r   r-   r   r:   rL   r5   KeyErrorr   _handle_answerro   debug)r$   rw   rr   r0   	answer_idr1   r%   r%   r&   _receive_data_loop%  sT   #!z*WebsocketsTransportBase._receive_data_loopr0   r   r1   c                    sF   z|d ur| j | ||fI d H  W d S W d S  ty"   Y d S w r2   )rL   r3   r   )r$   r0   r   r1   r%   r%   r&   r   U  s    z&WebsocketsTransportBase._handle_answerTr   c           
   
   C  s6  |  |||I dH }t||du d}|| j|< | j  zoz#	 | I dH \}}|dur1|V  n|dkr?td| d nq W n+ tj	t
fyl }	 ztd|	 |jrb| |I dH  d|_W Y d}	~	nd}	~	ww W td	|  | | dS W td	|  | | dS td	|  | | w )
zSend a query and receive the results using a python async generator.

        The query can be a graphql query, mutation or subscription.

        The results are sent as an ExecutionResult object.
        NTr   r(   zComplete received for query z --> exit without errorzException in subscribe: Fz"In subscribe finally for query_id )r~   r   rL   r[   r   r+   ro   r   r    r   GeneratorExitr   rf   _remove_listener)
r$   ry   rz   r{   r   r   listenerr0   r1   rr   r%   r%   r&   	subscribed  sD   



	z!WebsocketsTransportBase.subscribec                    sT   d}| j |||dd}|2 z3 dH W }|}| I dH   n6 |du r(td|S )a  Execute the provided document AST against the configured remote server
        using the current session.

        Send a query but close the async generator as soon as we have the first answer.

        The result is sent as an ExecutionResult object.
        NFr   z;Query completed without any answer received from the server)r   acloser   )r$   ry   rz   r{   first_result	generatorresultr%   r%   r&   execute  s   	zWebsocketsTransportBase.executec              
      s  t d | jdu r| jsd| _| jr| j}n
| jdr dnd}|| j| jd}|	| j
 zttjj| jfi || jI dH | _W d| _nd| _w tt| j| _| jj| _|  I dH  d| _d| _| j  z	|  I dH  W n( ty } z|d}~w ttjfy } z| j|ddI dH  |d}~ww |  I dH  | jdurt | ! | _"t | # | _$nt%d	t d
 dS )aO  Coroutine which will:

        - connect to the websocket address
        - send the init message
        - wait for the connection acknowledge from the server
        - create an asyncio task which will be used to receive
          and parse the websocket answers

        Should be cleaned with a call to the close coroutine
        zconnect: startingNTwss)rA   extra_headerssubprotocolsFr   rl   zTransport is already connectedzconnect: done)&ro   r   rJ   r^   rA   r?   
startswithr@   r`   updaterG   r    r   
websocketsclientconnectrC   r   r   ra   rg   rK   r_   rY   r   rd   r   r   r   rq   ri   rF   ensure_futurer   rN   r   rM   r   )r$   rA   rG   rr   r%   r%   r&   r     sT   



zWebsocketsTransportBase.connectc                 C   sN   || j v r	| j |= t| j }td| d| d |dkr%| j  dS dS )zAfter exiting from a subscription, remove the listener and
        signal an event if this was the last listener for the client.
        z	listener z
 deleted, z
 remainingr   N)rL   lenro   r   r[   rZ   )r$   r   	remainingr%   r%   r&   r     s   

z(WebsocketsTransportBase._remove_listenerrr   c                    s   | j  D ]\}}|jr| |I dH  d|_qzt| j | jI dH  W n tj	y7   t
d Y nw |  I dH  dS )zCoroutine which will:

        - send stop messages for each active subscription to the server
        - send the connection terminate message
        NFzTimer close_timeout fired)rL   itemsr   rf   r    r   r[   r   rD   r   ro   r   rk   )r$   rr   r   r   r%   r%   r&   _clean_close&  s   z$WebsocketsTransportBase._clean_closerm   c              
      s  t d zz| jdusJ | jdur4| j  ttj | jI dH  W d   n1 s/w   Y  |  I dH  || _	|rkt d z
| 
|I dH  W n tyj } zt dt|  W Y d}~nd}~ww t d | j D ]\}}||I dH  qut d | j I dH  t d W n ty } zt dt|  W Y d}~nd}~ww W t d	 d| _d| _d| _| j  nt d	 d| _d| _d| _| j  w t d
 dS )a%  Coroutine which will:

        - do a clean_close if possible:
            - send stop messages for each active query to the server
            - send the connection terminate message
        - close the websocket connection
        - send the exception to all the remaining listeners
        z_close_coro: startingNz!_close_coro: starting clean_closez$Ignoring exception in _clean_close: z+_close_coro: sending exception to listenersz'_close_coro: close websocket connectionz(_close_coro: websocket connection closedz"Exception catched in _close_coro: z_close_coro: start cleanupz_close_coro: exiting)ro   r   rJ   rN   cancelr   r    r   rj   r_   r   r.   warningreprrL   r   r5   closerO   rY   rZ   )r$   rr   rm   excr   r   r%   r%   r&   _close_coro=  sR   








z#WebsocketsTransportBase._close_coroc                    sz   t dt|  | jd u r+| jd u rt d d S tt| j||d| _d S t dt| j	 d t|  d S )Nz _fail: starting with exception: z;_fail started with self.websocket == None -> already closedrl   z8close_task is not None in _fail. Previous exception is: z New exception is: )
ro   r   r   rO   rJ   r    shieldr   r   r_   )r$   rr   rm   r%   r%   r&   rq   }  s"   


zWebsocketsTransportBase._failc                    s<   t d | tdI d H  |  I d H  t d d S )Nzclose: startingz*Websocket GraphQL transport closed by userzclose: done)ro   r   rq   r   wait_closedrc   r%   r%   r&   r     s
   
zWebsocketsTransportBase.closec                    s*   t d | j I d H  t d d S )Nzwait_close: startingzwait_close: done)ro   r   rY   r   rc   r%   r%   r&   r     s   
z#WebsocketsTransportBase.wait_closed)NN)r   N)NNT)T)+r6   r7   r8   r9   ru   r	   r   r   r   r;   r   r   r:   floatr'   rd   rf   rg   ri   rj   rk   rs   rx   r   r   r~   r
   r   r   r   r   r   r   r   r   r   r   r.   r   r   rq   r   r   r%   r%   r%   r&   r=   R   s    


	


T

"0


<

#
Y@
r=   ).r    loggingrP   abcr   
contextlibr   rA   r   typingr   r   r   r   r	   r
   r   r   r   graphqlr   r   websockets.clientr   websockets.datastructuresr   r   websockets.exceptionsr   websockets.typingr   r   async_transportr   
exceptionsr   r   r   r   r   	getLoggerro   ru   r<   r   r=   r%   r%   r%   r&   <module>   s&    (
4