a
    xdfS                     @   s   d dl Z ddlmZ ddlmZ ddlmZ ddlmZmZ G dd dZ	G d	d
 d
eej
dZG dd deej
dZddddZdd Zdd Zdd ZG dd dZG dd deZG dd deZdd Zdd  ZdS )!    N   )_coreStapledStream)_util)
SendStreamReceiveStreamc                   @   sP   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd ZdddZ	dddZ
dS )_UnboundedByteQueuec                 C   s(   t  | _d| _t | _td| _d S )NFz%another task is already fetching data)		bytearray_data_closedr   
ParkingLot_lotr   ConflictDetector_fetch_lockself r   T/var/www/html/Ranjet/env/lib/python3.9/site-packages/trio/testing/_memory_streams.py__init__   s    
z_UnboundedByteQueue.__init__c                 C   s   d| _ | j  d S NT)r   r   
unpark_allr   r   r   r   close   s    z_UnboundedByteQueue.closec                 C   s   t  | _|   d S N)r
   r   r   r   r   r   r   close_and_wipe   s    z"_UnboundedByteQueue.close_and_wipec                 C   s,   | j rtd|  j|7  _| j  d S )Nzvirtual connection closed)r   r   ClosedResourceErrorr   r   r   r   datar   r   r   put"   s    
z_UnboundedByteQueue.putc                 C   s*   |d u rd S t |}|dk r&tdd S )N   max_bytes must be >= 1)operatorindex
ValueErrorr   	max_bytesr   r   r   _check_max_bytes(   s
    
z$_UnboundedByteQueue._check_max_bytesc                 C   sX   | j s| jsJ |d u r"t| j}| jrN| jd | }| jd |= |sJJ |S t S d S r   )r   r   lenr
   )r   r%   chunkr   r   r   	_get_impl/   s    
z_UnboundedByteQueue._get_implNc                 C   sP   | j 6 | | | js$| js$tj| |W  d    S 1 sB0    Y  d S r   )r   r&   r   r   r   
WouldBlockr)   r$   r   r   r   
get_nowait;   s
    
z_UnboundedByteQueue.get_nowaitc                    sj   | j P | | | js0| js0| j I d H  nt I d H  | |W  d    S 1 s\0    Y  d S r   )	r   r&   r   r   r   parkr   
checkpointr)   r$   r   r   r   getB   s    
z_UnboundedByteQueue.get)N)N)__name__
__module____qualname__r   r   r   r   r&   r)   r+   r.   r   r   r   r   r	      s   
r	   c                   @   sN   e Zd ZdZdddZdd Zdd Zd	d
 Zdd ZdddZ	dddZ
dS )MemorySendStreama  An in-memory :class:`~trio.abc.SendStream`.

    Args:
      send_all_hook: An async function, or None. Called from
          :meth:`send_all`. Can do whatever you like.
      wait_send_all_might_not_block_hook: An async function, or None. Called
          from :meth:`wait_send_all_might_not_block`. Can do whatever you
          like.
      close_hook: A synchronous function, or None. Called from :meth:`close`
          and :meth:`aclose`. Can do whatever you like.

    .. attribute:: send_all_hook
                   wait_send_all_might_not_block_hook
                   close_hook

       All of these hooks are also exposed as attributes on the object, and
       you can change them at any time.

    Nc                 C   s*   t d| _t | _|| _|| _|| _d S )N!another task is using this stream)r   r   _conflict_detectorr	   	_outgoingsend_all_hook"wait_send_all_might_not_block_hook
close_hook)r   r6   r7   r8   r   r   r   r   a   s    zMemorySendStream.__init__c                    sj   | j P t I dH  t I dH  | j| | jdurH|  I dH  W d   n1 s\0    Y  dS )z}Places the given data into the object's internal buffer, and then
        calls the :attr:`send_all_hook` (if any).

        N)r4   r   r-   r5   r   r6   r   r   r   r   send_allo   s    
zMemorySendStream.send_allc                    sj   | j P t I dH  t I dH  | jd | jdurH|  I dH  W d   n1 s\0    Y  dS )znCalls the :attr:`wait_send_all_might_not_block_hook` (if any), and
        then returns immediately.

        N    )r4   r   r-   r5   r   r7   r   r   r   r   wait_send_all_might_not_block}   s    
z.MemorySendStream.wait_send_all_might_not_blockc                 C   s    | j   | jdur|   dS )z^Marks this stream as closed, and then calls the :attr:`close_hook`
        (if any).

        N)r5   r   r8   r   r   r   r   r      s    

zMemorySendStream.closec                    s   |    t I dH  dS z!Same as :meth:`close`, but async.Nr   r   r-   r   r   r   r   aclose   s    zMemorySendStream.aclosec                    s   | j |I dH S )a  Retrieves data from the internal buffer, blocking if necessary.

        Args:
          max_bytes (int or None): The maximum amount of data to
              retrieve. None (the default) means to retrieve all the data
              that's present (but still blocks until at least one byte is
              available).

        Returns:
          If this stream has been closed, an empty bytearray. Otherwise, the
          requested data.

        N)r5   r.   r$   r   r   r   get_data   s    zMemorySendStream.get_datac                 C   s   | j |S )zRetrieves data from the internal buffer, but doesn't block.

        See :meth:`get_data` for details.

        Raises:
          trio.WouldBlock: if no data is available to retrieve.

        )r5   r+   r$   r   r   r   get_data_nowait   s    	z MemorySendStream.get_data_nowait)NNN)N)N)r/   r0   r1   __doc__r   r9   r;   r   r>   r?   r@   r   r   r   r   r2   L   s      

r2   )	metaclassc                   @   sD   e Zd ZdZdddZdddZdd Zd	d
 Zdd Zdd Z	dS )MemoryReceiveStreama  An in-memory :class:`~trio.abc.ReceiveStream`.

    Args:
      receive_some_hook: An async function, or None. Called from
          :meth:`receive_some`. Can do whatever you like.
      close_hook: A synchronous function, or None. Called from :meth:`close`
          and :meth:`aclose`. Can do whatever you like.

    .. attribute:: receive_some_hook
                   close_hook

       Both hooks are also exposed as attributes on the object, and you can
       change them at any time.

    Nc                 C   s*   t d| _t | _d| _|| _|| _d S )Nr3   F)r   r   r4   r	   	_incomingr   receive_some_hookr8   )r   rE   r8   r   r   r   r      s    zMemoryReceiveStream.__init__c                    s   | j r t I dH  t I dH  | jr0tj| jdurH|  I dH  | j|I dH }| jrftj|W  d   S 1 s~0    Y  dS )zCalls the :attr:`receive_some_hook` (if any), and then retrieves
        data from the internal buffer, blocking if necessary.

        N)r4   r   r-   r   r   rE   rD   r.   )r   r%   r   r   r   r   receive_some   s    
z MemoryReceiveStream.receive_somec                 C   s&   d| _ | j  | jdur"|   dS )zfDiscards any pending data from the internal buffer, and marks this
        stream as closed.

        TN)r   rD   r   r8   r   r   r   r   r      s    

zMemoryReceiveStream.closec                    s   |    t I dH  dS r<   r=   r   r   r   r   r>      s    zMemoryReceiveStream.aclosec                 C   s   | j | dS )z.Appends the given data to the internal buffer.N)rD   r   r   r   r   r   put_data   s    zMemoryReceiveStream.put_datac                 C   s   | j   dS )z2Adds an end-of-file marker to the internal buffer.N)rD   r   r   r   r   r   put_eof  s    zMemoryReceiveStream.put_eof)NN)N)
r/   r0   r1   rA   r   rF   r   r>   rG   rH   r   r   r   r   rC      s   
	

rC   )r%   c                C   sf   z|  |}W n tjy$   Y dS 0 z|s6|  n
|| W n tjy`   tdY n0 dS )a  Take data out of the given :class:`MemorySendStream`'s internal buffer,
    and put it into the given :class:`MemoryReceiveStream`'s internal buffer.

    Args:
      memory_send_stream (MemorySendStream): The stream to get data from.
      memory_receive_stream (MemoryReceiveStream): The stream to put data into.
      max_bytes (int or None): The maximum amount of data to transfer in this
          call, or None to transfer all available data.

    Returns:
      True if it successfully transferred some data, or False if there was no
      data to transfer.

    This is used to implement :func:`memory_stream_one_way_pair` and
    :func:`memory_stream_pair`; see the latter's docstring for an example
    of how you might use it yourself.

    FzMemoryReceiveStream was closedT)r@   r   r*   rH   rG   r   BrokenResourceError)Zmemory_send_streamZmemory_receive_streamr%   r   r   r   r   memory_stream_pump  s    
rJ   c                     s:   t  t fdd  fdd} | _ _fS )uQ  Create a connected, pure-Python, unidirectional stream with infinite
    buffering and flexible configuration options.

    You can think of this as being a no-operating-system-involved
    Trio-streamsified version of :func:`os.pipe` (except that :func:`os.pipe`
    returns the streams in the wrong order – we follow the superior convention
    that data flows from left to right).

    Returns:
      A tuple (:class:`MemorySendStream`, :class:`MemoryReceiveStream`), where
      the :class:`MemorySendStream` has its hooks set up so that it calls
      :func:`memory_stream_pump` from its
      :attr:`~MemorySendStream.send_all_hook` and
      :attr:`~MemorySendStream.close_hook`.

    The end result is that data automatically flows from the
    :class:`MemorySendStream` to the :class:`MemoryReceiveStream`. But you're
    also free to rearrange things however you like. For example, you can
    temporarily set the :attr:`~MemorySendStream.send_all_hook` to None if you
    want to simulate a stall in data transmission. Or see
    :func:`memory_stream_pair` for a more elaborate example.

    c                      s   t   d S r   )rJ   r   )recv_streamsend_streamr   r   $pump_from_send_stream_to_recv_streamC  s    zHmemory_stream_one_way_pair.<locals>.pump_from_send_stream_to_recv_streamc                      s
      d S r   r   r   )rM   r   r   *async_pump_from_send_stream_to_recv_streamF  s    zNmemory_stream_one_way_pair.<locals>.async_pump_from_send_stream_to_recv_stream)r2   rC   r6   r8   )rN   r   )rM   rK   rL   r   memory_stream_one_way_pair(  s    rO   c                 C   s0   |  \}}|  \}}t ||}t ||}||fS r   r   )Zone_way_pairZ
pipe1_sendZ
pipe1_recvZ
pipe2_sendZ
pipe2_recvZstream1Zstream2r   r   r   _make_stapled_pairN  s
    



rP   c                   C   s   t tS )a  Create a connected, pure-Python, bidirectional stream with infinite
    buffering and flexible configuration options.

    This is a convenience function that creates two one-way streams using
    :func:`memory_stream_one_way_pair`, and then uses
    :class:`~trio.StapledStream` to combine them into a single bidirectional
    stream.

    This is like a no-operating-system-involved, Trio-streamsified version of
    :func:`socket.socketpair`.

    Returns:
      A pair of :class:`~trio.StapledStream` objects that are connected so
      that data automatically flows from one to the other in both directions.

    After creating a stream pair, you can send data back and forth, which is
    enough for simple tests::

       left, right = memory_stream_pair()
       await left.send_all(b"123")
       assert await right.receive_some() == b"123"
       await right.send_all(b"456")
       assert await left.receive_some() == b"456"

    But if you read the docs for :class:`~trio.StapledStream` and
    :func:`memory_stream_one_way_pair`, you'll see that all the pieces
    involved in wiring this up are public APIs, so you can adjust to suit the
    requirements of your tests. For example, here's how to tweak a stream so
    that data flowing from left to right trickles in one byte at a time (but
    data flowing from right to left proceeds at full speed)::

        left, right = memory_stream_pair()
        async def trickle():
            # left is a StapledStream, and left.send_stream is a MemorySendStream
            # right is a StapledStream, and right.recv_stream is a MemoryReceiveStream
            while memory_stream_pump(left.send_stream, right.recv_stream, max_bytes=1):
                # Pause between each byte
                await trio.sleep(1)
        # Normally this send_all_hook calls memory_stream_pump directly without
        # passing in a max_bytes. We replace it with our custom version:
        left.send_stream.send_all_hook = trickle

    And here's a simple test using our modified stream objects::

        async def sender():
            await left.send_all(b"12345")
            await left.send_eof()

        async def receiver():
            async for data in right:
                print(data)

        async with trio.open_nursery() as nursery:
            nursery.start_soon(sender)
            nursery.start_soon(receiver)

    By default, this will print ``b"12345"`` and then immediately exit; with
    our trickle stream it instead sleeps 1 second, then prints ``b"1"``, then
    sleeps 1 second, then prints ``b"2"``, etc.

    Pro-tip: you can insert sleep calls (like in our example above) to
    manipulate the flow of data across tasks... and then use
    :class:`MockClock` and its :attr:`~MockClock.autojump_threshold`
    functionality to keep your test suite running quickly.

    If you want to stress test a protocol implementation, one nice trick is to
    use the :mod:`random` module (preferably with a fixed seed) to move random
    numbers of bytes at a time, and insert random sleeps in between them. You
    can also set up a custom :attr:`~MemoryReceiveStream.receive_some_hook` if
    you want to manipulate things on the receiving side, and not just the
    sending side.

    )rP   rO   r   r   r   r   memory_stream_pairV  s    JrQ   c                   @   sN   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dddZ
dS )_LockstepByteQueuec                 C   s@   t  | _d| _d| _d| _t | _t	d| _
t	d| _d S )NFzanother task is already sendingz!another task is already receiving)r
   r   _sender_closed_receiver_closed_receiver_waitingr   r   _waitersr   r   _send_conflict_detector_receive_conflict_detectorr   r   r   r   r     s    
z_LockstepByteQueue.__init__c                 C   s   | j   d S r   )rV   r   r   r   r   r   _something_happened  s    z&_LockstepByteQueue._something_happenedc                    s:   | rq(| j s(| jrq(| j I d H  q t I d H  d S r   )rS   rT   rV   r,   r   r-   )r   fnr   r   r   	_wait_for  s    z_LockstepByteQueue._wait_forc                 C   s   d| _ |   d S r   )rS   rY   r   r   r   r   close_sender  s    z_LockstepByteQueue.close_senderc                 C   s   d| _ |   d S r   )rT   rY   r   r   r   r   close_receiver  s    z!_LockstepByteQueue.close_receiverc                    s    j ~  jrtj jr tj jr*J   j|7  _     fddI d H   jrdtj jrv jrvtjW d    n1 s0    Y  d S )Nc                      s    j  S r   r   r   r   r   r   <lambda>  r:   z-_LockstepByteQueue.send_all.<locals>.<lambda>)	rW   rS   r   r   rT   rI   r   rY   r[   r   r   r   r   r9     s    
z_LockstepByteQueue.send_allc                    s~    j d  jrtj jr8t I d H  W d    d S   fddI d H   jr\tjW d    n1 sp0    Y  d S )Nc                      s    j S r   )rU   r   r   r   r   r_     r:   zB_LockstepByteQueue.wait_send_all_might_not_block.<locals>.<lambda>)rW   rS   r   r   rT   r-   r[   r   r   r   r   r;     s    z0_LockstepByteQueue.wait_send_all_might_not_blockNc              	      s    j  |d ur*t|}|dk r*td jr6tjd _   z" 	 fddI d H  W d _nd _0  jr|tj j
r j
d | } j
d |=    |W  d    S  jsJ W d    dS W d    n1 s0    Y  d S )Nr   r    Tc                      s    j S r   r^   r   r   r   r   r_     r:   z1_LockstepByteQueue.receive_some.<locals>.<lambda>Fr:   )rX   r!   r"   r#   rT   r   r   rU   rY   r[   r   rS   )r   r%   gotr   r   r   rF     s*    

z_LockstepByteQueue.receive_some)N)r/   r0   r1   r   rY   r[   r\   r]   r9   r;   rF   r   r   r   r   rR     s   	rR   c                   @   s4   e Zd Zdd Zdd Zdd Zdd Zd	d
 ZdS )_LockstepSendStreamc                 C   s
   || _ d S r   _lbqr   lbqr   r   r   r     s    z_LockstepSendStream.__init__c                 C   s   | j   d S r   )rc   r\   r   r   r   r   r   
  s    z_LockstepSendStream.closec                    s   |    t I d H  d S r   r=   r   r   r   r   r>     s    z_LockstepSendStream.aclosec                    s   | j |I d H  d S r   )rc   r9   r   r   r   r   r9     s    z_LockstepSendStream.send_allc                    s   | j  I d H  d S r   )rc   r;   r   r   r   r   r;     s    z1_LockstepSendStream.wait_send_all_might_not_blockN)r/   r0   r1   r   r   r>   r9   r;   r   r   r   r   ra     s
   ra   c                   @   s.   e Zd Zdd Zdd Zdd Zd
dd	ZdS )_LockstepReceiveStreamc                 C   s
   || _ d S r   rb   rd   r   r   r   r     s    z_LockstepReceiveStream.__init__c                 C   s   | j   d S r   )rc   r]   r   r   r   r   r     s    z_LockstepReceiveStream.closec                    s   |    t I d H  d S r   r=   r   r   r   r   r>     s    z_LockstepReceiveStream.acloseNc                    s   | j |I d H S r   )rc   rF   r$   r   r   r   rF   #  s    z#_LockstepReceiveStream.receive_some)N)r/   r0   r1   r   r   r>   rF   r   r   r   r   rf     s   rf   c                  C   s   t  } t| t| fS )a  Create a connected, pure Python, unidirectional stream where data flows
    in lockstep.

    Returns:
      A tuple
      (:class:`~trio.abc.SendStream`, :class:`~trio.abc.ReceiveStream`).

    This stream has *absolutely no* buffering. Each call to
    :meth:`~trio.abc.SendStream.send_all` will block until all the given data
    has been returned by a call to
    :meth:`~trio.abc.ReceiveStream.receive_some`.

    This can be useful for testing flow control mechanisms in an extreme case,
    or for setting up "clogged" streams to use with
    :func:`check_one_way_stream` and friends.

    In addition to fulfilling the :class:`~trio.abc.SendStream` and
    :class:`~trio.abc.ReceiveStream` interfaces, the return objects
    also have a synchronous ``close`` method.

    )rR   ra   rf   )re   r   r   r   lockstep_stream_one_way_pair'  s    rg   c                   C   s   t tS )a  Create a connected, pure-Python, bidirectional stream where data flows
    in lockstep.

    Returns:
      A tuple (:class:`~trio.StapledStream`, :class:`~trio.StapledStream`).

    This is a convenience function that creates two one-way streams using
    :func:`lockstep_stream_one_way_pair`, and then uses
    :class:`~trio.StapledStream` to combine them into a single bidirectional
    stream.

    )rP   rg   r   r   r   r   lockstep_stream_pairB  s    rh   )r!    r   Z_highlevel_genericr   r   abcr   r   r	   Finalr2   rC   rJ   rO   rP   rQ   rR   ra   rf   rg   rh   r   r   r   r   <module>   s    ?rI!&R^