Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions livechat/utils/ws_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import ssl
import threading
from time import sleep
from typing import List, NoReturn, Union
from typing import List, Union

from loguru import logger
from websocket import WebSocketApp, WebSocketConnectionClosedException
Expand All @@ -19,7 +19,8 @@

def on_message(ws_client: WebSocketApp, message: str):
''' Custom WebSocketApp handler that inserts new messages in front of `self.messages` list. '''
ws_client.messages.insert(0, json.loads(message))
with ws_client._messages_lock:
ws_client.messages.insert(0, json.loads(message))


def on_close(ws_client: WebSocketApp, close_status_code: int, close_msg: str):
Expand All @@ -39,6 +40,7 @@ class WebsocketClient(WebSocketApp):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.messages: List[dict] = []
self._messages_lock = threading.Lock()
self.on_message = on_message
self.on_close = on_close
self.on_error = on_error
Expand All @@ -50,7 +52,7 @@ def open(self,
ping_interval: Union[float, int] = 5,
ws_conn_timeout: Union[float, int] = 10,
keep_alive: bool = True,
response_timeout: Union[float, int] = 3) -> NoReturn:
response_timeout: Union[float, int] = 3) -> None:
''' Opens websocket connection and keep running forever.
Args:
origin (dict): Specifies origin while creating websocket connection.
Expand All @@ -77,9 +79,10 @@ def open(self,
'ping_interval': ping_interval,
}
if keep_alive:
ping_thread = threading.Thread(target=self.run_forever,
kwargs=run_forever_kwargs)
ping_thread.start()
ws_thread = threading.Thread(target=self.run_forever,
kwargs=run_forever_kwargs,
daemon=True)
ws_thread.start()
self._wait_till_sock_connected(ws_conn_timeout)
return
self.run_forever(**run_forever_kwargs)
Expand Down Expand Up @@ -107,7 +110,9 @@ def send(self, request: dict, opcode=ABNF.OPCODE_TEXT) -> dict:

def await_message(stop_event: threading.Event) -> dict:
while not stop_event.is_set():
for item in self.messages:
with self._messages_lock:
messages_snapshot = self.messages.copy()
for item in messages_snapshot:
if item.get('request_id') == request_id and item.get(
'type') == 'response':
return item
Expand All @@ -131,7 +136,7 @@ def await_message(stop_event: threading.Event) -> dict:
return RtmResponse(response)

def _wait_till_sock_connected(self,
timeout: Union[float, int] = 10) -> NoReturn:
timeout: Union[float, int] = 10) -> None:
''' Polls until `self.sock` is connected.
Args:
timeout (float): timeout value in seconds, default 10. '''
Expand Down