
    Z j	                        S SK r S SKrS SKrS SKrS SKJr  S SKJrJr  S SK	J
r
Jr  S SKJr  S SKJr  S SKJr  S SKrS SKJr  S S	KJr  S S
KJr  SSKJr  SSKJrJr  SSKJr  SSKJr  SSK J!r!  SSK"J#r#J$r$J%r%  SSK&J'r'  SSK(J)r)  SSK*J+r+  SSK,J-r-J.r.  SSK/J0r0  SSK1J2r2J3r3J4r4J5r5  SSK6J7r7J8r8J9r9  SSK:J;r;J<r<J=r=J>r>J?r?    " S S\R                  5      rA " S S5      rB\$" 5        " S  S!5      5       rC\$" 5        " S" S#5      5       rD " S$ S%5      rEg)&    N)abstractmethod)Callable	Generator)contextmanagernullcontext)ceil)perf_counter)Any)nn)tqdm)logging_redirect_tqdm   )PretrainedConfig)ContinuousBatchingConfigGenerationConfig)!lazy_import_paged_flash_attention)is_flash_attention_requested)logging)ContinuousBatchProcessorMetricsattach_tracertraced   )LogitsProcessorList   )PagedAttentionCache)%ContinuousBatchingLogitsProcessorList)ContinuousBatchingAsyncIOsContinuousBatchingIOs)OffloadingManager)GenerationOutputRequestStateRequestStatuslogger)SCHEDULER_MAPPINGFIFOScheduler	Scheduler)WorkloadHintsattn_mask_is_neededcreate_warmup_future_statespad_to_intervalpad_to_pow2c                       \ rS rSr% \\S'   \R                  \S'   \R                  \S'   \	S\
SS4S j5       r\	S	\S\4S
 j5       rSrg)ProtoPretrainedModelF   configdtypedeviceattn_implementationreturnNc                     g N )selfr2   s     ڋ/root/GenerationalWealth/GenerationalWealth/venv/lib/python3.13/site-packages/transformers/generation/continuous_batching/continuous_api.pyset_attn_implementation,ProtoPretrainedModel.set_attn_implementationK           generation_configc                     g r5   r6   )r7   r=   s     r8   _get_logits_processor*ProtoPretrainedModel._get_logits_processorO   r;   r<   r6   )__name__
__module____qualname____firstlineno__r   __annotations__torchr0   r1   r   strr9   r   r   r?   __static_attributes__r6   r<   r8   r-   r-   F   s`    ;;LL3 4   7G L_  r<   r-   c                   L    \ rS rSrSrSS jrS\SS4S jrS\\   SS4S	 jr	S
r
g)OutputRouterT   a  Dedicated object for routing generation outputs to the right destination.

When an async handler is registered for a request, the output is forwarded
to that handler via ``call_soon_threadsafe``. Otherwise the output is placed
on the shared ``output_queue``.
r3   Nc                 z    [         R                  " 5       U l        0 U l        [        R
                  " 5       U l        g r5   )queueQueueoutput_queueresult_handlers	threadingLock_lockr7   s    r8   __init__OutputRouter.__init__\   s&    !KKMVX^^%
r<   outputc                    U R                      U R                  R                  UR                  5      nSSS5        Wb  Uu  p4UR	                  X15        gU R
                  R                  U5        g! , (       d  f       NC= f)zDRoute a single output to its registered handler or the output_queue.N)rS   rP   get
request_idcall_soon_threadsaferO   put)r7   rW   entrycallbackloops        r8   deliverOutputRouter.delivera   sa    ZZ((,,V->->?E "NH%%h7!!&) Zs   &A00
A>outputsc                 f   / nSnU R                      U H^  nU R                  R                  UR                  5      nUb  Uu  pcUR	                  Xd45        MC  U R
                  R                  U5        M`     SSS5        U(       a  Ub  U4S jnUR                  U5        ggg! , (       d  f       N2= f)zRoute a batch of outputs, using a single ``call_soon_threadsafe`` to minimize cross-thread overhead.

Outputs without a registered handler fall back to the shared ``output_queue``.
Nc                 *    U  H  u  pU" U5        M     g r5   r6   )batchcbouts      r8   
_run_batch.OutputRouter.deliver_batch.<locals>._run_batch|   s    $GBsG  %r<   )rS   rP   rY   rZ   appendrO   r\   r[   )r7   rb   	callbacksr_   rW   r]   r^   rh   s           r8   deliver_batchOutputRouter.deliver_batchk   s    
 >@	ZZ!,,001B1BC$%*NH$$h%78%%))&1 "  )!*  %%j1 *9 Zs   A%B""
B0)rS   rO   rP   r3   N)rA   rB   rC   rD   __doc__rU   r    r`   listrl   rH   r6   r<   r8   rJ   rJ   T   s9    &
*. *4 *2T*:%; 2 2r<   rJ   c                      \ rS rSr% \\-  \S'   \\S'   S\S\	S\
S\S\S	\R                  S
\S\R"                  S\R&                  S\R(                  S\SS4S jrS\4S jrS8S jrS8S jrS8S jr\S8S j5       r\S\S\SS4S j5       rS\ S\ S\!S\"\ \ 4   4S jr#\S\!4S j5       r$\S8S j5       r%\S\!4S j5       r&\S  5       r'\S\SS4S! j5       r(\\RR                  " 5       S"\*RV                  SS4S# j5       5       r,S$\-S%\R\                  R^                  SS4S& jr0\S"\*RV                  S'\1S(\Rd                  S)\Rd                  S*\Rd                  SS4S+ j5       r3\" S,S-9S"\*RV                  S'\1S\Rd                  4S. j5       r4\" S/S-9S'\1S0\Rd                  S\Rd                  4S1 j5       r5\" S2S-9S3\Rd                  S4\Rd                  S*\Rd                  SS4S5 j5       r6\Rn                  " 5       S"\*RV                  SS4S6 j5       r8S7r9g)9ContinuousBatchProcessor   inputs_and_outputs	schedulercacher/   r=   continuous_batching_configlogit_processorinput_queueoutput_router
stop_eventmodel_devicemodel_dtyper3   Nc                    Xl         X l        X@l        XPl        X`l        Xpl        Xl        Xl        Xl        Xl	        [        USS5      U l        UR                  U l        [        USS5      c  SOUR                  U l        U R                  R                  U l        U R                  R                  U l        U R                  R!                  5       u  U l        U l        UR&                  U l        [)        UR&                  5      U l        U R-                  5         U R                  R/                  [        USS5      [1        US9U R                   R2                  S:  S	9  U R                  R4                  U R                  R6                  pSU l        Ub4  [:        R<                  " U R>                  40 URA                  5       D6U l        SU l!        Ub4  [:        R<                  " U R>                  40 URA                  5       D6U l!        U R"                  =(       d    U R$                  nU=(       d    USL=(       d    USLU l"        U(       a  [:        RF                  RI                  5       OSU l%        UUU	U
U R                  U R                  U R"                  S
.nU R                  RL                  U l&        U RL                  (       a6  [O        U R                  RP                  S-  5      US'   [S        S0 UD6U l*        O)U R                  RP                  US'   [W        S0 UD6U l*        [Y        UUURZ                  UR\                  U RT                  R^                  S9U l0        g)a  Initialize the continuous batch processor.

Args:
    cache: A [`PagedAttentionCache`] object
    config: The model configuration
    generation_config: The generation configuration
    continuous_batching_config: The continuous batching configuration
    logit_processor: The [`ContinuousBatchingLogitsProcessorList`] object used to process the logits.
    input_queue: Queue for incoming requests
    output_router: An [`OutputRouter`] object that routes outputs to handlers or the output queue.
    stop_event: Event to signal processing should stop
    model_device: Device for model inputs/outputs
    model_dtype: Data type for model inputs/outputs
    scheduler: The [`Scheduler`] to use
	do_sampleTsliding_windowNr   compile_config)r/   r   )fallback_compile_configis_flash_attndecode_fast_path_available)rv   r/   r1   r}   return_logprobsrx   use_cuda_graph_varlenr   
max_graphs)rv   ru   cpu_offload_space_gibsafety_thresholdcompute_streamr6   )1rv   r/   	cb_configrx   ry   rz   r{   r|   r}   ru   getattrr   r   r   q_padding_interval_sizekv_padding_interval_sizeget_cuda_graph_booleansr   use_cuda_graph_decodemax_batch_tokensr   metrics%_ensure_decode_fast_path_is_availableresolve_compile_configsr   max_blocks_per_requestvarlen_compile_configdecode_compile_config_compiled_varlenrF   compile_forward_process_and_sampleto_dict_compiled_decode_pad_inputscudagraph_pool_handle
graph_pooluse_async_batchingr   max_cached_graphsr   rt   r   r   cpu_offload_space"cpu_offload_space_safety_thresholdr   offloading_manager)r7   rv   r/   r=   rw   rx   ry   rz   r{   r|   r}   ru   varlen_configdecode_configuse_cuda_graphs	io_kwargss                   r8   rU   !ContinuousBatchProcessor.__init__   s   : 
3.&*$(&" !!2KF9II $+63CT#J#RaX^XmXm'+~~'M'M$(,(O(O%AEAgAgAi>"D$> !& 6 66u7M7MN 	224 	..$+,=?OQU$V6fE'+zz'H'H1'L 	/ 	

 (,~~'K'KT^^MqMq} !%$$)MM$2R2R$nVcVkVkVm$nD! !%$$)MM$2R2R$nVcVkVkVm$nD! 44R8R8R*f}D/H/eMaeLe<K%**668QU "&#33#33%)%?%?
	 #'.."C"C""&*4>>+K+Ka+O&PIl#&@&M9&MD#&*nn&F&FIl#&;&Hi&HD# #4"<"N"N7ZZ22AA#
r<   c                     SU R                    SU R                  R                   SU R                  R                   S3U R                  R                  5       R                  5       -   $ )Nz%ContinuousBatchProcessor(input_queue=z, active_requests=z, waiting_requests=))ry   ru   active_requestswaiting_requestsrt   get_model_kwargs__repr__rT   s    r8   r   !ContinuousBatchProcessor.__repr__   sj    3D4D4D3E F#~~==>>QRVR`R`RqRqQrrsu%%668AACD	
r<   c                     S U l         [        R                  " 5         [        R                  R                  5       (       a  [        R                  R                  5         g g r5   )rt   gccollectrF   r   is_availableempty_cacherT   s    r8   __del__ ContinuousBatchProcessor.__del__   s;    "&


::""$$JJ""$ %r<   c                    U R                   R                  SLnU(       d)  U R                   R                  U R                  l        S nO[        R
                  nU R                  R                  S:w  Ga  [        U R                  SS9(       a  [        U R                  R                  5      S   nU R                  R                  S:H  [        R                  R                  5       USL/n[        U5      (       d6  U" SU R                  R                  < SU S	35        SU R                  l        ggU" SU R                  R                  < S
U R                  R                  < S	35        SU R                  l        gg)zEnsures the decode fast path is available. If it is not, set the max blocks per request to 0. If it is
available, and no user-provided max blocks per request, set it to the fallback default.Nc                     U $ r5   r6   xs    r8   <lambda>PContinuousBatchProcessor._ensure_decode_fast_path_is_available.<locals>.<lambda>  s    qr<   r   r   )versionr   z-Although self.cache.max_blocks_per_request = zN, the decode fast path is not available because the one condition is not met: .z, the decode fast path is not available because the attention implementation is not FA3. Got self.config._attn_implementation = )r   r   fallback_max_blocks_per_requestrv   r#   warningr   r/   r   _attn_implementationnum_sliding_attention_groupsrF   r   r   all)r7   user_requestedlogger_warningflash_attn_with_kvcache
conditionss        r8   r   >ContinuousBatchProcessor._ensure_decode_fast_path_is_available  sH    >>dJ040^0^DJJ-(N#^^N ::,,1+DKKC*KDKKLlLl*mno*p'JJ;;q@JJ++-+47
 :"HDJJ$E$E#I JAAKAO 9:DJJ5 ' D

 A AE FpLPKKLlLlKppqs 56

1- 2r<   c                    U R                   R                  5         U R                  R                  5         U R                  R                  5         U R                  R                  5         [        U R                  R                  5      U l        g)z4Reset the batch processor for a new generation loop.N)	r   resetru   rt   rv   free_all_requestsr   r   r   rT   s    r8   r   ContinuousBatchProcessor.reset)  s\    %%'%%'

$$&6tzz7R7RSr<   c                 (   U R                   R                  5       (       d   U R                   R                  5       nUc  M?  U R                  R	                  UR
                  5        U R                  R                  U5        U R                   R                  5       (       d  M  gg! [        R                   a     g[         aO  n[        R                  " SU 3SS9  [        5       R                  S5      nUb  U R                  X!5         SnANSnAff = f)z?Pull new requests from the input queue and add to waiting list.NzError processing new request: Texc_infostate)ry   empty
get_nowaitrx   check_kwargslogit_processor_kwargsru   add_waiting_requestrM   Empty	Exceptionr#   errorlocalsrY   _handle_request_error)r7   r   es      r8   _get_new_requests*ContinuousBatchProcessor._get_new_requests1  s     ""((**9((335=$$11%2N2NO2259 ""((** ;;  9=aSADQ&,hll7&;$..q8	9s%   B#  A B# #D9	DADDr   r   c                    [         R                  Ul        [        U5      Ul        [        UR                  [        5      (       a+  U R                  R                  UR                  5      Ul	        O/ Ul	        U R                  R                  UR                  UR                  5        U R                  R                  UR                  5       5        g)z(Handle general request processing error.N)r"   FAILEDstatusrG   r   
isinstancerZ   ru   !get_active_request_static_outputsgenerated_tokensr   record_request_completioncreated_timerz   r`   to_generation_output)r7   r   r   s      r8   r   .ContinuousBatchProcessor._handle_request_errorD  s     %++%j e&&,,%)^^%U%UV[VfVf%gE"%'E"..u/A/A5CSCST""5#=#=#?@r<   num_q_tokensmax_kv_readuse_decode_fast_pathc                     U R                   (       ak  U(       dM  [        XR                  U R                  5      n[        X R                  U R
                  R                  5      nX4$ [        XR                  5      nSnX4$ )z[Pads the inputs sizes for the next batch if it is needed. Often it is, for max performance.r   )r   r*   r   r   r   rv   	num_pagesr+   )r7   r   r   r   s       r8   maybe_pad_inputs)ContinuousBatchProcessor.maybe_pad_inputsS  sq    '.|=Y=Y[_[p[pq-k;X;XZ^ZdZdZnZno
 ((  +<9N9NO((r<   c                    U R                  5         U R                  R                  5       nU H  nU R                  R	                  U5        M      U R                  R                  5       (       d  gU R                  R                  [        U R                  R                  5      [        U R                  R                  5      5        U R                  R                  U R                  U R                  R                  5      u  p4pVUcI  [        U R                  R                  5      S:  a  U R                  R                  5         g[!        S5      eU(       d  gU R                  R#                  U5        U R                  R%                  U5        [&        R(                  " S[        U5       S[        U R                  R                  5       S[        U R                  R                  5       SU SU S	U R                  R+                  5        35        U R-                  XVU5      u  pVU R.                  R1                  X0R2                  XEU5        U R                  R5                  U R                  5        g
)z}Prepare tensors and metadata for the next model forward pass. Returns True if there are requests to process,
False otherwise.Fr   z=No requests can be scheduled and no request can be offloaded.zScheduled: z, Waiting: z
, Active: z	. cum Q: z
. cum KV: z, free blocks: T)r   ru   clear_cancelled_requestsr   free_request_cpu_cachehas_pending_requestsr   record_queue_metricslenr   r   schedule_batchr   rv   r   offload_one_requestRuntimeErrorrestore_scheduled_requestsrecord_batch_metricsr#   debugget_num_free_blocksr   rt   prepare_batch_tensorsrx   record_kv_cache_memory_metrics)r7   cancelled_statesr   requests_in_batchr   r   r   s          r8   prepare_next_batch+ContinuousBatchProcessor.prepare_next_batch`  s    	 >>BBD%E##::5A &~~2244))#dnn.L.L*MsSWSaSaSrSrOst NR^^MjMj!!4::#7#7N
J $4>>112Q6'';;="#bcc  	::;LM 	))*;<#/01SA`A`=a<b c4>>99:;9\N S"m?4::3Q3Q3S2TV	
 %)$9$9,Ui$j!55335IYd	
 	33DJJ?r<   c                 t   U R                   R                  5       u  pnSn/ nU GHQ  nUR                  nUR                  [        R
                  [        R                  4;   aY  U R                  (       a  UR                  (       a  US-  nMg  [        SUR                  R                   SUR                   S35      eUR                  (       Gab  UR                  5       S:X  aE  U R                  R                  UR                  UR                  5        [        R                   Ul        X$   nUb  X4   OSn	US-  nUR#                  X5      n
U R$                  R'                  XvR(                  5        U
(       af  U R                  R+                  UR                  UR                  5        U R,                  R/                  UR                  5        SU R,                  l        UR2                  (       d  UR                  [        R
                  :X  a"  UR5                  UR7                  5       5        GM  GM  UR                  [        R8                  :X  d  GM,  U R$                  R'                  XvR(                  5        GMT     U(       a  U R:                  R=                  U5        / / pU R,                  R>                  (       a  U R,                  R>                  RA                  5       nURB                  nSUl!        [E        U5       Vs/ s H  oR                   SU 3PM     nnU H+  nURG                  U5      U R,                  RH                  U'   M-     U R$                  RK                  UR                  U5      u  nnURM                  U5        URM                  U5        U R,                  R>                  (       a  M  U(       ai  U R                   RN                  nUb  [P        RR                  RU                  U5      O	[W        5       nU   U R$                  RY                  X5        SSS5        ggs  snf ! , (       d  f       g= f)	z0Update request states based on generated tokens.r   r   zTried to update z	 request z in sync mode.NFz__child#)-rt   prepare_batch_updater   r   r"   FINISHEDPENDINGr   has_new_tokenr   namerZ   generated_lenr   record_ttft_metricr   DECODINGupdate_and_check_completionrv   !mark_shareable_blocks_as_completecomplete_blocksr   ru   finish_requestblock_new_requests	streamingrj   r   
PREFILLINGrz   rl   _requests_to_forkpopnum_childrenrangeforkr   fork_requestextendr   rF   r   streamr   
copy_cache)r7   r  
new_tokenslogprobscurrent_logits_indexpending_outputsfuture_stater   tokenlogprobis_finishedcopy_sourcecopy_destinationstate_to_forkr  inew_request_idsnew_request_idcopy_srccopy_dstr   maybe_streams                         r8   update_batch%ContinuousBatchProcessor.update_batch  sT    372I2I2^2^2`/x -L &&E|| 6 68M8MNN**#11,1,"%5ell6G6G5H	RWRbRbQccq#rss)))&&(A-LL33E4F4FHXHXY#0#9#9EL"8<D<P(8VZ$)$ $??O

<<UD`D`aLL::5;M;MuO_O_`NN11%2B2BC8=DNN5??ellm6L6L&L#**5+E+E+GH 'M !9!99

<<UD`D`aC .F ,,_= )+B%nn.. NN<<@@BM(55L)*M&QVWcQdeQdA":":!;8A3GQdOe"1ANASASTbAc..~> #2 "&!8!89Q9QSb!cHhx(##H- nn...   "44CCN@N@Z5::,,^<`k`mL

%%kD   f s   >P$>P))
P7c                 6    U R                   R                  5       $ )z2Check if there are any active or waiting requests.)ru   r   rT   s    r8   r   -ContinuousBatchProcessor.has_pending_requests  s     ~~2244r<   c                     U R                   R                  5       S   nU HM  nU R                  XR                  5        U R                  R                  UR                  R                  5        MO     g)z&Handle errors during batch processing.r   N)rt   r  r   r   ru   r  rZ   )r7   r   failed_future_statesr!  s       r8   handle_batch_error+ContinuousBatchProcessor.handle_batch_error  sZ      $66KKMaP0L&&u.@.@ANN)),*<*<*G*GH 1r<   c                 ,   [        U R                  R                  R                  5       5      nU H9  nU R	                  X5        U R                  R                  UR                  5        M;     U R                  R                  5         [        U R                  R                  R                  5       5       H9  nU R                  R                  R                  U5      nU R	                  X5        M;     U R                  R                  R                  5         g)z.Fail all active requests with the given error.N)rp   ru   r   valuesr   r  rZ   r   free_all_waiting_cpu_cachesr   keysr  waiting_requests_orderclear)r7   r   requestsr   req_ids        r8   fail_all_requests*ContinuousBatchProcessor.fail_all_requests  s     66==?@E&&u4NN))%*:*:; 
 	;;=4>>::??ABFNN3377?E&&u4 C
 	--335r<   modelc                    U R                   R                  U R                  S9nU R                   R                  5       u  p4nU R                   R                  nU R                   R
                  (       a2  U R                  c  U R                  OU R                  nU R                  nO1U R                  c  U R                  OU R                  nU R                  nU(       dB  Ub  [        R                  R                  U5      O	[        5       n	U	   U" XX4U5        SSS5        OnU R                   R                  5       n
U
b9  [        R                  R                  U5         U
R!                  5         SSS5        OXX4U4nU R"                  " Xv/UQ76   U R                   R%                  5         g! , (       d  f       N)= f! , (       d  f       N:= f)z!Perform a single generation step.use_paddingN)rt   r   r   get_cb_kwargsr   use_block_tabler   r   r   r   r   rF   r   r  r   	get_graphreplaycapture_graphretrieve_device_outputs)r7   r@  
batch_datacarry_over_idsprev_output_ids
output_idsr   
forward_fnuse_cuda_graphr-  graphargss               r8   _generation_step)ContinuousBatchProcessor._generation_step  sq    ,,==$JZJZ=[
6:6M6M6[6[6]300?? ""22=A=R=R=Z99`d`u`uJ!77N=A=R=R=Z99`d`u`uJ!77N @N@Z5::,,^<`k`mL5nzZ 
 ++557E ZZ&&~6LLN 76 >JW"":EE 	779!  76s   
F'F8'
F58
GrN  r   c                    [         R                  R                  U5         U" U6   S S S 5        [         R                  R                  5       n[         R                  R	                  XBU R
                  SS9   U" U6   S S S 5        U R                  R                  U5        g ! , (       d  f       N= f! , (       d  f       N;= f)Nthread_local)r  poolcapture_error_mode)rF   r   r  	CUDAGraphrP  r   rt   	set_graph)r7   rN  r   rQ  rP  s        r8   rH  &ContinuousBatchProcessor.capture_graph  s    ZZ~. / 

$$& ZZeest u 	))%0 /. uts   B5B0
B-0
B>rJ  rK  rL  rM  c                    U R                   R                  US   X45        U R                  X5      R                  5       nU R                  R
                  (       a  U R                  X&5      OUnU R                  XrS   U5        g)zThis function performs the forward pass, logits processing, and sampling; which are broken down into smaller
function to be easier to trace with OpenTelemetry.	input_idslogits_indicesN)rt   carry_over_tokens_model_forwardfloatrx   do_processing_process_logit_sample)r7   r@  rJ  rK  rL  rM  logitsscoress           r8   r   4ContinuousBatchProcessor._forward_process_and_sample,  sm     	11*[2I>k$$U7==?<@<P<P<^<^$$Z8djV(89:Fr<   model_forward	span_namec                 &    U" S0 UD6R                   $ )Nr6   )rd  )r7   r@  rJ  s      r8   r_  'ContinuousBatchProcessor._model_forward<  s    "z")))r<   logit_processingrd  c                     UR                   u  p4nUR                  X4-  U5      nUS   R                  X4-  5      nU R                  XvUS   5      nUR                  X4U5      $ )Nr\  logits_processor_args)shapeviewrx   )	r7   rJ  rd  
batch_sizeseq_len
vocab_size	logits_2dinput_ids_2dprocessed_logits_2ds	            r8   rb  'ContinuousBatchProcessor._process_logit@  si     +1,,'
ZKK
 4jA	!+.33J4HI"22<JWnLop"''
ZHHr<   samplingre  r]  c                    U R                   (       d  U R                  (       a"  [        R                  R	                  US   SS9nOUR                  S5      nU R                   (       a  [        R                  " USS9nO[        R                  " USSS9nU R                  (       a/  UR                  SUS9R                  S5      nUR                  5       nUR                  S5      nUR                  S5      nUS U n	XY   nUSS U24   R                  U5        U R                  (       a9  WU	   nUSS U24   R                  UR                  [        R                  S	95        g g )
Nr   )dimr   )num_samplesT)r{  keepdim)r{  index)r0   )r   r   r   
functionalsoftmaxsqueezerF   multinomialargmaxgatherlogsizecopy_rp  int32)
r7   re  r]  rM  probsnext_tokensper_token_probsr  tokensindicess
             r8   rc   ContinuousBatchProcessor._sampleL  s8    >>T11MM))&))<E NN1%E >>++EqAK,,u"dCK #llqlDLLRPO&**,H!))"- !!!$ &)!*1gvg:$$[1(H q'6'z"((U[[)IJ  r<   c           	      L   U R                   (       d  [        R                  " S5        gU R                  nU R                  R
                  U R                  R                  -  nX2-
  nU R                  R                  nU R                  (       a  SOSn[        U5       GH  nU R                  (       a-  XpR                  l        [        R                  " SUS-    S35        U R                  UXB-   SS9u  p[        R                  " S	U S
U	 S35        [        S[        R                  X$U R                  5      n
 [!        5       nU R                  R#                  XR$                  SXU-
  5        U R                  R'                  SS9nU R                  R)                  5       u  pnU R*                  =(       d    U R,                  nXXU4nU R.                  (       a  U R0                  " UU/UQ76   O-[2        R4                  R7                  U5         U" U6   SSS5        [        R                  " S[!        5       U-
  S S35        U
 H2  nU R                  R=                  UR>                  R@                  5        M4     U R                  RB                  S:X  a  GM  [        R                  " S5        Sn[!        5       nSn [        U[        RD                  SU R                  R                  U R                  5      n
U
(       d  GOT U R                  USSS9u  nnU R                  R#                  XR$                  SUS5        U R                  R'                  SS9nU R                  R)                  5       u  pnU RF                  =(       d    U R,                  nXXU4nU RH                  (       a  U R0                  " UU/UQ76   O-[2        R4                  R7                  U5         U" U6   SSS5        US-  nU
 H2  nU R                  R=                  UR>                  R@                  5        M4     UU R                  :  a  O[K        SU-  U R                  5      nGM  [        R                  " SU S[!        5       U-
  S S35        GM     U R                  (       a  SU R                  l        gg! , (       d  f       GN= f! [8         a%  n[        R:                  " SU S35         SnAGNSnAff = f! U
 H2  nU R                  R=                  UR>                  R@                  5        M4     f = f! , (       d  f       GNL= f! [8         a'  n[        R:                  " SU SU 35         SnAGNvSnAff = f! U
 H2  nU R                  R=                  UR>                  R@                  5        M4     f = f)a7  Pre-capture CUDA graphs (or trigger compile warmup) for varlen and decode paths. In async mode, both IO
pairs are warmed up since each has its own graph buffer and static tensors. The varlen path is warmed up at
the largest possible `(q, kv)` sizes so subsequent captures fit inside it without growing the pool.z6CUDA graphs and compile are disabled, skipping warmup.Nr   r   zWarming up IO pair z/2...F)r   r   r   zWarming up varlen path (z Q tokens, z KV tokens)...TrB  zVarlen warmup completed in .2fszFailed to warm up varlen path: z-. Graph pool may fragment and OOM under load.r   zWarming up decode fast path...z"Failed to warm up decode path for z requests: zDecode warmup completed (z graphs) in s.)&r   r#   infor   rv   
num_blocks
block_sizert   r   r   r  current_pairr   r)   r"   r  r	   r   rx   r   rD  r   r   r   rH  rF   r   r  r   r   free_blocksr   rZ   r   r  r   r   min)r7   r@  num_query_tokensr   num_cache_tokensr   num_io_pairspair_idxpadded_q	padded_kvfuture_statesstartrJ  rK  rL  rM  rN  forward_fn_argsr   fsdecode_graphsnum_requests_s                          r8   warmupContinuousBatchProcessor.warmupp  s    KKPQ00JJ))DJJ,A,AA	$700?? !33ql+H&&7?''41(Q,uEF #'"7"7-,?%* #8 #H
 KK28*K	{R`ab7=++-=QUQ[Q[M@$''==!#7#7V^J^ "44EERVEW
>B>U>U>c>c>e;!22Vd6V6V
#(nWa"b--&&z>TOT**>:"O4 ;9,.5:PQT9UUVWX (BJJ**288+>+>? ( zz00A5 KK89M NEL ; -"8"8!TZZ=R=RTXT^T^! %D"&"7"7%1qW[ #8 #KHa ++AA%';';T8Q "&!8!8!I!IVZ!I![JBFBYBYBgBgBi?NZ!%!6!6!Z$:Z:ZJ',.[e&fO11**:~XX"ZZ..~>&8 ?!Q&M ,

..rxx/B/BC ,4#8#88"1|#3T5J5JK= > KK3M?,|~`eOefiNjjlmn] ,b ""34D##0 #k ;:  s!@Cpqrrs (BJJ**288+>+>? (B ?> ! fNN%G~U`ab`c#deef ,

..rxx/B/BC ,s   7CR4
R".R4=CT5T#T5"
R1	,R44
S#>SS&S##S&&:T #
T2	-T55
U&?U!U)!U&&U)):V#)r   r   r   rv   r   r/   r   r   ry   rt   r   rx   r   r   r|   r}   r   rz   r   r   ru   r   r{   r   r   r   rn   ):rA   rB   rC   rD   r   r   rE   r&   r   r   r   r   r   rM   rN   rJ   rQ   EventrF   r1   r0   rU   rG   r   r   r   r   r   r   r   r!   r   intbooltupler   r  r.  r   r4  r>  no_gradr   ModulerR  r
   r   StreamrH  dictTensorr   r_  rb  rc  inference_moder  rH   r6   r<   r8   rr   rr      st   -0JJJm
"m
 !m
 ,	m

 %=m
 ?m
 [[m
 $m
 OOm
 llm
 [[m
 m
 
m
^
# 
%"6HT 9 9$ A9 A\ Ad A A)S )s )Z^ )chilnqiqcr ) /D / /b CE CEJ 5d 5 5 I I 6y 6T 6 6" 
]]_#:bii #:D #:  #:J1 1UZZ=N=N 1Z^ 1 GyyG G 	G
 G LLG 
G G o&*BII *4 *ELL * '* ()	I 	Iu|| 	I 	I *	I j!!Kell !KELL !KV[VbVb !Kgk !K "!KF c5BII c5$ c5 c5r<   rr   c                   L   \ rS rSrSrS\S\S\SS4S jr\	S,S	 j5       r
S\4S
 jrS,S jrS-S\S\S-  S\SS4S jjrS.S\S\S-  SS4S jjr     S/S\\   S\S-  S\S-  S\S\S\\\   -  S-  S\S\4S jjr   S0S\\\      S\S-  S\S\S\SS4S jjrS\SS4S jrS1S\S-  S\S-  S\S-  4S jjrS rS\S\\   4S jrS\S \SS4S! jr\	S,S" j5       rS\ 4S# jr!\"RF                  " 5       S,S$ j5       r$\	" S%S&9S'\ SS4S( j5       r%\	S)\&S'\ S-  SS4S* j5       r'S+r(g)2ContinuousBatchingManageri  a  Manager for handling continuous batching of generation requests. It provides a user interface for submitting
generation requests, retrieving results, and managing the background generation thread. This class should not be
created directly, but through one of the following entry points (all methods of the `ContinuousMixin` mixin):
- `init_continuous_batching`
- `continuous_batching_context_manager`
- `generate_batch`
r@  r=   rw   r3   Nc                    SUR                   R                  ;  a(  UR                  SUR                   R                   35        UR                  5       U l        X l        X0l        SU l        U R                  R                  U l	        [        R                  " U R                  R                  S9U l        [        R                  " 5       U l        [#        5       U l        [        R                  " 5       U l        SU l        SU l        SU l        [        R.                  " 5       U l        [3        USS5      nUb  UOSU l        [7        U R                  R9                  U5      U R                  R:                  U R                  R<                  S9U l        [A        U R                  R                   5      nU R                  RC                  [3        US	S5      US
9  U R                  RE                  U5      U l#        U R                  RI                  5         U R                  RJ                  U l%        U R                  RL                  U l&        U R                  RN                  U l'        g)zInitialize the continuous batching manager.

Args:
    model: The language model for generation
    generation_config: Configuration for generation parameters
    continuous_batching_config: Configuration for continuous batching parameters
zpaged|F)maxsizeNr   num_return_sequencesr   )logits_processorper_request_processorsdrop_unsupported_processorsr   )r   is_attn_mask_needed)(r/   r   r9   evalr@  r=   rw   	warmed_upallow_block_sharing_use_prefix_sharingrM   rN   max_queue_sizery   rQ   r  _has_new_requestsrJ   rz   r{   batch_processor_generation_thread_request_counterrR   _request_lockr   r  r   r?   r  r  rx   r(   decide_use_cuda_graphsdecide_use_async_batchingr   resolve_sentinel_valuesr   r   r   )r7   r@  r=   rw   r  r  s         r8   rU   "ContinuousBatchingManager.__init__  s    5<<<<<))F5<<3T3T2U*VW ZZ\
!2*D'#'#B#B#V#V  ;;t/N/N/]/]^!*!2)^#//+@D"& !&^^-  ''8:PRVW<P<\$8bc!D!ZZ==>OP#'#B#B#Y#Y(,(G(G(c(c 
 2$**2C2CD''>>"#46FM 3 	? 	

 #'"A"A"["[\o"p 	''??A'+'F'F'^'^$(,(G(G(`(`%!%!@!@!R!Rr<   c                 8   U R                   b6  U R                   R                  5       (       a  [        R                  " S5        gU R                  R                  5         [        R                  " U R                  S9U l         U R                   R                  5         g)z'Start the background generation thread.Nz"Manager thread is already running.)target)
r  is_aliver#   r   r{   r;  rQ   Thread_run_generation_loopr  rT   s    r8   r  ContinuousBatchingManager.start  so     "".43J3J3S3S3U3UNN?@"+"2"2$:S:S"T%%'r<   c                 `    U R                   SL=(       a    U R                   R                  5       $ )z5Check if the background generation thread is running.N)r  r  rT   s    r8   
is_running$ContinuousBatchingManager.is_running(  s'    &&d2Yt7N7N7W7W7YYr<   c                     U R                   c  U R                  5       U l         U R                   R                  U R                  5        SU l        g)zPre-capture CUDA graphs for varlen and decode paths by running dummy batches. Initializes the batch
processor if not already done.NT)r  _create_batch_processorr  r@  r  rT   s    r8   r   ContinuousBatchingManager.warmup,  s@     '#'#?#?#AD ##DJJ/r<   blocktimeoutkeep_for_next_sessionc                 b   U R                   c  [        R                  " S5        O\U R                   R                  R                  (       a7  [        R
                  " SU R                   R                  R                   35        U R                  c%  U(       a  SOSn[        R                  " SU-   5        g[        5       nU R                  R                  5       (       d0  U R                  R                  5         [        R
                  " S5        U(       a  U R                  XR5        U(       d  SU l         O&[        R
                  " S5        X R                  l        [        R                   " 5         ["        R$                  R'                  5       (       a  ["        R$                  R)                  5         gg)	zSignal the background thread to stop.

Args:
    block: Whether to wait for the thread to stop
    timeout: Maximum time to wait for the thread to stop
    keep_for_next_session: Whether to cache this on the model for future use
Nz%
Batch processor was not initialized.z-
Prefix sharing was on. Total prefix length: z? Hence the unstarted manager will not be kept for next session. zManager not started.z'Stopping continuous batching manager...z:Continuous batching manager will be kept for next session.)r  r#   r   rv   use_prefix_sharingr  _total_prefix_lengthr  r	   r{   is_setsetjoinr@  #_cached_continuous_batching_managerr   r   rF   r   r   r   )r7   r  r  r  suffixstop_trigger_times         r8   stopContinuousBatchingManager.stop5  s(    'NNCD!!''::KK@AUAUA[A[ApAp@qr ""*ZoVuwFNN1F:;(N%%''OO!KKABII'1 %#'D  KKTU=AJJ:


::""$$JJ""$ %r<   r  c                 "   U R                   b  U R                   R                  US9  U R                   R                  5       (       a  [        R                  " SU S35        g[        5       n[        R                  " SX1-
  S S35        SU l         gg)zjWait for the background thread to finish.

Args:
    timeout: Maximum time to wait for the thread to stop
Nr  z3Generation thread did not exit after join timeout (z).z*Continuous Batching Manager stopped after r  r  )r  r  r  r#   r   r	   r  )r7   r  r  ends       r8   r  ContinuousBatchingManager.join]  s     "".##(((9&&//11!TU\T]]_`a"nHI`adHeeghi*.' /r<   r\  rZ   max_new_tokensr  record_timestampseos_token_idr   c                    Uc9  U R                      SU R                   3nU =R                  S-  sl        SSS5        Uc  U R                  R                  OUnUc  U R                  R                  OUn[        U[        U5      U R                  S-
  UUUUUS9nU R                  R                  USSS9  U R                  R                  5         U$ ! , (       d  f       N= f)a  Add a new generation request to the queue.

Args:
    input_ids: Input token IDs to use as prompt
    request_id: Optional custom request ID (auto-generated if None)
    max_new_tokens: Maximum number of new tokens to generate
    streaming: Whether to stream tokens as they're generated
    record_timestamps: Whether to record timestamps for each generated token
    eos_token_id: End-of-sequence token ID(s)
    logit_processor_kwargs: Keyword arguments for the logits processor.

Returns:
    str: The request ID
Nreq_r   )rZ   initial_tokensr  r  r  r  r  r   T
   r  r  )r  r  r=   r  r  r!   rp   r  ry   r\   r  r  )	r7   r\  rZ   r  r  r  r  r   r   s	            r8   add_request%ContinuousBatchingManager.add_requestl  s    0 ###D$9$9#:;
%%*% $ CQBX//>>^l>J>Rt--::Xd !	?22Q6/)%#9	
 	U$;""$- $#s   %C
Cinputsc                 D   U R                      [        U R                  U R                  [        U5      -   5       Vs/ s H  nSU 3PM
     nnU =R                  [        U5      -  sl        S S S 5        [	        [        WU5      5      nU R                  (       a  [        US SS9nU R                  R                  n	U	c   U R                  R                  R                  OU	n	U	c  SOU	n	U H  u  pU R                  " SUU
UUUU	S.UD6  M      g s  snf ! , (       d  f       N= f)Nr  c                     U S   $ )Nr   r6   r   s    r8   r   8ContinuousBatchingManager.add_requests.<locals>.<lambda>  s    !A$r<   T)keyreverserz  )r\  rZ   r  r  r  r  r6   )r  r  r  r   rp   zipr  sortedr=   r  r@  r/   r  )r7   r  r  r  r  r   r(  request_idsids_and_inputsr  rZ   r\  s               r8   add_requests&ContinuousBatchingManager.add_requests  s    /4T5J5JDLaLadghndoLo/pq/p!T!:/pKq!!S[0!   c+v67###NPTUN --::9E9Mtzz((55S_)1r|%3!J #%-#"3) ) &4 r  s   /DD DD
Dc                 j    U R                   b&  U R                   R                  R                  U5        gg)zSCancel a request by its ID.

Args:
    request_id: The ID of the request to cancel
N)r  ru   set_request_cancellation)r7   rZ   s     r8   cancel_request(ContinuousBatchingManager.cancel_request  s/     +  **CCJO ,r<   c                 d   U R                   c*  U R                  R                  R                  5       (       a  g U R                  R                  R	                  SUS9nUb6  UR
                  U:w  a&  U R                  R                  R                  U5        gU$ ! [        R                   a     gf = f)a  Retrieve one result from the output queue.

Args:
    request_id: If set, only return results matching this ID (others are requeued).
    timeout: Maximum time to wait for a result.

Returns:
    Optional[GenerationOutput]: The result data or None if timeout.
NTr  )	r  rz   rO   r   rY   rZ   r\   rM   r   )r7   rZ   r  results       r8   
get_result$ContinuousBatchingManager.get_result  s     ""*t/A/A/N/N/T/T/V/V	''4488tW8UF%&*;*;z*I""//33F;M{{ 		s   AB B B/.B/c              #      #    U R                   bf  U R                   R                  5       (       aF  U R                  SS9nUb  Uv   U R                   b"  U R                   R                  5       (       a  MD  gggg7f)z.Iterate over results as they become available.N皙?r  )r  r  r  )r7   r  s     r8   __iter__"ContinuousBatchingManager.__iter__  sk     %%1d6M6M6V6V6X6X__S_1F! %%1d6M6M6V6V6X6X16X1s   A/A73A7c              #      #    U R                   b}  U R                   R                  5       (       a]  U R                  USS9nUb  Uv   UR                  5       (       a  gU R                   b"  U R                   R                  5       (       a  M[  gggg7f)zIterate over results matching a specific request id (blocking).

Uses the shared output queue with requeue. For high-concurrency serving,
use :meth:`register_result_handler` instead.
Nr  )rZ   r  )r  r  r  r$  )r7   rZ   r  s      r8   request_id_iter)ContinuousBatchingManager.request_id_iter  s      %%1d6M6M6V6V6X6X__
C_HF!%%'' %%1d6M6M6V6V6X6X16X1s   BB
Br^   c                    ^ ^^ [         R                  " 5       nUUU 4S jnT R                  R                     XC4T R                  R                  T'   SSS5        g! , (       d  f       g= f)a  Register a callback for result delivery (streaming or non-streaming).

The callback is invoked on the event loop via ``call_soon_threadsafe``
each time a result is produced for this request. For streaming requests,
this happens on every token; for non-streaming, only on completion.

The handler is automatically cleaned up when the request finishes.

Args:
    request_id (`str`): The request ID to receive outputs for.
    callback (`callable`): Called with a ``GenerationOutput`` for each result.
c                    > T" U 5        U R                  5       (       aF  TR                  R                     TR                  R                  R	                  TS 5        S S S 5        g g ! , (       d  f       g = fr5   )r$  rz   rS   rP   r  )r  r^   rZ   r7   s    r8   _auto_cleanupHContinuousBatchingManager.register_result_handler.<locals>._auto_cleanup   sY    V!!##''--&&66:::tL .- $--s   'A&&
A4N)asyncioget_running_looprz   rS   rP   )r7   rZ   r^   r_   r  s   ```  r8   register_result_handler1ContinuousBatchingManager.register_result_handler  sO     '')	M %%>K=RD..z: &%%s   A
A)c                 ~    U R                   c  [        S5      eU R                   R                  U R                  5        g)z=Perform a single generation step. This is mostly cuda graphedNzNTried to perform a generation step before the batch processor was initialized.)r  r   rR  r@  rT   s    r8   rR  *ContinuousBatchingManager._generation_step	  s4     'opp--djj9r<   c                    U R                   R                  U R                  R                  5        [	        U R
                  R                  U R                   U R
                  R                  U R
                  R                  [        U R
                  SS 5      S9nUR                  U l        U R                   R                  n[        R                  " US 5      nUc   [        R                   " SU S35        ["        n[%        UU R
                  R                  U R&                  U R                   U R                  U R(                  U R*                  U R,                  U R
                  R                  U R
                  R                  U" U5      S9nU$ )N_tp_size)tp_sizezScheduler 'z ' not found. Defaulting to FIFO.)rv   r/   r=   rw   rx   ry   rz   r{   r|   r}   ru   )rw   resolve_max_memory_percentrx   ra  r   r@  r/   r1   r0   r   r  r  scheduler_typer$   rY   r#   r   r%   rr   r=   ry   rz   r{   )r7   paged_attention_cacher  ru   r  s        r8   r  1ContinuousBatchingManager._create_batch_processor  s?   ''BB4CWCWCeCef 3JJ++JJJJDJJ
D9!
 $9#K#K  88GG%)).$?	NN[(88XYZ%I 3'::$$"44'+'F'F 00((,,**

(( 56
 r<   c                     [        U SS5      n[        U[        5      (       a  UR                  5         OU R	                  5       nXl        SU l        UR                  (       aE  UR                  5       (       d  [        S5      eU R                  5         U =R                  S-  sl        U R                  R                  5       (       a  UR                  5       (       a^  U R                  U5        U =R                  S-  sl        U R                  R                  5       (       d  MG  UR                  5       (       a  M^  [        UR                  [         5      (       a8  SUR                  R"                  -
  UR                  l        UR%                  5         [(        R.                  " S	5        g! [&         a4  n[(        R*                  " SU 3SS9  U R-                  UW5         SnANPSnAff = f! [(        R.                  " S	5        f = f)
z6Main processing loop running in the background thread.r  Nr   z$Failed to bootstrap the first batch.r   zError in generation loop: Tr   zGeneration loop finished.)r   r   rr   r   r  r  current_batchr   r  r   rR  r{   r  r   _inner_generation_looprt   r   r  r.  r   r#   r   _handle_critical_errorr  )r7   r  r   s      r8   r  .ContinuousBatchingManager._run_generation_loop4  s   #	5%d,=tDO/+CDD%%' #'">">"@ $3 !"D 11&99;;&'MNN%%'""a'"--//O4X4X4Z4Z++O<""a'" --//O4X4X4Z4Z /<<>XYYBCoFhFhFuFuBu22?,,. KK34	  	<LL5aS9DI''?;;	< KK34s7   DF( #F( :AF( (
G&2*G!G) !G&&G) )Hgeneration_looprh  r  c                     UR                  5       (       d4  U R                  R                  SS9  U R                  R                  5         g U R	                  5         UR                  5         g )Nr  r  )r  r  waitr;  rR  r.  )r7   r  s     r8   r   0ContinuousBatchingManager._inner_generation_loop\  sW     1133""'''4""((*$$&r<   r   c                     U R                   R                  5           U R                  R                  5       nUb  UR	                  X5        M0  ! [
        R                   a     Of = fUb  UR                  U5        gg)z:Handle critical errors that terminate the generation loop.N)r{   r  ry   r   r   rM   r   r>  )r7   r   r  req_datas       r8   r!  0ContinuousBatchingManager._handle_critical_errorg  s|     		++668".#99%J  {{ 		 &--e4 's   1A A$#A$)r  r  r  r  r  r  rw   r  r=   ry   r   rx   r   r@  r  rz   r   r{   r   r  rn   )TNFr5   )NNFFN)NFF)NN))rA   rB   rC   rD   ro   r-   r   r   rU   r   r  r  r  r  r`  r  r  rp   r  rG   r
   r  r  r   r    r  r  r   r  r   r  rR  rr   r  rF   r  r  r   r   r!  rH   r6   r<   r8   r  r    s   :S#:S ,:S %=	:S
 
:Sx ( (ZD Z&%$ &% &%\` &%mq &%P/e /edl /d /$ "&%)"'/3/9/ $J/ d
	/
 /  / DIo,/ #&/ 
/h &*"'T#Y d
 	
   #& 
BP P PS4Z  YilpYp *# )<L2M S# S Sd S0 : :")A "H %5 %5N '('6N 'SW ' )' 5I 5H`cgHg 5lp 5 5r<   r  c                      \ rS rSr% Sr\\S'   \R                  " 5          SS\S-  S\	S-  S\
S-  S\4S jj5       rSS	 jr\\R                  " 5              SS\S-  S
\S\S-  S\	S-  S\S\S\
S-  S\\   4S jj5       5       r\\R                  " 5             SS\\\      S\S-  S\	S-  S\S\S\S\S\\\4   4S jj5       5       rSrg)ContinuousMixini{  a  Mixin class for models to add continuous batching capabilities. Continuous batching has three entry points:
- `init_continuous_batching`, which is the actual entry point for continuous batching
- `continuous_batching_context_manager`, which itself is a wrapper around `init_continuous_batching`
- `generate_batch`, which is really a wrapper around `continuous_batching_context_manager`

They are defined in this order. Any change made to any of those three entry points should be reflected in the other
two.
r=   Nrw   workload_hintsr3   c                 X   [        U S5      (       a"  [        U S5      (       a  [        U S5      (       d  [        S5      e[        U SS5      n[        U[        5      (       a  [
        R                  " S5        U$ Ub  UOU R                  nUc  [        S5      eUR                  c  [
        R                  " S	5        S
Ul	        Uc7  [        [        USS5      [        5      (       a  UR                  nO
[        5       nUR                  " S0 UD6  Ub  UR                  U5        [	        XUS9$ )a  Initialize a manager for continuous batching inference.

Args:
    generation_config: An optional generation configuration, which may contain a CompileConfig object
    continuous_batching_config: An optional continuous batching configuration
    workload_hints: Optional WorkloadHints to help the continuous batching manager make better decisions for
        default values
    **deprecated_kwargs: Deprecated arguments that are now passed in the continuous_batching_config. Those are:
        max_queue_size, q_padding_interval_size, kv_padding_interval_size, allow_block_sharing,
        use_async_batching, max_cached_graphs
Returns:
    `ContinuousBatchingManager`: The manager instance to add requests and retrieve results.
r/   r1   r0   z;Model must have 'config', 'device', and 'dtype' attributes.r  NzCached continuous batching manager found: it will be re-used instead of creating a new one. If you want to create a new manager, you should call `destroy_cached_continuous_batching_manager` first.z8A GenerationConfig must be provided or set in the model.zE`eos_token_id` not set in GenerationConfig. Setting to -1 (disabled).rz  rw   )r@  r=   rw   r6   )hasattrAttributeErrorr   r   r  r#   r  r=   
ValueErrorr  r   r   rw   #account_for_cb_deprecated_argumentsresolve_using_hints)r7   r=   rw   r,  deprecated_kwargscached_manager
gen_configs          r8   init_continuous_batching(ContinuousMixin.init_continuous_batching  s)   , tX&&gdH.E.EWUY[bMcMc !^__ !'LdSn&?@@KKu "! +<*G&TMcMc
WXX""*NNbc&(J# &-'*.JDQSkll-7-R-R*-E-G*"FF[IZ[%../IJ )Qk
 	
r<   c                     [        U SS5      n[        U[        5      (       a  UR                  SSSS9  [	        U S5        gg)zFDestroy the cached continuous batching manager and free GPU resources.r  NTFr  r  r  )r   r   r  r  delattr)r7   r4  s     r8   *destroy_cached_continuous_batching_manager:ContinuousMixin.destroy_cached_continuous_batching_manager  sF     'LdSn&?@@dDPUVD?@ Ar<   r  r  persistent_managerr  c              +     #    U R                   " S	UUUS.UD6n	U(       ag  U	R                  (       dV  [        R                  " S5        [	        5       n
U	R                  5         [        R                  " S[	        5       U
-
  S S35        U	R                  5          U	v   [        R                  " S5        U	R                  X#US9  g! [        R                  " S5        U	R                  X#US9  f = f7f)
a  A context manager to safely use the continuous batching manager. Arguments are similar to the ones of
`init_continuous_batching`, except for:
    - block: whether to block the thread when stopping the manager. Default is True.
    - timeout: maximum time to wait for the thread to stop. Default is None (no timeout).
    - warmup: whether to pre-capture CUDA graphs at the largest sizes before running. Default is True.
)r=   rw   r,  z%Warming up for continuous batching...zWarming up completed in r  r  z!Continuous batching loop finishedr9  Nr6   )	r6  r  r#   r   r	   r  r  r   r  )r7   r=   r  r  rw   r=  r  r,  r3  managerr  s              r8   #continuous_batching_context_manager3ContinuousMixin.continuous_batching_context_manager  s     & // 
/'A)
  	
 '++NNBC NENNNN5lnu6LS5QQSTU	aM LL<=LLuM_L` LL<=LLuM_L`s   BC.C 'C.(C++C.r  r  progress_barc                 n   U(       d  0 $ [         R                  " 5       [        R                  ::  a  [         R                  " S5        Sn0 n	/ SQn
U
 H  nX;   d  M
  UR                  U5      X'   M     Uc  U R                  OUnUR                  b  UR                  OSn[        U5      U-  nUR                  SS5      nUc  UR                  OUn[        [        S U 5       5      Ub  UOSS	9nU R                  " SUUS
SUUUS.U	D6n[        [         /5      n[        UU(       + SU S3SS9n0 nSnU nU   U n UR                  XUS9  UU:  a  UR!                  SS9nU(       a=  UR"                  nUR%                  5       (       a  UUU'   US-  nUR'                  S5        O7UR)                  5       (       d"  [         R*                  " S5        [-        S5        OUU:  a  M  SSS5        SSS5        SSS5        0 n[1        [        U5      5       H>  nUR3                  SU 35      nUb
  UUSU 3'   M$  [         R*                  " SU S35        M@     U$ ! [.         a"  n[         R*                  " SU 3S
S9   SnANSnAff = f! , (       d  f       N= f! , (       d  f       N= f! , (       d  f       N= f)a  Generate sequences for a batch of prompts using continuous batching.

Args:
    inputs: List of input token sequences (prompts)
    generation_config: Optional generation configuration
    continuous_batching_config: Optional continuous batching configuration
    record_timestamps: If set to true, the requests will have a timestamp for each token generated
    progress_bar: If set to true, a progress bar will be displayed
    persistent_manager: whether to persist the manager after the generation is finished. Default is False.
    warmup: whether to pre-capture CUDA graphs before processing requests. Default is True.
    **kwargs: Additional generation parameters. Only max_new_tokens is used, but other deprecated arguments
        are extracted and passed to the continuous_batching_config object.
Returns:
    `dict[str, GenerationOutput]`: a dictionary of request ids to GenerationOutput objects
z=Progress bar is disabled when logger level is less than DEBUGF)r   r   r  r   r   r  Nr   r  c              3   8   #    U  H  n[        U5      v   M     g 7fr5   )r   ).0r\  s     r8   	<genexpr>1ContinuousMixin.generate_batch.<locals>.<genexpr>/  s     !I&Y#i..&s   r   )max_prompt_lengthmax_generated_lengthT   )r=   rw   r  r  r=  r  r,  zSolving z	 requestsrequest)totaldisabledescunit)r  r  r  r  z*Generation thread terminated unexpectedly.zCReturning results of generate_batch despite unexpected termination.zError during batch generation: r   r  zRequest req_z not found in results.r6   )r#   getEffectiveLevelr   DEBUGr   r  r=   r  r   r  r'   maxr@  r   r   r  r  rZ   r$  updater  r   printr   r  rY   )r7   r  r=   rw   r  rB  r=  r  kwargsr3  deprecated_keysdepr_keygen_cfgr  r  r  r,  
manager_cm
logging_cmpbar_cmresultsfinished_countr?  pbarr  r=  r   reordered_resultsr(  s                                r8   generate_batchContinuousMixin.generate_batch  s   : I ##%6NNZ[ L 
 (H!.4jj.B!+ (
 ->,E$((K\?F?[?[?gw;;mn6{%99  $4d;3A3I//~ '!!I&!II3A3MST
 == 	
/'A1)	
  	

 +F84
%%L>3	
 7J4S$$Fev$w$|3$///:F!'!2!2!--//.4GFO*a/N KKN$//11%QRcd %|3 18JZ* s6{#A[[4s,F!06!D*-|A3.DEF $ !   S>qcBTRS# 18JJZZsm   3J&6J9J;BIJI#J+J&
J	I<	7J<J	J
JJ
J#	J&&
J4r6   )NNNrn   )NTNNFTN)NNFTFT)rA   rB   rC   rD   ro   r   rE   rF   r  r   r'   r  r6  r;  r   r  r`  r   r@  r   rp   r  r  rG   r    r`  rH   r6   r<   r8   r+  r+  {  s    ('
 6:FJ/3	7
+d27
 %=t$C7
 &,	7
 
#7
 7
rA 
 6: $FJ#(/3#a+d2#a #a 	#a
 %=t$C#a !#a #a &,#a 
,	-#a  #aL 
 6:FJ"'!#(t!T#Yt! ,d2t! %=t$C	t!
  t! t! !t! t! 
c##	$t!  t!r<   r+  )Fr  r   rM   rQ   abcr   collections.abcr   r   
contextlibr   r   mathr   timer	   typingr
   rF   r   r   tqdm.contrib.loggingr   configuration_utilsr   generation.configuration_utilsr   r   modeling_flash_attention_utilsr   utils.genericr   utils.loggingr   utils.metricsr   r   r   logits_processr   rv   r   cb_logits_processorsr   input_outputsr   r   r   r   r<  r    r!   r"   r#   ru   r$   r%   r&   utilsr'   r(   r)   r*   r+   r  r-   rJ   rr   r  r+  r6   r<   r8   <module>rs     s     	    / 2       6 3 X O 9 $ S S 0 & G L 1 K K B B p p.299 ,2 ,2` O	5 O	5 O	5f _5 _5 _5Dk! k!r<   