a
    Šxd(  ã                   @   s²   d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZmZmZmZ d dlmZ G d	d
„ d
eƒZeG dd„ dejƒƒZdS )é    N)Úgen)ÚIOStream)Úapp_log)Ú	TCPServer)ÚskipIfNonUnix)ÚAsyncTestCaseÚ	ExpectLogÚbind_unused_portÚgen_test)ÚTuplec                   @   s8   e Zd Zedd„ ƒZedd„ ƒZdd„ Zedd„ ƒZd	S )
ÚTCPServerTestc                 c   sâ   G dd„ dt ƒ}d  }}z¢tƒ \}}|ƒ }| |¡ tt ¡ ƒ}ttdƒ> | d|f¡V  | d¡V  | 	¡ V  t
jV  W d   ƒ n1 sŽ0    Y  W |d urª| ¡  |d urÞ| ¡  n"|d urÌ| ¡  |d urÜ| ¡  0 d S )Nc                   @   s   e Zd Zejdd„ ƒZdS )zFTCPServerTest.test_handle_stream_coroutine_logging.<locals>.TestServerc                 s   s$   |  tdƒ¡V  | ¡  dd  d S )Nó   helloé   r   )Ú
read_bytesÚlenÚclose©ÚselfÚstreamÚaddress© r   úS/var/www/html/Ranjet/env/lib/python3.9/site-packages/tornado/test/tcpserver_test.pyÚhandle_stream   s    zTTCPServerTest.test_handle_stream_coroutine_logging.<locals>.TestServer.handle_streamN©Ú__name__Ú
__module__Ú__qualname__r   Ú	coroutiner   r   r   r   r   Ú
TestServer   s   r   zException in callbackÚ	localhostr   )r   r	   Ú
add_socketr   Úsocketr   r   ÚconnectÚwriteÚread_until_closer   ZmomentÚstopr   )r   r   ÚserverÚclientÚsockÚportr   r   r   Ú$test_handle_stream_coroutine_logging   s(    


(
ýz2TCPServerTest.test_handle_stream_coroutine_loggingc                 c   sp   G dd„ dt ƒ}tƒ \}}|ƒ }| |¡ tt ¡ ƒ}| d|f¡V  | ¡ V }|  |d¡ | ¡  | 	¡  d S )Nc                   @   s   e Zd Zdd„ ZdS )zETCPServerTest.test_handle_stream_native_coroutine.<locals>.TestServerc                 Ó   s   |  d¡ | ¡  d S )Nó   data)r#   r   r   r   r   r   r   3   s    
zSTCPServerTest.test_handle_stream_native_coroutine.<locals>.TestServer.handle_streamN)r   r   r   r   r   r   r   r   r   2   s   r   r   r+   )
r   r	   r    r   r!   r"   r$   ÚassertEqualr%   r   )r   r   r(   r)   r&   r'   Úresultr   r   r   Ú#test_handle_stream_native_coroutine.   s    


z1TCPServerTest.test_handle_stream_native_coroutinec                 C   s.   t ƒ \}}tƒ }| |¡ | ¡  | ¡  d S ©N)r	   r   r    r%   )r   r(   r)   r&   r   r   r   Útest_stop_twiceA   s
    

zTCPServerTest.test_stop_twicec              	   #   sÐ   G ‡fdd„dt ƒ}tƒ \}}|ƒ ‰ˆ |¡ d|f‰d}dd„ t|ƒD ƒ}g ‰tj‡‡fdd„ƒ‰ ‡ fd	d„|D ƒV  |  tˆƒd
d¡ z,tˆƒ|kr¢|  d¡ W ˆD ]}| 	¡  q¨nˆD ]}| 	¡  q¼0 d S )Nc                       s   e Zd Zej‡ fdd„ƒZdS )z7TCPServerTest.test_stop_in_callback.<locals>.TestServerc                 3   s   ˆ   ¡  | ¡ V  d S r/   )r%   r$   r   ©r&   r   r   r   O   s    zETCPServerTest.test_stop_in_callback.<locals>.TestServer.handle_streamNr   r   r1   r   r   r   N   s   r   r   é(   c                 S   s   g | ]}t t ¡ ƒ‘qS r   )r   r!   )Ú.0Úir   r   r   Ú
<listcomp>Y   ó    z7TCPServerTest.test_stop_in_callback.<locals>.<listcomp>c                 3   s2   z|   ˆ¡V  W n ty"   Y n0 ˆ  | ¡ d S r/   )r"   ÚEnvironmentErrorÚappend)Úc)Úconnected_clientsÚserver_addrr   r   r"   \   s
    z4TCPServerTest.test_stop_in_callback.<locals>.connectc                    s   g | ]}ˆ |ƒ‘qS r   r   )r3   r9   )r"   r   r   r5   e   r6   r   zall clients failed connectingzHat least one client should fail connecting for the test to be meaningful)
r   r	   r    Úranger   r   ZassertGreaterr   ZskipTestr   )r   r   r(   r)   ÚNZclientsr9   r   )r"   r:   r&   r;   r   Útest_stop_in_callbackH   s*    

ÿÿz#TCPServerTest.test_stop_in_callbackN)r   r   r   r
   r*   r.   r0   r>   r   r   r   r   r      s   

r   c                   @   sD   e Zd Zeeeef dœdd„Zdd„ Zdd„ Zdd	„ Zd
d„ Z	dS )ÚTestMultiprocess)ÚcodeÚreturnc              
   C   st   zt jtjdgd|ddd}W nH t jyf } z.td|j› d|j› d|j› ƒ|‚W Y d }~n
d }~0 0 |j|jfS )Nz-Werror::DeprecationWarningTÚutf8)Úcapture_outputÚinputÚencodingÚcheckzProcess returned z stdout=z stderr=)	Ú
subprocessÚrunÚsysÚ
executableÚCalledProcessErrorÚRuntimeErrorÚ
returncodeÚstdoutÚstderr)r   r@   r-   Úer   r   r   Úrun_subproc~   s    û
ÿþzTestMultiprocess.run_subprocc                 C   s>   t  d¡}|  |¡\}}|  d t|ƒ¡d¡ |  |d¡ d S )Na  
            import asyncio
            from tornado.tcpserver import TCPServer

            async def main():
                server = TCPServer()
                server.listen(0, address='127.0.0.1')

            asyncio.run(main())
            print('012', end='')
        Ú Ú012©ÚtextwrapÚdedentrQ   r,   ÚjoinÚsorted©r   r@   ÚoutÚerrr   r   r   Útest_listen_single   s    ÿz#TestMultiprocess.test_listen_singlec                 C   s>   t  d¡}|  |¡\}}|  d t|ƒ¡d¡ |  |d¡ d S )NaÀ  
            import warnings

            from tornado.ioloop import IOLoop
            from tornado.process import task_id
            from tornado.tcpserver import TCPServer

            warnings.simplefilter("ignore", DeprecationWarning)

            server = TCPServer()
            server.bind(0, address='127.0.0.1')
            server.start(3)
            IOLoop.current().run_sync(lambda: None)
            print(task_id(), end='')
        rR   rS   rT   rY   r   r   r   Útest_bind_start¡   s    ÿz TestMultiprocess.test_bind_startc                 C   s>   t  d¡}|  |¡\}}|  d t|ƒ¡d¡ |  |d¡ d S )Na  
            import asyncio
            from tornado.netutil import bind_sockets
            from tornado.process import fork_processes, task_id
            from tornado.ioloop import IOLoop
            from tornado.tcpserver import TCPServer

            sockets = bind_sockets(0, address='127.0.0.1')
            fork_processes(3)
            async def post_fork_main():
                server = TCPServer()
                server.add_sockets(sockets)
            asyncio.run(post_fork_main())
            print(task_id(), end='')
        rR   rS   rT   rY   r   r   r   Útest_add_sockets·   s    ÿz!TestMultiprocess.test_add_socketsc                 C   s>   t  d¡}|  |¡\}}|  d t|ƒ¡d¡ |  |d¡ d S )NaÍ  
            import asyncio
            import socket
            from tornado.netutil import bind_sockets
            from tornado.process import task_id, fork_processes
            from tornado.tcpserver import TCPServer

            # Pick an unused port which we will be able to bind to multiple times.
            (sock,) = bind_sockets(0, address='127.0.0.1',
                family=socket.AF_INET, reuse_port=True)
            port = sock.getsockname()[1]

            fork_processes(3)

            async def main():
                server = TCPServer()
                server.listen(port, address='127.0.0.1', reuse_port=True)
            asyncio.run(main())
            print(task_id(), end='')
            rR   rS   rT   rY   r   r   r   Útest_listen_multi_reuse_portÍ   s    ÿz-TestMultiprocess.test_listen_multi_reuse_portN)
r   r   r   Ústrr   rQ   r\   r]   r^   r_   r   r   r   r   r?   w   s
   r?   )r!   rG   rI   rU   ZunittestZtornador   Ztornado.iostreamr   Ztornado.logr   Ztornado.tcpserverr   Ztornado.test.utilr   Ztornado.testingr   r   r	   r
   Útypingr   r   ZTestCaser?   r   r   r   r   Ú<module>   s   f