f8&dZddlmZddlZddlZddlZddlZddlZddlZddl Z ddl Z ddl m Z ddl mZmZmZmZmZmZddlmZmZmZmZddlmZmZddlmZmZmZm Z m!Z!m"Z"m#Z#m$Z$dd l%m&Z&dd l'm(Z(dd l)m*Z*m+Z+m,Z,dd l-m.Z.dd l/m0Z0m1Z1ddl2m3Z3ddl4m5Z5ddl6m7Z7m8Z8m9Z9m:Z:m;Z;ddlZ>m?Z?m@Z@mAZAerddlBmCZCddlDmEZEddlFmGZGmHZHeIe eJjZLddZMGddZNGddZO ddZPddZQ d d!dZRy)"z>c2eZdZdZd.dZd/dZd0dZ d1 d2dZ d3dZ d4 d5dZ d4 d5d Z d6 d7d Z d8 d9d Z d8 d9d Z d:d Zd:dZd;dZddZd?dZd?dZd@dZdAdZdAdZdBdCdZdDdZd/dZd/dZedEdZdFdZdGdZ dHdZ!dId Z"d/d!Z#dJd"Z$dKd#Z%dKd$Z&d/d%Z'd/d&Z(dLd'Z)dLd(Z*dMd)Z+dNd*Z,dOd+Z-dPd,Z.dQd-Z/y)RTopologyz*Monitor a topology of one or more servers.c|j|_|jj|_|jduxr|jj|_|jduxr|jj |_d|_d|_ |j s |jrtjd|_|jrJ|jJ|jj|jj|jff||_t|j!|j#|j$dd|}||_|jr~|jJtt(j*iddd|j}|jj|jj,||j&|jff|j.D]Z}|j s|jJ|jj|jj0||jff\t3|j5|_d|_d|_t=|_|jjA|j>|_!i|_"d|_#d|_$tK|_&|j s |jr|jJdfd }tOjPtRjTtRjV|d}tYjZ|j|j\||_ |j_d|_0|jjb3|jjdstg||j|_0yyy)Nd)maxsizeFctSN)r;)weaksr:targetz!Topology.__init__..targets+D11pymongo_events_thread)interval min_intervalrDname)returnbool)4 _topology_id _pool_options_event_listeners _listenersenabled_for_server_publish_serverenabled_for_topology _publish_tp_events_Topology__events_executorr3Queueputpublish_topology_opened _settingsr*get_topology_typeget_server_descriptionsreplica_set_name _descriptionr)Unknown$publish_topology_description_changedseedspublish_server_openedlistserver_descriptions_seed_addresses_opened_closedr_lockcondition_class _condition_servers_pid_max_cluster_timer _session_poolrPeriodicExecutorr EVENTS_QUEUE_FREQUENCYMIN_HEARTBEAT_INTERVALweakrefrefcloseopen _srv_monitorfqdn load_balancedr)selftopology_settingstopology_description initial_tdseedrDexecutorrCs @r:__init__zTopology.__init__as-::+99JJ#d:at?a?a??$6_4??;_;_ &*   4#3#3 ;;s3DL   <<+ ++ LL  dooEEHYHYG[\ ]*2  / / 1  5 5 7  . .     1   <<+ ++,%%r4tT^^J LL  OOHH!2!2D4E4EF &++D##||///   $//"G"G$PTPaPaIb!cd, $$8$L$L$NO  !^ ..88D02 #' 8</1   4#3#3<<+ ++ 2)9966#::, H;;t||X^^>   *4>>3O3O *4 @D 4P *rEc*tj}|j||_n||jk7r||_tjdddk\r dt fi}nddi}t j di||j5|jjD]}|j|jjddd|j5|jdddy#1swY/xYw#1swYyxYw)aStart monitoring, or restart after a fork. No effect if called multiple times. .. warning:: Topology is shared among multiple threads and is protected by mutual exclusion. Using Topology from a process other than the one that initialized it will emit a warning and may result in deadlock. To prevent this from happening, MongoClient must be created after any forking. N) skip_file_prefixes stacklevel)zMongoClient opened before fork. May not be entirely fork-safe, proceed with caution. See PyMongo's documentation for details: https://pymongo.readthedocs.io/en/stable/faq.html#is-pymongo-fork-safe)osgetpidrksys version_info _pymongo_dirwarningswarnrgrjvaluesrsrmreset_ensure_opened)rxpidkwargsservers r:rtz Topology.opensiik 99 DI DII DI#w..@&* MM'     +"mm224FLLN5""((*  +ZZ "    ! " " + + " "sA C=#D =D Dc^tj}||jjS|SrB)r remainingrYserver_selection_timeout)rxtimeouts r:get_server_selection_timeoutz%Topology.get_server_selection_timeouts(//# ?>>:: :rENc ||j}n|}|j5|j|||||}|Dcgc]+}tt|j |j -c}cdddScc}w#1swYyxYw)aReturn a list of Servers matching selector, or time out. :param selector: function that takes a list of Servers and returns a subset of them. :param operation: The name of the operation that the server is being selected for. :param server_selection_timeout: maximum seconds to wait. If not provided, the default value common.SERVER_SELECTION_TIMEOUT is used. :param address: optional server address to select. Calls self.open() if needed. Raises exc:`ServerSelectionTimeoutError` after `server_selection_timeout` if no matching servers are found. N)rrg_select_servers_loopr r!get_server_by_addressaddress) rxselector operationrr operation_idserver_timeoutrcsds r:select_serverszTopology.select_serverss. $ +!>>@N5N ZZ "&";";.)\7#  PcIKVT77 CD       sA=0A8,A=8A==BcZtj}||z}d}tjtj rLt ttj||||j|jjj|jj|||jj} | s|dk(s||kDrtjtj r\t ttj ||||j|jjj|j#|t%|j#|d|d|j|smt ttj&||||j|jjjt)|tjz d }|j+|j-|j.j1t2j4|jj7tj}|jj|||jj} | s|jj7| S) z7select_servers() guts. Hold the lock when calling this.F)messagerr operationIdtopologyDescriptionclientId)custom_selectorr)rrrrrrfailurez , Timeout: zs, Topology Description: )rrrrrrremainingTimeMST)time monotonicr isEnabledForloggingDEBUGrrSTARTED description_topology_settingsrLr]apply_selectorrYserver_selectorFAILED_error_messagerWAITINGintr_request_check_allriwaitr rpcheck_compatible) rxrrrrrnowend_timelogged_waitingrcs r:rzTopology._select_servers_loops9nn= # 0 0 ? (5==!#($($4$4))<<II #//>> gt~~/M/M? &!|sX~+88G0 = D D!)"+$0,0,<,>3C(C$D "&    !  # # % OO !>!> ?    . . 0.."C"&"3"3"B"B'4>>3Q3Q#C# S&Z **,""rEc|j|||||}t||}t|dk(r|dStj|d\}} |j j | j j kr|S| S)Nrr)r_filter_serverslenrandomsamplepooloperation_count) rxrrrrdeprioritized_serversrserversserver1server2s r:_select_serverzTopology._select_serverQs%% i!97L "'+@A w<1 1: !==!4 << ' '7<<+G+G GNNrEc |j||||||}tjr)tj|jj t jtjr|tt tj||||j|jjj|jjd|jjd |S)zALike select_servers, but choose a random server if several match.rrr)rrrrrr serverHost serverPort)rr get_timeoutset_rttrmin_round_trip_timerrrrrr SUCCEEDEDrrLr)rxrrrrrrrs r: select_serverzTopology.select_serverfs$$   $  !% %      MM&,,@@ A # 0 0 ? (5??!#($($4$4))<<II!--55a8!--55a8  rEc6|jt||||S)a=Return a Server for "address", reconnecting if necessary. If the server's type is not known, request an immediate check of all servers. Time out after "server_selection_timeout" if the server cannot be reached. :param address: A (host, port) pair. :param operation: The name of the operation that the server is being selected for. :param server_selection_timeout: maximum seconds to wait. If not provided, the default value common.SERVER_SELECTION_TIMEOUT is used. :param operation_id: The unique id of the current operation being performed. Defaults to None if not provided. Calls self.open() if needed. Raises exc:`ServerSelectionTimeoutError` after `server_selection_timeout` if no matching servers are found. r)rr$)rxrrrrs r:select_server_by_addressz!Topology.select_server_by_addresss+2!!   $ % "  rEc4|j}|j|j}t||ryt |j|}|j s)|j r^|jtjk(rA|jj|j}|r|jj|jxs |jxr||k(}|jrY|sW|j J|j j#|j$j&|||j|j(ff||_|j+|j-|j.|jrX|sV|j J|j j#|j$j0||j|j(ff|j2rS|jtj4k(r6|jjt6vr|j2j9|rC|jj|j}|r|jj;||j<j?y)ziProcess a new ServerDescription on an opened topology. Hold the lock when calling this. N)interrupt_connections) r]_server_descriptionsr_is_stale_server_descriptionr, is_readableis_server_type_known topology_typer)SinglerjgetrreadyrQrSrTrWrO"publish_server_description_changedrL_update_servers_receive_cluster_time_no_lock cluster_timer_rur^r(rsrri notify_all) rxserver_description reset_poolrtd_oldsd_oldnew_tdrsuppress_events r:_process_changezTopology._process_changes&"",,-?-G-GH '0B C -d.?.?AST  ) )  3 38L8LP]PdPd8d]]&&'9'A'ABF !!#..B$2B2BdRdHd   <<+ ++ LL  OOFF/1C1K1KTM^M^_ #  **+=+J+JK   N<<+ ++ LL  OOHHT..0A0AB      M$9$9 9!!//7MM    # # % ]]&&'9'A'ABF !!8M!N ""$rEc|j5|jr8|jj|jr|j |||dddy#1swYyxYw)z>Process a new ServerDescription after an hello call completes.N)rgrer] has_serverrr)rxrrrs r: on_changezTopology.on_changes[ZZ \|| 1 1 < <=O=W=W X$$%7EZ[ \ \ \s AAA$c^|j}|jtvryt|j||_|j |j rW|j J|j j|jj||j|jffyy)z_Process a new seedlist on an opened topology. Hold the lock when calling this. N) r]rr(r+rrSrTrWrOr_rL)rxseedlistrs r:_process_srv_updatezTopology._process_srv_updates""   '= = EdFWFWYab    <<+ ++ LL  OOHHT..0A0AB  rEc|j5|jr|j|dddy#1swYyxYw)z?Process a new list of nodes obtained from scanning SRV records.N)rgrer)rxrs r: on_srv_updatezTopology.on_srv_updates5ZZ 3||((2 3 3 3s4=c8|jj|S)aJGet a Server or None. Returns the current version of the server immediately, even if it's Unknown or absent from the topology. Only use this in unittests. In driver code, use select_server_by_address, since then you're assured a recent view of the server's type and wire protocol version. )rjrrxrs r:rzTopology.get_server_by_addresss}}  ))rEc||jvSrB)rjrs r:rzTopology.has_server$s$--''rEc|j5|jj}|tjk7r dddyt |j djcdddS#1swYyxYw)z!Return primary's address or None.Nr)rgr]rr)ReplicaSetWithPrimaryr'_new_selectionr)rxrs r: get_primaryzTopology.get_primary'spZZ N --;;M C CC N N ,D,?,?,AB1EMM  N N Ns+A0%A00A9cT|j5|jj}|tjtj fvrt cdddSt||jDchc]}|jc}cdddScc}w#1swYyxYw)z+Return set of replica set member addresses.N) rgr]rr)rReplicaSetNoPrimarysetiterrr)rxrrrs r:_get_replica_set_membersz!Topology._get_replica_set_members1sZZ P --;;M3311%u  P P*.ht7J7J7L.M)NO2BJJO P PP P Ps$AB!B:B BBB'c,|jtS)z"Return set of secondary addresses.)rr&rxs r:get_secondarieszTopology.get_secondaries>s,,-FGGrEc,|jtS)z Return set of arbiter addresses.)rr%rs r: get_arbiterszTopology.get_arbitersBs,,-DEErEc|jS)z1Return a document, the highest seen $clusterTime.rlrs r:max_cluster_timezTopology.max_cluster_timeFs%%%rEc\|r*|jr|d|jdkDr||_yyy)N clusterTimerrxrs r:rz&Topology._receive_cluster_time_no_lockJs= ** .1G1G 1VV)5&W rEch|j5|j|dddy#1swYyxYwrB)rgrr s r:receive_cluster_timezTopology.receive_cluster_timeYs, ZZ =  . .| < = = =s(1c|j5|j|jj|dddy#1swYyxYw)z=Wake all monitors, wait for at least one to check its server.N)rgrrir)rx wait_times r:request_check_allzTopology.request_check_all]s< ZZ ,  # # % OO  + , , ,s ,AA c|jjtjk(r|jjS|jj S)z~Return a list of all data-bearing servers. This includes any server that might be selected for an operation. )r]rr)r known_serversreadable_serversrs r:data_bearing_serverszTopology.data_bearing_serverscsB    * *m.B.B B$$22 2  111rEc g}|j5|jD]P}|j|j}|j ||j j jfR ddd|D]!\}} |j j|#y#1swY0xYw#t$r;}t|d|dd}|j|jj|d}~wwxYw)NrF) rgrrjrappendrgen get_overallremove_stale_socketsr _ErrorContext handle_errorr)rxrrr generationexcctxs r: update_poolzTopology.update_poolls ZZ H//1rzz2 (C(C(EFG2 H #* FJ  00<#*  H H  #CJtD!!&"4"4"<"c|jSrB)r]rs r:rzTopology.descriptions   rEc6|jjS)z"Pop all session ids from the pool.)rmpop_allrs r:pop_all_sessionszTopology.pop_all_sessionss!!))++rEc8|jj|S)z>Start or resume a server session, or raise ConfigurationError.)rmget_server_session)rxsession_timeout_minutess r:r'zTopology.get_server_sessions!!445LMMrEc:|jj|yrB)rmreturn_server_session)rxserver_sessions r:r*zTopology.return_server_sessions 00@rEc@tj|jS)zmA Selection object, initially including all known servers. Hold the lock when calling this. )r#from_topology_descriptionr]rs r:rzTopology._new_selections 2243D3DEErEc h|jr td|jsd|_|j|js |j r|j j|jr6|jjtvr|jj|jjr?|jt|j dt#d|j$dd|j&j)D]}|jy)z[Start monitors, or restart after a fork. Hold the lock when calling this. z"Cannot use MongoClient after closeTrr )ok serviceIdmaxWireVersionN)rfrrerrSrQrUrtrurrr(rYrwrr"rdrrLrjrrxrs r:rzTopology._ensure_openeds <<"#GH H||DL  "4#7#7&&++-  d&6&6&D&DH^&^!!&&(~~++$$%,,Q/QT5F5FZ\]^mm**,F KKM-rEc|jj|}|y|jj|j|j ry|j j}|j}d}|rAt|dr5t|jtr|jjd}t||S)NTdetailstopologyVersion)rjr_poolstale_generationsock_generation service_idrtopology_versionerrorhasattr isinstancer5dict _is_stale_error_topology_version)rxrerr_ctxrcur_tvr<error_tvs r:_is_stale_errorzTopology._is_stale_errors""7+ > << ( ()@)@'BTBT U##44  WUI.%--. ==,,->?/AArEcn|j||ry|j|}|j}|j}|jj r|s |j syt|tr |j ryt|tryt|ttfr#t|dr |j}n0t|trdnd}|jjd|}|t j"vrw|t j$v}|jj s|j't)|||s|j*dkr|j-||j/y|j sD|jj s|j't)|||j-|yyt|t0r^|jj s|j't)|||j-||j2j5yy)Ncodei{'r<)rDrjr<r:rYrwcompleted_handshaker>rrrrr=rFr5rr _NOT_PRIMARY_CODES_SHUTDOWN_CODESrr"max_wire_versionr request_checkr_monitor cancel_check) rxrrArr<r:err_codedefaultis_shutting_downs r: _handle_errorzTopology._handle_errors    1 w' '' >> ' ' 7C^C^  e^ ,1L1L  z *  1AB Cuf% ::$.e_#E%4 ==,,VW=7555#+w/F/F#F ~~33(():7%)PQ#(@(@A(ELL,$$&00~~33(():7%)PQ Z( 1 0 1>>//$$%6we%LM LL $ OO ( ( *2rEcj|j5|j||dddy#1swYyxYw)zHandle an application error. May reset the server to Unknown, clear the pool, and request an immediate check depending on the error and the context. N)rgrS)rxrrAs r:rzTopology.handle_error's0 ZZ 1   w 0 1 1 1s)2cb|jjD]}|jy)z3Wake all monitors. Hold the lock when calling this.N)rjrrMr3s r:rzTopology._request_check_all0s%mm**,F  "-rEc |jjjD]S\}}||jvr|jj |||j ||j}d}|jr+|jtj|j}t||j|||j|j|}||j|<|j|j|j j"}||j|_||j"k7s"|j|j$j'|j"Vt)|jjD]L\}}|jj+|r"|j-|jj/|Ny)zrSync our Servers from TopologyDescription.server_descriptions. Hold the lock while calling this. )rtopologyrryN)rrmonitor topology_id listenersevents)r]rcr rjrY monitor_class_create_pool_for_monitorrQrTrqrrr!_create_pool_for_serverrLrOrtr is_writablerupdate_is_writablerbrrspop)rxrrrXrCr was_writables r:rzTopology._update_servers5s  ,,@@BHHJKGRdmm+..66')!66w?&*nn 7''DLL,D";;t||4D')55g># $ 1 1"oo *0 g&  $}}W5AAMM 57 g&22>>1MM'*//BB2>>R=K@ $DMM$7$7$9:OGV$$//8  !!'* ;rEcz|jj||jj|jS)N) client_id)rY pool_class pool_optionsrLrs r:r^z Topology._create_pool_for_server_s5~~(( T^^00D>)) ~~(( )UdFWFW)  rEc0|jjtjtjfv}|rd}n,|jjtj k(rd}nd}|jj r|tur|ryd|zSd|d|dSt|jj}t|jjj}|s-|r&d j||jjSd |zS|d jtfd |d dD}|r=d|zS|r)t!|j#|j$sd|zSt'Sdj)d|DS)zeFormat an error message if server selection fails. Hold the lock when calling this. zreplica set membersmongosesrzNo primary available for writeszNo %s available for writeszNo z match selector ""z)No {} available for replica set name "{}"zNo %s availablerc3<K|]}|jk(ywrBrG).0rr<s r: z*Topology._error_message..sGv||u,GsrNzNo %s found yetz\Could not reach any servers in %s. Replica set is configured with internal hostnames or IPs?,c3`K|]&}|jst|j(ywrB)r<str)ryrs r:rzz*Topology._error_message..sXf6<<FLL 1Xs..)r]rr)rrShardedrr'rbrcrformatrYr\r<allr intersectionrdr}join)rxris_replica_set server_plural addressesrsamer<s @r:rzTopology._error_messagezs **88  / /  - -=   1M    , , 0E0E E&M%M    * *33!<7-GG]O+)re __class____name__r])rxmsgs r:__repr__zTopology.__repr__s>||C4>>**+1SE$2C2C1FaHHrEc|j}tt|j|j|j |j fS)z?The properties to use for MongoClient/Topology equality checks.)rYtuplesortedr`r\rvsrv_service_name)rxtss r:eq_propszTopology.eq_propss8 ^^fRXX&')<)rrNotImplemented)rxothers r:__eq__zTopology.__eq__s. eT^^ ,==?enn&66 6rEc4t|jSrB)hashrrs r:__hash__zTopology.__hash__sDMMO$$rE)ryr.)rJNone)rJfloat)NNN) r Callable[[Selection], Selection]rr}rOptional[float]rOptional[_Address]r Optional[int]rJ list[Server]) rrrrrr}rrrrrJlist[ServerDescription])NNNN)rrrr}rrrrrOptional[list[Server]]rrrJr!)NN) rr0rr}rrrrrJr!)FF)rr"rrKrrKrJr)rzlist[tuple[str, Any]]rJr)rr0rJzOptional[Server])rr0rJrK)rJr)rrrJ set[_Address])rJr)rJzOptional[ClusterTime])rOptional[Mapping[str, Any]]rJr))rrrJr)rJr)rJr*)rJzlist[_ServerSession])r(rrJr)r+rrJr)rJr#)rr0rArrJrK)rr0rArrJr)rr0rJr)rrrJr})rJr})rJz>tuple[tuple[_Address, ...], Optional[str], Optional[str], str])robjectrJrK)rJr)0r __module__ __qualname____doc__r~rtrrrrrrrrrrrrrrrrrrr rrrrspropertyrr%r'r*rrrDrSrrrr^r]rrrrrrEr:r=r=^s4NA`&"P59&*&* #2###2 # $ # $ # #JJ#2J#J# J# $ J# $ J# !J#`59&*8<&*2#2  $  6 $ 259&*8<&* 2  #2  $  6 $   L37&*    #0  $    H!&+ @%-@%@% $ @%  @%J!&+ \-\\ $ \  \((3*(N PHF& 6=, 2"+:!!,NAFBB(@+D1# (+T  ,8YtI \  %rEr=c,eZdZdZ ddZy)rz.An error with context for SDAM error handling.cJ||_||_||_||_||_yrB)r<rLr9rIr:)rxr<rLr9rIr:s r:r~z_ErrorContext.__init__s* 0.#6 $rEN) r< BaseExceptionrLrr9rrIrKr:zOptional[ObjectId])rrrrr~rrEr:rrs:8 % % % % " % ' %rErc:||y|d|dk7ry|d|dk\S)z9Return True if the error's topologyVersion is <= current.F processIdcounterr) current_tvrCs r:r@r@s<X-+(;"77 i HY$7 77rEcj|j|j}}||y|d|dk7ry|d|dkDS)z4Return True if the new topologyVersion is < current.Frr)r;) current_sdnew_sdrnew_tvs r:rrsN#44f6M6MJV^+&"55 i 6)#4 44rEcH|s|S|Dcgc] }||vs| }}|xs|Scc}w)zBFilter out deprioritized servers from a list of server candidates.r) candidatesrrfiltereds r:rrs; !%/W66AV3VWHW  !z!Xs )r5z"weakref.ReferenceType[queue.Queue]rJrK)rrrCrrJrK)rr"rr"rJrKrB)rrrrrJr)Sr __future__rrrr3rrrrrqpathlibrtypingrrrrr r pymongor r r rpymongo.client_sessionrrpymongo.errorsrrrrrrrr pymongo.hellor pymongo.lockrpymongo.loggerrrrpymongo.monitorr pymongo.poolrr pymongo.serverr!pymongo.server_descriptionr"pymongo.server_selectorsr#r$r%r&r'pymongo.topology_descriptionr(r)r*r+r,bsonr-pymongo.settingsr.pymongo.typingsr/r0r}__file__parentrr;r=rr@rrrrEr:rsC" HH==E    % '*!8154>(() "g %g %T%%$8+87R8 85OS " "5K " "rE