Golang-websocket 案例學習
演示websocket在應用上的使用
golangWebsocket
處理html靜態檔案和websocket路由
package main
import (
"log"
"net/http"
)
func main() {
setupApi()
log.Fatalln(http.ListenAndServe(":8082", nil))
}
func setupApi() {
manger := Manager{}
http.Handle("/", http.FileServer(http.Dir("./front"))) //加載前端
http.HandleFunc("/ws", manger.serveWs)
}
//go run *.go 運行所有文件
新增一個簡單的ws服務
// http status 101 代表正在切換協議
package main
// manager.go // 管理websocket
import (
"github.com/gorilla/websocket"
"log"
"net/http"
)
// 管理websocket
var (
websocketUpgrade = websocket.Upgrader{
//確保用戶不會發送巨大封包
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
)
type Manager struct {
}
// 工廠模式
func NewManager() *Manager {
return &Manager{}
}
func (m *Manager) serveWs(w http.ResponseWriter, r *http.Request) {
log.Println("new connection")
//upgrade regular http connection into websocket
conn, err := websocketUpgrade.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
conn.Close()
}
新增基本客戶端
package main
import "github.com/gorilla/websocket"
//客戶端列表
type ClientList map[*Client]bool
//處理所有單用戶相關的內容
type Client struct {
connection *websocket.Conn
manager *Manager
}
func NewClient(conn *websocket.Conn,manager *Manager) *Client {
return &Client{
connection: conn,
//之所以會使用manager是因為會將一些事情引導到manager進行處理,例如像其他用戶廣播
manager: manager,
}
}
修改Manager讓Manager能夠維護Client
type Manager struct {
clients ClientList
//會有很多人同時連接到API,使用互斥鎖保護
sync.RWMutex
}
新增和移除
func (m *Manager) serveWs(w http.ResponseWriter, r *http.Request) {
log.Println("new connection")
//upgrade regular http connection into websocket
conn, err := websocketUpgrade.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
client := NewClient(conn,m)
m.addClient(client)
//conn.Close()
}
//向管理起添加或刪除客戶端
func (m *Manager)addClient(client *Client) {
m.Lock()
//鎖上後當兩個人同時連接就不會同時修該Client list,Client list本質為map
defer m.Unlock()
m.clients[client] = true
}
func (m *Manager) removeClient(client *Client){
m.Lock()
defer m.Unlock()
if _,ok := m.clients[client]; ok{
client.connection.Close()
delete(m.clients, client)
}
}
讀取訊息
package main
import (
"github.com/gorilla/websocket"
"log"
)
// 客戶端列表,上線狀態
type ClientList map[*Client]bool
// 處理所有單用戶相關的內容
type Client struct {
connection *websocket.Conn
manager *Manager
}
func NewClient(conn *websocket.Conn, manager *Manager) *Client {
return &Client{
connection: conn,
//之所以會使用manager是因為會將一些事情引導到manager進行處理,例如像其他用戶廣播
manager: manager,
}
}
// 使用一個無緩衝的通道來防止連接同時獲得過多的請求
func (c *Client) readMessages() {
//這邊使用defer是因為跳出for迴圈後再執行
defer func() {
//cleanup connection
c.manager.removeClient(c)
}()
for {
//payload為負載
messageType, payload, err := c.connection.ReadMessage()
//messageType在RFC中定義有幾種不同的消息類型讓你對數據,二進制進行ping/pong
if err != nil {
//連接意外關閉返回錯誤
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
//檢查異常是因為不希望正常斷開的時候也被當成錯誤紀錄
log.Printf("error reading message %v", err)
break
}
}
log.Println(messageType)
log.Println(payload)
}
}
function sendMessage(){
var newMessage = document.getElementById("message");
if(newMessage !== null ){
// console.log(newMessage);
conn.send(newMessage.value);
}
return false;
}
使用Event判斷消息類型並取代原先的發送和接受
<script>
let selectedChat = "general";
class Event {
//更好的控制用戶發送的訊息
constructor(type ,payload) {
this.type = type;
this.payload = payload;
}
}
function routeEvent(event){
if(event.type === undefined){
alert("no type field in the event");
}
switch (event.type){
case "new_message":
console.log("new Message");
break;
default:
alert("not supported this type");
break;
}
}
function sendEvent(eventName,payload){
const event = new Event(eventName,payload);
conn.send(JSON.stringify(event))
}
function changeChatRoom (){
let newChat = document.getElementById("chatroom");
if(newChat !== null && newChat.value !== selectedChat){
console.log(newChat);
}
return false;
}
function sendMessage(){
let newMessage = document.getElementById("message");
if(newMessage !== null ){
// console.log(newMessage);
// conn.send(newMessage.value);
sendEvent("send_message", newMessage)
}
return false;
}
window.onload = function(){
document.getElementById("chatroom-selection").onsubmit = changeChatRoom;
document.getElementById("chatroom-message").onsubmit = sendMessage;
if(window["WebSocket"]){
console.log("support websocket")
//connect to websocket
conn = new WebSocket("ws://"+document.location.host+"/ws")
conn.onmessage = function(e){
// console.log(e)
const eventData = JSON.parse(e.data);
const event = Object.assign(new Event, eventData);
routeEvent(event)
}
}else{
alert("not support websocket")
}
}
</script>
// event.go
package main
import "encoding/json"
type Event struct {
Type string `json:"type"`
Payload json.RawMessage `json:"payload"`
//使用RawMessage(原始格式)是因為我們希望使用者可以發送任何類型
}
type EventHandler func(event Event, c *Client) error
const (
EventSendMessage = "send_message"
)
type SendMessageEvent struct {
Message string `json:"message"`
From string `json:"from"`
}
//將這些儲存在管理器,確保管理器方便處理
type Manager struct {
clients ClientList
//會有很多人同時連接到API,使用互斥鎖保護
sync.RWMutex
//將type當作key並允許我們獲取事件處理程序
handlers map[string]EventHandler
}
// 工廠模式
func NewManager() *Manager {
m := &Manager{
clients: make(ClientList),
handlers: make(map[string]EventHandler),
}
m.setupEventHandlers()
return m
}
func (m *Manager) setupEventHandlers() {
m.handlers[EventSendMessage] = SendMessage
}
func SendMessage(event Event, c *Client) error {
fmt.Println(event)
return nil
}
func (m *Manager) routeEvent(event Event, c *Client) error {
//檢查事件類型是否是處理程序的一部分,處理程序是一個使用事件類型作為key的map
//因此每當我們收到類型設置為發送消息,都會觸發發送消息
if handler, ok := m.handlers[event.Type]; ok {
if err := handler(event, c); err != nil {
return err
}
return nil
} else {
return errors.New("there is no such event type")
}
}
func (c *Client) readMessages() {
//這邊使用defer是因為跳出for迴圈後再執行
defer func() {
//cleanup connection from client List,幫助我們清理未使用的客戶端
c.manager.removeClient(c)
}()
for {
//payload為負載,類行為byte
_, payload, err := c.connection.ReadMessage()
//messageType在RFC中定義有幾種不同的消息類型讓你對數據,二進制進行ping/pong
if err != nil {
//連接意外關閉返回錯誤
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
//檢查異常是因為不希望正常斷開的時候也被當成錯誤紀錄
log.Printf("error reading message %v", err)
break
}
}
//測試寫入egress,每read一個message都會發送給其他所有客戶
//for wsclient := range c.manager.clients {
// wsclient.egress <- payload
//}
//
//log.Println(messageType)
//log.Println(string(payload)) ->使用Event取代
var request Event
if err := json.Unmarshal(payload, &request); err != nil {
log.Printf("error unmarshalling :%v", err)
break
}
if err := c.manager.routeEvent(request, c); err != nil {
log.Println("error handling message", err)
}
}
}
func (c *Client) writeMessages() {
defer func() {
c.manager.removeClient(c)
}()
for {
select {
case message, ok := <-c.egress:
if !ok {
if err := c.connection.WriteMessage(websocket.CloseMessage, nil); err != nil {
log.Println("connection closed :", err)
}
return //return 後會觸發破壞迴圈觸發defer
}
data, err := json.Marshal(message)
if err != nil {
log.Println(err)
break
}
if err := c.connection.WriteMessage(websocket.TextMessage, data); err != nil {
log.Printf("failed to send message %v:", err)
}
log.Println("message sent")
}
}
}
使用event原因
WebSocket是一種基於TCP的全雙工通信協議,它提供雙向實時、持久的連接,使得客戶端和服務器可以通過WebSocket進行雙向通信。
在WebSocket中,消息的發送和接收是異步的,這就需要在服務器端使用事件來區分不同類型的消息。
通常情況下,WebSocket 會定義一些預定的消息類型,比如聊天消息、命令消息等。當服務器接收到消息時,通過事件判斷消息的類型,然後根據不同的類型執行相應的邏輯。這可以是發送聊天消息消息給其他連接的客戶端,執行一些特定的操作,或者觸發一些事件。
作用地方
在編寫不同的聊天室時,通常會涉及多種類型的消息
例如用戶發送的聊天消息、系統通知消息、用戶加入或離開聊天室的消息等。這些消息可能具有不同的格式和含義,因此需要使用事件來判斷消息的類型,並根據類型執行相應的邏輯。
心跳機制
- 會向服務端發送ping,目的是為了確保另一端的連接存在
- 因為websocket依舊走在http協議上,如果閒置過久的話會被中斷,因此會使用ping/pong保持空連接
- ping/pong屬於客戶端
- ping給服務端後,前端要pong回應,因為RFC告訴我們ping和pong應該自動觸發,現在瀏覽器都會默認為自動
var (
pongWait = 10 * time.Second //發送ping後pong的最多等待時間
pingInterval = (pongWait * 9) / 10 //ping每次發送的煎個,如果滿足條件,該職必須低於Pong wait
)
//發送PING
ticker := time.NewTicker(pingInterval)
case <-ticker.C:
log.Println("ping")
//Send ping to client
//必須為指定類型,否則前端無法處理
if err := c.connection.WriteMessage(websocket.PingMessage, []byte(``)); err != nil {
log.Println("write message error", err)
return
}
//ping給服務端後,前端要pong回應,因為RFC告訴我們ping和pong應該自動觸發
}
//接受PONG
//當我們接受到pong以前能夠等待的時間
if err := c.connection.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
log.Println(err)
return
}
//觸發pong時處理的handler,每收到pong就會觸發
c.connection.SetPongHandler(c.pongHandler)
func (c *Client) pongHandler(pongMessage string) error {
log.Println("pong")
//接受到pong以後要重置的時間
return c.connection.SetReadDeadline(time.Now().Add(pongWait))
}
巨型幀(Jumbo frames)
- 設置read message的最大大小
//Jumbo frames
c.connection.SetReadLimit(512)
Cross Origin
//Cross Origin
func checkOrigin(r *http.Request)bool{
//true將會連接,false關閉連接
origin := r.Header.Get("Origin")
switch origin {
case "http://localhost:8082":
return true
default:
return false
}
}
// 管理websocket
var (
websocketUpgrade = websocket.Upgrader{
//cross
CheckOrigin: checkOrigin,
//確保用戶不會發送巨大封包
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
)
Auth
- 流程:用戶使用HTTP連接,返回ODP或一次性密碼,然後將其返回給URL中的websocket端點,如果有效就連接
function login(){
let formData = {
username:document.getElementById("username").value,
password:document.getElementById("password").value,
};
fetch("login",{
method:"POST",
body:JSON.stringify(formData),
mode:"cors"
}).then((response) => {
if(response.ok){
return response.json();
}else{
throw 'unauthorized';
}
}).then((data)=>{
//通過auth
connectWebsocket(data.otp);
}).catch((e)=>{alert(e)});
return false;
}
function connectWebsocket(otp){
if(window["WebSocket"]){
console.log("support websocket");
//connect to websocket
let conn = new WebSocket("ws://"+document.location.host+"/ws?otp="+otp);
//連接到websocket時觸發
conn.onopen = function (e){
document.getElementById("connection-header").innerText = "Connect to websocket : True";
}
conn.onclose = function (e){
document.getElementById("connection-header").innerText = "Connect to websocket : True";
//reconnection
}
conn.onmessage = function(e){
// console.log(e)
const eventData = JSON.parse(e.data);
const event = Object.assign(new Event, eventData);
routeEvent(event);
}
}else{
alert("not support websocket");
}
}
window.onload = function(){
document.getElementById("chatroom-selection").onsubmit = changeChatRoom;
document.getElementById("chatroom-message").onsubmit = sendMessage;
document.getElementById("login-form").onsubmit = login;
}
//otp.go
package main
import (
"context"
"github.com/google/uuid"
"time"
)
type OTP struct {
Key string `json:"key"`
Created time.Time `json:"created"`
}
// 這個map將包含一次性密碼,刪除太舊的一次性密碼
type RetentionMap map[string]OTP
// 接受一個上下文和保留時間
func NewRetentionMap(ctx context.Context, retentiontime time.Duration) RetentionMap {
rm := make(RetentionMap)
go rm.Retention(ctx, retentiontime) //開一個協程不斷檢查
return rm
}
func (rm RetentionMap) NewOTP() OTP {
o := OTP{
Key: uuid.NewString(),
Created: time.Now(),
}
rm[o.Key] = o
return o
}
func (rm RetentionMap) VerifyOTP(otp string) bool {
if _, ok := rm[otp]; !ok {
return false // otp is not valid
}
delete(rm, otp) //刪除一次性密碼
return true
}
func (rm RetentionMap) Retention(ctx context.Context, retentionPeriod time.Duration) {
//每次重新檢查的頻率
ticker := time.NewTicker(400 * time.Millisecond)
for {
select {
case <-ticker.C:
for _, otp := range rm {
//過期時間比現在早,這密碼無效
if otp.Created.Add(retentionPeriod).Before(time.Now()) {
delete(rm, otp.Key)
}
}
//上下文關閉
case <-ctx.Done():
return
}
}
}
//manager.go
type Manager struct {
clients ClientList
//會有很多人同時連接到API,使用互斥鎖保護
sync.RWMutex
otps RetentionMap
//將type當作key並允許我們獲取事件處理程序
handlers map[string]EventHandler
}
// 工廠模式
func NewManager(ctx context.Context) *Manager {
m := &Manager{
clients: make(ClientList),
otps: NewRetentionMap(ctx, 5*time.Second),
handlers: make(map[string]EventHandler),
}
m.setupEventHandlers()
return m
}
func (m *Manager) serveWs(w http.ResponseWriter, r *http.Request) {
//驗證OTP是否有效
otp := r.URL.Query().Get("otp")
if otp == "" {
w.WriteHeader(http.StatusUnauthorized)
return
}
if !m.otps.VerifyOTP(otp) {
w.WriteHeader(http.StatusUnauthorized)
return
}
log.Println("new connection")
//upgrade regular http connection into websocket
conn, err := websocketUpgrade.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
client := NewClient(conn, m)
m.addClient(client)
// Start client processing
go client.readMessages()
go client.writeMessages()
}
func (m *Manager) loginHandler(w http.ResponseWriter, r *http.Request) {
type userLoginRequest struct {
Username string `json:"username"`
Password string `json:"password"`
}
var req userLoginRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
//帳密驗證須依照自身所需進行更改,這邊只是事例
if req.Username == "test" && req.Password == "test" {
type Response struct {
OTP string `json:"otp"`
}
otp := m.otps.NewOTP()
resp := Response{
OTP: otp.Key,
}
data, err := json.Marshal(resp)
if err != nil {
log.Println(err)
return
}
w.WriteHeader(http.StatusOK)
w.Write(data)
return
}
w.WriteHeader(http.StatusUnauthorized)
}
main.go
func setupApi() {
ctx := context.Background()
manger := NewManager(ctx)
http.Handle("/", http.FileServer(http.Dir("./front"))) //加載前端
http.HandleFunc("/ws", manger.serveWs)
http.HandleFunc("/login", manger.loginHandler)
}
新增廣播和群組聊天->最終完成品
front/index.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Go_Websocket</title>
</head>
<body>
<div class="center">
<h1>Chat</h1>
<h3 id="connection-header">Connect to websocket: false</h3>
<form id="chatroom-selection">
<label for="chatroom" id="chat-header">ChatRoom:</label>
<input type="text" id="chatroom" name="chatroom"><br><br>
<input type="submit" value="Change Chatroom">
</form>
<br><br>
<!-- record message from websovket-->
<textarea class="messagearea" id="chatmessage" readonly name="chatmessages" cols="30" rows="10" placeholder="welocome to chatroom"></textarea>
<br>
<form id="chatroom-message">
<label for="chatmessage">MESSAGE</label>
<input type="text" id="message" name="message">
<br><br>
<input type="submit" value="sendmessage">
</form>
<div style="border: 3px solid black;margin-top: 30px">
<form id="login-form">
<label for="username">Username</label>
<input type="text" name="username" id="username">
<label for="password">Password</label>
<input type="password" name="password" id="password">
<br><br>
<input type="submit" value="Login">
</form>
</div>
<script>
let selectedChat = "general";
class Event {
//更好的控制用戶發送的訊息
constructor(type ,payload) {
this.type = type;
this.payload = payload;
}
}
class SendMessageEvent {
constructor(message ,from) {
this.message = message;
this.from = from;
}
}
class NewMessageEvent {
constructor(message ,from,sent) {
this.message = message;
this.from = from;
this.sent = sent;
}
}
class ChangeChatRoomEvent {
constructor(name) {
this.name = name;
}
}
function changeChatRoom(){
var newchat = document.getElementById("chatroom")
//不希望垃圾郵件更改到同一個聊天室,只能到新的
if(newchat !== null && newchat.value !== selectedChat){
selectedChat = newchat.value;
header = document.getElementById("chat-header").innerHTML = "Currently in chatroom :" + selectedChat;
let changeEvent = new ChangeChatRoomEvent(selectedChat)
sendEvent("change_room",changeEvent)
textarea = document.getElementById("chatmessage");
textarea.innerHTML = `You changed new chatroom ${selectedChat}`;
}
//如果沒有return false將會重定向
return false;
}
function routeEvent(event){
if(event.type === undefined){
alert("no type field in the event");
}
switch (event.type){
case "new_message":
// console.log("new Message");
const messageEvent = Object.assign(new NewMessageEvent,event.payload)
appendChatMessage(messageEvent)
break;
default:
alert("not supported this type");
break;
}
}
function appendChatMessage(messageEvent){
let date = new Date(messageEvent.sent);
const formattedMessage = `${date.toLocaleString()} : ${messageEvent.message}`;
textarea = document.getElementById("chatmessage");
textarea.innerHTML = textarea.innerHTML + "\n" + formattedMessage;
textarea.scrollTop = textarea.scrollHeight;
}
function sendEvent(eventName,payload){
const event = new Event(eventName,payload);
conn.send(JSON.stringify(event))
}
function sendMessage(){
let newMessage = document.getElementById("message");
if(newMessage !== null ){
// console.log(newMessage);
// conn.send(newMessage.value);
//peter到時候需案自行更改
let outgoingEvent = new SendMessageEvent(newMessage.value,"test")
sendEvent("send_message",outgoingEvent);
}
return false;
}
function login(){
let formData = {
username:document.getElementById("username").value,
password:document.getElementById("password").value,
};
fetch("login",{
method:"POST",
body:JSON.stringify(formData),
mode:"cors"
}).then((response) => {
if(response.ok){
return response.json();
}else{
throw 'unauthorized';
}
}).then((data)=>{
//通過auth
connectWebsocket(data.otp);
}).catch((e)=>{alert(e)});
return false;
}
function connectWebsocket(otp){
if(window["WebSocket"]){
console.log("support websocket");
//connect to websocket
conn = new WebSocket("ws://"+document.location.host+"/ws?otp="+otp);
//連接到websocket時觸發
conn.onopen = function (e){
document.getElementById("connection-header").innerText = "Connect to websocket : True";
}
conn.onclose = function (e){
document.getElementById("connection-header").innerText = "Connect to websocket : True";
//reconnection
}
conn.onmessage = function(e){
// console.log(e)
const eventData = JSON.parse(e.data);
const event = Object.assign(new Event, eventData);
routeEvent(event);
}
}else{
alert("not support websocket");
}
}
window.onload = function(){
document.getElementById("chatroom-selection").onsubmit = changeChatRoom;
document.getElementById("chatroom-message").onsubmit = sendMessage;
document.getElementById("login-form").onsubmit = login;
}
</script>
</div>
</body>
</html>
client.go
package main
import (
"encoding/json"
"github.com/gorilla/websocket"
"log"
"time"
)
var (
pongWait = 10 * time.Second //發送ping後pong的最多等待時間
pingInterval = (pongWait * 9) / 10 //ping每次發送的煎個,如果滿足條件,該職必須低於Pong wait
)
// ClientList 客戶端列表,上線狀態
type ClientList map[*Client]bool
// Client 處理所有單用戶相關的內容
type Client struct {
connection *websocket.Conn
manager *Manager
chatroom string
//egress 避免客戶端併發權限,使用一個無緩衝的通道來防止連接同時獲得過多的請求
//egress chan []byte
egress chan Event
}
func NewClient(conn *websocket.Conn, manager *Manager) *Client {
return &Client{
connection: conn,
//之所以會使用manager是因為會將一些事情引導到manager進行處理,例如像其他用戶廣播
manager: manager,
egress: make(chan Event),
}
}
func (c *Client) readMessages() {
//這邊使用defer是因為跳出for迴圈後再執行
defer func() {
//cleanup connection from client List,幫助我們清理未使用的客戶端
c.manager.removeClient(c)
}()
//Jumbo frames
c.connection.SetReadLimit(512)
//當我們接受到pong以前能夠等待的時間
if err := c.connection.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
log.Println(err)
return
}
//觸發pong時處理的handler,每收到pong就會觸發
c.connection.SetPongHandler(c.pongHandler)
for {
//payload為負載,類行為byte
_, payload, err := c.connection.ReadMessage()
//messageType在RFC中定義有幾種不同的消息類型讓你對數據,二進制進行ping/pong
if err != nil {
//連接意外關閉返回錯誤
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
//檢查異常是因為不希望正常斷開的時候也被當成錯誤紀錄
log.Printf("error reading message %v", err)
break
}
}
//測試寫入egress,每read一個message都會發送給其他所有客戶
//for wsclient := range c.manager.clients {
// wsclient.egress <- payload
//}
//
//log.Println(messageType)
//log.Println(string(payload)) ->使用Event取代
var request Event
if err := json.Unmarshal(payload, &request); err != nil {
log.Printf("error unmarshalling :%v", err)
break
}
if err := c.manager.routeEvent(request, c); err != nil {
log.Println("error handling message", err)
}
}
}
func (c *Client) writeMessages() {
defer func() {
c.manager.removeClient(c)
}()
//計時器
ticker := time.NewTicker(pingInterval)
for {
select {
case message, ok := <-c.egress:
if !ok {
if err := c.connection.WriteMessage(websocket.CloseMessage, nil); err != nil {
log.Println("connection closed :", err)
}
return //return 後會觸發破壞迴圈觸發defer
}
data, err := json.Marshal(message)
if err != nil {
log.Println(err)
break
}
if err := c.connection.WriteMessage(websocket.TextMessage, data); err != nil {
log.Printf("failed to send message %v:", err)
}
log.Println("message sent")
case <-ticker.C:
log.Println("ping")
//Send ping to client
//必須為指定類型,否則前端無法處理
if err := c.connection.WriteMessage(websocket.PingMessage, []byte(``)); err != nil {
log.Println("write message error", err)
return
}
//ping給服務端後,前端要pong回應,因為RFC告訴我們ping和pong應該自動觸發
}
}
}
func (c *Client) pongHandler(pongMessage string) error {
log.Println("pong")
//接受到pong以後要重置的時間
return c.connection.SetReadDeadline(time.Now().Add(pongWait))
}
event.go
package main
import (
"encoding/json"
"fmt"
"time"
)
type Event struct {
Type string `json:"type"`
Payload json.RawMessage `json:"payload"`
//使用RawMessage(原始格式)是因為我們希望使用者可以發送任何類型
}
type EventHandler func(event Event, c *Client) error
const (
EventSendMessage = "send_message"
EventNewMessage = "new_message"
EventChangeRoom = "change_room"
)
type SendMessageEvent struct {
Message string `json:"message"`
From string `json:"from"`
}
type NewMessageEvent struct {
SendMessageEvent
Sent time.Time `json:"sent"`
}
type ChangeRoomEvent struct {
Name string `json:"name"`
}
func ChangeRoomHandler(event Event, c *Client) error {
var chatRoomEvent ChangeRoomEvent
if err := json.Unmarshal(event.Payload, &chatRoomEvent); err != nil {
return fmt.Errorf("bad payload in request : %v", err)
}
c.chatroom = chatRoomEvent.Name
return nil
}
func SendMessage(event Event, c *Client) error {
//fmt.Println(event)
var chatevent SendMessageEvent
if err := json.Unmarshal(event.Payload, &chatevent); err != nil {
return fmt.Errorf("bad payload in request : %v", err)
}
var broadMessage NewMessageEvent
broadMessage.Sent = time.Now()
broadMessage.Message = chatevent.Message
broadMessage.From = chatevent.From
data, err := json.Marshal(broadMessage)
if err != nil {
return fmt.Errorf("failed to marshal broadcast message : %v", err)
}
var outgoing Event
outgoing.Type = EventNewMessage
outgoing.Payload = data
for clients := range c.manager.clients {
if clients.chatroom == c.chatroom {
clients.egress <- outgoing
}
}
return nil
}
//將這些儲存在管理器,確保管理器方便處理
manager.go
// http status 101 代表正在切換協議
package main
// manager.go // 管理websocket
import (
"context"
"encoding/json"
"errors"
"github.com/gorilla/websocket"
"log"
"net/http"
"sync"
"time"
)
// 管理websocket
var (
websocketUpgrade = websocket.Upgrader{
//cross
CheckOrigin: checkOrigin,
//確保用戶不會發送巨大封包
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
)
type Manager struct {
clients ClientList
//會有很多人同時連接到API,使用互斥鎖保護
sync.RWMutex
otps RetentionMap
//將type當作key並允許我們獲取事件處理程序
handlers map[string]EventHandler
}
// 工廠模式
func NewManager(ctx context.Context) *Manager {
m := &Manager{
clients: make(ClientList),
otps: NewRetentionMap(ctx, 5*time.Second),
handlers: make(map[string]EventHandler),
}
m.setupEventHandlers()
return m
}
func (m *Manager) setupEventHandlers() {
m.handlers[EventSendMessage] = SendMessage
m.handlers[EventChangeRoom] = ChangeRoomHandler
}
func (m *Manager) routeEvent(event Event, c *Client) error {
//檢查事件類型是否是處理程序的一部分,處理程序是一個使用事件類型作為key的map
//因此每當我們收到類型設置為發送消息,都會觸發發送消息
if handler, ok := m.handlers[event.Type]; ok {
if err := handler(event, c); err != nil {
return err
}
return nil
} else {
return errors.New("there is no such event type")
}
}
func (m *Manager) serveWs(w http.ResponseWriter, r *http.Request) {
//驗證OTP是否有效
otp := r.URL.Query().Get("otp")
if otp == "" {
w.WriteHeader(http.StatusUnauthorized)
return
}
if !m.otps.VerifyOTP(otp) {
w.WriteHeader(http.StatusUnauthorized)
return
}
log.Println("new connection")
//upgrade regular http connection into websocket
conn, err := websocketUpgrade.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
client := NewClient(conn, m)
m.addClient(client)
// Start client processing
go client.readMessages()
go client.writeMessages()
}
func (m *Manager) loginHandler(w http.ResponseWriter, r *http.Request) {
type userLoginRequest struct {
Username string `json:"username"`
Password string `json:"password"`
}
var req userLoginRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
//form
if req.Username == "test" && req.Password == "test" {
type Response struct {
OTP string `json:"otp"`
}
otp := m.otps.NewOTP()
resp := Response{
OTP: otp.Key,
}
data, err := json.Marshal(resp)
if err != nil {
log.Println(err)
return
}
w.WriteHeader(http.StatusOK)
w.Write(data)
return
}
w.WriteHeader(http.StatusUnauthorized)
}
// 向管理起添加或刪除客戶端
func (m *Manager) addClient(client *Client) {
m.Lock()
//鎖上後當兩個人同時連接就不會同時修該Client list,Client list本質為map
defer m.Unlock()
m.clients[client] = true
}
func (m *Manager) removeClient(client *Client) {
m.Lock()
defer m.Unlock()
if _, ok := m.clients[client]; ok {
client.connection.Close()
delete(m.clients, client)
}
}
// Cross Origin
func checkOrigin(r *http.Request) bool {
//true將會連接,false關閉連接
origin := r.Header.Get("Origin")
switch origin {
case "http://localhost:8082":
return true
default:
return false
}
}
otp.go
package main
import (
"context"
"github.com/google/uuid"
"time"
)
type OTP struct {
Key string `json:"key"`
Created time.Time `json:"created"`
}
// 這個map將包含一次性密碼,刪除太舊的一次性密碼
type RetentionMap map[string]OTP
// 接受一個上下文和保留時間
func NewRetentionMap(ctx context.Context, retentiontime time.Duration) RetentionMap {
rm := make(RetentionMap)
go rm.Retention(ctx, retentiontime) //開一個協程不斷檢查
return rm
}
func (rm RetentionMap) NewOTP() OTP {
o := OTP{
Key: uuid.NewString(),
Created: time.Now(),
}
rm[o.Key] = o
return o
}
func (rm RetentionMap) VerifyOTP(otp string) bool {
if _, ok := rm[otp]; !ok {
return false // otp is not valid
}
delete(rm, otp) //刪除一次性密碼
return true
}
func (rm RetentionMap) Retention(ctx context.Context, retentionPeriod time.Duration) {
//每次重新檢查的頻率
ticker := time.NewTicker(400 * time.Millisecond)
for {
select {
case <-ticker.C:
for _, otp := range rm {
//過期時間比現在早,這密碼無效
if otp.Created.Add(retentionPeriod).Before(time.Now()) {
delete(rm, otp.Key)
}
}
//上下文關閉
case <-ctx.Done():
return
}
}
}
main.go
package main
import (
"context"
"log"
"net/http"
)
func main() {
setupApi()
log.Fatalln(http.ListenAndServe(":8082", nil))
}
func setupApi() {
ctx := context.Background()
manger := NewManager(ctx)
http.Handle("/", http.FileServer(http.Dir("./front"))) //加載前端
http.HandleFunc("/ws", manger.serveWs)
http.HandleFunc("/login", manger.loginHandler)
}
//go run *.go 運行所有文件