Skip to content

Adding callback patterns for generator client #993

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 25, 2025

Conversation

happyandslow
Copy link
Collaborator

Pull Request Description

This PR removes await from worker thread for async IO.

Related Issues

#903

@happyandslow happyandslow marked this pull request as draft April 22, 2025 21:57
@happyandslow happyandslow force-pushed the lexu/client-callback-pattern branch from 624af76 to d095c3c Compare May 2, 2025 00:39
@happyandslow happyandslow force-pushed the lexu/client-callback-pattern branch 2 times, most recently from cb6f060 to aa9776b Compare June 2, 2025 23:17
@happyandslow happyandslow force-pushed the lexu/client-callback-pattern branch from aa9776b to d1d3cc9 Compare June 9, 2025 21:32
@happyandslow happyandslow force-pushed the lexu/client-callback-pattern branch 2 times, most recently from 56c9d16 to 835d078 Compare June 24, 2025 00:30
@Jeffwan
Copy link
Collaborator

Jeffwan commented Jun 24, 2025

@happyandslow if this is ready for review, you can change to ready status.
image

@happyandslow
Copy link
Collaborator Author

The current implementation of client should be able to sustain QPS beyond 1000. Here is a simple benchmark to test its performance (running on my local macbook, Apple M3 Pro processor) roughly. Here is how to reproduce the benchmark (I'm using batch mode):

  1. Setting up mock app environment using instruction here
  2. Generating workload
    python benchmark.py --stage workload --config config.yaml --override duration_ms=300000 --override prompt_type="synthetic_shared"
  3. Running a mock testing
for SCALE in 1 0.1 0.01 0.001 0.0001 
do
    OUTPUT_DIR="./mock-${SCALE}"
    mkdir -p ${OUTPUT_DIR}
    python benchmark.py --stage client --config config.yaml  --override time_scale=$SCALE --override endpoint="http://localhost:8000" --override target_model="llama2-7b" --override api_key="test-key-1234567890" --override client_output=${OUTPUT_DIR} --override streaming_enabled=false > ${OUTPUT_DIR}/client.log 2>&1
    python benchmark.py --stage analysis --config config.yaml --override client_output=${OUTPUT_DIR} > ${OUTPUT_DIR}/analysis.log 2>&1
done


for SCALE in 1 0.1 0.01 0.001 0.0001 
do
    OUTPUT_DIR="./mock-${SCALE}"
    python extract_waiting_times_simple.py $OUTPUT_DIR/client.log --list  
done

You could see most statistics from ${OUTPUT_DIR}. The ${OUTPUT_DIR}/client.log contains all delays waited by the client thread before dispatching a message -- a negative delay means that client isn't able to dispatch request in time. The example workload that we use does not contains session information (and the mock app should minimize the overhead of data transfer/IO as well as engine compute). The delay being collected while scaling time_scale I observed were:

Analyzing log file: ./mock-1/client.log
Found 286 waiting time entries
============================================================
WAITING TIME DISTRIBUTION ANALYSIS
============================================================
Total requests analyzed: 286

Basic Statistics:
  Mean:     +149.9653 seconds
  Median:   +142.9559 seconds
  Std Dev:  89.5933 seconds
  Min:      +1.4964 seconds
  Max:      +299.8116 seconds

Percentiles:
  25th:     +70.7576 seconds
  50th:     +142.9559 seconds
  75th:     +233.1188 seconds
  90th:     +273.7975 seconds
  95th:     +288.7080 seconds
  99th:     +297.9426 seconds
============================================================
Analyzing log file: ./mock-0.1/client.log
Found 286 waiting time entries
============================================================
WAITING TIME DISTRIBUTION ANALYSIS
============================================================
Total requests analyzed: 286

Basic Statistics:
  Mean:     +14.9879 seconds
  Median:   +14.2869 seconds
  Std Dev:  8.9581 seconds
  Min:      +0.1432 seconds
  Max:      +29.9707 seconds

Percentiles:
  25th:     +7.0680 seconds
  50th:     +14.2869 seconds
  75th:     +23.3023 seconds
  90th:     +27.3692 seconds
  95th:     +28.8602 seconds
  99th:     +29.7834 seconds
============================================================
Analyzing log file: ./mock-0.01/client.log
Found 286 waiting time entries
============================================================
WAITING TIME DISTRIBUTION ANALYSIS
============================================================
Total requests analyzed: 286

Basic Statistics:
  Mean:     +1.4904 seconds
  Median:   +1.4202 seconds
  Std Dev:  0.8948 seconds
  Min:      +0.0083 seconds
  Max:      +2.9872 seconds

Percentiles:
  25th:     +0.6995 seconds
  50th:     +1.4202 seconds
  75th:     +2.3210 seconds
  90th:     +2.7271 seconds
  95th:     +2.8759 seconds
  99th:     +2.9679 seconds
============================================================
Analyzing log file: ./mock-0.001/client.log
Found 286 waiting time entries
============================================================
WAITING TIME DISTRIBUTION ANALYSIS
============================================================
Total requests analyzed: 286
Negative waiting times: 92 (32.2%)

Basic Statistics:
  Mean:     +0.0679 seconds
  Median:   +0.0481 seconds
  Std Dev:  0.0730 seconds
  Min:      -0.0130 seconds
  Max:      +0.2027 seconds

Percentiles:
  25th:     -0.0057 seconds
  50th:     +0.0481 seconds
  75th:     +0.1373 seconds
  90th:     +0.1775 seconds
  95th:     +0.1919 seconds
  99th:     +0.2009 seconds
============================================================
Analyzing log file: ./mock-0.0001/client.log
Found 286 waiting time entries
============================================================
WAITING TIME DISTRIBUTION ANALYSIS
============================================================
Total requests analyzed: 286
Negative waiting times: 286 (100.0%)

Basic Statistics:
  Mean:     -0.1333 seconds
  Median:   -0.1201 seconds
  Std Dev:  0.0808 seconds
  Min:      -0.2780 seconds
  Max:      -0.0084 seconds

Percentiles:
  25th:     -0.2249 seconds
  50th:     -0.1201 seconds
  75th:     -0.0662 seconds
  90th:     -0.0336 seconds
  95th:     -0.0233 seconds
  99th:     -0.0144 seconds
============================================================

The delays are mostly still positive up to time_scale being set to 0.001. The original workload we use has target QPS of 1. The client should be OK up to target QPS of 1/0.001 = 1000.

Attaching analysis script included

#!/usr/bin/env python3
"""
Simple script to extract waiting time distribution from client logs.
Extracts waiting times from log entries like:
"WARNING:root:send_request_batch: Prepare to launch task after 51.39828181266785 target_time ..."
"WARNING:root:send_request_batch: Prepare to launch task after -2.5 target_time ..."
"""

import re
import argparse
import json
from pathlib import Path
from statistics import mean, median, stdev
import math

def extract_waiting_times(log_file_path):
    """
    Extract waiting times from log file.
    
    Args:
        log_file_path (str): Path to the log file
        
    Returns:
        list: List of waiting times (float values, can be negative)
    """
    waiting_times = []
    
    # Regex pattern to match the waiting time (including negative values)
    # Matches: "Prepare to launch task after <number> target_time"
    # Updated to handle negative numbers: -?\d+\.?\d*
    pattern = r'Prepare to launch task after (-?[\d.]+) target_time'
    
    with open(log_file_path, 'r') as f:
        for line_num, line in enumerate(f, 1):
            match = re.search(pattern, line)
            if match:
                try:
                    waiting_time = float(match.group(1))
                    waiting_times.append(waiting_time)
                except ValueError as e:
                    print(f"Warning: Could not parse waiting time on line {line_num}: {match.group(1)}")
                    continue
    
    return waiting_times

def calculate_percentile(data, percentile):
    """
    Calculate percentile without numpy dependency.
    
    Args:
        data (list): List of numbers
        percentile (float): Percentile to calculate (0-100)
        
    Returns:
        float: The percentile value
    """
    if not data:
        return None
    
    sorted_data = sorted(data)
    index = (percentile / 100) * (len(sorted_data) - 1)
    
    if index.is_integer():
        return sorted_data[int(index)]
    else:
        lower_index = int(index)
        upper_index = lower_index + 1
        weight = index - lower_index
        return sorted_data[lower_index] * (1 - weight) + sorted_data[upper_index] * weight

def analyze_distribution(waiting_times):
    """
    Analyze the distribution of waiting times.
    
    Args:
        waiting_times (list): List of waiting times
        
    Returns:
        dict: Statistics about the distribution
    """
    if not waiting_times:
        return {}
    
    stats = {
        'waiting_times': waiting_times,  # Include the raw data for analysis
        'count': len(waiting_times),
        'mean': mean(waiting_times),
        'median': median(waiting_times),
        'std': stdev(waiting_times) if len(waiting_times) > 1 else 0,
        'min': min(waiting_times),
        'max': max(waiting_times),
        'percentiles': {
            '25th': calculate_percentile(waiting_times, 25),
            '50th': calculate_percentile(waiting_times, 50),
            '75th': calculate_percentile(waiting_times, 75),
            '90th': calculate_percentile(waiting_times, 90),
            '95th': calculate_percentile(waiting_times, 95),
            '99th': calculate_percentile(waiting_times, 99)
        }
    }
    
    return stats

def print_statistics(stats):
    """
    Print formatted statistics.
    
    Args:
        stats (dict): Statistics dictionary
    """
    if not stats:
        print("No statistics available")
        return
    
    # Count negative values
    negative_count = sum(1 for time in stats.get('waiting_times', []) if time < 0)
    total_count = stats['count']
    
    print("=" * 60)
    print("WAITING TIME DISTRIBUTION ANALYSIS")
    print("=" * 60)
    print(f"Total requests analyzed: {total_count}")
    if negative_count > 0:
        print(f"Negative waiting times: {negative_count} ({negative_count/total_count*100:.1f}%)")
    print()
    print("Basic Statistics:")
    print(f"  Mean:     {stats['mean']:+.4f} seconds")
    print(f"  Median:   {stats['median']:+.4f} seconds")
    print(f"  Std Dev:  {stats['std']:.4f} seconds")
    print(f"  Min:      {stats['min']:+.4f} seconds")
    print(f"  Max:      {stats['max']:+.4f} seconds")
    print()
    print("Percentiles:")
    print(f"  25th:     {stats['percentiles']['25th']:+.4f} seconds")
    print(f"  50th:     {stats['percentiles']['50th']:+.4f} seconds")
    print(f"  75th:     {stats['percentiles']['75th']:+.4f} seconds")
    print(f"  90th:     {stats['percentiles']['90th']:+.4f} seconds")
    print(f"  95th:     {stats['percentiles']['95th']:+.4f} seconds")
    print(f"  99th:     {stats['percentiles']['99th']:+.4f} seconds")
    print("=" * 60)

def save_results(waiting_times, stats, output_file):
    """
    Save results to a JSON file.
    
    Args:
        waiting_times (list): List of waiting times
        stats (dict): Statistics dictionary
        output_file (str): Output file path
    """
    results = {
        'waiting_times': waiting_times,
        'statistics': stats
    }
    
    with open(output_file, 'w') as f:
        json.dump(results, f, indent=2)
    
    print(f"Results saved to {output_file}")

def main():
    parser = argparse.ArgumentParser(description='Extract and analyze waiting time distribution from client logs')
    parser.add_argument('log_file', help='Path to the client log file')
    parser.add_argument('--output', '-o', help='Output file for results (JSON)')
    parser.add_argument('--list', '-l', action='store_true', help='List all waiting times')
    
    args = parser.parse_args()
    
    # Check if log file exists
    if not Path(args.log_file).exists():
        print(f"Error: Log file '{args.log_file}' not found")
        return 1
    
    print(f"Analyzing log file: {args.log_file}")
    
    # Extract waiting times
    waiting_times = extract_waiting_times(args.log_file)
    
    if not waiting_times:
        print("No waiting times found in the log file")
        return 1
    
    print(f"Found {len(waiting_times)} waiting time entries")
    
    # Analyze distribution
    stats = analyze_distribution(waiting_times)
    
    # Print statistics
    print_statistics(stats)
    
    # Save results
    if args.output:
        save_results(waiting_times, stats, args.output)
    
    return 0

if __name__ == "__main__":
    exit(main()) 

@happyandslow happyandslow marked this pull request as ready for review June 24, 2025 23:50
@Jeffwan Jeffwan force-pushed the lexu/client-callback-pattern branch from 63f36dd to 63c120a Compare June 25, 2025 00:03
@Jeffwan Jeffwan merged commit 1b0ec4c into vllm-project:main Jun 25, 2025
2 checks passed
ModiCodeCraftsman pushed a commit to ModiCodeCraftsman/aibrix that referenced this pull request Jun 25, 2025
* adding futures wip
* merge fix
* update client async function
* roll back to async io design
* Remove client pool from parameters
* analysis script update and timing bug fix

---------

Signed-off-by: Le Xu <[email protected]>
Co-authored-by: Le Xu <[email protected]>
Signed-off-by: Modi Tamam <[email protected]>
ModiCodeCraftsman pushed a commit to ModiCodeCraftsman/aibrix that referenced this pull request Jun 25, 2025
* adding futures wip
* merge fix
* update client async function
* roll back to async io design
* Remove client pool from parameters
* analysis script update and timing bug fix

---------

Signed-off-by: Le Xu <[email protected]>
Co-authored-by: Le Xu <[email protected]>
Signed-off-by: Modi Tamam <[email protected]>
Yaegaki1Erika pushed a commit to Yaegaki1Erika/aibrix that referenced this pull request Jul 23, 2025
* adding futures wip
* merge fix
* update client async function
* roll back to async io design
* Remove client pool from parameters
* analysis script update and timing bug fix

---------

Signed-off-by: Le Xu <[email protected]>
Co-authored-by: Le Xu <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants