Skip to content

Amqp source add #831

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 19 commits into
base: master
Choose a base branch
from
Draft

Amqp source add #831

wants to merge 19 commits into from

Conversation

stan-dot
Copy link

@stan-dot stan-dot commented Feb 4, 2025

Fixes #806

My first contribution to the repo, I assume it has a couple of mistakes at this point, so draft for now.

I'd welcome an initial review.

@stan-dot
Copy link
Author

stan-dot commented Feb 6, 2025

looking for help with this error:

error[E0277]: the trait bound `AmqpState: arroyo_types::Data` is not satisfied
   --> crates/arroyo-connectors/src/amqp/operator.rs:163:55
    |
163 | ...                   let s = ctx.table_manager.get_global_keyed_state("k").await
    |                                                 ^^^^^^^^^^^^^^^^^^^^^^ the trait `bincode::Encode` is not implemented for `AmqpState`
    |
    = help: the following other types implement trait `bincode::Encode`:
              &'a CStr
              &'a T
              &std::path::Path
              ()
              (A, B)
              (A, B, C)
              (A, B, C, D)
              (A, B, C, D, E)
            and 141 others
    = note: required for `AmqpState` to implement `arroyo_types::Data`
note: required by a bound in `TableManager::get_global_keyed_state`
   --> /home/runner/work/arroyo/arroyo/crates/arroyo-state/src/tables/table_manager.rs:387:52
    |
387 |     pub async fn get_global_keyed_state<K: Key, V: Data>(
    |                                                    ^^^^ required by this bound in `TableManager::get_global_keyed_state`

@stan-dot
Copy link
Author

stan-dot commented Feb 7, 2025

@ecarrara , @mwylde I saw your commits in the arguably similar standard Rabbit MQ connector (https://github.com/ArroyoSystems/arroyo/blob/master/crates/arroyo-connectors/src/rabbitmq/source.rs), I'm a bit confused about the offsets and GlobalKeyedView, could you please take a quick look?

@mwylde
Copy link
Member

mwylde commented Mar 12, 2025

@stan-dot — I'm sorry this has taken so long.

There are a few issues here:

The error you posted about the trait bound AmqpState: arroyo_types::Data is not satisfied means that in order for a struct to be stored in state, it needs to satisfy the arroyo_types::Data trait:

pub trait Data: Debug + Clone + Encode + Decode + Send + 'static {}

so it needs to implement all of those traits (Debug + Clone + Encode + Decode + Send), but currently it jus implements Debug and Clone. You can fix this by deriving Encode and Decode (see an example here:
#[derive(Copy, Clone, Debug, Encode, Decode, PartialEq, PartialOrd)]
pub struct KafkaState {
partition: i32,
offset: i64,
}
)

There also appear to be some problems with results and ?. There are various places you're propagating an error with ? in a method that returns lapin::Result but the error you're propagating cannot be converted into a lapin::Error. This will likely work as you expect if you instead use anyhow::Result.

@stan-dot
Copy link
Author

thanks for response @mwylde this has been very helpful.

I think I addressed the build errors in operator.rs now I notice I didn't expect the table.json to be necessary to define and then import inside mod.rs.

How to structure mod.rs and how to arrive at the right schemas for profile.json and table.json?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add STOMP message bus as source connector
2 participants