Skip to content

fix: MRT extended timestamp #2961

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 46 additions & 24 deletions pkg/packet/mrt/mrt.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ const (
OSPFv3_ET MRTType = 49
)

func (t MRTType) HasExtendedTimestamp() bool {
switch t {
case BGP4MP_ET, ISIS_ET, OSPFv3_ET:
return true
default:
return false
}
}

type MRTSubTyper interface {
ToUint16() uint16
}
Expand Down Expand Up @@ -111,7 +120,7 @@ const (
ESTABLISHED BGPState = 6
)

func packValues(values []interface{}) ([]byte, error) {
func packValues(values ...any) ([]byte, error) {
b := new(bytes.Buffer)
for _, v := range values {
err := binary.Write(b, binary.BigEndian, v)
Expand All @@ -123,10 +132,11 @@ func packValues(values []interface{}) ([]byte, error) {
}

type MRTHeader struct {
Timestamp uint32
Type MRTType
SubType uint16
Len uint32
Timestamp uint32
Type MRTType
SubType uint16
Len uint32
ExtendedTimestampMicroseconds uint32
}

func (h *MRTHeader) DecodeFromBytes(data []byte) error {
Expand All @@ -137,25 +147,38 @@ func (h *MRTHeader) DecodeFromBytes(data []byte) error {
h.Type = MRTType(binary.BigEndian.Uint16(data[4:6]))
h.SubType = binary.BigEndian.Uint16(data[6:8])
h.Len = binary.BigEndian.Uint32(data[8:12])
if h.Type.HasExtendedTimestamp() {
h.ExtendedTimestampMicroseconds = binary.BigEndian.Uint32(data[12:16])
}
return nil
}

func (h *MRTHeader) Serialize() ([]byte, error) {
return packValues([]interface{}{h.Timestamp, h.Type, h.SubType, h.Len})
fields := []any{h.Timestamp, h.Type, h.SubType, h.Len}
if h.Type.HasExtendedTimestamp() {
fields = append(fields, h.ExtendedTimestampMicroseconds)
}
return packValues(fields...)
}

func NewMRTHeader(timestamp uint32, t MRTType, subtype MRTSubTyper, l uint32) (*MRTHeader, error) {
func NewMRTHeader(timestamp time.Time, t MRTType, subtype MRTSubTyper, l uint32) (*MRTHeader, error) {
ms := uint32(0)
if t.HasExtendedTimestamp() {
ms = uint32(timestamp.UnixMicro() - timestamp.Unix()*1000000)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timestamp.UnixMicro() % 1_000_000
more intuitive?

}
return &MRTHeader{
Timestamp: timestamp,
Type: t,
SubType: subtype.ToUint16(),
Len: l,
Timestamp: uint32(timestamp.Unix()),
Type: t,
SubType: subtype.ToUint16(),
Len: l,
ExtendedTimestampMicroseconds: ms,
}, nil
}

func (h *MRTHeader) GetTime() time.Time {
t := int64(h.Timestamp)
return time.Unix(t, 0)
ms := int64(h.ExtendedTimestampMicroseconds)
return time.Unix(t, ms*1000)
}

type MRTMessage struct {
Expand All @@ -176,7 +199,7 @@ func (m *MRTMessage) Serialize() ([]byte, error) {
return append(bbuf, buf...), nil
}

func NewMRTMessage(timestamp uint32, t MRTType, subtype MRTSubTyper, body Body) (*MRTMessage, error) {
func NewMRTMessage(timestamp time.Time, t MRTType, subtype MRTSubTyper, body Body) (*MRTMessage, error) {
header, err := NewMRTHeader(timestamp, t, subtype, 0)
if err != nil {
return nil, err
Expand Down Expand Up @@ -252,12 +275,12 @@ func (p *Peer) Serialize() ([]byte, error) {
buf = append(buf, p.IpAddress.To4()...)
}
if p.Type&(1<<1) > 0 {
bbuf, err = packValues([]interface{}{p.AS})
bbuf, err = packValues(p.AS)
} else {
if p.AS > uint32(math.MaxUint16) {
return nil, fmt.Errorf("AS number is beyond 2 octet. %d > %d", p.AS, math.MaxUint16)
}
bbuf, err = packValues([]interface{}{uint16(p.AS)})
bbuf, err = packValues(uint16(p.AS))
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -461,7 +484,6 @@ func (e *RibEntry) String() string {
} else {
return fmt.Sprintf("RIB_ENTRY: PeerIndex [%d] OriginatedTime [%d] PathAttributes [%v]", e.PeerIndex, e.OriginatedTime, e.PathAttributes)
}

}

type Rib struct {
Expand Down Expand Up @@ -527,7 +549,7 @@ func (u *Rib) Serialize() ([]byte, error) {
return nil, err
}
buf = append(buf, bbuf...)
bbuf, err = packValues([]interface{}{uint16(len(u.Entries))})
bbuf, err = packValues(uint16(len(u.Entries)))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -716,13 +738,13 @@ func (m *BGP4MPHeader) decodeFromBytes(data []byte) ([]byte, error) {
}

func (m *BGP4MPHeader) serialize() ([]byte, error) {
var values []interface{}
var values []any
if m.isAS4 {
values = []interface{}{m.PeerAS, m.LocalAS, m.InterfaceIndex, m.AddressFamily}
values = []any{m.PeerAS, m.LocalAS, m.InterfaceIndex, m.AddressFamily}
} else {
values = []interface{}{uint16(m.PeerAS), uint16(m.LocalAS), m.InterfaceIndex, m.AddressFamily}
values = []any{uint16(m.PeerAS), uint16(m.LocalAS), m.InterfaceIndex, m.AddressFamily}
}
buf, err := packValues(values)
buf, err := packValues(values...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -792,7 +814,7 @@ func (m *BGP4MPStateChange) Serialize() ([]byte, error) {
if err != nil {
return nil, err
}
bbuf, err := packValues([]interface{}{m.OldState, m.NewState})
bbuf, err := packValues(m.OldState, m.NewState)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -907,14 +929,14 @@ func SplitMrt(data []byte, atEOF bool) (advance int, token []byte, err error) {
if cap(data) < MRT_COMMON_HEADER_LEN { // read more
return 0, nil, nil
}
//this reads the data
// this reads the data
hdr := &MRTHeader{}
errh := hdr.DecodeFromBytes(data[:MRT_COMMON_HEADER_LEN])
if errh != nil {
return 0, nil, errh
}
totlen := int(hdr.Len + MRT_COMMON_HEADER_LEN)
if len(data) < totlen { //need to read more
if len(data) < totlen { // need to read more
return 0, nil, nil
}
return totlen, data[0:totlen], nil
Expand Down
25 changes: 16 additions & 9 deletions pkg/packet/mrt/mrt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

func TestMrtHdr(t *testing.T) {
h1, err := NewMRTHeader(10, TABLE_DUMPv2, RIB_IPV4_MULTICAST, 20)
h1, err := NewMRTHeader(time.Unix(10, 0), TABLE_DUMPv2, RIB_IPV4_MULTICAST, 20)
if err != nil {
t.Fatal(err)
}
Expand All @@ -44,14 +44,23 @@ func TestMrtHdr(t *testing.T) {
}

func TestMrtHdrTime(t *testing.T) {
h1, err := NewMRTHeader(10, TABLE_DUMPv2, RIB_IPV4_MULTICAST, 20)
ttime1 := time.Unix(10, 0)
h1, err := NewMRTHeader(ttime1, TABLE_DUMPv2, RIB_IPV4_MULTICAST, 20)
if err != nil {
t.Fatal(err)
}
ttime := time.Unix(10, 0)
htime := h1.GetTime()
t.Logf("this timestamp should be 10s after epoch:%v", htime)
assert.Equal(t, h1.GetTime(), ttime)
h1time := h1.GetTime()
t.Logf("this timestamp should be 10s after epoch:%v", h1time)
assert.Equal(t, h1time, ttime1)

ttime2 := time.Unix(20, 123000)
h2, err := NewMRTHeader(ttime2, BGP4MP_ET, STATE_CHANGE, 20)
if err != nil {
t.Fatal(err)
}
h2time := h2.GetTime()
t.Logf("this timestamp should be 20s and 123ms after epoch:%v", h2time)
assert.Equal(t, h2time, ttime2)
}

func testPeer(t *testing.T, p1 *Peer) {
Expand Down Expand Up @@ -283,7 +292,7 @@ func TestMrtSplit(t *testing.T) {
for i := 0; i < numwrite; i++ {
msg := bgp.NewBGPKeepAliveMessage()
m1 := NewBGP4MPMessage(65000, 65001, 1, "192.168.0.1", "192.168.0.2", false, msg)
mm, _ := NewMRTMessage(1234, BGP4MP, MESSAGE, m1)
mm, _ := NewMRTMessage(time.Unix(1234, 0), BGP4MP, MESSAGE, m1)
b1, err := mm.Serialize()
if err != nil {
t.Fatal(err)
Expand All @@ -302,15 +311,13 @@ func TestMrtSplit(t *testing.T) {
}

func FuzzMRT(f *testing.F) {

f.Fuzz(func(t *testing.T, data []byte) {
if len(data) < 16 {
return
}

hdr := &MRTHeader{}
err := hdr.DecodeFromBytes(data[:MRT_COMMON_HEADER_LEN])

if err != nil {
return
}
Expand Down
42 changes: 27 additions & 15 deletions pkg/server/mrt.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,19 @@ func (m *mrtWriter) loop() error {
case e.FourBytesAs:
subtype = mrt.MESSAGE_AS4
}
if bm, err := mrt.NewMRTMessage(uint32(e.Timestamp.Unix()), mrt.BGP4MP, subtype, mp); err != nil {
if bm, err := mrt.NewMRTMessage(e.Timestamp, mrt.BGP4MP, subtype, mp); err != nil {
m.s.logger.Warn("Failed to create MRT BGP4MP message",
log.Fields{
"Topic": "mrt",
"Data": e,
"Error": err,
"Subtype": subtype})
"Subtype": subtype,
})
} else {
msg = append(msg, bm)
}
case *watchEventTable:
t := uint32(time.Now().Unix())
t := time.Now()

peers := make([]*mrt.Peer, 1, len(e.Neighbor)+1)
// Adding dummy Peer record for locally generated routes
Expand All @@ -131,7 +132,8 @@ func (m *mrtWriter) loop() error {
"Topic": "mrt",
"Data": e,
"Error": err,
"Subtype": mrt.PEER_INDEX_TABLE})
"Subtype": mrt.PEER_INDEX_TABLE,
})
break
} else {
msg = append(msg, bm)
Expand Down Expand Up @@ -174,7 +176,8 @@ func (m *mrtWriter) loop() error {
"Topic": "mrt",
"Data": e,
"Error": err,
"Subtype": st})
"Subtype": st,
})
} else {
msg = append(msg, bm)
seq++
Expand Down Expand Up @@ -224,7 +227,8 @@ func (m *mrtWriter) loop() error {
m.s.logger.Warn("Can't write to destination MRT file",
log.Fields{
"Topic": "mrt",
"Error": err})
"Error": err,
})
}
}

Expand All @@ -236,7 +240,8 @@ func (m *mrtWriter) loop() error {
log.Fields{
"Topic": "mrt",
"Data": e,
"Error": err})
"Error": err,
})
} else {
b.Write(buf)
if b.Len() > 1*1000*1000 {
Expand All @@ -259,7 +264,8 @@ func (m *mrtWriter) loop() error {
m.s.logger.Warn("can't rotate MRT file",
log.Fields{
"Topic": "mrt",
"Error": err})
"Error": err,
})
}
}

Expand Down Expand Up @@ -293,7 +299,8 @@ func mrtFileOpen(logger log.Logger, filename string, rInterval uint64) (*os.File
log.Fields{
"Topic": "mrt",
"Filename": realname,
"RotationInterval": rInterval})
"RotationInterval": rInterval,
})

i := len(realname)
for i > 0 && os.IsPathSeparator(realname[i-1]) {
Expand All @@ -311,7 +318,8 @@ func mrtFileOpen(logger log.Logger, filename string, rInterval uint64) (*os.File
logger.Warn("can't create MRT destination directory",
log.Fields{
"Topic": "mrt",
"Error": err})
"Error": err,
})
return nil, err
}
}
Expand All @@ -321,7 +329,8 @@ func mrtFileOpen(logger log.Logger, filename string, rInterval uint64) (*os.File
logger.Warn("can't create MRT destination file",
log.Fields{
"Topic": "mrt",
"Error": err})
"Error": err,
})
}
return file, err
}
Expand Down Expand Up @@ -360,26 +369,29 @@ func (m *mrtManager) enable(c *oc.MrtConfig) error {
m.bgpServer.logger.Info("use minimum mrt rotation interval",
log.Fields{
"Topic": "mrt",
"Interval": minRotationInterval})
"Interval": minRotationInterval,
})
rInterval = minRotationInterval
}
}

if c.DumpType == oc.MRT_TYPE_TABLE {
switch c.DumpType {
case oc.MRT_TYPE_TABLE:
if rInterval == 0 {
if dInterval < minDumpInterval {
m.bgpServer.logger.Info("use minimum mrt dump interval",
log.Fields{
"Topic": "mrt",
"Interval": minDumpInterval})
"Interval": minDumpInterval,
})
dInterval = minDumpInterval
}
} else if dInterval == 0 {
setRotationMin()
} else {
return fmt.Errorf("can't specify both intervals in the table dump type")
}
} else if c.DumpType == oc.MRT_TYPE_UPDATES {
case oc.MRT_TYPE_UPDATES:
// ignore the dump interval
dInterval = 0
if len(c.TableName) > 0 {
Expand Down
Loading