Skip to content

chore: sync sdk code with DeepLearning repo #119

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
May 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion assemblyai/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.40.2"
__version__ = "0.41.0b1"
27 changes: 27 additions & 0 deletions assemblyai/streaming/v3/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from .client import StreamingClient
from .models import (
BeginEvent,
EventMessage,
StreamingClientOptions,
StreamingError,
StreamingEvents,
StreamingParameters,
StreamingSessionParameters,
TerminationEvent,
TurnEvent,
Word,
)

__all__ = [
"BeginEvent",
"EventMessage",
"StreamingClient",
"StreamingClientOptions",
"StreamingError",
"StreamingEvents",
"StreamingParameters",
"StreamingSessionParameters",
"TerminationEvent",
"TurnEvent",
"Word",
]
255 changes: 255 additions & 0 deletions assemblyai/streaming/v3/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
import json
import logging
import queue
import sys
import threading
from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, Union
from urllib.parse import urlencode

import httpx
import websockets
from pydantic import BaseModel
from websockets.sync.client import connect as websocket_connect

from assemblyai import __version__

from .models import (
BeginEvent,
ErrorEvent,
EventMessage,
OperationMessage,
StreamingClientOptions,
StreamingError,
StreamingErrorCodes,
StreamingEvents,
StreamingParameters,
StreamingSessionParameters,
TerminateSession,
TerminationEvent,
TurnEvent,
UpdateConfiguration,
)

logger = logging.getLogger(__name__)


def _user_agent() -> str:
vi = sys.version_info
python_version = f"{vi.major}.{vi.minor}.{vi.micro}"
return (
f"AssemblyAI/1.0 (sdk=Python/{__version__} runtime_env=Python/{python_version})"
)


class StreamingClient:
def __init__(self, options: StreamingClientOptions):
self._options = options

self._handlers: Dict[StreamingEvents, List[Callable]] = {}

for event in StreamingEvents.__members__.values():
self._handlers[event] = []

self._write_queue: queue.Queue[OperationMessage] = queue.Queue()
self._write_thread = threading.Thread(target=self._write_message)
self._read_thread = threading.Thread(target=self._read_message)
self._stop_event = threading.Event()

def connect(self, params: StreamingParameters) -> None:
params_dict = params.model_dump(exclude_none=True)
params_encoded = urlencode(params_dict)

uri = f"wss://{self._options.api_host}/v3/ws?{params_encoded}"
headers = {
"Authorization": self._options.api_key,
"User-Agent": _user_agent(),
"AssemblyAI-Version": "2025-05-12",
}

try:
self._websocket = websocket_connect(
uri,
additional_headers=headers,
open_timeout=15,
)
except websockets.exceptions.ConnectionClosed as exc:
self._handle_error(exc)
return

self._write_thread.start()
self._read_thread.start()

logger.debug("Connected to WebSocket server")

def disconnect(self, terminate: bool = False) -> None:
if terminate and not self._stop_event.is_set():
self._write_queue.put(TerminateSession())

try:
self._read_thread.join()
self._write_thread.join()

if self._websocket:
self._websocket.close()
except Exception:
pass

def stream(
self, data: Union[bytes, Generator[bytes, None, None], Iterable[bytes]]
) -> None:
if isinstance(data, bytes):
self._write_queue.put(data)
return

for chunk in data:
self._write_queue.put(chunk)

def set_params(self, params: StreamingSessionParameters):
message = UpdateConfiguration(**params.model_dump())
self._write_queue.put(message)

def on(self, event: StreamingEvents, handler: Callable) -> None:
if event in StreamingEvents.__members__.values() and callable(handler):
self._handlers[event].append(handler)

def _write_message(self) -> None:
while not self._stop_event.is_set():
if not self._websocket:
raise ValueError("Not connected to the WebSocket server")

try:
data = self._write_queue.get(timeout=1)
except queue.Empty:
continue

try:
if isinstance(data, bytes):
self._websocket.send(data)
elif isinstance(data, BaseModel):
message = data.model_dump_json(exclude_none=True)
self._websocket.send(message)
else:
raise ValueError(f"Attempted to send invalid message: {type(data)}")
except websockets.exceptions.ConnectionClosed as exc:
self._handle_error(exc)
return

def _read_message(self) -> None:
while not self._stop_event.is_set():
if not self._websocket:
raise ValueError("Not connected to the WebSocket server")

try:
message_data = self._websocket.recv(timeout=1)
except TimeoutError:
continue
except websockets.exceptions.ConnectionClosed as exc:
self._handle_error(exc)
return

try:
message_json = json.loads(message_data)
except json.JSONDecodeError as exc:
logger.warning(f"Failed to decode message: {exc}")
continue

message = self._parse_message(message_json)

if isinstance(message, ErrorEvent):
self._handle_error(message)
elif message:
self._handle_message(message)
else:
logger.warning(f"Unsupported event type: {message_json['type']}")

def _handle_message(self, message: EventMessage) -> None:
if isinstance(message, TerminationEvent):
self._stop_event.set()

event_type = StreamingEvents[message.type]

for handler in self._handlers[event_type]:
handler(self, message)

def _parse_message(self, data: Dict[str, Any]) -> Optional[EventMessage]:
if "type" in data:
message_type = data.get("type")

event_type = self._parse_event_type(message_type)

if event_type == StreamingEvents.Begin:
return BeginEvent.model_validate(data)
elif event_type == StreamingEvents.Termination:
return TerminationEvent.model_validate(data)
elif event_type == StreamingEvents.Turn:
return TurnEvent.model_validate(data)
else:
return None
elif "error" in data:
return ErrorEvent.model_validate(data)

return None

@staticmethod
def _parse_event_type(message_type: Optional[Any]) -> Optional[StreamingEvents]:
if not isinstance(message_type, str):
return None

try:
return StreamingEvents[message_type]
except KeyError:
return None

def _handle_error(
self,
error: Union[
ErrorEvent,
websockets.exceptions.ConnectionClosed,
],
):
parsed_error = self._parse_error(error)

for handler in self._handlers[StreamingEvents.Error]:
handler(self, parsed_error)

self.disconnect()

def _parse_error(
self,
error: Union[
ErrorEvent,
websockets.exceptions.ConnectionClosed,
],
) -> StreamingError:
if isinstance(error, ErrorEvent):
return StreamingError(
message=error.error,
)
elif isinstance(error, websockets.exceptions.ConnectionClosed):
if (
error.code >= 4000
and error.code <= 4999
and error.code in StreamingErrorCodes
):
error_message = StreamingErrorCodes[error.code]
else:
error_message = error.reason

if error.code != 1000:
return StreamingError(message=error_message, code=error.code)

return StreamingError(
message=f"Unknown error: {error}",
)


class HTTPClient:
def __init__(self, options: StreamingClientOptions):
headers = {
"Authorization": options.api_key,
"User-Agent": _user_agent(),
}

base_url = f"https://{options.api_host}"

self._http_client = httpx.Client(base_url=base_url, headers=headers, timeout=30)
Loading
Loading