Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions constants/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,21 @@ const (
CLIENT_TCP_PORT_HELP = "Define port of mmar TCP server for client to connect to, creating a tunnel."
TUNNEL_HOST_HELP = "Define host domain of mmar server for client to connect to."

TUNNEL_MESSAGE_PROTOCOL_VERSION = 1
TUNNEL_MESSAGE_PROTOCOL_VERSION = 2
TUNNEL_MESSAGE_DATA_DELIMITER = '\n'
ID_CHARSET = "abcdefghijklmnopqrstuvwxyz0123456789"
ID_LENGTH = 6

MAX_TUNNELS_PER_IP = 5
TUNNEL_RECONNECT_TIMEOUT = 3
GRACEFUL_SHUTDOWN_TIMEOUT = 3
TUNNEL_CREATE_TIMEOUT = 3
REQ_BODY_READ_CHUNK_TIMEOUT = 3
DEST_REQUEST_TIMEOUT = 30
MAX_REQ_BODY_SIZE = 10000000 // 10mb
MAX_TUNNELS_PER_IP = 5
TUNNEL_RECONNECT_TIMEOUT = 3
GRACEFUL_SHUTDOWN_TIMEOUT = 3
TUNNEL_CREATE_TIMEOUT = 3
REQ_BODY_READ_CHUNK_TIMEOUT = 3
DEST_REQUEST_TIMEOUT = 30
HEARTBEAT_FROM_SERVER_TIMEOUT = 5
HEARTBEAT_FROM_CLIENT_TIMEOUT = 2
READ_DEADLINE = 3
MAX_REQ_BODY_SIZE = 10000000 // 10mb

CLIENT_DISCONNECT_ERR_TEXT = "Tunnel is closed, cannot connect to mmar client."
LOCALHOST_NOT_RUNNING_ERR_TEXT = "Tunneled successfully, but nothing is running on localhost."
Expand Down
35 changes: 33 additions & 2 deletions internal/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,35 @@ func (mc *MmarClient) ProcessTunnelMessages(ctx context.Context) {
case <-ctx.Done(): // Client gracefully shutdown
return
default:
// Send heartbeat if nothing has been read for a while
receiveMessageTimeout := time.AfterFunc(
constants.HEARTBEAT_FROM_CLIENT_TIMEOUT*time.Second,
func() {
heartbeatMsg := protocol.TunnelMessage{MsgType: protocol.HEARTBEAT_FROM_CLIENT}
if err := mc.SendMessage(heartbeatMsg); err != nil {
logger.Log(constants.DEFAULT_COLOR, "Failed to send heartbeat. Exiting...")
os.Exit(0)
}
// Set a read timeout, if no response to heartbeat is recieved within that period,
// attempt to reconnect to the server
readDeadline := time.Now().Add((constants.READ_DEADLINE * time.Second))
mc.Tunnel.Conn.SetReadDeadline(readDeadline)
},
)

tunnelMsg, err := mc.ReceiveMessage()
// If a message is received, stop the receiveMessageTimeout and remove the ReadTimeout
// as we do not need to send heartbeat or check connection health in this iteration
receiveMessageTimeout.Stop()
mc.Tunnel.Conn.SetReadDeadline(time.Time{})

if err != nil {
// If the context was cancelled just return
if errors.Is(ctx.Err(), context.Canceled) {
return
} else if errors.Is(err, os.ErrDeadlineExceeded) {
continue
} else if errors.Is(err, protocol.INVALID_MESSAGE_PROTOCOL_VERSION) {
logger.Log(constants.YELLOW, "The mmar message protocol has been updated, please update mmar.")
os.Exit(0)
}

logger.Log(constants.DEFAULT_COLOR, "Tunnel connection disconnected.")
Expand Down Expand Up @@ -184,6 +206,15 @@ func (mc *MmarClient) ProcessTunnelMessages(ctx context.Context) {
os.Exit(0)
case protocol.REQUEST:
go mc.handleRequestMessage(tunnelMsg)
case protocol.HEARTBEAT_ACK:
// Got a heartbeat ack, that means the connection is healthy,
// we do not need to perform any action
case protocol.HEARTBEAT_FROM_SERVER:
heartbeatAckMsg := protocol.TunnelMessage{MsgType: protocol.HEARTBEAT_ACK}
if err := mc.SendMessage(heartbeatAckMsg); err != nil {
logger.Log(constants.DEFAULT_COLOR, "Failed to send Heartbeat Ack. Exiting...")
os.Exit(0)
}
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion internal/protocol/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ const (
CLIENT_TUNNEL_LIMIT
LOCALHOST_NOT_RUNNING
DEST_REQUEST_TIMEDOUT
HEARTBEAT_FROM_CLIENT
HEARTBEAT_FROM_SERVER
HEARTBEAT_ACK
)

var INVALID_MESSAGE_PROTOCOL_VERSION = errors.New("Invalid Message Protocol Version")
Expand All @@ -33,7 +36,7 @@ var INVALID_MESSAGE_TYPE = errors.New("Invalid Tunnel Message Type")
func isValidTunnelMessageType(mt uint8) (uint8, error) {
// Iterate through all the message type, from first to last, checking
// if the provided message type matches one of them
for msgType := REQUEST; msgType <= DEST_REQUEST_TIMEDOUT; msgType++ {
for msgType := REQUEST; msgType <= HEARTBEAT_ACK; msgType++ {
if mt == msgType {
return msgType, nil
}
Expand Down
32 changes: 32 additions & 0 deletions internal/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,29 @@ func (ms *MmarServer) processTunneledRequestsForClient(ct *ClientTunnel) {

func (ms *MmarServer) processTunnelMessages(ct *ClientTunnel) {
for {
// Send heartbeat if nothing has been read for a while
receiveMessageTimeout := time.AfterFunc(
constants.HEARTBEAT_FROM_SERVER_TIMEOUT*time.Second,
func() {
heartbeatMsg := protocol.TunnelMessage{MsgType: protocol.HEARTBEAT_FROM_SERVER}
if err := ct.SendMessage(heartbeatMsg); err != nil {
logger.Log(constants.DEFAULT_COLOR, fmt.Sprintf("Failed to send heartbeat: %v", err))
ms.closeClientTunnel(ct)
return
}
// Set a read timeout, if no response to heartbeat is recieved within that period,
// that means the client has disconnected
readDeadline := time.Now().Add((constants.READ_DEADLINE * time.Second))
ct.Tunnel.Conn.SetReadDeadline(readDeadline)
},
)

tunnelMsg, err := ct.ReceiveMessage()
// If a message is received, stop the receiveMessageTimeout and remove the ReadTimeout
// as we do not need to send heartbeat or check connection health in this iteration
receiveMessageTimeout.Stop()
ct.Tunnel.Conn.SetReadDeadline(time.Time{})

if err != nil {
logger.Log(constants.DEFAULT_COLOR, fmt.Sprintf("Receive Message from client tunnel errored: %v", err))
if utils.NetworkError(err) {
Expand Down Expand Up @@ -467,6 +489,16 @@ func (ms *MmarServer) processTunnelMessages(ct *ClientTunnel) {
case protocol.CLIENT_DISCONNECT:
ms.closeClientTunnel(ct)
return
case protocol.HEARTBEAT_FROM_CLIENT:
heartbeatAckMsg := protocol.TunnelMessage{MsgType: protocol.HEARTBEAT_ACK}
if err := ct.SendMessage(heartbeatAckMsg); err != nil {
logger.Log(constants.DEFAULT_COLOR, fmt.Sprintf("Failed to heartbeat ack to client: %v", err))
ms.closeClientTunnel(ct)
return
}
case protocol.HEARTBEAT_ACK:
// Got a heartbeat ack, that means the connection is healthy,
// we do not need to perform any action
case protocol.CLIENT_RECLAIM_SUBDOMAIN:
newAndExistingIDs := strings.Split(string(tunnelMsg.MsgData), ":")
newId := newAndExistingIDs[0]
Expand Down
3 changes: 2 additions & 1 deletion internal/utils/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,5 +112,6 @@ func NetworkError(err error) bool {
return errors.Is(err, io.EOF) ||
errors.Is(err, io.ErrUnexpectedEOF) ||
errors.Is(err, net.ErrClosed) ||
errors.Is(err, syscall.ECONNRESET)
errors.Is(err, syscall.ECONNRESET) ||
errors.Is(err, os.ErrDeadlineExceeded)
}
Loading