
    Z jN                        S r SSKrSSKrSSKrSSKrSSKrSSKJrJr  SSK	J
r
  SSKJr  SSKJr  SSKJr  SSKJr  \(       a2  SSKrSSKrSSKrSS	KJrJrJrJrJr  SS
KJr  SSKJr  SSK J!r!  SSK"J#r#  \RH                  " \%5      r&Sr' " S S\RP                  5      r) " S S5      r* " S S\+5      r, " S S\-5      r.SSSSSSSS.S .S!.0r/S"S#S$\0S-  4S% jr1S&\0S$\04S' jr2S(\0S$\3\0   S-  4S) jr4 " S* S+5      r5S,\
S-\6S$\74S. jr8 " S/ S05      r9 " S1 S25      r:S3\;S$S4S4 jr<SBS5 jr= " S6 S75      r> " S8 S9\5      r? " S: S;\?5      r@ " S< S=\?5      rA " S> S?5      rB " S@ SA5      rCg)Cz?
Shared types, constants, and utilities for the serving layer.
    N)ABCabstractmethod)Callable)Future)Queue)TYPE_CHECKING)logging)ContinuousBatchingConfigGenerationConfigPreTrainedModelPreTrainedTokenizerFastProcessorMixin)ContinuousBatchingManager)GenerationOutput)	Scheduler   )ModelManagerzx-request-idc                   (    \ rS rSrSrSrSrSrSrSr	g)	Modality9   LLMVLM
MULTIMODALSTTTTS N)
__name__
__module____qualname____firstlineno__r   r   r   r   r   __static_attributes__r       o/root/GenerationalWealth/GenerationalWealth/venv/lib/python3.13/site-packages/transformers/cli/serving/utils.pyr   r   9   s    
C
CJ
C
Cr"   r   c                   &    \ rS rSrSrS\4S jrSrg)_StreamErrorA   z5Sentinel to signal an error from the generate thread.msgc                     Xl         g Nr'   )selfr'   s     r#   __init___StreamError.__init__D   s    r"   r*   N)r   r   r   r    __doc__strr,   r!   r   r"   r#   r%   r%   A   s    ?C r"   r%   c                       \ rS rSrSrSrg)_GenerationCancelledH   zERaised inside ``DirectStreamer.put()`` to abort ``model.generate()``.r   Nr   r   r   r    r.   r!   r   r"   r#   r1   r1   H   s    Or"   r1   c                       \ rS rSrSrSrg)CBWorkerDeadErrorL   zRaised when a request is submitted to a CB worker that has died.

Surfaced as 503 by the FastAPI exception handler. Carries the original error message
that killed the worker so the client knows why the server is in this state.
r   Nr3   r   r"   r#   r5   r5   L   s    r"   r5   qwenz<tool_call>z</tool_call>z<tool_call>(.*?)</tool_call>arrayobjectjson)typezx-parser)zx-regex-iteratorr;   items)stcetcschemamodelr   returnc                 j  ^ [        U SU 5      n[        USS5      n[        USS5      n[        USS5      nU(       a  U(       a  U(       a	  US   S   nO:[        U4S j[        R                  5        5       S5      nUc  gUS	   US
   US   pdnUR	                  U5      nUR	                  U5      n	XhU	S.$ )aR  Return tool call config for the model, or ``None`` if tool calls are not supported.

Returns a dict with:
    - ``schema`` (`dict`): Schema to pass to ``tokenizer.parse_response(block, schema)``.
    - ``stc_id`` (`int`): Token ID of the start-of-tool-call delimiter.
    - ``etc_id`` (`int`): Token ID of the end-of-tool-call delimiter.
	tokenizer	stc_tokenN	etc_tokenresponse_schema
properties
tool_callsc              3   d   >#    U  H%  u  pUTR                   R                  ;   d  M!  Uv   M'     g 7fr)   )config
model_type).0kvr@   s      r#   	<genexpr>'get_tool_call_config.<locals>.<genexpr>w   s(     d'CtqqELLLcLcGc'Cs    0	0r=   r>   r?   )r?   stc_idetc_id)getattrnext_TOOL_CALL_FALLBACKSr<   convert_tokens_to_ids)
	processorr@   rC   r=   r>   rF   r?   fallbackrQ   rR   s
    `        r#   get_tool_call_configrY   e   s     	;	:I
)[$
/C
)[$
/Ci):DAO s .|< d';'A'A'Cdfjk#E?HUOXh=O&,,S1F,,S1F&AAr"   	tool_callc                     U R                  SU 5      nUR                  S0 5      nUS   [        U[        5      (       d  [        R                  " U5      S.$ US.$ )a  Normalize a parsed tool call to ``{"name": str, "arguments": str}``.

Different models return different structures from ``parse_response``:
- Gemma: ``{"function": {"name": ..., "arguments": {...}}}`` (nested, arguments as dict)
- Qwen:  ``{"name": ..., "arguments": {...}}`` (flat, arguments as dict)

The OpenAI API expects ``arguments`` as a JSON **string**, so we ``json.dumps`` it.
function	argumentsname)r^   r]   )get
isinstancer/   r:   dumps)rZ   r\   r]   s      r#   _normalize_tool_callrb      s[     }}Z3H["-I 2<Y2L2LTZZ	* R[ r"   r?   c                     U R                  X5      nU(       d  g[        U[        5      (       d  U/nU Vs/ s H  n[        U5      PM     nnU(       a  U$ S$ s  snf )a  Parse tool calls from generated token IDs using ``tokenizer.parse_response``.

Args:
    processor: The processor or tokenizer.
    generated_ids: Token IDs from generation. Passed directly to ``parse_response``
        which decodes them internally, preserving special tokens that
        ``skip_special_tokens=True`` would strip (e.g. Gemma's ``<|tool_call>``).
    schema: The tool call schema (from ``response_schema`` or ``_TOOL_CALL_FALLBACKS``).

Returns a list of ``{"name": str, "arguments": str}`` dicts, or ``None`` if none found.
N)parse_responser`   listrb   )rW   generated_idsr?   parsedrZ   rH   s         r#   parse_tool_callsrh      s[     %%m<Ffd##CIJ6i&y16JJ#:-- Ks   Ac                       \ rS rSrSrS\S\4S jrS\S\S-  S	S4S
 jr	S\S\S\S-  S	S4S jr
S\S	S4S jrSS jrSrg)DownloadAggregator   zAggregates byte-progress across multiple concurrent download tqdm bars.

huggingface_hub opens one tqdm bar per file shard. This class tracks them all and emits
a single aggregate ``{"stage": "download", "progress": {...}}`` event whenever any updates.
enqueuemodel_idc                 8    Xl         X l        0 U l        S U l        g r)   )rl   r@   barslast_emitted_current)r+   rl   rm   s      r#   r,   DownloadAggregator.__init__   s    
79	04!r"   bar_idtotalNrA   c                 F    SU4U R                   U'   U R                  5         g)z6Register a new download bar with its total byte count.r   Nro   _emit)r+   rr   rs   s      r#   registerDownloadAggregator.register   s    J		&

r"   currentc                 D    X#4U R                   U'   U R                  5         g)z>Update a bar's current byte count and emit aggregate progress.Nru   )r+   rr   ry   rs   s       r#   updateDownloadAggregator.update   s    $,		&

r"   c                     g r)   r   )r+   rr   s     r#   closeDownloadAggregator.close       r"   c                 b   [        S U R                  R                  5        5       5      nXR                  :X  a  g Xl        U R                  R                  5        VVs/ s H  u  p#Uc  M
  UPM     nnnU(       a  [        U5      OS nU R	                  SU R
                  SXS.S.5        g s  snnf )Nc              3   *   #    U  H	  u  pUv   M     g 7fr)   r   )rL   c_s      r#   rO   +DownloadAggregator._emit.<locals>.<genexpr>   s     ;(:!(:   loadingdownloadry   rs   statusr@   stageprogress)sumro   valuesrp   rl   r@   )r+   agg_currentr   ttotals	agg_totals         r#   rv   DownloadAggregator._emit   s    ;		(8(8(:;;333$/! $		 0 0 2D 2a! 2D#)CKt	##(3H		
 Es   	B++B+)ro   rl   rp   r@   rA   N)r   r   r   r    r.   r   r/   r,   intrw   r{   r~   rv   r!   r   r"   r#   rj   rj      su    5 5C 5s 3: $ 
S 3 sTz d 
C D 
r"   rj   callbackrm   c                 P   ^ ^^ SSK Jn  [        T T5      m " U UU4S jSU5      nU$ )u  Create a tqdm subclass that routes progress to a callback.

Bars with ``unit="B"`` are download bars — aggregated via ``DownloadAggregator``.
Other bars (e.g. "Loading weights") emit ``weights`` stage events.

Args:
    callback (`callable`): Called with a dict payload
        ``{"status": "loading", "model": ..., "stage": ..., "progress": ...}``.
    model_id (`str`): The model ID (included in progress payloads).

Returns:
    A tqdm subclass that forwards progress to *callback*.
r   )tqdmc                   ^   >^  \ rS rSrU U4S jrSUUU4S jjrUUU4S jrU U4S jrSrU =r	$ ).make_progress_tqdm_class.<locals>.ProgressTqdm   c                   > UR                  S5      =(       d    SU l        SUS'   [        TU ]  " U0 UD6  SU l        SU l        U R                  S:X  a7  [        U 5      U l        TR                  U R                  U R                  5        g g )NunititTdisabler   B)
r_   sse_unitsuperr,   nlast_emittedid_bar_idrw   rs   )r+   argskwargs	__class__download_aggregators      r#   r,   7make_progress_tqdm_class.<locals>.ProgressTqdm.__init__   sw    "JJv.6$DM $F9Gd-f-DF "D}}#!$x#,,T\\4::F $r"   c                 ^  > Uc  SnU =R                   U-  sl         U R                  S:X  a2  TR                  U R                  U R                   U R                  5        g U R                   U R
                  :w  a6  U R                   U l        T" STSU R                   U R                  S.S.5        g g Nr   r   r   weightsr   r   )r   r   r{   r   rs   r   )r+   r   r   r   rm   s     r#   r{   5make_progress_tqdm_class.<locals>.ProgressTqdm.update   s    yFFaKF}}##**4<<L4,,,$(FF!"+!)!*04$L	 -r"   c           	   3     >#    U R                    H  nU =R                  S-  sl        U R                  S:X  a2  TR                  U R                  U R                  U R
                  5        OOU R                  U R                  :w  a5  U R                  U l        T" STSU R                  U R
                  S.S.5        Uv   M     g 7fr   )iterabler   r   r{   r   rs   r   )r+   itemr   r   rm   s     r#   __iter__7make_progress_tqdm_class.<locals>.ProgressTqdm.__iter__   s     !==C''..t||TVVTZZPVVt000(,D%&/%-%.48FFTZZ(P	 
 &s   B?Cc                 x   > U R                   S:X  a  TR                  U R                  5        [        TU ]  5         g )Nr   )r   r~   r   r   )r+   r   r   s    r#   r~   4make_progress_tqdm_class.<locals>.ProgressTqdm.close  s*    }}##))$,,7GMOr"   )r   r   r   r   )r   )
r   r   r   r    r,   r{   r   r~   r!   __classcell__)r   r   r   rm   s   @r#   ProgressTqdmr      s"    	G	 	"	"	 	r"   r   )	tqdm.autor   rj   )r   rm   	base_tqdmr   r   s   ``  @r#   make_progress_tqdm_classr      s/     ,,Xx@0 0y 0d r"   c                       \ rS rSrSr  SSSS\R                  S\R                  S\S	\	S-  4
S
 jjr
SS jrSS jrSS jrSrg)DirectStreameri  ar  Streamer for ``model.generate()`` (used by :class:`GenerateManager`).

Implements the ``put``/``end`` protocol that ``model.generate()`` expects:
generate calls ``put(token_tensor)`` after each decode step, and ``end()``
when generation is complete. Tokens are decoded incrementally via the Rust
``DecodeStream`` (O(1) per token) and pushed as text to an asyncio.Queue.
NrC   tokenizers.Tokenizerloopqueueskip_special_tokenstool_configc                    SSK Jn  Xl        X l        X0l        U" / U5      U l        U(       a  US   OSU l        U(       a  US   OSU l        SU l        SU l	        [        R                  " 5       U l        SU l        / U l        g)aK  
Args:
    tokenizer: The Rust tokenizer (``tokenizer._tokenizer``).
    loop (`asyncio.AbstractEventLoop`): The event loop to push decoded text to.
    queue (`asyncio.Queue`): The queue that receives decoded text chunks.
    skip_special_tokens (`bool`, *optional*, defaults to `True`):
        Whether to strip special tokens during decoding.
    tool_config (`dict`, *optional*): Tool call config from ``get_tool_call_config``.
        When set, tokens between stc/etc delimiters (inclusive) are suppressed
        from the queue so tool call markup is never streamed to the client.
r   DecodeStreamrQ   NrR   FT)tokenizers.decodersr   
_tokenizer_loop_queue_decode_stream_stc_id_etc_id_inside_tool_call_first	threadingEvent
_cancelledtotal_tokensgenerated_token_ids)r+   rC   r   r   r   r   r   s          r#   r,   DirectStreamer.__init__"  ss    & 	5#
*2/BC0;{8,0;{8,!&#//+.0 r"   c                 n   U R                   R                  5       (       a
  [        5       eU R                  (       a  SU l        gUR	                  5        H  nU =R
                  S-  sl        U R                  R                  U5        X R                  :X  a  SU l	        OX R                  :X  a  SU l	        U R                  R                  U R                  U5      nUc  M  U R                  (       a  M  X R                  :w  d  M  U R                  R                  U R                   R"                  U5        M     g)zHCalled by ``model.generate()`` after each decode step with new token(s).FNr   T)r   is_setr1   r   tolistr   r   appendr   r   r   r   stepr   r   call_soon_threadsafer   
put_nowait)r+   valuetoken_idtexts       r#   putDirectStreamer.putC  s    ??!!##&((;;DKH"$$++H5<<')-&\\)).&&&++DOOXFD(>(>(>8||C[

//0F0FM 'r"   c                 d    U R                   R                  U R                  R                  S5        g)z;Called by ``model.generate()`` when generation is complete.N)r   r   r   r   r+   s    r#   endDirectStreamer.endX  s     

''(>(>Er"   c                 8    U R                   R                  5         g)zWSignal cancellation. The next ``put()`` call will raise and abort ``model.generate()``.N)r   setr   s    r#   cancelDirectStreamer.cancel\  s    r"   )r   r   r   r   r   r   r   r   r   r   r   )TN)r   torch.TensorrA   Nr   )r   r   r   r    r.   asyncioAbstractEventLoopr   booldictr,   r   r   r   r!   r   r"   r#   r   r     sd     %)#'1)1 ''1 }}	1
 "1 D[1BN*Fr"   r   c                       \ rS rSrSr SSSS\SSS	\R                  S
\R                  S\	S-  4S jjr
SS jrSS jrSS jrSrg)
CBStreameria  ap  Streamer for continuous batching (used by :class:`CBGenerateManager`).

Same ``put``/``end`` protocol as :class:`DirectStreamer`, but called manually
by :class:`CBGenerateManager` instead of by ``model.generate()``:
``put(output)`` receives a CB ``GenerationOutput``, decodes new tokens, and
pushes text to the asyncio.Queue. ``end()`` signals the stream is complete.
N
cb_managerr   
request_idrC   r   r   r   r   c                     SSK Jn  Xl        X l        X@l        XPl        X0l        U" / S5      U l        U(       a  US   OSU l        U(       a  US   OSU l	        SU l
        SU l        SU l        / U l        g)a  
Args:
    cb_manager (`ContinuousBatchingManager`): The CB manager instance.
    request_id (`str`): The request ID to track in the CB scheduler.
    tokenizer: The Rust tokenizer (``tokenizer._tokenizer``).
    loop (`asyncio.AbstractEventLoop`): The event loop to push decoded text to.
    queue (`asyncio.Queue`): The queue that receives decoded text chunks.
    tool_config (`dict`, *optional*): Tool call config (see ``DirectStreamer``).
r   r   TrQ   NrR   F)r   r   _cb_request_idr   r   r   r   r   r   r   	_prev_lenr   r   )r+   r   r   rC   r   r   r   r   s           r#   r,   CBStreamer.__init__j  so    $ 	5%
#*2t40;{8,0;{8,!&.0 r"   c                 
   UR                   U R                  S n[        UR                   5      U l        U H  nU =R                  S-  sl        U R                  R                  U5        X0R                  :X  a  SU l        OX0R                  :X  a  SU l        U R                  R                  U R                  U5      nUc  M  U R                  (       a  M  X0R                  :w  d  M  U R                  R                  U5        M     g)zLDecode new tokens from a CB ``GenerationOutput`` and push text to the queue.Nr   TF)generated_tokensr   lenr   r   r   r   r   r   r   r   r   r   r   )r+   output
new_tokensr   r   s        r#   r   CBStreamer.put  s    ,,T^^-=>
V445"H"$$++H5<<')-&\\)).&&&++DOOXFD(>(>(>8||C[&&t, #r"   c                 :    U R                   R                  S5        g)zSignal end of stream.N)r   r   r   s    r#   r   CBStreamer.end  s    t$r"   c                 N    U R                   R                  U R                  5        g)zCancel the CB request.N)r   cancel_requestr   r   s    r#   r   CBStreamer.cancel  s     0 01r"   )r   r   r   r   r   r   r   r   r   r   r   r   r)   )r   r   rA   Nr   )r   r   r   r    r.   r/   r   r   r   r   r,   r   r   r   r!   r   r"   r#   r   r   a  si     $(1/1 1 *	1
 ''1 }}1 D[1B-"%2r"   r   seedc                 0    SSK nUR                  " U 5        g)z8Set the PyTorch random seed for reproducible generation.r   N)torchmanual_seed)r  r  s     r#   set_torch_seedr    s    	dr"   c                      SSK n U R                  R                  5       (       a  U R                  R                  5         gg)z+Empty the CUDA cache if a GPU is available.r   N)r  cudais_availableempty_cache)r  s    r#   reset_torch_cacher    s-    zz  

  !r"   c                   X    \ rS rSrSrS rS
S jrS\4S jrS\	R                  4S jr
S	rg)InferenceThreadi  zPersistent thread for ``model.generate()`` calls.

``torch.compile`` with CUDA graphs stores state in thread-local storage.
All inference must run on the same thread to avoid corrupted graph state.
c                     [        5       U l        [        R                  " U R                  SS9U l        U R
                  R                  5         g )NT)targetdaemon)r   r   r   Thread_run_threadstartr   s    r#   r,   InferenceThread.__init__  s3    "W ''tyyFr"   rA   Nc                 R    U R                   R                  5       u  pp4n U" U0 UD6nUb  UR                  UR                  U5        OUR                  U5         M[  ! [         a?  nUb!  UR                  UR
                  U5         S nAN0UR                  U5         S nANFS nAff = fr)   )r   r_   r   
set_result	Exceptionset_exception)r+   fnr   r   futurer   resultes           r#   r  InferenceThread._run  s    -1[[__->*Bfd
,T,V,#--f.?.?H%%f-   ,#--f.B.BAFF((++	,s#   'A 	A 
B&'B!B!!B&c                 V    [        5       nU R                  R                  XX4S45        U$ )ESubmit a callable to the inference thread. Returns a blocking Future.N)r   r   r   )r+   r  r   r   r  s        r#   submitInferenceThread.submit  s%    6489r"   c                     [         R                  " 5       nUR                  5       nU R                  R	                  XX5U45        U$ zOSubmit a callable to the inference thread. Returns an awaitable asyncio.Future.)r   get_running_loopcreate_futurer   r   )r+   r  r   r   r   r  s         r#   async_submitInferenceThread.async_submit  s:    '')##%6489r"   )r   r  r   )r   r   r   r    r.   r,   r  r   r#  r   r)  r!   r   r"   r#   r  r    s-    
,V 7>> r"   r  c                       \ rS rSrSrSS	 jr\ SSSS
SS\SSS\S\S-  S\	\
R                  S4   4S jj5       r\SSS
SS\SSS\S\	\\\\   4   4S j5       r\SS j5       rSrg)BaseGenerateManageri  u   Base class for generation managers.

Subclasses:
- :class:`GenerateManager` — sequential ``model.generate()`` on a persistent thread.
- :class:`CBGenerateManager` — continuous batching with paged attention.
r@   r   
gen_configr   rA   Nc                     g)z:Initialize continuous batching. No-op for non-CB managers.Nr   r+   r@   r-  s      r#   init_cbBaseGenerateManager.init_cb      r"   rW   (ProcessorMixin | PreTrainedTokenizerFastinputsr   r   zDirectStreamer | CBStreamerc                     g)a  Start streaming generation.

Args:
    model (`PreTrainedModel`): The loaded model.
    processor: The processor or tokenizer for decoding.
    inputs (`dict`): Tokenized inputs (tensors for sequential, lists for CB).
    gen_config (`GenerationConfig`): Generation parameters.
    request_id (`str`): Unique request identifier.
    tool_config (`dict`, *optional*): Tool call config from ``get_tool_call_config``.
        When set, tool call tokens (between stc/etc) are suppressed from output.

Returns:
    `tuple[asyncio.Queue, DirectStreamer | CBStreamer]`: A ``(queue, streamer)`` pair
    where *queue* yields ``str | _StreamError | None`` and *streamer* exposes
    ``.total_tokens`` and ``.cancel()``.
Nr   )r+   r@   rW   r4  r-  r   r   s          r#   generate_streaming&BaseGenerateManager.generate_streaming  r2  r"   c                    #    g7f)a  Run generation to completion.

Args:
    model (`PreTrainedModel`): The loaded model.
    processor: The processor or tokenizer for decoding.
    inputs (`dict`): Tokenized inputs (tensors for sequential, lists for CB).
    gen_config (`GenerationConfig`): Generation parameters.
    request_id (`str`): Unique request identifier.

Returns:
    `tuple[str, int, list[int]]`: ``(text, input_len, generated_ids)``.
Nr   )r+   r@   rW   r4  r-  r   s         r#   generate_non_streaming*BaseGenerateManager.generate_non_streaming  s     s   c                     g)z/Stop the generation manager and free resources.Nr   r   s    r#   stopBaseGenerateManager.stop  r2  r"   r   r@   r   r-  r   rA   Nr)   r   )r   r   r   r    r.   r0  r   r   r/   tupler   r   r6  r   re   r9  r<  r!   r   r"   r#   r,  r,    s    I  $(  > 	
 '  D[ 
w}};;	< 4   > 	
 '  
sCc"	# * > >r"   r,  c                       \ rS rSrSrS r SSSSSS	\S
SS\S\S-  S\\	R                  \4   4S jjrSSSSS	\S
SS\S\\\S4   4S jrS\S\4S jrS\S\	R                   4S jrSS jrSrg)GenerateManageri  zFSequential generation via ``model.generate()`` on a persistent thread.c                 "    [        5       U l        g r)   )r  r  r   s    r#   r,   GenerateManager.__init__!  s    &(r"   Nr@   r   rW   r3  r4  r-  r   r   r   rA   c                 "  ^^
^^ [         R                  " 5       m[         R                  " 5       m[        USU5      R                  n[        UTTUS9n0 UEXUS.Em
[        TS5      (       a  ST
S'   SU
UUU4S jjn	U R                  U	5        TU4$ )	zLStart streaming generation via ``model.generate()`` on the inference thread.rC   r   )streamergeneration_configrC   
has_talkerr   generation_modec            	        >  TR                   " S0 TD6  g ! [         a    TR                  TR                  S 5         g [         a8  n TR                  TR                  [        [        U 5      5      5         S n A g S n A ff = f)Nr   )generater1   r   r   r  r%   r/   )r  
gen_kwargsr   r@   r   s    r#   r  0GenerateManager.generate_streaming.<locals>._run7  sk    R,,' B))%*:*:DA R))%*:*:LQ<PQQRs    &A?	A?.A::A?r   )r   r'  r   rS   r   r   hasattrr#  )r+   r@   rW   r4  r-  r   r   rust_tokenizerrF  r  rL  r   r   s    `        @@@r#   r6  "GenerateManager.generate_streaming$  s     '')&}} KCNN!.$;WnnHdmn
5,'',2J()	R 	R 	Dhr"   r   c                    #    0 UEXBS.En[        US5      (       a  SUS'   U R                  " UR                  40 UD6I Sh  vN nUS   R                  S   nUSUS24   n	UR	                  U	S	S
9n
XU	4$  N37f)zNRun generation to completion via ``model.generate()`` on the inference thread.)rG  rC   rH  r   rI  N	input_idsr   r   Tr   )rN  r)  rK  shapedecode)r+   r@   rW   r4  r-  r   generate_kwargs	sequences	input_lenrf   r   s              r#   r9  &GenerateManager.generate_non_streamingB  s      ^V]*]5,''17O-.++ENNNoNN	;'--b1	!!YZ-04H--	 Os   ?A7A54A7r  c                 B    U R                   R                  " U/UQ70 UD6$ )r"  )r  r#  r+   r  r   r   s       r#   r#  GenerateManager.submitV  s!    ||""27777r"   c                 B    U R                   R                  " U/UQ70 UD6$ r&  )r  r)  r[  s       r#   r)  GenerateManager.async_submitZ  s!    ||((=d=f==r"   c                     g r)   r   r   s    r#   r<  GenerateManager.stop^  r   r"   )r  r)   r   )r   r   r   r    r.   r,   r   r/   r?  r   r   r   r6  r   r9  r   r   r#  r)  r<  r!   r   r"   r#   rA  rA    s    P) $(  > 	
 '  D[ 
w}}n,	-<. . >. 	.
 '. . 
sC'	(.(8 8v 8>x >W^^ >r"   rA  c                       \ rS rSrSrSSS jjrSS
 jrS	\4S jrS\	S	S4S jr
 SSSSSS\SSS\	S\S-  S	\\R                  \4   4S jjrSSSSS\SSS\	S	\\	\\\   4   4S jr\SS j5       rSS jrSrg)CBGenerateManagerib  aQ  Continuous batching generation via paged attention.

Translates between the handler's text-level asyncio.Queue and CB's
token-level interface. Per-request: ``max_new_tokens``, ``eos_token_id``.

The CB manager is initialized lazily on the first request via
:meth:`ensure_initialized`, using that request's ``gen_config`` for shared
sampling params (temperature, top_p, do_sample).

.. todo:: Remove :meth:`init_cb` when CB supports per-request
   generation config. At that point, ``gen_config`` can be passed directly
   to ``add_request`` and the CB manager no longer needs a shared config.
Nc                     S U l         Xl        g r)   r   
_cb_config)r+   	cb_configs     r#   r,   CBGenerateManager.__init__q  s    #r"   r@   r   r-  r   rA   c                     U R                   b  gUR                  X R                  S9U l         U R                   R                  5         g)aL  Initialize the CB manager on first call with the request's generation config.

.. todo:: Remove when CB supports per-request generation config.

Args:
    model (`PreTrainedModel`): The loaded model (must support ``init_continuous_batching``).
    gen_config (`GenerationConfig`): Generation config used for shared sampling params.
N)rG  continuous_batching_config)r   init_continuous_batchingre  r  r/  s      r#   r0  CBGenerateManager.init_cbu  s?     8811(__ 2 
 	r"   c                 \    U R                   SL =(       d    U R                   R                  SL $ )zJWhether the CB worker is healthy. ``True`` before ``init_cb()`` is called.N)r   fatal_errorr   s    r#   is_aliveCBGenerateManager.is_alive  s%    xx4?488#7#74#??r"   r   c                     U R                   b=  U R                   R                  b%  [        SU SU R                   R                   35      egg)u   Raise :class:`CBWorkerDeadError` if the CB worker has died.

Called at request entry to fail fast — submitting to a dead worker would otherwise
enqueue the request into a void where it never gets processed.
Nz,CB worker is dead and cannot accept request : )r   rm  r5   )r+   r   s     r#   _check_aliveCBGenerateManager._check_alive  sN     88DHH$8$8$D#>zl"TXXMaMaLbc  %Er"   rW   r3  r4  r   c           	        ^^ U R                   nUc  [        S5      eU R                  U5        [        R                  " 5       n[        R
                  " 5       mUS   n	UR                  U	USUR                  UR                  S9n[        USU5      R                  n
[        U R                   XZUTUS9mUU4S jnUR                  X[5        TT4$ )zFStart streaming CB generation. Registers a per-request output handler.3CB manager not initialized. Call `init_cb()` first.rR  T)r   	streamingmax_new_tokenseos_token_idrC   rE  c                 n  >  TR                  U 5        U R                  b5  TR                  [        U R                  5      5        TR	                  5         g U R                  5       (       a  TR	                  5         g g ! [         a-  nTR                  [        [        U5      5      5         S nAg S nAff = fr)   )r   errorr   r%   r   is_finishedr  r/   )r   r  rF  
text_queues     r#   
_on_output8CBGenerateManager.generate_streaming.<locals>._on_output  s    <V$ <<+)),v||*DELLN''))LLN * <%%l3q6&:;;<s   AA= %A= =
B4#B//B4)r   RuntimeErrorrr  r   r'  r   add_requestrw  rx  rS   r   r   register_result_handler)r+   r@   rW   r4  r-  r   r   cbr   rR  rO  r}  rF  r|  s               @@r#   r6  $CBGenerateManager.generate_streaming  s     XX:TUU*%'')$+MMO
;'	^^!%44#00 $ 

 !KCNNdhh
D*bmn	< 	""::8##r"   c                 @  ^#    U R                   nUc  [        S5      eU R                  U5        US   n[        U5      n[        R
                  " 5       n	U	R                  5       mU4S jn
UR                  XZ5        UR                  UUUR                  SUR                  S9  TI Sh  vN nUR                  bC  UR                  b  [        SU SUR                   35      e[        S	U SUR                   35      eUR                  nUR                  US
S9nXU4$  Nt7f)zcRun non-streaming CB generation. Registers a handler that resolves an asyncio.Future on completion.Nru  rR  c                 T   > TR                  5       (       d  TR                  U 5        g g r)   )doner  )r  r  s    r#   
_on_result<CBGenerateManager.generate_non_streaming.<locals>._on_result  s!    ;;==!!&) !r"   F)r   rw  rv  rx  zCB worker died during request rq  zCB generation failed for TrS  )r   r  rr  r   r   r'  r(  r  r  rw  rx  rz  rm  r5   r   rU  )r+   r@   rW   r4  r-  r   r  rR  rX  r   r  r  rf   r   r  s                 @r#   r9  (CBGenerateManager.generate_non_streaming  s0     XX:TUU*%;'		N	 '')##%	* 	""::
!%44#00 	 	
  <<#~~)'*HTVW]WcWcVd(eff!::,bWXX//4H-- s   B$D'D(A5Dc                 r    U R                   c  [        S5      eU R                   R                  R                  $ )z*The CB scheduler (for testing/monitoring).zCB manager not initialized.)r   r  batch_processor	schedulerr   s    r#   r  CBGenerateManager.scheduler  s0     88<==xx''111r"   c                 T    U R                   b  U R                   R                  SSS9  g g )NT   )blocktimeout)r   r<  r   s    r#   r<  CBGenerateManager.stop  s%    88HHMMaM0  r"   rd  r)   )rf  ContinuousBatchingConfig | Noner>  )rA   r   r   )r   r   r   r    r.   r,   r0  r   rn  r/   rr  r   r?  r   r   r   r6  r   re   r9  propertyr  r<  r!   r   r"   r#   rb  rb  b  s    $"@$ @	s 	t 	$ $(/$ /$ >/$ 	/$
 '/$ /$ D[/$ 
w}}j(	)/$b/. /. >/. 	/.
 '/. /. 
sCc"	#/.b 2 21r"   rb  c                       \ rS rSrSr   SS\S\SS4S jjrS	S
S\S\4S jrSS\	S\S\
4S jjrSS jrS\4S jrSrg)GenerationStatei  a  Shared generation state across all handlers.

Manages per-model :class:`GenerateManager` instances (each with its own
:class:`InferenceThread` so different models can run concurrently while
``torch.compile`` / CUDA graphs require same-model-same-thread) and a
single :class:`CBGenerateManager` for continuous batching.

Args:
    continuous_batching (`bool`, *optional*, defaults to `False`):
        Whether to use continuous batching with paged attention instead of
        sequential ``model.generate()`` calls.
Ncontinuous_batchingcompilerf  r  c                 R    Xl         X l        X0l        0 U l        S U l        S U l        g r)   )_continuous_batching_compilere  _generate_managers_cb_manager_cb_model_id)r+   r  r  rf  s       r#   r,   GenerationState.__init__  s,     %8!#>@59(,r"   r@   r   modalityrA   c                     U R                   (       d  g[        US5      =(       a    U[        R                  :H  nU(       d,  [        R                  UR                  R                   S35        U$ )a'  Check if continuous batching can be used for this model and modality.

Args:
    model (`PreTrainedModel`): The loaded model.
    modality (`Modality`): The detected model modality (LLM, VLM, etc.).

Returns:
    `bool`: ``True`` if CB is enabled and the model supports it, ``False`` otherwise.
Frj  zM does not support continuous batching. Falling back to sequential generation.)r  rN  r   r   loggerwarning_oncer   r   )r+   r@   r  cans       r#   use_continuous_batching'GenerationState.use_continuous_batching  s]     ((e78UX=U??++, -9 9 
r"   rm   use_cbc                 d   U(       au  U R                   U:w  a.  U R                  b!  U R                  R                  5         SU l        U R                  c  [        U R                  S9U l        Xl         U R                  $ XR
                  ;  a  [        5       U R
                  U'   U R
                  U   $ )a6  Return a per-model generation manager, lazily created on first request.

Args:
    model_id (`str`): The model ID in ``'model_id@revision'`` format.
    use_cb (`bool`): Whether to return a CB manager or a sequential one.

Returns:
    `BaseGenerateManager`: Either a `GenerateManager` or `CBGenerateManager`.
N)rf  )r  r  r<  rb  re  r  rA  )r+   rm   r  s      r#   get_managerGenerationState.get_manager2  s       H,##/$$))+'+D$'#4t#O $,!###2220?0AD##H-&&x00r"   c                 b    U R                   b"  U R                   R                  5         SU l         gg)z$Stop any active generation managers.N)r  r<  r   s    r#   shutdownGenerationState.shutdownI  s-    '!!##D (r"   c                 `    U R                   SL =(       d    U R                   R                  5       $ )zTWhether the CB worker is healthy. ``True`` if CB is disabled or not yet initialized.N)r  rn  r   s    r#   is_cb_aliveGenerationState.is_cb_aliveO  s'    4'F4+;+;+D+D+FFr"   )re  r  r  r  r  r  )FFNFr   )r   r   r   r    r.   r   r,   r   r  r/   r,  r  r  r  r!   r   r"   r#   r  r    s}     %*7;	-!- - 5	--> ( W[ (1C 1 1BU 1.$GT Gr"   r  c            	           \ rS rSr% SrSr\S-  \S'   \" 5       r	\\
   \S'   SSS\4S	 jrS
\SS4S jr\SSS\
4S j5       rS
\S\\
SS4   4S jr SS
\SSS\SS4S jjr\S\\   S\S\\   4S j5       rSrg)BaseHandleriT  a  Shared logic for chat completion and responses handlers.

Provides model resolution, generation config building, and SSE formatting.
Generation is delegated to the shared :class:`GenerationState`.

Args:
    model_manager (`ModelManager`):
        Handles model loading, caching, and lifecycle.
    generation_state (`GenerationState`):
        Shared state managing per-model generation managers.
N_valid_params_class_unused_fieldsmodel_managerr   generation_statec                     Xl         X l        g r)   )r  r  )r+   r  r  s      r#   r,   BaseHandler.__init__d  s    
 + 0r"   bodyrA   c                    SSK Jn  [        UR                  5       5      nU R                  b3  U[        U R                  S[        5       5      -
  nU(       a
  U" SSU 3S9eX0R                  -  nU(       a  [        R                  SU 35        gg)	zMValidate request fields against the handler's params class and unused fields.r   HTTPExceptionN__mutable_keys__i  z"Unexpected fields in the request: status_codedetailz,Ignoring unsupported fields in the request: )	fastapir  r   keysr  rS   r  r  r  )r+   r  r  
input_keys
unexpectedunuseds         r#   _validate_requestBaseHandler._validate_requestl  s    )%
##/#gd.F.FHZ\_\a&bbJ#>`ak`l<mnn111"Nvh WX r"   chunkzstr | pydantic.BaseModelc                     [        U [        5      (       a  U R                  S5      (       a  U $ SU  S3$ SU R                  SS9 S3$ )z;Format a pydantic model or string as an SSE ``data:`` line.zdata: z

T)exclude_none)r`   r/   
startswithmodel_dump_json)r  s    r#   chunk_to_sseBaseHandler.chunk_to_ssey  sS     eS!!!,,X665PfUG4<PP--4-@AFFr"   r   r3  c                    SSK Jn  U R                  R                  bh  UR	                  S5      nUb;  X0R                  R                  :w  a"  U" SSU R                  R                   SU S3S9eU R                  R                  US'   U R                  R                  US   5      nU R                  R                  U5      u  pVXEU4$ )	zVApply force_model, load model + processor.

Returns ``(model_id, model, processor)``.
r   r  r@   i  zServer is pinned to 'z'; requested 'z'.r  )r  r  r  force_modelr_   process_model_nameload_model_and_processor)r+   r  r  	requestedrm   r@   rW   s          r#   _resolve_modelBaseHandler._resolve_model  s    
 	*))5)I$6H6H6T6T)T# #3D4F4F4R4R3SSabkallno  !..::DM%%88gG--FFxP	))r"   model_generation_configr   r  c                 j   SSK Jn  UR                  S5      b   U" S0 [        R                  " US   5      D6nO:[
        R                  " U5      nUR                  b  UR                  S:  a  SUl        UR                  S5      b,  [        US   5      Ul	        [        US   5      S:X  a  SUl
        UR                  S5      b  [        US   5      Ul        UR                  S	5      b  [        US	   5        U R                  R                  (       a  UR                  c  S
Ul        U(       a  SUl        U$ )a  Build a GenerationConfig from shared params (temperature, top_p, seed, generation_config JSON).

Subclasses should call ``super()._build_generation_config(...)`` then apply
endpoint-specific params (``max_tokens``, ``max_output_tokens``, etc.).

Args:
    body (`dict`):
        The raw request body.
    model_generation_config (`GenerationConfig`):
        The model's default generation config (will be deep-copied).
    use_cb (`bool`, *optional*, defaults to `False`):
        Whether continuous batching is active. If ``True``, disables the model's
        internal KV cache (CB manages its own paged cache).

Returns:
    `GenerationConfig`: A new config with request-specific overrides applied.
r   )r   rG  i   temperatureg        Ftop_pr  staticr   )transformersr   r_   r:   loadscopydeepcopyrw  floatr  	do_sampler  r  r  r  cache_implementation	use_cache)r+   r  r  r  r   rG  s         r#   _build_generation_config$BaseHandler._build_generation_config  s   ( 	288'(4 0 Y4::dCV>W3X Y $.E F //7;L;[;[^b;b37!088M".,1$}2E,F)T-()S0.3!+88G(&+DM&:#88F'4<(   )).?.T.T.\5=2 */' ! r"   messagesr  c           	      v   / nU  GH/  nUS   / S.nSU;   a  US   US'   SU;   a  US   US'   SU;   a  / OUR                  S5      =(       d    / n[        U[        5      (       a  SUS./nU GH  nUS   nUS	;   a  US   R                  SUS   S.5        M+  US
;   a\  U[        R
                  [        R                  4;   a8  US   n[        U[        5      (       a  US   nUS   R                  SUS.5        M  US:X  af  U[        R                  :X  aR  US   n	[        U	[        5      (       a  U	R                  SS5      OSn
U	S   nUS   R                  SSU
 SU 3S.5        M  US:X  aD  U[        R
                  [        R                  4;   a   US   R                  SUS   S   S.5        GMC  US:X  d  GML  U[        R                  :X  d  GMc  US   R                  SUS   S   S.5        GM     U[        R                  :X  a  SR                  S US    5       5      US'   UR                  U5        GM2     U$ )a  Convert OpenAI-format messages to the format expected by HF processors.

All modalities extract text. VLM additionally handles ``image_url`` and ``video_url``.
MULTIMODAL handles all of the above plus ``input_audio`` and ``audio_url``.
For LLMs, the content parts are collapsed into a plain text string.

Args:
    messages (`list[dict]`): OpenAI-format chat messages.
    modality (`Modality`): The model modality (LLM, VLM, or MULTIMODAL).

Returns:
    `list[dict]`: Processor-compatible messages.
role)r  contentrH   tool_call_idr  r   )r;   r   r;   )r   
input_textoutput_text)	image_urlinput_imager  urlimage)r;   r  input_audioformatwavdataaudiozdata:audio/z;base64,	video_urlvideo	audio_url c              3   *   #    U  H	  oS    v   M     g7f)r   Nr   )rL   r   s     r#   rO   ABaseHandler.get_processor_inputs_from_messages.<locals>.<genexpr>  s     ,R@Q1vY@Qr   )
r_   r`   r/   r   r   r   r   r   r   join)r  r  processor_inputsmessagerg   raw_contentr  content_typer  r  fmt	audio_b64s               r#   "get_processor_inputs_from_messages.BaseHandler.get_processor_inputs_from_messages  sO    G%fo"=F w&'.|'<|$()0)@~& !- 7"gkk)>T>ZXZK+s++(.DE&&v#HH9%,,fgfo-VW!%AAhS[S_S_aiatatRuFu!+.C!#t,,!%j9%,,gc-JK!]2x8CVCV7V")-"8K>HVZ>[>[+//(E:afC +F 3I9%,,gTWSXX`aj`kFl-mn![0X(,,PXPcPcAd5d9%,,ggkFZ[`Fa-bc![0XATAT5T9%,,ggkFZ[`Fa-bc- '2 8<<'$'HH,Ry@Q,R$Ry!##F+W  X  r"   )r  r  r  )r   r   r   r    r.   r  r;   __annotations__r   r  r/   r  r,   r   r  staticmethodr  r?  r  r   r  re   r   r  r!   r   r"   r#   r  r  T  s    
 (,+"uNCH$1%1 *1Yd Yt Y G6 G3 G G*4 *E#7HJt2t,u *, W\0!0!3E0!OS0!	0!d < T$Z < 8 < X\]aXb <  < r"   r  r   )Dr.   r   r  enumr:   r   abcr   r   collections.abcr   concurrent.futuresr   r   r   typingr   transformers.utilsr	   pydantic
tokenizersr  r  r
   r   r   r   r   :transformers.generation.continuous_batching.continuous_apir   4transformers.generation.continuous_batching.requestsr   5transformers.generation.continuous_batching.schedulerr   r  r   
get_loggerr   r  X_REQUEST_IDEnumr   r%   r  r1   r  r5   rU   r   rY   rb   re   rh   rj   r/   r;   r   r   r   r   r  r  r  r,  rA  rb  r  r  r   r"   r#   <module>r     s        # $ %    &   eUO+ 
		H	% tyy  P9 P   ?&F;

 B+< B B8D T ".t .T
T@Q .*(
 (
VDx D3 D4 DNE EPA2 A2H  !& &R>># >>BA) AH^1+ ^1BNG NGbp  p r"   