Skip to content

Implemented reading in multiple threads of data from different bird tables#275

Open
stal76 wants to merge 6 commits intomainfrom
bird-import-proto
Open

Implemented reading in multiple threads of data from different bird tables#275
stal76 wants to merge 6 commits intomainfrom
bird-import-proto

Conversation

@stal76
Copy link
Collaborator

@stal76 stal76 commented Feb 11, 2025

Autotests and an application for testing the reading of route information from bird have also been added.

@stal76 stal76 force-pushed the bird-import-proto branch 3 times, most recently from e65f95a to 2416df2 Compare February 12, 2025 13:52
@ol-imorozko
Copy link
Collaborator

ol-imorozko commented Feb 24, 2025

It's better to use the imperative mood for commit messages. I suggest:

PR name:

  • Implement reading data from multiple threads across different bird tables

Commits:

  • Start bird import thread (Already good)
  • Add ability to read route information from multiple bird tables
  • Emulate working with bird in autotests
  • Add autotests that emulate reading the routing table from bird
  • Add application for testing route information reading from bird

Copy link
Collaborator

@ol-imorozko ol-imorozko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current approach with raw memory and decode_* functions is problematic and I want it changed.

Casting raw memory to different types violates strict aliasing rules and can lead to UB.

Instead we should use std::istream which is an abstraction over a character sequence, which is precisely what we're operating with here. It provides type-safe methods for reading data and automatically handles boundary checking.

This way, instead of

static inline bool decode_u8(uintptr_t* ppos, uintptr_t end, uint8_t* pvalue)
{
    if (*ppos + sizeof(uint8_t) > end)
        return false;
    *pvalue = *(uint8_t*)*ppos;
    *ppos += sizeof(uint8_t);
    return true;
}

we will have just

static inline bool decode_u8(std::istream& stream, uint8_t& value)
{
    return !!stream.read(reinterpret_cast<char*>(&value), sizeof(value));
}

I will propose specific changes to replace raw memory decoding with std::istream in the related places with review comments by myself, so I'm not asking you to rewrite the whole file by yourself.

Comment on lines 1 to 8
#ifndef LIBBIRD_H
#define LIBBIRD_H

#include <stdint.h>

#define NET_IP4 1
#define NET_IP6 2
#define NET_VPN4 3
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand, this is a copy of some file in https://github.com/CZ-NIC/bird, right? Then I guess we need some commentary at the top explaining that this is a copy and that is should not be changed. Correct me if I'm wrong, please

#include <unistd.h>
#include <vector>

static inline bool
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static: not needed in a .cpp file because functions already have internal linkage
inline: not needed because there’s no risk of multiple definitions

Comment on lines +10 to +18
static inline bool
decode_u8(uintptr_t* ppos, uintptr_t end, uint8_t* pvalue)
{
if (*ppos + sizeof(uint8_t) > end)
return false;
*pvalue = *(uint8_t*)*ppos;
*ppos += sizeof(uint8_t);
return true;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
static inline bool
decode_u8(uintptr_t* ppos, uintptr_t end, uint8_t* pvalue)
{
if (*ppos + sizeof(uint8_t) > end)
return false;
*pvalue = *(uint8_t*)*ppos;
*ppos += sizeof(uint8_t);
return true;
}
bool decode_u8(std::istream& stream, uint8_t& value)
{
return !!(stream.read(reinterpret_cast<char*>(&value), sizeof(value)));
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::istream::read reads stream and fills value safely.
Since we want to return bool from the function on successful read, we can utilize explicit cast of std::istream object to a bool. We can do it either with a static_cast<bool>(stream.read....), or with !!. Use whichever you prefer, I personally prefer !! more since it's shorter

Comment on lines +20 to +28
static inline bool
decode_u16(uintptr_t* ppos, uintptr_t end, uint16_t* pvalue)
{
if (*ppos + sizeof(uint16_t) > end)
return false;
*pvalue = *(uint16_t*)*ppos;
*ppos += sizeof(uint16_t);
return true;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will not be needed, since decoding of an array can be done explicitly with std::istream

Suggested change
static inline bool
decode_u16(uintptr_t* ppos, uintptr_t end, uint16_t* pvalue)
{
if (*ppos + sizeof(uint16_t) > end)
return false;
*pvalue = *(uint16_t*)*ppos;
*ppos += sizeof(uint16_t);
return true;
}

Comment on lines +30 to +38
static inline bool
decode_u32(uintptr_t* ppos, uintptr_t end, uint32_t* pvalue)
{
if (*ppos + sizeof(uint32_t) > end)
return false;
*pvalue = *(uint32_t*)*ppos;
*ppos += sizeof(uint32_t);
return true;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
static inline bool
decode_u32(uintptr_t* ppos, uintptr_t end, uint32_t* pvalue)
{
if (*ppos + sizeof(uint32_t) > end)
return false;
*pvalue = *(uint32_t*)*ppos;
*ppos += sizeof(uint32_t);
return true;
}
bool decode_u32(std::istream& stream, uint32_t& value)
{
return !!(stream.read(reinterpret_cast<char*>(&value), sizeof(value)));
}

Comment on lines +40 to +48
static inline bool
decode_chunk(uintptr_t* ppos, uintptr_t end, uintptr_t* pchunk_end)
{
uint32_t chunk_size;
if (!decode_u32(ppos, end, &chunk_size))
return false;
*pchunk_end = *ppos + chunk_size;
return *pchunk_end <= end;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
static inline bool
decode_chunk(uintptr_t* ppos, uintptr_t end, uintptr_t* pchunk_end)
{
uint32_t chunk_size;
if (!decode_u32(ppos, end, &chunk_size))
return false;
*pchunk_end = *ppos + chunk_size;
return *pchunk_end <= end;
}
bool decode_chunk(std::istream& stream, std::streampos& chunk_end)
{
uint32_t chunk_size;
if (!decode_u32(stream, chunk_size))
return false;
chunk_end = stream.tellg() + std::streampos(chunk_size);
return true;
}

We don't need to compare it with the end since std::istream manages it automatically. If there's not enough data, the following read operation will simply fail

Comment on lines +345 to +354
size_t attrs_end;
if (!decode_chunk(ppos, end, &attrs_end))
return false;

if (attrs_end != end)
return false;

/* Now decode all attributes one by one. */
while (*ppos < attrs_end)
{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
size_t attrs_end;
if (!decode_chunk(ppos, end, &attrs_end))
return false;
if (attrs_end != end)
return false;
/* Now decode all attributes one by one. */
while (*ppos < attrs_end)
{
std::streampos attrs_end;
if (!decode_chunk(stream, attrs_end))
{
return false;
}
/* Now decode all attributes one by one. */
while (stream.tellg() < attrs_end)
{

while (*ppos < attrs_end)
{
uint32_t attr_id;
if (!decode_u32(ppos, attrs_end, &attr_id))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (!decode_u32(ppos, attrs_end, &attr_id))
if (!decode_u32(stream, attr_id))

case BA_ORIGIN:
{
uint32_t origin;
if (!decode_u32(ppos, attrs_end, &origin))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (!decode_u32(ppos, attrs_end, &origin))
if (!decode_u32(stream, origin))

and the rest of function calls in a similar manner..

* - true: some kind of error has occurred and it may be worth running the function after some time interval.
* - false: data was received through pipe indicating that the thread needs to be completed
*/
bool read_bird_feed(const char* sock_name, const char* vrf, rib_update_handler handler, int pipe_close)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bool read_bird_feed(const char* sock_name, const char* vrf, rib_update_handler handler, int pipe_close)
bool read_bird_feed(const char* sock_name, const std::string& vrf, rib_update_handler handler, int pipe_close)

Comment on lines +690 to +705
/* pos and end denote addresses of a memory chunk to parse. */
uintptr_t pos = (uintptr_t)read_buf + parse_pos;
uintptr_t end = (uintptr_t)read_buf + read_pos;

/* Determine boundaries of the next route update. */
uintptr_t route_end;
if (!decode_chunk(&pos, end, &route_end))
break;

common::icp::rib_update::action action;
if (!parse_route_update(&pos, route_end, vrf, &action))
break;

requests.emplace_back(action);

parse_pos = pos - (uintptr_t)read_buf;
Copy link
Collaborator

@ol-imorozko ol-imorozko Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/* pos and end denote addresses of a memory chunk to parse. */
uintptr_t pos = (uintptr_t)read_buf + parse_pos;
uintptr_t end = (uintptr_t)read_buf + read_pos;
/* Determine boundaries of the next route update. */
uintptr_t route_end;
if (!decode_chunk(&pos, end, &route_end))
break;
common::icp::rib_update::action action;
if (!parse_route_update(&pos, route_end, vrf, &action))
break;
requests.emplace_back(action);
parse_pos = pos - (uintptr_t)read_buf;
/* Create a stringstream from the current buffer */
std::stringstream stream;
stream.write((char*)((uintptr_t)read_buf + parse_pos), read_pos - parse_pos);
std::streampos route_end;
if (!decode_chunk(stream, route_end))
break;
common::icp::rib_update::action action;
if (!parse_route_update(stream, vrf, action))
break;
requests.push_back(std::move(action));
parse_pos += stream.tellg();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants

Comments