-
Notifications
You must be signed in to change notification settings - Fork 81
Open
Description
The fumarole connect method always tries to create a subscriber instead of using an existing one.
async fn connect(&self, tx: Sender<Result<SubscribeUpdate, Status>>) -> Result<(), VixenError> {
let filters = self.filters.clone();
let subscriber_name = self.config.subscriber_name.clone();
// TODO: add tasks pool concurrency limit through config
let mut tasks_set = JoinSet::new();
const MAX_PARA_DATA_STREAMS: u8 = 4; //Fumarole const
let fumarole_subscribe_config = FumaroleSubscribeConfig {
num_data_plane_tcp_connections: NonZero::new(MAX_PARA_DATA_STREAMS).unwrap(),
..Default::default()
};
let (initial_offset_policy, from_slot) = match self.config.from_slot {
Some(slot) => (InitialOffsetPolicy::FromSlot, Some(slot)),
None => (InitialOffsetPolicy::Latest, None),
};
let mut fumarole_client = FumaroleClient::connect(self.config.clone().into())
.await
.expect("failing to connect to fumarole");
// ----- THIS IS THE PROBLEM -----
let group_result = fumarole_client
.create_consumer_group(CreateConsumerGroupRequest {
consumer_group_name: subscriber_name.clone(),
initial_offset_policy: initial_offset_policy.into(),
// If the initial offset policy is "from-slot", this is the slot to start from.
// If not specified, the subscriber will start from the latest slot.
from_slot,
})
.await;
// ----- THIS IS THE PROBLEM -----Also, users claim they get ratelimit when the call connect >
I suspect the connect function gets invoked too many time in a tight loop during the building process.
Metadata
Metadata
Assignees
Labels
No labels