a iz@s~dZddlZddlmZddlZddlZddlZddlm Z ddl Z ddl Z ddl m Z ddlZddlZddlZe ZdaGdddZd d Ze ed Zd ZGd ddeZGdddZddZGdddeZGdddeZGdddeZ Gddde Z!ddZ"ddZ#d-dd Z$d!d"Z%Gd#d$d$e j&Z'da(da)d%d&Z*d'd(Z+Gd)d*d*ej,Z-Gd+d,d,ej.Z/dS).z"Brian Quinlan (brian@sweetapp.com)N)_base)Queue)partialFc@s,eZdZddZddZddZddZd S) _ThreadWakeupcCsd|_tjdd\|_|_dS)NF)duplex)_closedmpPipe_reader_writerselfr2/usr/lib64/python3.9/concurrent/futures/process.py__init__Csz_ThreadWakeup.__init__cCs$|js d|_|j|jdSNT)rr closer r rrrrGs z_ThreadWakeup.closecCs|js|jddS)N)rr send_bytesr rrrwakeupMsz_ThreadWakeup.wakeupcCs |js|jr|jqdSN)rr poll recv_bytesr rrrclearQs z_ThreadWakeup.clearN)__name__ __module__ __qualname__rrrrrrrrrBsrcCs@datt}|D]\}}|q|D]\}}|q*dSr)_global_shutdownlist_threads_wakeupsitemsrjoin)r _ thread_wakeuptrrr _python_exitWs     r%=c@seZdZddZddZdS)_RemoteTracebackcCs ||_dSrtb)r r*rrrrwsz_RemoteTraceback.__init__cCs|jSrr)r rrr__str__ysz_RemoteTraceback.__str__N)rrrrr+rrrrr(vsr(c@seZdZddZddZdS)_ExceptionWithTracebackcCs8tt|||}d|}||_d|j_d||_dS)Nz """ %s""") tracebackformat_exceptiontyper!exc __traceback__r*)r r1r*rrrr}s  z _ExceptionWithTraceback.__init__cCst|j|jffSr) _rebuild_excr1r*r rrr __reduce__sz"_ExceptionWithTraceback.__reduce__N)rrrrr4rrrrr,|sr,cCst||_|Sr)r( __cause__)r1r*rrrr3s r3c@seZdZddZdS) _WorkItemcCs||_||_||_||_dSr)futurefnargskwargs)r r7r8r9r:rrrrsz_WorkItem.__init__Nrrrrrrrrr6sr6c@seZdZdddZdS) _ResultItemNcCs||_||_||_dSr)work_id exceptionresult)r r=r>r?rrrrsz_ResultItem.__init__)NNr;rrrrr<sr<c@seZdZddZdS) _CallItemcCs||_||_||_||_dSr)r=r8r9r:)r r=r8r9r:rrrrsz_CallItem.__init__Nr;rrrrr@sr@cs*eZdZdfdd ZfddZZS) _SafeQueuercs&||_||_||_tj||ddS)N)ctx)pending_work_items shutdown_lockr#superr)r max_sizerBrCrDr# __class__rrrsz_SafeQueue.__init__cst|trtt|||j}tdd||_ |j |j d}|j |jWdn1sj0Y|dur|j|nt||dS)Nz """ {}"""r-) isinstancer@r.r/r0r2r(formatr!r5rCpopr=rDr#rr7 set_exceptionrE_on_queue_feeder_error)r eobjr* work_itemrGrrrMs (z!_SafeQueue._on_queue_feeder_error)r)rrrrrM __classcell__rrrGrrAsrAcgs,t|}tt||}|s dS|VqdSr)ziptuple itertoolsislice) chunksize iterablesitchunkrrr _get_chunkss rZcsfdd|DS)Ncsg|] }|qSrr).0r9r8rr rz"_process_chunk..r)r8rYrr\r_process_chunks r^c Cs`z|t|||dWnBtyZ}z*t||j}|t||dWYd}~n d}~00dS)N)r?r>r>)putr< BaseExceptionr,r2) result_queuer=r?r>rNr1rrr_sendback_results   rcc Cs|dur:z ||Wn$ty8tjjdddYdS0|jdd}|dur`|tdSz|j|j i|j }Wn@ty}z(t ||j }t ||j|dWYd}~nd}~00t ||j|d~~q:dS)NzException in initializer:T)exc_infoblockr_)r?)rarLOGGERcriticalgetr`osgetpidr8r9r:r,r2rcr=) call_queuerb initializerinitargs call_itemrrNr1rrr_process_workers$     &rqcsleZdZfddZddZddZddZd d Zd d Zd dZ ddZ ddZ ddZ ddZ ZS)_ExecutorManagerThreadcsf|j|_|j|_|j|jfdd}t|||_|j|_|j |_ |j |_ |j |_|j|_tdS)NcSs<tjd||Wdn1s.0YdS)Nz?Executor collected: triggering callback for QueueManager wakeup)rutildebugr)r"r#rDrrr weakref_cbs z3_ExecutorManagerThread.__init__..weakref_cb)_executor_manager_thread_wakeupr#_shutdown_lockrDweakrefrefexecutor_reference _processes processes _call_queuerl _result_queuerb _work_idswork_ids_queue_pending_work_itemsrCrEr)r executorrurGrrrs  z_ExecutorManagerThread.__init__cCs||\}}}|r(||dS|durX||~|}|durV|j~|r||j s| dSqdSr) add_call_item_to_queuewait_result_broken_or_wakeupterminate_brokenprocess_result_itemrz_idle_worker_semaphorereleaseis_shutting_downflag_executor_shutting_downrCjoin_executor_internals)r result_item is_brokencauserrrrrun:s"   z_ExecutorManagerThread.runcCs~|jrdSz|jjdd}Wntjy6YdS0|j|}|jrn|jj t ||j |j |j ddq|j|=qqdS)NFreT)rlfullrriqueueEmptyrCr7set_running_or_notify_cancelr`r@r8r9r:)r r=rPrrrrZs"    z-_ExecutorManagerThread.add_call_item_to_queuec Cs|jj}|jj}||g}ddt|jD}tj||}d}d}d}||vrz| }d}Wqt y} z t t | | | j}WYd} ~ qd} ~ 00n ||vrd}|j|jWdn1s0Y|||fS)NcSsg|] }|jqSr)sentinelr[prrrr]{rzG_ExecutorManagerThread.wait_result_broken_or_wakeup..TF)rbr r#rr|valuesr connectionwaitrecvrar.r/r0r2rDr) r result_reader wakeup_readerreadersworker_sentinelsreadyrrrrNrrrrqs&,(z3_ExecutorManagerThread.wait_result_broken_or_wakeupcCsrt|tr2|j|}||jsn|dSn<|j|jd}|durn|jr`|j |jn|j |j dSr) rIintr|rKr!rrCr=r>r7rL set_resultr?)r rrrPrrrrs  z*_ExecutorManagerThread.process_result_itemcCs|}tp|dup|jSr)rzr_shutdown_thread)r rrrrrs z'_ExecutorManagerThread.is_shutting_downcCs|}|dur d|_d|_d}td}|durHtdd|d|_|jD]\}}|j |~qR|j |j D] }|q~|dS)NzKA child process terminated abruptly, the process pool is not usable anymoreTz^A process in the process pool was terminated abruptly while the future was running or pending.z ''' r-z''')rz_brokenrBrokenProcessPoolr(r!r5rCr r7rLrr|r terminater)r rrbper=rPrrrrrs"   z'_ExecutorManagerThread.terminate_brokencCs|}|dur|d|_|jr|i}|jD]\}}|js*|||<q*||_z|jWqLt j yrYqvYqL0qLd|_dS)NTF) rzr_cancel_pending_futuresrCr r7cancelr get_nowaitrr)r rnew_pending_work_itemsr=rPrrrrs   z2_ExecutorManagerThread.flag_executor_shutting_downc Csl|}d}||krh|dkrht||D]8}z|jd|d7}Wq,tjybYq Yq,0q,q dS)Nrr&)get_n_children_aliverangerl put_nowaitrFull)r n_children_to_stopn_sentinels_sentirrrshutdown_workerss   z'_ExecutorManagerThread.shutdown_workerscCsh||j|j|j|jWdn1sB0Y|jD] }|qVdSr) rrlr join_threadrDr#r|rr!r rrrrrs  (z._ExecutorManagerThread.join_executor_internalscCstdd|jDS)Ncss|]}|VqdSr)is_aliverrrr rz>_ExecutorManagerThread.get_n_children_alive..)sumr|rr rrrr sz+_ExecutorManagerThread.get_n_children_alive)rrrrrrrrrrrrrrrQrrrGrrrs +  & rrc Cshtrtrttdaztd}Wnttfy:YdS0|dkrHdS|dkrTdSd|attdS)NTSC_SEM_NSEMS_MAXz@system provides too few semaphores (%d available, 256 necessary))_system_limits_checked_system_limitedNotImplementedErrorrjsysconfAttributeError ValueError) nsems_maxrrr_check_system_limitss rccs&|D]}||r|VqqdSr)reverserK)iterableelementrrr_chain_from_iterable_of_lists,src@s eZdZdS)rN)rrrrrrrr8srcs~eZdZdddZddZddZd d Zd d Zd dZe j jj e_ dddfdd Z dddddZ e j j j e _ ZS)ProcessPoolExecutorNrcCsJt|dur6tpd|_tjdkrntt|j|_n8|dkrHtdn tjdkrh|tkrhtdt||_|dur~t }||_ |j j dddk|_ |durt|std ||_||_d|_i|_d|_t|_td|_d|_d|_i|_d|_t|_|jt }t!||j |j|j|jd |_"d |j"_#|$|_%t&'|_(dS) Nr&win32rz"max_workers must be greater than 0zmax_workers must be <= F) allow_noneforkzinitializer must be a callable)rFrBrCrDr#T))rrj cpu_count _max_workerssysplatformmin_MAX_WINDOWS_WORKERSrr get_context _mp_contextget_start_method#_safe_to_dynamically_spawn_childrencallable TypeError _initializer _initargs_executor_manager_threadr{r threadingLockrw Semaphorerr _queue_countrrrrvEXTRA_QUEUED_CALLSrAr} _ignore_epipe SimpleQueuer~rrr)r max_workers mp_contextrmrn queue_sizerrrr@sZ         zProcessPoolExecutor.__init__cCs<|jdur8|js|t||_|j|jt|j<dSr)rr_launch_processesrrstartrvrr rrr_start_executor_manager_threads   z2ProcessPoolExecutor._start_executor_manager_threadcCs2|jjddrdSt|j}||jkr.|dS)NF)blocking)racquirelenr{r_spawn_process)r process_countrrr_adjust_process_counts   z)ProcessPoolExecutor._adjust_process_countcCs$tt|j|jD] }|qdSr)rrr{rr)r r"rrrrsz%ProcessPoolExecutor._launch_processescCs8|jjt|j|j|j|jfd}|||j|j <dS)N)targetr9) rProcessrqr}r~rrrr{pidrrrrrsz"ProcessPoolExecutor._spawn_processcOs|j|jrt|j|jr&tdtr2tdt}t||||}||j |j <|j |j |j d7_ |j |jr|||WdS1s0YdS)Nz*cannot schedule new futures after shutdownz6cannot schedule new futures after interpreter shutdownr&)rwrrr RuntimeErrorrrFuturer6rrrr`rvrrrr)r r8r9r:fwrrrsubmits"   zProcessPoolExecutor.submitr&)timeoutrVcs:|dkrtdtjtt|t|d|i|d}t|S)Nr&zchunksize must be >= 1.rV)r)rrEmaprr^rZr)r r8rrVrWresultsrGrrrs zProcessPoolExecutor.mapTF)cancel_futurescCs|j0||_d|_|jdur(|jWdn1s<0Y|jdur^|r^|jd|_d|_|jdur|r|j d|_d|_ d|_dSr) rwrrrvrrr!r}r~rr{)r rrrrrshutdowns (  zProcessPoolExecutor.shutdown)NNNr)T)rrrrrrrrrrExecutor__doc__rrrQrrrGrr?s U   r)NN)0 __author__rjconcurrent.futuresrrmultiprocessingrmultiprocessing.connectionmultiprocessing.queuesrrrx functoolsrrTrr.WeakKeyDictionaryrrrr%_register_atexitrr Exceptionr(r,r3objectr6r<r@rArZr^rcrqThreadrrrrrrBrokenExecutorrrrrrrr.sN       )