HEX
Server: Apache
System: Linux opal14.opalstack.com 3.10.0-1160.108.1.el7.x86_64 #1 SMP Thu Jan 25 16:17:31 UTC 2024 x86_64
User: curbgloabal_opal (1234)
PHP: 8.1.29
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //lib64/python3.6/test/__pycache__/test_timeout.cpython-36.pyc
3


 \o,�@s�dZddlZddlZddlmZejd�ZddlZddlZddl	Z	ej
�dd��ZGdd�dej�Z
Gd	d
�d
ej�ZGdd�de�ZGd
d�de�Zdd�Zedkr�e�dS)z&Unit tests for socket timeout feature.�N)�support�networkcCs2tj|��tj||tjtj�ddSQRXdS)z�Resolve an (host, port) to an address.

    We must perform name resolution before timeout tests, otherwise it will be
    performed by connect().
    r�N)r�transient_internet�socketZgetaddrinfo�AF_INET�SOCK_STREAM)�hostZport�r
�$/usr/lib64/python3.6/test_timeout.py�resolve_addresssrc@sXeZdZdZdd�Zdd�Zdd�Zdd	�Zd
d�Zdd
�Z	dd�Z
dd�Zdd�ZdS)�CreationTestCasez9Test case for socket.gettimeout() and socket.settimeout()cCstjtjtj�|_dS)N)rrr�sock)�selfr
r
r�setUpszCreationTestCase.setUpcCs|jj�dS)N)r�close)rr
r
r�tearDown!szCreationTestCase.tearDowncCs|j|jj�dd�dS)Nztimeout not disabled by default)�assertEqualr�
gettimeout)rr
r
r�testObjectCreation$sz#CreationTestCase.testObjectCreationcCs^|jjd�|j|jj�d�|jjd�|j|jj�d�|jjd�|j|jj�d�dS)Ng�z�Ga@�)r�
settimeoutrr)rr
r
r�testFloatReturnValue)sz%CreationTestCase.testFloatReturnValuecCsP|jjd�|jt|jj��td��|jjd�|jt|jj��td��dS)N�g�?g333333@)rrr�typer)rr
r
r�testReturnType4szCreationTestCase.testReturnTypecCs�|jjd�|jjd�|jjd�|jjd�|jt|jjd�|jt|jjd�|jt|jjf�|jt|jjg�|jt|jji�|jt|jjd�dS)Nrg�y)rr�assertRaises�	TypeError)rr
r
r�
testTypeCheck<szCreationTestCase.testTypeCheckcCs:|jt|jjd�|jt|jjd�|jt|jjd�dS)Nrg�?���r g�)r�
ValueErrorrr)rr
r
r�testRangeCheckIszCreationTestCase.testRangeCheckcCs�|jjd�|jjd�|j|jj�d�|jjd�|j|jj�d�|jjd�|jjd�|j|jj�d�|jjd�|j|jj�d�dS)N�
rrg)rr�setblockingrr)rr
r
r�testTimeoutThenBlockingOsz(CreationTestCase.testTimeoutThenBlockingcCsX|jjd�|jjd�|j|jj�d�|jjd�|jjd�|j|jj�d�dS)Nrr)rr$rrr)rr
r
r�testBlockingThenTimeout]sz(CreationTestCase.testBlockingThenTimeoutN)
�__name__�
__module__�__qualname__�__doc__rrrrrrr"r%r&r
r
r
rr
s
r
c@s*eZdZdZejZdd�ZeZdd�Z	dS)�TimeoutTestCaseg@cCs
t��dS)N)�NotImplementedError)rr
r
rrsszTimeoutTestCase.setUpc	Gs�|jj|�t|j|�}xbt|�D]L}tj�}y||�Wq"tjk
rl}ztj�|}PWYdd}~Xq"Xq"W|jd�|j|||j	�|j
||d�dS)z�
        Test the specified socket method.

        The method is run at most `count` times and must raise a socket.timeout
        within `timeout` + self.fuzz seconds.
        Nzsocket.timeout was not raisedg�?)rr�getattr�range�timer�timeoutZfailZ
assertLess�fuzzZ
assertGreater)	r�countr0�method�args�iZt1�eZdeltar
r
r�_sock_operationxs
zTimeoutTestCase._sock_operationN)
r'r(r)r1rZHOST�	localhostrrr7r
r
r
rr+hs
r+c@sPeZdZdZdd�Zdd�Zdd�Zdd	�Zd
d�Zdd
�Z	dd�Z
dd�ZdS)�TCPTimeoutTestCasez3TCP test case for socket.socket() timeout functionscCs"tjtjtj�|_tdd�|_dS)Nzwww.python.org.�P)rrrrr�addr_remote)rr
r
rr�szTCPTimeoutTestCase.setUpcCs|jj�dS)N)rr)rr
r
rr�szTCPTimeoutTestCase.tearDownc Cstdd�}tdd�}d}tjtjtj�}d}|j|�zZy|j|�WnFtjk
r\Yn2tk
r�}z|jtj	kr|d}WYdd}~XnXWd|j
�~X|r�|jdj|d	|d
||d	|d
��||_
tj|j
d	��|jd
dd|j
�WdQRXdS)
Nzblackhole.snakebite.netiZ�zwhitehole.snakebite.neti[�TrFz�We didn't receive a connection reset (RST) packet from {}:{} within {} seconds, so we're unable to test connect timeout against the corresponding {}:{} (which is configured to silently drop packets).rrg����MbP?�connect)rrrrrr<r0�OSError�errnoZECONNREFUSEDrZskipTest�formatr;rrr7)rZ	blackholeZ	whitehole�skiprr0�errr
r
r�testConnectTimeout�s4


z%TCPTimeoutTestCase.testConnectTimeoutcCs>tj|jd��$|jj|j�|jdddd�WdQRXdS)Nrrg�?Zrecvi)rrr;rr<r7)rr
r
r�testRecvTimeout�sz"TCPTimeoutTestCase.testRecvTimeoutcCs,tj|j|j�|jj�|jddd�dS)Nrg�?Zaccept)r�	bind_portrr8�listenr7)rr
r
r�testAcceptTimeout�s
z$TCPTimeoutTestCase.testAcceptTimeoutc
CsZtjtjtj��@}tj||j�|j�|jj|j	��|j
ddddd�WdQRXdS)N�dg�?�send�Xi@
)rrrrrDr8rErr<�getsocknamer7)r�servr
r
r�testSend�s
zTCPTimeoutTestCase.testSendc
Cs`tjtjtj��F}tj||j�|j�|jj|j	��|j
ddddd|j	��WdQRXdS)NrGg�?ZsendtorIi@
)rrrrrDr8rErr<rJr7)rrKr
r
r�
testSendtoszTCPTimeoutTestCase.testSendtoc
CsZtjtjtj��@}tj||j�|j�|jj|j	��|j
ddddd�WdQRXdS)NrGg�?ZsendallrIi@
)rrrrrDr8rErr<rJr7)rrKr
r
r�testSendalls
zTCPTimeoutTestCase.testSendallN)r'r(r)r*rrrBrCrFrLrMrNr
r
r
rr9�sT	
r9c@s(eZdZdZdd�Zdd�Zdd�ZdS)	�UDPTimeoutTestCasez3UDP test case for socket.socket() timeout functionscCstjtjtj�|_dS)N)rrZ
SOCK_DGRAMr)rr
r
rrszUDPTimeoutTestCase.setUpcCs|jj�dS)N)rr)rr
r
rrszUDPTimeoutTestCase.tearDowncCs$tj|j|j�|jdddd�dS)Nrg�?Zrecvfromi)rrDrr8r7)rr
r
r�testRecvfromTimeoutsz&UDPTimeoutTestCase.testRecvfromTimeoutN)r'r(r)r*rrrPr
r
r
rrOsrOcCstjd�tjttt�dS)Nr)rZrequiresZrun_unittestr
r9rOr
r
r
r�	test_main&s

rQ�__main__)r*�	functoolsZunittestZtestrZis_resource_enabledZ
skip_expectedr/r>r�	lru_cacherZTestCaser
r+r9rOrQr'r
r
r
r�<module>s M'