
    R?g(                         S SK r S SKrS SKrS SKrS SKrS SKJr  S SKJr  S SK	J
r
  S SKJr  S SKJr  S SKJrJrJrJr  S SKJr   " S	 S
\5      r\ " S S\R.                  5      5       rg)    N)gen)IOStream)app_log)	TCPServer)skipIfNonUnix)AsyncTestCase	ExpectLogbind_unused_portgen_test)Tuplec                   J    \ rS rSr\S 5       r\S 5       rS r\S 5       rSr	g)TCPServerTest   c              #   H  #     " S S[         5      nS =p# [        5       u  pEU" 5       nUR                  U5        [        [        R                  " 5       5      n[        [        S5         UR                  SU45      v   UR                  S5      v   UR                  5       v   [        R                  v   S S S 5        Ub  UR                  5         Ub  UR                  5         g g ! , (       d  f       N6= f! Ub  UR                  5         Ub  UR                  5         f f = f7f)Nc                   8    \ rS rSr\R
                  S 5       rSrg)FTCPServerTest.test_handle_stream_coroutine_logging.<locals>.TestServer   c              3   n   #    UR                  [        S5      5      v   UR                  5         SS-    g 7f)N   hello   r   )
read_byteslencloseselfstreamaddresss      =/usr/lib/python3/dist-packages/tornado/test/tcpserver_test.pyhandle_streamTTCPServerTest.test_handle_stream_coroutine_logging.<locals>.TestServer.handle_stream   s*     ''H66As   35 N__name__
__module____qualname____firstlineno__r   	coroutiner   __static_attributes__r!       r   
TestServerr      s    ]] r)   r*   zException in callback	localhostr   )r   r
   
add_socketr   socketr	   r   connectwriteread_until_closer   momentstopr   )r   r*   serverclientsockports         r   $test_handle_stream_coroutine_logging2TCPServerTest.test_handle_stream_coroutine_logging   s     	 	 	)+JD\Fd#fmmo.F7$;<nnk4%899ll8,,--//jj 	 = !! " =< !! "s6   D"AC6 (AC%5C6 =(D"%
C3/C6 6)DD"c              #   `  #     " S S[         5      n[        5       u  p#U" 5       nUR                  U5        [        [        R                  " 5       5      nUR                  SU45      v   UR                  5       v nU R                  US5        UR                  5         UR                  5         g 7f)Nc                       \ rS rSrS rSrg)ETCPServerTest.test_handle_stream_native_coroutine.<locals>.TestServer2   c                 N   #    UR                  S5        UR                  5         g 7f)N   data)r/   r   r   s      r   r   STCPServerTest.test_handle_stream_native_coroutine.<locals>.TestServer.handle_stream3   s     W%s   #%r!   N)r#   r$   r%   r&   r   r(   r!   r)   r   r*   r;   2   s    r)   r*   r+   r>   )
r   r
   r,   r   r-   r.   r0   assertEqualr2   r   )r   r*   r5   r6   r3   r4   results          r   #test_handle_stream_native_coroutine1TCPServerTest.test_handle_stream_native_coroutine.   s     	 	
 &'
$&--/*nnk4011..00)s   B,B.c                     [        5       u  p[        5       nUR                  U5        UR                  5         UR                  5         g N)r
   r   r,   r2   )r   r5   r6   r3   s       r   test_stop_twiceTCPServerTest.test_stop_twiceA   s3    %'
$r)   c              #   `  ^	^
^#     " U
4S jS[         5      n[        5       u  p#U" 5       m
T
R                  U5        SU4mSn[        U5       Vs/ s H!  n[	        [
        R
                  " 5       5      PM#     nn/ m	[        R                  U	U4S j5       nU Vs/ s H
  o" U5      PM     snv   U R                  [        T	5      SS5         [        T	5      U:X  a  U R                  S5        T	 H  nUR                  5         M     g s  snf s  snf ! T	 H  nUR                  5         M     f = f7f)	Nc                   @   > \ rS rSr\R
                  U 4S j5       rSrg)7TCPServerTest.test_stop_in_callback.<locals>.TestServerN   c              3   R   >#    TR                  5         UR                  5       v   g 7frE   )r2   r0   )r   r   r   r3   s      r   r   ETCPServerTest.test_stop_in_callback.<locals>.TestServer.handle_streamO   s     --//s   $'r!   Nr"   )r3   s   r   r*   rJ   N   s    ]]0 0r)   r*   r+   (   c              3   x   >#     U R                  T5      v   TR                  U 5        g ! [         a     g f = f7frE   )r.   appendEnvironmentError)cconnected_clientsserver_addrs    r   r.   4TCPServerTest.test_stop_in_callback.<locals>.connect\   s=     ,ii,, "((+ $ s   :* :
7:7:r   zall clients failed connectingzHat least one client should fail connecting for the test to be meaningful)r   r
   r,   ranger   r-   r   r'   assertGreaterr   skipTestr   )r   r*   r5   r6   Niclientsr.   rR   rS   r3   rT   s            @@@r   test_stop_in_callback#TCPServerTest.test_stop_in_callbackH   s    	0 	0 &'
$"D)6;Ah?h8FMMO,h?		, 
	, $++7awqz7++30116UV
	$%* 4
 '	 '1 @ , '	 's6   A
D.(D7"D.D*!D. D ,$D.D++D.r!   N)
r#   r$   r%   r&   r   r7   rB   rF   r\   r(   r!   r)   r   r   r      sA     6  $ ) )r)   r   c                   H    \ rS rSrS\S\\\4   4S jrS rS rS r	S r
S	rg
)TestMultiprocessw   codereturnc           	      "    [         R                  " [        R                  S/SUSSS9nUR                  UR                  4$ ! [         R                   a8  n[        SUR                   SUR                   SUR                   35      UeS nAff = f)Nz-Werror::DeprecationWarningTutf8)capture_outputinputencodingcheckzProcess returned z stdout=z stderr=)	
subprocessrunsys
executableCalledProcessErrorRuntimeError
returncodestdoutstderr)r   ra   rA   es       r   run_subprocTestMultiprocess.run_subproc~   s    	^^!>?#F }}fmm++	 ,, 	#ALL>!((8AHH:V	s   (A B3B		Bc                     [         R                  " S5      nU R                  U5      u  p#U R                  SR	                  [        U5      5      S5        U R                  US5        g )Na  
            import asyncio
            from tornado.tcpserver import TCPServer

            async def main():
                server = TCPServer()
                server.listen(0, address='127.0.0.1')

            asyncio.run(main())
            print('012', end='')
         012textwrapdedentrs   r@   joinsortedr   ra   outerrs       r   test_listen_single#TestMultiprocess.test_listen_single   sX     

 ##D)-u5b!r)   c                     [         R                  " S5      nU R                  U5      u  p#U R                  SR	                  [        U5      5      S5        U R                  US5        g )Na  
            import warnings

            from tornado.ioloop import IOLoop
            from tornado.process import task_id
            from tornado.tcpserver import TCPServer

            warnings.simplefilter("ignore", DeprecationWarning)

            server = TCPServer()
            server.bind(0, address='127.0.0.1')
            server.start(3)
            IOLoop.current().run_sync(lambda: None)
            print(task_id(), end='')
        rv   rw   rx   r}   s       r   test_bind_start TestMultiprocess.test_bind_start   V    
" ##D)-u5b!r)   c                     [         R                  " S5      nU R                  U5      u  p#U R                  SR	                  [        U5      5      S5        U R                  US5        g )Na  
            import asyncio
            from tornado.netutil import bind_sockets
            from tornado.process import fork_processes, task_id
            from tornado.ioloop import IOLoop
            from tornado.tcpserver import TCPServer

            sockets = bind_sockets(0, address='127.0.0.1')
            fork_processes(3)
            async def post_fork_main():
                server = TCPServer()
                server.add_sockets(sockets)
            asyncio.run(post_fork_main())
            print(task_id(), end='')
        rv   rw   rx   r}   s       r   test_add_sockets!TestMultiprocess.test_add_sockets   r   r)   c                     [         R                  " S5      nU R                  U5      u  p#U R                  SR	                  [        U5      5      S5        U R                  US5        g )Na  
            import asyncio
            import socket
            from tornado.netutil import bind_sockets
            from tornado.process import task_id, fork_processes
            from tornado.tcpserver import TCPServer

            # Pick an unused port which we will be able to bind to multiple times.
            (sock,) = bind_sockets(0, address='127.0.0.1',
                family=socket.AF_INET, reuse_port=True)
            port = sock.getsockname()[1]

            fork_processes(3)

            async def main():
                server = TCPServer()
                server.listen(port, address='127.0.0.1', reuse_port=True)
            asyncio.run(main())
            print(task_id(), end='')
            rv   rw   rx   r}   s       r   test_listen_multi_reuse_port-TestMultiprocess.test_listen_multi_reuse_port   sV    
, ##D)-u5b!r)   r!   N)r#   r$   r%   r&   strr   rs   r   r   r   r   r(   r!   r)   r   r_   r_   w   s1    , ,c3h ,"(",","r)   r_   )r-   ri   rk   ry   unittesttornador   tornado.iostreamr   tornado.logr   tornado.tcpserverr   tornado.test.utilr   tornado.testingr   r	   r
   r   typingr   r   TestCaser_   r!   r)   r   <module>r      s_      
    %  ' + P P aM aL n"x(( n" n"r)   