Skip to content

Commit 85e76fa

Browse files
authored
Merge pull request #44 from moov-io/add-message-reader-writer
Add message reader and writer
2 parents e6c8a89 + dff4373 commit 85e76fa

File tree

5 files changed

+392
-104
lines changed

5 files changed

+392
-104
lines changed

connection.go

+127-79
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package connection
22

33
import (
44
"bufio"
5-
"bytes"
65
"crypto/tls"
76
"errors"
87
"fmt"
@@ -40,18 +39,30 @@ const (
4039
StatusUnknown Status = ""
4140
)
4241

43-
// ErrUnpack returns error with possibility to access RawMessage when
42+
// UnpackError returns error with possibility to access RawMessage when
4443
// connection failed to unpack message
45-
type ErrUnpack struct {
44+
type UnpackError struct {
4645
Err error
4746
RawMessage []byte
4847
}
4948

50-
func (e *ErrUnpack) Error() string {
49+
func (e *UnpackError) Error() string {
5150
return e.Err.Error()
5251
}
5352

54-
func (e *ErrUnpack) Unwrap() error {
53+
func (e *UnpackError) Unwrap() error {
54+
return e.Err
55+
}
56+
57+
type PackError struct {
58+
Err error
59+
}
60+
61+
func (e *PackError) Error() string {
62+
return e.Err.Error()
63+
}
64+
65+
func (e *PackError) Unwrap() error {
5566
return e.Err
5667
}
5768

@@ -62,7 +73,7 @@ type Connection struct {
6273
Opts Options
6374
conn io.ReadWriteCloser
6475
requestsCh chan request
65-
readResponseCh chan []byte
76+
readResponseCh chan *iso8583.Message
6677
done chan struct{}
6778

6879
// spec that will be used to unpack received messages
@@ -105,7 +116,7 @@ func New(addr string, spec *iso8583.MessageSpec, mlReader MessageLengthReader, m
105116
addr: addr,
106117
Opts: opts,
107118
requestsCh: make(chan request),
108-
readResponseCh: make(chan []byte),
119+
readResponseCh: make(chan *iso8583.Message),
109120
done: make(chan struct{}),
110121
respMap: make(map[string]response),
111122
spec: spec,
@@ -296,8 +307,8 @@ func (c *Connection) Done() <-chan struct{} {
296307

297308
// request represents request to the ISO 8583 server
298309
type request struct {
299-
// includes length header and message itself
300-
rawMessage []byte
310+
// message to send
311+
message *iso8583.Message
301312

302313
// ID of the request (based on STAN, RRN, etc.)
303314
requestID string
@@ -330,34 +341,17 @@ func (c *Connection) Send(message *iso8583.Message) (*iso8583.Message, error) {
330341
c.mutex.Unlock()
331342
defer c.wg.Done()
332343

333-
var buf bytes.Buffer
334-
packed, err := message.Pack()
335-
if err != nil {
336-
return nil, fmt.Errorf("packing message: %w", err)
337-
}
338-
339-
// create header
340-
_, err = c.writeMessageLength(&buf, len(packed))
341-
if err != nil {
342-
return nil, fmt.Errorf("writing message header to buffer: %w", err)
343-
}
344-
345-
_, err = buf.Write(packed)
346-
if err != nil {
347-
return nil, fmt.Errorf("writing packed message to buffer: %w", err)
348-
}
349-
350344
// prepare request
351345
reqID, err := c.Opts.RequestIDGenerator.GenerateRequestID(message)
352346
if err != nil {
353347
return nil, fmt.Errorf("creating request ID: %w", err)
354348
}
355349

356350
req := request{
357-
rawMessage: buf.Bytes(),
358-
requestID: reqID,
359-
replyCh: make(chan *iso8583.Message),
360-
errCh: make(chan error),
351+
message: message,
352+
requestID: reqID,
353+
replyCh: make(chan *iso8583.Message),
354+
errCh: make(chan error),
361355
}
362356

363357
var resp *iso8583.Message
@@ -399,49 +393,58 @@ func (c *Connection) Send(message *iso8583.Message) (*iso8583.Message, error) {
399393
return resp, err
400394
}
401395

402-
// Reply sends the message and does not wait for a reply to be received.
403-
// Any reply received for message send using Reply will be handled with
404-
// unmatchedMessageHandler
405-
func (c *Connection) Reply(message *iso8583.Message) error {
406-
c.mutex.Lock()
407-
if c.closing {
408-
c.mutex.Unlock()
409-
return ErrConnectionClosed
396+
func (c *Connection) writeMessage(w io.Writer, message *iso8583.Message) error {
397+
if c.Opts.MessageWriter != nil {
398+
return c.Opts.MessageWriter.WriteMessage(w, message)
410399
}
411-
// calling wg.Add(1) within mutex guarantees that it does not pass the wg.Wait() call in the Close method
412-
// otherwise we will have data race issue
413-
c.wg.Add(1)
414-
c.mutex.Unlock()
415-
defer c.wg.Done()
416400

417-
// prepare message for sending
418-
var buf bytes.Buffer
401+
// default message writer
419402
packed, err := message.Pack()
420403
if err != nil {
421-
return fmt.Errorf("packing message: %w", err)
404+
return utils.NewSafeError(&PackError{err}, "failed to pack message")
422405
}
423406

424407
// create header
425-
_, err = c.writeMessageLength(&buf, len(packed))
408+
_, err = c.writeMessageLength(w, len(packed))
426409
if err != nil {
427410
return fmt.Errorf("writing message header to buffer: %w", err)
428411
}
429412

430-
_, err = buf.Write(packed)
413+
_, err = w.Write(packed)
431414
if err != nil {
432415
return fmt.Errorf("writing packed message to buffer: %w", err)
433416
}
434417

418+
return nil
419+
}
420+
421+
// Reply sends the message and does not wait for a reply to be received.
422+
// Any reply received for message send using Reply will be handled with
423+
// unmatchedMessageHandler
424+
func (c *Connection) Reply(message *iso8583.Message) error {
425+
c.mutex.Lock()
426+
if c.closing {
427+
c.mutex.Unlock()
428+
return ErrConnectionClosed
429+
}
430+
// calling wg.Add(1) within mutex guarantees that it does not pass the wg.Wait() call in the Close method
431+
// otherwise we will have data race issue
432+
c.wg.Add(1)
433+
c.mutex.Unlock()
434+
defer c.wg.Done()
435+
435436
req := request{
436-
rawMessage: buf.Bytes(),
437-
errCh: make(chan error),
437+
message: message,
438+
errCh: make(chan error),
438439
}
439440

440441
c.requestsCh <- req
441442

442443
sendTimeoutTimer := time.NewTimer(c.Opts.SendTimeout)
443444
defer sendTimeoutTimer.Stop()
444445

446+
var err error
447+
445448
select {
446449
case err = <-req.errCh:
447450
case <-sendTimeoutTimer.C:
@@ -507,9 +510,25 @@ func (c *Connection) writeLoop() {
507510
c.pendingRequestsMu.Unlock()
508511
}
509512

510-
_, err = c.conn.Write([]byte(req.rawMessage))
513+
err = c.writeMessage(c.conn, req.message)
511514
if err != nil {
512-
c.handleError(utils.NewSafeError(err, "failed to write message into connection"))
515+
c.handleError(fmt.Errorf("writing message: %w", err))
516+
517+
var packErr *PackError
518+
if errors.As(err, &packErr) {
519+
// let caller know that his message was not not sent
520+
// because of pack error. We don't set all type of errors to errCh
521+
// as this case is handled by handleConnectionError(err)
522+
// which sends the same error to all pending requests, including
523+
// this one
524+
req.errCh <- err
525+
526+
err = nil
527+
528+
// we can continue to write other messages
529+
continue
530+
}
531+
513532
break
514533
}
515534

@@ -539,29 +558,70 @@ func (c *Connection) writeLoop() {
539558
// readLoop reads data from the socket (message length header and raw message)
540559
// and runs a goroutine to handle the message
541560
func (c *Connection) readLoop() {
542-
var err error
543-
var messageLength int
561+
var outErr error
544562

545563
r := bufio.NewReader(c.conn)
546564
for {
547-
messageLength, err = c.readMessageLength(r)
565+
message, err := c.readMessage(r)
548566
if err != nil {
549-
c.handleError(utils.NewSafeError(err, "failed to read message length"))
567+
c.handleError(utils.NewSafeError(err, "failed to read message from connection"))
568+
569+
// if err is ErrUnpack, we can still continue reading
570+
// from the connection
571+
var unpackErr *UnpackError
572+
if errors.As(err, &unpackErr) {
573+
continue
574+
}
575+
576+
outErr = err
550577
break
551578
}
552579

553-
// read the packed message
554-
rawMessage := make([]byte, messageLength)
555-
_, err = io.ReadFull(r, rawMessage)
556-
if err != nil {
557-
c.handleError(utils.NewSafeError(err, "failed to read message from connection"))
558-
break
580+
// if readMessage returns nil message, it means that
581+
// it was a ping message or something else, not a regular
582+
// iso8583 message and we can continue reading
583+
if message == nil {
584+
continue
559585
}
560586

561-
c.readResponseCh <- rawMessage
587+
c.readResponseCh <- message
562588
}
563589

564-
c.handleConnectionError(err)
590+
c.handleConnectionError(outErr)
591+
}
592+
593+
// readMessage reads message length header and raw message from the connection
594+
// and returns iso8583.Message and error if any
595+
func (c *Connection) readMessage(r io.Reader) (*iso8583.Message, error) {
596+
if c.Opts.MessageReader != nil {
597+
return c.Opts.MessageReader.ReadMessage(r)
598+
}
599+
600+
// default message reader
601+
messageLength, err := c.readMessageLength(r)
602+
if err != nil {
603+
return nil, fmt.Errorf("failed to read message length: %w", err)
604+
}
605+
606+
// read the packed message
607+
rawMessage := make([]byte, messageLength)
608+
_, err = io.ReadFull(r, rawMessage)
609+
if err != nil {
610+
return nil, fmt.Errorf("failed to read message from connection: %w", err)
611+
}
612+
613+
// unpack the message
614+
message := iso8583.NewMessage(c.spec)
615+
err = message.Unpack(rawMessage)
616+
if err != nil {
617+
unpackErr := &UnpackError{
618+
Err: err,
619+
RawMessage: rawMessage,
620+
}
621+
return nil, fmt.Errorf("unpacking message: %w", unpackErr)
622+
}
623+
624+
return message, nil
565625
}
566626

567627
func (c *Connection) readResponseLoop() {
@@ -584,21 +644,9 @@ func (c *Connection) readResponseLoop() {
584644
}
585645
}
586646

587-
// handleResponse unpacks the message and then sends it to the reply channel
588-
// that corresponds to the message ID (request ID)
589-
func (c *Connection) handleResponse(rawMessage []byte) {
590-
// create message
591-
message := iso8583.NewMessage(c.spec)
592-
err := message.Unpack(rawMessage)
593-
if err != nil {
594-
unpackErr := &ErrUnpack{
595-
Err: err,
596-
RawMessage: rawMessage,
597-
}
598-
c.handleError(utils.NewSafeError(unpackErr, "failed to unpack message"))
599-
return
600-
}
601-
647+
// handleResponse sends message to the reply channel that corresponds to the
648+
// message ID (request ID)
649+
func (c *Connection) handleResponse(message *iso8583.Message) {
602650
if isResponse(message) {
603651
reqID, err := c.Opts.RequestIDGenerator.GenerateRequestID(message)
604652
if err != nil {

0 commit comments

Comments
 (0)