HEX
Server: Apache
System: Linux opal14.opalstack.com 3.10.0-1160.108.1.el7.x86_64 #1 SMP Thu Jan 25 16:17:31 UTC 2024 x86_64
User: curbgloabal_opal (1234)
PHP: 8.1.29
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //lib64/python3.6/test/__pycache__/test_threadsignals.cpython-36.pyc
3


 \Q(�@s�dZddlZddlZddlZddlZddlmZejd�Zddl	Z	ej
dd�dkrdejdej
��ej�Z
ej�Zejjdko�ejjd	kZd
d�Zdd
�Zdd�ZGdd�dej�Zdd�Zedkr�e�dS)z6PyUnit testing that threads honor our signal semantics�N)�support�_thread��winzCan't test signal on %sZpthreadz
mutex+condcCs4tjtj|�}tjtj|�}tjtj|�}|||fS)N)�signal�SIGUSR1�SIGUSR2�SIGALRM)Zfor_usr1Zfor_usr2Zfor_alrmZusr1Zusr2Zalrm�r
�*/usr/lib64/python3.6/test_threadsignals.py�registerSignalssrcCs(t|dd7<tj�t|d<dS)N�tripped��
tripped_by)�signal_blackboard�thread�	get_ident)�sig�framer
r
r�handle_signalssrcCs(tjttj�tjttj�tj�dS)N)�os�kill�process_pidrrr�
signalled_all�releaser
r
r
r�send_signals"src@s�eZdZdd�Zdd�Zdd�Zejed�eje	j
jd�oBe	jj
d	�eje	j
jd
�d�dd
����Zejed�eje	j
jd�o�e	jj
d	�eje	j
jd
�d�dd����Zdd�Zdd�Zdd�Zdd�ZdS)�
ThreadSignalscCs�tj��tj�|j�tj�WdQRXttjddksPttjddkrtztj	d�tj
�Wdtj	d�X|jttjdd�|jttjdtj
��|jttjdd�|jttjdtj
��tj�dS)Nr
rrr)r�wait_threads_exitr�acquire�spawnSignallingThreadrrrr�alarm�pauseZassertEqualrrr)�selfr
r
r�test_signals)s"



zThreadSignals.test_signalscCstjtf�dS)N)r�start_new_threadr)r"r
r
rrIsz#ThreadSignals.spawnSignallingThreadcCst�dS)N)�KeyboardInterrupt)r"rrr
r
r�alarm_interruptLszThreadSignals.alarm_interruptz/POSIX condition variables cannot be interrupted�linuxzBIssue 34004: musl does not allow interruption of locks by signals.Zopenbsdz%lock cannot be interrupted on OpenBSDcCs�tjtj|j�}zPtj�}|j�tjd�tj�}|jt	|jdd�tj�|}|j
|d�Wdtjd�tjtj|�XdS)Nr�)�timeoutg@r)rr	r&r�
allocate_lockrr �time�assertRaisesr%�
assertLess)r"�oldalrm�lock�t1�dtr
r
r�test_lock_acquire_interruptionOs

z,ThreadSignals.test_lock_acquire_interruptioncs�tjtj|j�}z�tj���fdd�}tj��rtj|f�x"�jdd�r^�j	�t
jd�q>Wtjd�t
j
�}|j
t�jdd�t
j
�|}|j|d	�WdQRXWdtjd
�tjtj|�XdS)Ncs�j�dS)N)rr
)�rlockr
r�other_thread�szCThreadSignals.test_rlock_acquire_interruption.<locals>.other_threadF)�blockingg{�G�z�?rr()r)g@r)rr	r&r�RLockrrr$rrr+�sleepr r,r%r-)r"r.r4r0r1r
)r3r�test_rlock_acquire_interruptionns 


z-ThreadSignals.test_rlock_acquire_interruptioncs�d�_�fdd�}tjtj|�}zr�fdd�}tj��Ttj|f�x"�jdd�rf�j�t	j
d�qFW�j�}�j�j��j|�WdQRXWdtjtj|�XdS)NFcs
d�_dS)NT)�	sig_recvd)rr)r"r
r�
my_handler�sz9ThreadSignals.acquire_retries_on_intr.<locals>.my_handlercs6�j�tjd�tjttj�tjd��j�dS)Ng�?)	rr+r7rrrrrrr
)r/r
rr4�s


z;ThreadSignals.acquire_retries_on_intr.<locals>.other_thread)r5g{�G�z�?)r9rrrrrr$rrr+r7Z
assertTrue)r"r/r:�old_handlerr4�resultr
)r/r"r�acquire_retries_on_intr�s
z%ThreadSignals.acquire_retries_on_intrcCs|jtj��dS)N)r=rr*)r"r
r
r�!test_lock_acquire_retries_on_intr�sz/ThreadSignals.test_lock_acquire_retries_on_intrcCs|jtj��dS)N)r=rr6)r"r
r
r�"test_rlock_acquire_retries_on_intr�sz0ThreadSignals.test_rlock_acquire_retries_on_intrcs�d�_d�_d�_tj���j�tj���j��fdd�}tjtj|�}z���fdd�}�fdd�}tj	��Vtj
|f�|��j��j�j�jd��j�j�jd	��j�jd�WdQRXWdtjtj|�XdS)
Nrcs�jd7_dS)Nr)�
sigs_recvd)Zsignumr)r"r
rr:�sz@ThreadSignals.test_interrupted_timed_acquire.<locals>.my_handlercs$tj��_�jdd�tj��_dS)Ng�?)r))r+�startr�endr
)r/r"r
r�
timed_acquire�s
zCThreadSignals.test_interrupted_timed_acquire.<locals>.timed_acquirecs6x(td�D]}tjd�tjttj�q
W�j�dS)N�(g{�G�z�?)	�ranger+r7rrrrrr)�_)�doner
rr�s
zBThreadSignals.test_interrupted_timed_acquire.<locals>.send_signalsg@g333333�?)
rArBr@rr*rrrrrr$r-Z
assertGreater)r"r:r;rCrr
)rGr/r"r�test_interrupted_timed_acquire�s(
z,ThreadSignals.test_interrupted_timed_acquireN)�__name__�
__module__�__qualname__r#rr&�unittestZskipIf�USING_PTHREAD_COND�sys�platform�
startswith�thread_info�versionr2r8r=r>r?rHr
r
r
rr's*  rcCsRtjddd�tjddd�tjddd�iatttt�}ztjt	�Wdt|�XdS)Nr)r
r)
rrrr	rrrrZrun_unittestr)Zoldsigsr
r
r�	test_main�srS�__main__)�__doc__rLrrrNZtestr�
import_modulerr+rOZSkipTest�getpidrr*rrQ�namer/rMrrrZTestCaserrSrIr
r
r
r�<module>s*
	C