Socket與WebSocket基礎
Socket 和 WebSocket 程式設計指南
目錄
Socket 基礎
什麼是 Socket?
Socket 是網路通訊的端點,它提供了程式和網路之間的介面。Socket 可以:
- 建立網路連接
- 發送和接收數據
- 管理連接狀態
Socket 類型
-
TCP Socket (SOCK_STREAM)
- 可靠的、順序的、雙向的通訊
- 需要建立連接
- 適合需要可靠傳輸的應用
-
UDP Socket (SOCK_DGRAM)
- 不可靠的、無序的通訊
- 不需要建立連接
- 適合需要快速傳輸的應用
Socket 地址家族
- AF_INET:IPv4
- AF_INET6:IPv6
- AF_UNIX:本機通訊
Socket 程式設計
TCP Server 範例
import socket
def create_tcp_server():
# 建立 Socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 綁定地址和端口
server.bind(('localhost', 8888))
# 開始監聽
server.listen(5)
print("伺服器開始監聽在 8888 端口")
while True:
# 接受連接
client, address = server.accept()
print(f"接受來自 {address} 的連接")
# 接收數據
data = client.recv(1024)
print(f"接收到數據: {data.decode()}")
# 發送回應
client.send(b"Server received your message")
# 關閉連接
client.close()
if __name__ == "__main__":
create_tcp_server()
TCP Client 範例
import socket
def create_tcp_client():
# 建立 Socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 連接伺服器
client.connect(('localhost', 8888))
# 發送數據
client.send(b"Hello, Server!")
# 接收回應
response = client.recv(1024)
print(f"伺服器回應: {response.decode()}")
# 關閉連接
client.close()
if __name__ == "__main__":
create_tcp_client()
UDP Server 範例
import socket
def create_udp_server():
# 建立 UDP Socket
server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 綁定地址
server.bind(('localhost', 8888))
print("UDP 伺服器開始監聽在 8888 端口")
while True:
# 接收數據
data, addr = server.recvfrom(1024)
print(f"從 {addr} 接收到: {data.decode()}")
# 發送回應
server.sendto(b"Message received", addr)
if __name__ == "__main__":
create_udp_server()
WebSocket 基礎
什麼是 WebSocket?
WebSocket 是一個在單個 TCP 連接上提供全雙工通訊的協議:
- 持久連接
- 即時數據傳輸
- 較低的延遲
- 支援瀏覽器原生 API
WebSocket 特點
-
建立連接
- 通過 HTTP 升級機制
- 使用 ws:// 或 wss:// 協議
-
數據幀格式
- 文本幀
- 二進制幀
- 控制幀
WebSocket vs HTTP
- WebSocket 是持久連接
- 更低的開銷
- 支援服務器推送
- 適合即時應用
WebSocket 程式設計
使用 Python websockets 庫
import asyncio
import websockets
async def echo_server(websocket, path):
try:
async for message in websocket:
print(f"收到消息: {message}")
await websocket.send(f"Echo: {message}")
except websockets.exceptions.ConnectionClosed:
print("客戶端斷開連接")
async def main():
async with websockets.serve(echo_server, "localhost", 8765):
await asyncio.Future() # 持續運行
if __name__ == "__main__":
asyncio.run(main())
HTML5 WebSocket 客戶端
// 建立 WebSocket 連接
const ws = new WebSocket('ws://localhost:8765');
// 連接建立時的處理
ws.onopen = function() {
console.log('連接已建立');
ws.send('Hello Server!');
};
// 接收消息的處理
ws.onmessage = function(event) {
console.log('收到消息:', event.data);
};
// 錯誤處理
ws.onerror = function(error) {
console.error('WebSocket 錯誤:', error);
};
// 連接關閉的處理
ws.onclose = function() {
console.log('連接已關閉');
};
應用案例
1. 即時聊天室
import asyncio
import websockets
import json
connected = set()
async def chat_server(websocket, path):
# 存儲新的連接
connected.add(websocket)
try:
async for message in websocket:
# 廣播消息給所有連接的客戶端
data = json.loads(message)
for conn in connected:
await conn.send(json.dumps({
'user': data['user'],
'message': data['message']
}))
finally:
# 移除斷開的連接
connected.remove(websocket)
async def main():
async with websockets.serve(chat_server, "localhost", 8765):
await asyncio.Future()
asyncio.run(main())
2. 即時數據監控
import asyncio
import websockets
import json
import random
async def sensor_data(websocket, path):
while True:
# 模擬傳感器數據
data = {
'temperature': random.uniform(20, 30),
'humidity': random.uniform(40, 60),
'timestamp': time.time()
}
await websocket.send(json.dumps(data))
await asyncio.sleep(1)
最佳實踐
1. 錯誤處理
async def handle_connection(websocket, path):
try:
async for message in websocket:
await process_message(message)
except websockets.exceptions.ConnectionClosed:
print("連接已關閉")
except Exception as e:
print(f"發生錯誤: {e}")
finally:
# 清理資源
await cleanup()
2. 心跳機制
async def heartbeat(websocket):
while True:
try:
await websocket.ping()
await asyncio.sleep(30)
except websockets.exceptions.ConnectionClosed:
break
3. 重連機制
function connect() {
const ws = new WebSocket('ws://localhost:8765');
ws.onclose = function() {
console.log('連接斷開,嘗試重連...');
setTimeout(connect, 1000);
};
return ws;
}
4. 安全性考慮
import ssl
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain('cert.pem', 'key.pem')
async def main():
async with websockets.serve(
handler,
"localhost",
8765,
ssl=ssl_context
):
await asyncio.Future()
效能優化
1. 連接池管理
class ConnectionPool:
def __init__(self, max_size=100):
self.pool = set()
self.max_size = max_size
def add(self, connection):
if len(self.pool) < self.max_size:
self.pool.add(connection)
return True
return False
2. 消息佇列
from asyncio import Queue
message_queue = Queue()
async def message_processor():
while True:
message = await message_queue.get()
await process_message(message)
message_queue.task_done()
3. 壓縮數據
import zlib
def compress_message(message):
return zlib.compress(message.encode())
def decompress_message(compressed):
return zlib.decompress(compressed).decode()
調試技巧
1. 記錄日誌
import logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s'
)
async def handle_message(websocket, message):
logging.debug(f"接收消息: {message}")
# 處理消息
logging.info(f"消息處理完成")
2. 監控連接狀態
class ConnectionMonitor:
def __init__(self):
self.connections = {}
def add_connection(self, websocket, info):
self.connections[websocket] = {
'connected_at': time.time(),
'info': info
}
def get_stats(self):
return {
'total_connections': len(self.connections),
'active_since': min(c['connected_at']
for c in self.connections.values())
}