#!/usr/bin/env python import asyncio import websockets import logging import json logging.basicConfig( format="%(asctime)s %(message)s", level=logging.INFO, ) peer_list = {} class LoggerAdapter(logging.LoggerAdapter): """Add connection ID and client IP address to websockets logs.""" def process(self, msg, kwargs): try: websocket = kwargs["extra"]["websocket"] except KeyError: return msg, kwargs rip = websocket.remote_address[0] try: xff = websocket.request_headers.get("X-Forwarded-For") except: xff = "None" return f"{websocket.id} {rip} {xff}", kwargs async def handler(websocket): peer_group_id = None try: while not websocket.closed: message = await websocket.recv() msg = json.loads(message) if "peer_group_id" in msg: peer_group_id = msg["peer_group_id"] if peer_group_id not in peer_list: peer_list[peer_group_id] = set() # Create new set for all peers in peer_group_id peer_list[peer_group_id].add(websocket) # Add peer's socket to peer_group for peer in peer_list[peer_group_id]: if peer != websocket: await peer.send(message) except websockets.exceptions.ConnectionClosed as e: pass finally: peer_list[peer_group_id].remove(websocket) if len(peer_list[peer_group_id]) < 1: peer_list.pop(peer_group_id) async def main(): async with websockets.serve( handler, "", 8001, ping_timeout=30, logger=LoggerAdapter(logging.getLogger("websockets.server"), None), ): await asyncio.Future() # run forever if __name__ == "__main__": asyncio.run(main())