!*K  SSKrSSKrSSKrSSKrSSKrSSKrSSKrSSKrSSKrSSK r SSK r SSK r SSK r SSK r SSKJr SSKJrJrJrJr SSKJrJr SSKr"SS\ R45r\ R8"\ R:S:HS5"S S \ R455r\ R>"\ R:S:HS 5"S S \ R455r "SS\ R45r!\ R8"\ R:S:HS5"SS\ R455r"\ R>"\#"\S5S5"SS\ R455r$\ R8"\ R:S:HS5\ R>"\#"\S5S5\RJ"5\ R>"\#"\S5S5"SS\ R455555r&\ R8"\ R:S:HS5\ R>"\#"\S5=(a \#"\S5S5"SS \ R4555r'"S!S"\ R45r("S#S$\ R45r)"S%S&\ R45r*"S'S(\ R45r+S)r,\-S*:Xa\ R\"5 gg!\a SrGNjf=f)+N)support)is_appleis_apple_mobile os_helperthreading_helper)assert_python_ok spawn_pythonc \rSrSrSrSrSrg) GenericTestsc,[[5GH n[[U5nUS;a"URU[R5 M<US;a"URU[R 5 MdUR S5(a8UR S5(d"URU[R5 MUR S5(dMURU[R5 UR[RS5 GM [R"[RSSS [S 9n[R"U[R5 [R"[RS SS [S 9n[R"U[R5 [[S S5nUbB[R"[RS SS[S 9n[R"Xe5 gg)N>SIG_DFLSIG_IGN> SIG_BLOCK SIG_SETMASK SIG_UNBLOCKSIGSIG_CTRL_win32SignalssignalcUR5=(a. URS5=(a URS5(+=(d URS5$)Nrrr)isupper startswithnames 7/opt/alt/python313/lib64/python3.13/test/test_signal.py)GenericTests.test_enums..+sDLLNQ/O8O4O0w/0)sourceHandlersc US;$)N)rrrs rrr 5s T%;;r!Sigmasksc US;$)N)rrrr%rs rrr >s )T!Tr!)dirrgetattrassertIsInstancer#r&rr assertEqualsysplatformenum _old_convert_IntEnum_test_simple_enum)selfrsigCheckedSignalsCheckedHandlersr&CheckedSigmaskss r test_enumsGenericTests.test_enumss|KD&$'C--%%c6??;DD%%c6??;''0G0G%%c6>>:))%%c6>>:  w7 ++ i0  ~v~~>,, j(; @6:t4  "00LL*hT!O  " "? = r!c[[5Hin[[U5n[R"U5(dM0[R "U5(aMMUR URS5 Mk g)Nr)r(rr)inspect isroutine isbuiltinr+ __module__)r2rvalues rtest_functions_module_attr'GenericTests.test_functions_module_attrCsUKDFD)E  ''0A0A%0H0H  !1!18< r!r%N)__name__r= __qualname____firstlineno__r7r?__static_attributes__r%r!rr r s %>N=r!r rzNot valid on WindowscP\rSrSrSrSrSrSrSrSr \ R"\ RRS5S 5S 5rS r\ R""\"\S 5S 5S5r\ R""\ R*S5\R."5S55rSrg) PosixTestsLcgNr%r2argss rtrivial_signal_handler!PosixTests.trivial_signal_handlerNs r!cD[R"URU5$rI) functoolspartialrL)r2arguments rcreate_handler_with_partial&PosixTests.create_handler_with_partialQs  !r!c\"SS5nU"5nURSUR5 URU5n[R"[RU5nUR U[R 5 UR[R"[R5U5 [R"[RU5 UR[R"[R5U5 URSUR5 g)Nc.^\rSrSrSrU4SjrSrU=r$)GPosixTests.test_no_repr_is_called_on_signal_handler..MyArgumentkcSUlgNr repr_countrYs r__init__PPosixTests.test_no_repr_is_called_on_signal_handler..MyArgument.__init__ls "#r!cJ>U=RS- sl[TU] 5$N)rmsuper__repr__)r2 __class__s rrtPPosixTests.test_no_repr_is_called_on_signal_handler..MyArgument.__repr__os1$w'))r!rl)rAr=rBrCrnrtrD __classcell__)rus@r MyArgumentrhks $ * *r!rxr)r+rmrRrrbr*r#rW)r2rxrQhandlerrcs r(test_no_repr_is_called_on_signal_handler3PosixTests.test_no_repr_is_called_on_signal_handlerhs * *< H//0228<mmFMM73 c6??3 ))&--8'B fmmS) ))&--8#> H//0r!netbsdz/gh-124083: strsignal is not supported on NetBSDc<URS[R"[R55 URS[R"[R55 URS[R"[R 55 g)N Interrupt TerminatedHangup)assertInrrXSIGINTSIGTERMrbrYs rtest_strsignalPosixTests.test_strsignal~s[ k6#3#3FMM#BC lF$4$4V^^$DE h 0 0 ?@r!c[RR[5n[RR US5n[ U5 g)Nzsignalinterproctester.py)ospathdirname__file__joinr)r2rscripts rtest_interprocess_signal#PosixTests.test_interprocess_signals1''//(+g'AB r! valid_signalszrequires signal.valid_signalsc[R"5nURU[5 UR [R R U5 UR [R RU5 URSU5 UR[RU5 UR[U5[R5 [[5H}nURS5(dMUS;aM#URUS9 [[U5nUR!US5 URU[R5 SSS5 M g!,(df  M=f)Nrr>rrr)rrr*setrrrSIGALRM assertNotInNSIG assertLesslenr(rsubTestr)assertGreaterEqual)r2srsignums rtest_valid_signalsPosixTests.test_valid_signalss  " a% fnn++Q/ fnn,,a0 A a( A ,KD??5))--4( .''2 4)(  )(s AE,, E; sys.executable required.c[R"[RSS/[RS9nUR SUR 5 URUR[R*5 g)N-czaimport os, signal, time os.kill(os.getpid(), signal.SIGINT) for _ in range(999): time.sleep(0.01)stderrKeyboardInterrupt) subprocessrunr, executablePIPErrr+ returncoderr)r2processs r!test_keyboard_interrupt_exit_code,PosixTests.test_keyboard_interrupt_exit_codes_..9:" ( *GNN; ++fmm^0?1,__S\\,,X6FHAHA ! ('5 5,)CD   " =#E =r!rFzWindows specificc\rSrSrSrSr\R"\RS5\ R"5S55r Sr g)WindowsSignalTestsc[R"5nURU[5 UR [ U5S5 UR [RRU5 URSU5 UR[RU5 UR[ U5[R5 g)Nr) rrr*rrrrrrrrrr2rs rr%WindowsSignalTests.test_valid_signalss  " a% A* fnn++Q/ A a( A ,r!cSn[5n[R[R[R[R [R [R[R4HXn[R"U5cM[R"U[R"X155 URU5 MZ URU5 UR[5 [R"SU5 SSS5 UR[5 [R"SU5 SSS5 g!,(df  ND=f!,(df  g=f)NcgrIr%)xys rr3WindowsSignalTests.test_issue9324..str!)rrSIGABRTSIGBREAKSIGFPESIGILLrSIGSEGVrrWadd assertTruerUrV)r2rycheckedr3s rtest_issue9324!WindowsSignalTests.test_issue9324s#%NNFOOV]]MM6==&..NN$C $0 c6==#>? C $    z * MM"g &+  z * MM!W %+ *+ *+ *s;E0E" E" E0rc[R"[RSS/[RS9nUR SUR 5 SnURURU5 g)Nrzraise KeyboardInterruptrrl:) rrr,rrrrr+r)r2rSTATUS_CONTROL_C_EXITs rr4WindowsSignalTests.test_keyboard_interrupt_exit_codesZ..'@A!( *GNN; * ++-BCr!r%N)rAr=rBrCrrrrr,rrrrrDr%r!rrrsF-&*)CD   " D#E Dr!rc2\rSrSrSrSr\R"\RS5S5r \R"\RS5\R"\ "\S5S5S 55r\R"\RS5\R"\RS5S 55r\R"\R$S :HS 5\R"\RS5\R"\ "\S5S5S 555rSrg) WakeupFDTestscNUR[5 [R"[RS9 SSS5 UR[5 [R"[RS5 SSS5 g!,(df  NR=f!,(df  g=f)N)rF)rUr]r set_wakeup_fdrrYs rtest_invalid_callWakeupFDTests.test_invalid_callse   y )   6*  y )   6* ) * )* )s#B&B B B$c[R"5nUR[[4[ R U5 grI)r make_bad_fdrUrVOSErrorrr)r2fds rtest_invalid_fdWakeupFDTests.test_invalid_fds0  " " $ :w/ .. 4r!zneeds working sockets.c[R"5nUR5nUR5 UR[[ 4[ RU5 grI)socketfilenocloserUrVrrr)r2sockrs rtest_invalid_socket!WakeupFDTests.test_invalid_socketsA}} [[]  :w/ .. 4r!zEmscripten cannot fstat pipes.piperequires os.pipe()c[R"5upUR[RU5 UR[RU5 [R"5up4UR[RU5 UR[RU5 [ [S5(a.[R "US5 [R "US5 [ R"U5 UR[ R"U5U5 UR[ R"S5U5 UR[ R"S5S5 g)N set_blockingFr) rr addCleanuprrrrrr+)r2r1w1r2w2s rtest_set_wakeup_fd_result'WakeupFDTests.test_set_wakeup_fd_results "% "% "% "% 2~ & & OOB & OOB &R  --b126 --b126 --b126r!cX[R"5nURUR5 URS5 UR 5n[R"5nURUR5 URS5 UR 5n[ R "U5 UR[ R "U5U5 UR[ R "S5U5 UR[ R "S5S5 g)NFr)rrr setblockingrrrr+)r2sock1fd1sock2fd2s r test_set_wakeup_fd_socket_result.WakeupFDTests.test_set_wakeup_fd_socket_results   $ % lln   $ % llnS! --c2C8 --b137 --b126r!rztests specific to POSIXc@[R"5upUR[RU5 UR[RU5 [R"US5 UR [ 5n[R"U5 SSS5 UR[WR5SU-5 [R"US5 [R"U5 [R"S5 g!,(df  Nz=f)NTz&the fd %s must be in non-blocking modeFr) rrrrrrUrVrrr+str exception)r2rfdwfdcms rtest_set_wakeup_fd_blocking)WakeupFDTests.test_set_wakeup_fd_blocking)s779 #& #& T"   z *b   %+ R\\*ACG I U#S!R + *s D Dr%N)rAr=rBrCrrrrrhas_socket_supportrr is_emscriptenrrrrr,r-rrDr%r!rrrs!74 335MN4O4__W**,LM V,.BC7DN7"__W**,LM 335MN7ON7$__S\\W,.GH __W**,LM V,.BC!DNI!r!rc\rSrSr\R "\SLS5SS.Sj5r\R "\SLS5\R"\ "\ S5S5S 55r S r S r S r\R"\ "\S 5S5S5rSrg)WakeupSignalTestsi>Nneed _testcapiTorderedcnSR[[[U55X!5n[ SU5 g)Naif 1: import _testcapi import os import signal import struct signals = {!r} def handler(signum, frame): pass def check_signum(signals): data = os.read(read, len(signals)+1) raised = struct.unpack('%uB' % len(data), data) if not {!r}: raised = set(raised) signals = set(signals) if raised != signals: raise Exception("%r != %r" % (raised, signals)) {} signal.signal(signal.SIGALRM, handler) read, write = os.pipe() os.set_blocking(write, False) signal.set_wakeup_fd(write) test() check_signum(signals) os.close(read) os.close(write) r)formattuplemapintr)r2 test_bodyrsignalscodes r check_wakeupWakeupSignalTests.check_wakeup@s5  @ F5S'*+W @A D t$r!rrczSn[R"5up#[R"US5 URS5 [R "U5 [R "U5 [ SU5 g![a NEf=f![R "U5 [R "U5 f=f)Na&if 1: import _testcapi import errno import os import signal import sys from test.support import captured_stderr def handler(signum, frame): 1/0 signal.signal(signal.SIGALRM, handler) r, w = os.pipe() os.set_blocking(r, False) # Set wakeup_fd a read-only file descriptor to trigger the error signal.set_wakeup_fd(r) try: with captured_stderr() as err: signal.raise_signal(signal.SIGALRM) except ZeroDivisionError: # An ignored exception should have been printed out on stderr err = err.getvalue() if ('Exception ignored when trying to write to the signal wakeup fd' not in err): raise AssertionError(err) if ('OSError: [Errno %d]' % errno.EBADF) not in err: raise AssertionError(err) else: raise AssertionError("ZeroDivisionError not raised") os.close(r) os.close(w) xz9OS doesn't report write() error on the read end of a piper)rrwriteskipTestrrr)r2rrws rtest_wakeup_write_error)WakeupSignalTests.test_wakeup_write_errorgs ! Dwwy  HHQ  MMU V HHQK HHQKt$    HHQK HHQKs(A<B < B B B  B .B:cDURS[R5 g)Nadef test(): import select import time TIMEOUT_FULL = 10 TIMEOUT_HALF = 5 class InterruptSelect(Exception): pass def handler(signum, frame): raise InterruptSelect signal.signal(signal.SIGALRM, handler) signal.alarm(1) # We attempt to get a signal during the sleep, # before select is called try: select.select([], [], [], TIMEOUT_FULL) except InterruptSelect: pass else: raise Exception("select() was not interrupted") before_time = time.monotonic() select.select([read], [], [], TIMEOUT_FULL) after_time = time.monotonic() dt = after_time - before_time if dt >= TIMEOUT_HALF: raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) r rrrYs rtest_wakeup_fd_early&WakeupSignalTests.test_wakeup_fd_earlys  >^^? r!cDURS[R5 g)Na`def test(): import select import time TIMEOUT_FULL = 10 TIMEOUT_HALF = 5 class InterruptSelect(Exception): pass def handler(signum, frame): raise InterruptSelect signal.signal(signal.SIGALRM, handler) signal.alarm(1) before_time = time.monotonic() # We attempt to get a signal during the select call try: select.select([read], [], [], TIMEOUT_FULL) except InterruptSelect: pass else: raise Exception("select() was not interrupted") after_time = time.monotonic() dt = after_time - before_time if dt >= TIMEOUT_HALF: raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) rrYs rtest_wakeup_fd_during'WakeupSignalTests.test_wakeup_fd_durings  6^^7 r!cbURS[R[R5 g)Nzdef test(): signal.signal(signal.SIGUSR1, handler) signal.raise_signal(signal.SIGUSR1) signal.raise_signal(signal.SIGALRM) )r rr^rrYs r test_signumWakeupSignalTests.test_signums$  ^^V^^  -r!pthread_sigmaskneed signal.pthread_sigmask()c`URS[R[RSS9 g)Nadef test(): signum1 = signal.SIGUSR1 signum2 = signal.SIGUSR2 signal.signal(signum1, handler) signal.signal(signum2, handler) signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2)) signal.raise_signal(signum1) signal.raise_signal(signum2) # Unblocking the 2 signals calls the C signal handler twice signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2)) Fr)r rr^SIGUSR2rYs r test_pendingWakeupSignalTests.test_pendings-   nnfnne  =r!r%)rAr=rBrCrr _testcapir rrrrrrrrr!rDr%r!rrr>s __Y$&(898<$%:$%L__Y$&(89 V,.BC1%D:1%f D<-):;8: =: =r!r socketpairzneed socket.socketpairc\rSrSr\R "\SLS5S5r\R "\SLS5S5r\R "\SLS5S5r Sr g)WakeupSocketSignalTestsiNrc Sn[SU5 g)Naif 1: import signal import socket import struct import _testcapi signum = signal.SIGINT signals = (signum,) def handler(signum, frame): pass signal.signal(signum, handler) read, write = socket.socketpair() write.setblocking(False) signal.set_wakeup_fd(write.fileno()) signal.raise_signal(signum) data = read.recv(1) if not data: raise Exception("no signum written") raised = struct.unpack('B', data) if raised != signals: raise Exception("%r != %r" % (raised, signals)) read.close() write.close() rrr2rs r test_socket#WakeupSocketSignalTests.test_sockets > t$r!cl[RS:XaSnOSnSRUS9n[SU5 g)Nntsendr a+if 1: import errno import signal import socket import sys import time import _testcapi from test.support import captured_stderr signum = signal.SIGINT def handler(signum, frame): pass signal.signal(signum, handler) read, write = socket.socketpair() read.setblocking(False) write.setblocking(False) signal.set_wakeup_fd(write.fileno()) # Close sockets: send() will fail read.close() write.close() with captured_stderr() as err: signal.raise_signal(signum) err = err.getvalue() if ('Exception ignored when trying to {action} to the signal wakeup fd' not in err): raise AssertionError(err) actionrrrrrr2r0rs rtest_send_error'WakeupSocketSignalTests.test_send_errors@ 77d?FF! B F&F !C D t$r!cl[RS:XaSnOSnSRUS9n[SU5 g)Nr-r.r aJ if 1: import errno import signal import socket import sys import time import _testcapi from test.support import captured_stderr signum = signal.SIGINT # This handler will be called, but we intentionally won't read from # the wakeup fd. def handler(signum, frame): pass signal.signal(signum, handler) read, write = socket.socketpair() # Fill the socketpair buffer if sys.platform == 'win32': # bpo-34130: On Windows, sometimes non-blocking send fails to fill # the full socketpair buffer, so use a timeout of 50 ms instead. write.settimeout(0.050) else: write.setblocking(False) written = 0 if sys.platform == "vxworks": CHUNK_SIZES = (1,) else: # Start with large chunk size to reduce the # number of send needed to fill the buffer. CHUNK_SIZES = (2 ** 16, 2 ** 8, 1) for chunk_size in CHUNK_SIZES: chunk = b"x" * chunk_size try: while True: write.send(chunk) written += chunk_size except (BlockingIOError, TimeoutError): pass print(f"%s bytes written into the socketpair" % written, flush=True) write.setblocking(False) try: write.send(b"x") except BlockingIOError: # The socketpair buffer seems full pass else: raise AssertionError("%s bytes failed to fill the socketpair " "buffer" % written) # By default, we get a warning when a signal arrives msg = ('Exception ignored when trying to {action} ' 'to the signal wakeup fd') signal.set_wakeup_fd(write.fileno()) with captured_stderr() as err: signal.raise_signal(signum) err = err.getvalue() if msg not in err: raise AssertionError("first set_wakeup_fd() test failed, " "stderr: %r" % err) # And also if warn_on_full_buffer=True signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True) with captured_stderr() as err: signal.raise_signal(signum) err = err.getvalue() if msg not in err: raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) " "test failed, stderr: %r" % err) # But not if warn_on_full_buffer=False signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False) with captured_stderr() as err: signal.raise_signal(signum) err = err.getvalue() if err != "": raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) " "test failed, stderr: %r" % err) # And then check the default again, to make sure warn_on_full_buffer # settings don't leak across calls. signal.set_wakeup_fd(write.fileno()) with captured_stderr() as err: signal.raise_signal(signum) err = err.getvalue() if msg not in err: raise AssertionError("second set_wakeup_fd() test failed, " "stderr: %r" % err) r/rr1r2s rtest_warn_on_full_buffer0WakeupSocketSignalTests.test_warn_on_full_bufferGsA 77d?FFg N F&F !O P t$r!r%) rAr=rBrCrrr#r*r3r6rDr%r!rr&r&su__Y$&(89!%:!%F__Y$&(89(%:(%T__Y$&(89n%:n%r!r& siginterruptzneeds signal.siginterrupt()rrcp\rSrSr\R 4SjrSrSr\R"S5S5r Sr g) SiginterruptTestic~SU<S3n[SU5nURR5nURUS9upgXV-nUR 5nUS;a[ SU<SU<35eUS:HsSSS5 $![ Ra UR5 SSS5 g f=f!,(df  g=f) Nzif 1: import errno import os import signal import sys interrupt = aq r, w = os.pipe() def handler(signum, frame): 1 / 0 signal.signal(signal.SIGALRM, handler) if interrupt is not None: signal.siginterrupt(signal.SIGALRM, interrupt) print("ready") sys.stdout.flush() # run the test twice try: for loop in range(2): # send a SIGALRM in a second (during the read) signal.alarm(1) try: # blocking call: read from a pipe without data os.read(r, 1) except ZeroDivisionError: pass else: sys.exit(2) sys.exit(3) finally: os.close(r) os.close(w) rtimeout)Child error (exit code ): r?F) r stdoutreadline communicatewait ExceptionrTimeoutExpiredkill) r2 interruptr=rr first_linerBrexitcodes rreadpipe_interrupted%SiginterruptTest.readpipe_interruptedsTG#H$ % '$^^446 !(!4!4W!4!E $,"<<>6)#'/%9:: A & % ,,  & %   & %s.B.+A<1B.<$B+ B.*B++B.. B<cHURS5nURU5 grIrLrr2 interrupteds rtest_without_siginterrupt*SiginterruptTest.test_without_siginterrupt //5   $r!cHURS5nURU5 gNTrOrPs rtest_siginterrupt_on%SiginterruptTest.test_siginterrupt_onrTr!walltimecFURSSS9nURU5 g)NFr>r<)rL assertFalserPs rtest_siginterrupt_off&SiginterruptTest.test_siginterrupt_off s' //q/A  %r!r%N) rAr=rBrCr SHORT_TIMEOUTrLrRrWrequires_resourcer\rDr%r!rr:r:s< 7>6K6K:'x%% z*&+&r!r: getitimer setitimerz/needs signal.getitimer() and signal.setitimer()c\rSrSrSrSrSrSrSrSr Sr \ R"\ RS ;=(d \S 5S 5rS rS rSrg) ItimerTesticSUlSUlSUl[R"[RUR 5Ulg)NFr) hndl_called hndl_countitimerrrsig_alrm old_alarmrYs rsetUpItimerTest.setUps2  v~~t}}Er!c[R"[RUR5 URb"[R"URS5 ggrk)rrrirgrarYs rtearDownItimerTest.tearDowns; fnndnn5 ;; "   T[[! , #r!cSUlgrV)rerJs rrhItimerTest.sig_alrm"s r!cSUlURS:a[R"S5eURS:Xa%[R"[R S5 U=RS- slg)NTr?z.setitimer didn't disable ITIMER_VIRTUAL timer.rrr)rerfr ItimerErrorraITIMER_VIRTUALrJs r sig_vtalrmItimerTest.sig_vtalrm%s^ ??Q $$&  __ !   V22A 6 1r!c\SUl[R"[RS5 g)NTr)rerra ITIMER_PROFrJs rsig_profItimerTest.sig_prof2s ++Q/r!cdUR[R[RSS5 g)Nrr)rUrrrra ITIMER_REALrYs rtest_itimer_excItimerTest.test_itimer_exc6s' &,,f.>.>AF r!c[RUl[R"URS5 [R"5 UR UR S5 g)Ng?T)rr{rgrapauser+rerYs rtest_itimer_realItimerTest.test_itimer_real?sB(( c*  ))40r!)netbsd5zDitimer not reliable (does not mix well with threading) on some BSDs.c@[RUl[R"[RUR5 [R "URSS5 [ R"[ R5HDn[S[S555n[R"UR5S:XdMD O UR[R"UR5S5 URURS5 g)NMbP?c3*# UH oU-v M g7frIr%.0is r 1ItimerTest.test_itimer_virtual..O0.`rrrrT)rrwrgSIGPROFrxrarrrrrr`r+rers rtest_itimer_profItimerTest.test_itimer_profYs((  fnndmm4c3/##G$8$89A05<00A , : : ))$++6 C ))40r!c[RUl[R"URS5 [R "S5 UR URS5 g)Nư>rrT)rr{rgratimesleepr+rerYs rtest_setitimer_tinyItimerTest.test_setitimer_tinyjsF(( d+ 1  ))40r!)rerfrgriN)rAr=rBrCrjrmrhrtrxr|rrrr,r-rrrrrDr%r!rrcrcscF -   0H1__S\\\1D_NP1P1"1"1r!rcc\rSrSr\R "\"\S5S5S5r\R "\"\S5S5\R "\"\S5S5S55r \R "\"\S5S 5\ R"5S 55r \R "\"\S5S5S 5r \R "\"\S 5S 5S5r\R "\"\S5S5S5r\R "\"\S5S5S5r\R "\"\S5S5S5r\R "\"\S5S5S5r\R "\"\S5S5S5r\R "\"\S 5S 5\R "\"\S5S5\ R"5S555r\R "\"\S5S5S5r\R "\"\S5S5S5r\R "\"\S5S5\ R"5S55r\R "\"\S5S 5\ R"5S55rSrg)PendingSignalsTestsit sigpendingzneed signal.sigpending()c^UR[R"5[55 grI)r+rrrrYs rtest_sigpending_empty)PendingSignalsTests.test_sigpending_emptyys **,ce4r!rrc Sn[SU5 g)Na if 1: import os import signal def handler(signum, frame): 1/0 signum = signal.SIGUSR1 signal.signal(signum, handler) signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) os.kill(os.getpid(), signum) pending = signal.sigpending() for sig in pending: assert isinstance(sig, signal.Signals), repr(pending) if pending != {signum}: raise Exception('%s != {%s}' % (pending, signum)) try: signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) except ZeroDivisionError: pass else: raise Exception("ZeroDivisionError not raised") rr(r)s rtest_sigpending#PendingSignalsTests.test_sigpending~s  0 t$r! pthread_killzneed signal.pthread_kill()c Sn[SU5 g)Naif 1: import signal import threading import sys signum = signal.SIGUSR1 def handler(signum, frame): 1/0 signal.signal(signum, handler) tid = threading.get_ident() try: signal.pthread_kill(tid, signum) except ZeroDivisionError: pass else: raise Exception("ZeroDivisionError not raised") rr(r)s rtest_pthread_kill%PendingSignalsTests.test_pthread_kills ( t$r!cNSUR5<SU<S3n[SU5 g)Nzif 1: import signal import sys from signal import Signals def handler(signum, frame): 1/0 z blocked = a signum = signal.SIGALRM # child: block and wait the signal try: signal.signal(signum, handler) signal.pthread_sigmask(signal.SIG_BLOCK, [blocked]) # Do the tests test(signum) # The handler must not be called on unblock try: signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked]) except ZeroDivisionError: print("the signal handler has been called", file=sys.stderr) sys.exit(1) except BaseException as err: print("error: {}".format(err), file=sys.stderr) sys.stderr.flush() sys.exit(1) r)stripr)r2blockedtestrs r wait_helperPendingSignalsTests.wait_helpers%Nzz|WA &J t$r!sigwaitzneed signal.sigwait()cDUR[RS5 g)Na  def test(signum): signal.alarm(1) received = signal.sigwait([signum]) assert isinstance(received, signal.Signals), received if received != signum: raise Exception('received %s, not %s' % (received, signum)) rrrrYs r test_sigwait PendingSignalsTests.test_sigwaits *  r! sigwaitinfozneed signal.sigwaitinfo()cDUR[RS5 g)Nz def test(signum): signal.alarm(1) info = signal.sigwaitinfo([signum]) if info.si_signo != signum: raise Exception("info.si_signo != %s" % signum) rrYs rtest_sigwaitinfo$PendingSignalsTests.test_sigwaitinfo *  r! sigtimedwaitzneed signal.sigtimedwait()cDUR[RS5 g)Nz def test(signum): signal.alarm(1) info = signal.sigtimedwait([signum], 10.1000) if info.si_signo != signum: raise Exception('info.si_signo != %s' % signum) rrYs rtest_sigtimedwait%PendingSignalsTests.test_sigtimedwaitrr!cDUR[RS5 g)Nz def test(signum): import os os.kill(os.getpid(), signum) info = signal.sigtimedwait([signum], 0) if info.si_signo != signum: raise Exception('info.si_signo != %s' % signum) rrYs rtest_sigtimedwait_poll*PendingSignalsTests.test_sigtimedwait_polls *  r!cDUR[RS5 g)Nz def test(signum): received = signal.sigtimedwait([signum], 1.0) if received is not None: raise Exception("received=%r" % (received,)) rrYs rtest_sigtimedwait_timeout-PendingSignalsTests.test_sigtimedwait_timeouts *  r!cr[RnUR[[RU/S5 g)Ng)rrrUrVr)r2rs r"test_sigtimedwait_negative_timeout6PendingSignalsTests.test_sigtimedwait_negative_timeouts) *f&9&9F8TJr!c[SS5 g)Nraif True: import os, threading, sys, time, signal # the default handler terminates the process signum = signal.SIGUSR1 def kill_later(): # wait until the main thread is waiting in sigwait() time.sleep(1) os.kill(os.getpid(), signum) # the signal must be blocked by all the threads signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) killer = threading.Thread(target=kill_later) killer.start() received = signal.sigwait([signum]) if received != signum: print("sigwait() received %s, not %s" % (received, signum), file=sys.stderr) sys.exit(1) killer.join() # unblock the signal, which should have been cleared by sigwait() signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) r(rYs rtest_sigwait_thread'PendingSignalsTests.test_sigwait_thread$s   r!cPUR[[R5 UR[[RS5 UR[[RSSS5 UR[[RS/5 UR[ 5 [R"[R [R/5 SSS5 UR[ 5 [R"[R S/5 SSS5 UR[ 5 [R"[R SS-/5 SSS5 g!,(df  N=f!,(df  Ng=f!,(df  g=f)Nrrr>r?iri)rUr]rrrrVrrrYs rtest_pthread_sigmask_arguments2PendingSignalsTests.test_pthread_sigmask_argumentsGs  )V%;%;< )V%;%;Q? )V%;%;Q1E '6#9#94D   z *  " "6#3#3fkk] C+   z *  " "6#3#3aS 9+   z *  " "6#3#3agY ?+ * + * * * * *s$,5E5>'F*F5 F F F%c[R"[R[R"55nUR [R[R U5 [R"[R [R"55nURU[R"55 grI)rrrrrrrassertLessEqualrs r"test_pthread_sigmask_valid_signals6PendingSignalsTests.test_pthread_sigmask_valid_signalsUs{  " "6#3#3V5I5I5K L ..0B0BAF  " "6#5#5v7K7K7M N Q 4 4 67r!c Sn[SU5 g)Na- if 1: import signal import os; import threading def handler(signum, frame): 1/0 def kill(signum): os.kill(os.getpid(), signum) def check_mask(mask): for sig in mask: assert isinstance(sig, signal.Signals), repr(sig) def read_sigmask(): sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, []) check_mask(sigmask) return sigmask signum = signal.SIGUSR1 # Install our signal handler old_handler = signal.signal(signum, handler) # Unblock SIGUSR1 (and copy the old mask) to test our signal handler old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) check_mask(old_mask) try: kill(signum) except ZeroDivisionError: pass else: raise Exception("ZeroDivisionError not raised") # Block and then raise SIGUSR1. The signal is blocked: the signal # handler is not called, and the signal is now pending mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) check_mask(mask) kill(signum) # Check the new mask blocked = read_sigmask() check_mask(blocked) if signum not in blocked: raise Exception("%s not in %s" % (signum, blocked)) if old_mask ^ blocked != {signum}: raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum)) # Unblock SIGUSR1 try: # unblock the pending signal calls immediately the signal handler signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) except ZeroDivisionError: pass else: raise Exception("ZeroDivisionError not raised") try: kill(signum) except ZeroDivisionError: pass else: raise Exception("ZeroDivisionError not raised") # Check the new mask unblocked = read_sigmask() if signum in unblocked: raise Exception("%s in %s" % (signum, unblocked)) if blocked ^ unblocked != {signum}: raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum)) if old_mask != unblocked: raise Exception("%s != %s" % (old_mask, unblocked)) rr(r)s rtest_pthread_sigmask(PendingSignalsTests.test_pthread_sigmask^sG P t$r!cSn[SU5nUR5up4UR5nUS:wa[SU<SU<35eSSS5 g!,(df  g=f)Na7if True: import threading import signal import sys def handler(signum, frame): sys.exit(3) signal.signal(signal.SIGUSR1, handler) signal.pthread_kill(threading.get_ident(), signal.SIGUSR1) sys.exit(2) rr?r@rA)r rDrErF)r2rrrBrrKs rtest_pthread_kill_main_thread1PendingSignalsTests.test_pthread_kill_main_threadsa  $ %$002NF||~H1}!)6!344& % %s [T5T:aJTR[R"55 [R "[R S5 gg)Nr)rappendr perf_counterrrar{)rframeNtimess rry5StressTest.measure_itimer_resolution..handlers@5zA~ T..01  !3!3T: r!rrrrz,detected median itimer() resolution: %.6f s.NN)rrrar{rrrrrr statisticsmedianrverboseprint)r2ryr durationsmedrrs @@rmeasure_itimer_resolution$StressTest.measure_itimer_resolutions  ; ; ((&*<* 10 ms.) on this platform (or system too busy))rr)r2resos rdecide_itimer_countStressTest.decide_itimer_countsB--/ 4< T\ MMM!G$ %r!raztest needs setitimer()c8^UR5n/mSnSU4SjjnUR[RU5 UR[RU5 UR[R U5 Sn[ R"5[R-nXA:GaR[R"[R"5[R5 US- n[T5U:aY[ R"5U:a@[ R"S5 [T5U:a[ R"5U:aM@[R"[R"5[R5 US- n[T5U:aY[ R"5U:a@[ R"S5 [T5U:a[ R"5U:aM@XA:aGMRUR[T5US5 g)Nc[R"[RS[R"5S--5 g)Nrh㈵>)rrar{random)rrs r first_handler@StressTest.test_stress_delivery_dependent..first_handlers*   V// $8N1N Or!c(>TRU5 grIrrrsigss rsecond_handlerAStressTest.test_stress_delivery_dependent..second_handler  KK r!rrrrSome signals were lostr)rrrrr^rr monotonicrr^rrHgetpidrrr+)r2rrr expected_sigsdeadliners @rtest_stress_delivery_dependent)StressTest.test_stress_delivery_dependentsh  $ $ & P FNNM2 FNNM2 FNNN3 >>#g&;&;; GGBIIK 0 Q Md)m+0@80K 4 d)m+0@80K GGBIIK 0 Q Md)m+0@80K 4 d)m+0@80K TA'?@r!cx^UR5n/mU4SjnUR[RU5 UR[RU5 SnX1:a[R "[R S[R"5S--5 [R"[R"5[R5 US- n[R"[R5Hn[T5U:dM O X1:aMUR[T5US5 g)Nc(>TRU5 grIrrs rry=StressTest.test_stress_delivery_simultaneous..handler/r r!rrrr>r )rrrr rrar{rrrHr rsleeping_retryr^rr+)r2rryr rrs @r!test_stress_delivery_simultaneous,StressTest.test_stress_delivery_simultaneous&s  $ $ & FNNG, FNNG,    V// $8N1N O GGBIIK 0 Q M++G,A,ABt9 -C TA'?@r!z&crashes due to system bug (FB13453490)r^ztest needs SIGUSR1c8^^^ ^ ^ [Rm Sm Sm SmU 4SjmUU U 4SjnUU U U 4Sjn[R"T T5nUR[RT U5 [R"US9nSn[ R "5nUR5 U"5 SmUR5 URb`URURR[5 URST S S 3[URR55 SnSSS5 U(dURT S5 UR!T T 5 SmUR5 g!,(df  NL=f!SmUR5 f=f) NrFc>TS- mgrqr%)rrnum_received_signalss rcustom_handlerAStressTest.test_stress_modifying_handlers..custom_handlerTs A % r!c^>T(d%[R"T5 TS- mT(dM$ggrq)r raise_signal)do_stopnum_sent_signalsrsrset_interruptsAStressTest.test_stress_modifying_handlers..set_interruptsXs&##F+ A% gr!c>TS:dTS:aS[S5H3nT[R4Hn[R"TU5 M M5 TS:aMJTS:aMRgg)Nrrri N)rrr)rryrrrrs rcycle_handlersAStressTest.test_stress_modifying_handlers..cycle_handlers^sU"S(,@1,DuA$2FNN#C fg6$D&#S(,@1,Dr!)targetTzSignal dz ignored due to race condition)rr^r threadingThreadrcatch_unraisable_exceptionstartr unraisabler* exc_valuerrr assertGreaterr) r2r r#rtignoredrrrrrrs @@@@@rtest_stress_modifying_handlers)StressTest.test_stress_modifying_handlersIsM   & &  7 7mmFN;   v{;   N 3 G335  ==,))"--*A*A7KMM!&+IJBMM3346#G6$""#7;  !57G HG FFH165.G FFHs%FBE4.3F4 F>FFr%N)rAr=rBrCrrrrrrrrrrrrrr0rDr%r!rrrs<0 %513*A3*AX513A3AB__XGH 3-/00263/I6r!rcr\rSrSrSr\R "\RS:gS5S5r Sr Sr Sr g ) RaiseSignalTesticUR[5 [R"[R5 SSS5 g!,(df  g=frI)rUKeyboardInterruptrrrrYs r test_sigintRaiseSignalTest.test_sigints/   0 1    .2 1 1s %A ArzWindows specific testcSn[R"U5 URS5 g![a)nUR[R :XaSnAgeSnAff=f)Nrrz#OSError (Invalid argument) expected)rrfailrerrnoEINVAL)r2rbes rtest_invalid_argument%RaiseSignalTest.test_invalid_argumentsM F    ' II; < ww%,,&  s), AAAAc(^SmU4Sjn[R"[RU5nUR[R[RU5 [R"[R5 UR T5 g)NFc >SmgrVr%)abis_oks rry-RaiseSignalTest.test_handler..handlersEr!)rrrrr)r2ry old_signalrCs @r test_handlerRaiseSignalTest.test_handlersY ]]6==':   v}}jAFMM* r!cJSn[SU5up#nURSU5 g)Nzif 1: import _thread class Foo(): def __del__(self): _thread.interrupt_main() x = Foo() rs/OSError: Signal 2 ignored due to race condition)rr)r2rrcouterrs rtest__thread_interrupt_main+RaiseSignalTest.test__thread_interrupt_mains* (d3  H#Nr!r%N) rAr=rBrCr6rrr,r-r=rFrLrDr%r!rr3r3s</__S\\W,.EF G   Or!r3cT\rSrSr\R "\"\S5S5S5rSr g)PidfdSignalTestipidfd_send_signalzpidfd support not built incUR[5n[R"S[R5 SSS5 WR R [ R:XaURS5 O9UR R [ R:XaURS5 URUR R [ R5 [R"S[R"53[R5nUR![R"U5 UR%[&S5 [R"U[R[)5S5 SSS5 UR[*5 [R"U[R5 SSS5 g!,(df  GN=f!,(df  Nd=f!,(df  g=f)Nrzkernel does not support pidfdsz"Not enough privileges to use pidfsz/proc/z^siginfo must be None$)rUrrrPrrr:ENOSYSrEPERMr+EBADFropenr  O_DIRECTORYrrassertRaisesRegexr]objectr5)r2rmy_pidfds rtest_pidfd_send_signal&PidfdSignalTest.test_pidfd_send_signals:   w '2  $ $Q 6( <<   - MM: ; \\  5;; . MM> ? ++U[[977VBIIK=12>>B (+  # #I/G H  $ $Xv}}fh JI   0 1  $ $Xv}} =2 1( 'I H 1 1s#&G0G*)&G; G'* G8; H r%N) rAr=rBrCrrrrrZrDr%r!rrOrOs- +,$ >  >r!rOc.[R"5 grI)r reap_childrenr%r!rtearDownModuler^s r!__main__)/r.r:rOr:rrrrrrr,r'rrrr test.supportrrrrtest.support.script_helperrr r# ImportErrorTestCaser rr-rFrrrrrr&rr:rcrrr3rOr^rAmainr%r!rres    F /=8$$/=d (*@Ac=""c=Bc=T S\\W,.@A-D**-DB-D`S!H%%S!l (*@As=))s=Bs=l WV\24LM@%h//@%N@%F (*@A WV^46ST WR(*>?R&x((R&@UBR&j (*@A WV[1Rgfk6RJL\1""\1LB\1~P4(++P4f |""|~)Oh'')OZ>h''>* z MMOk-IsJ77KK