Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 56 additions & 15 deletions iota/Subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,17 @@ type AnnResponse struct {

func NewSubscriber(nodeConfig *configfile.NodeConfig, subConfig *configfile.SubConfig) Subscriber {
// Generate Transport client
transport := C.tsp_client_new_from_url(C.CString(nodeConfig.Url))
transport := C.transport_client_new_from_url(C.CString(nodeConfig.Url))

var newSub *C.subscriber_t
cerr := C.sub_new(&newSub, C.CString(subConfig.Seed), C.CString(subConfig.Encoding), PAYLOAD_LENGTH, transport)
if cerr != C.ERR_OK {
fmt.Println(errCode(cerr))
}

// Generate Subscriber instance
sub := Subscriber {
C.sub_new(C.CString(subConfig.Seed), C.CString(subConfig.Encoding), PAYLOAD_LENGTH, transport),
newSub,
nil,
http.DefaultClient,
}
Expand All @@ -47,8 +53,18 @@ func NewSubscriber(nodeConfig *configfile.NodeConfig, subConfig *configfile.SubC
C.sub_receive_announce(sub.Subscriber, address)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After incorporating changes and using the v1.1.0 .so that you and I built earlier this week, I am receiving the following error on this line.

{"timestamp":"2021-07-15T16:39:20Z","hostname":"ubuntu","application":"./alvarium-example","line-number":"publisher.go:198","log-level":"debug","message":"announcement response - { \"announcement_id\": \"32bc4e28380e08ed4923b515481b045a028662441e7f28c2da82d7b838cc784d0000000000000000:6936d4e499c982365578a582\" }"}
{"timestamp":"2021-07-15T16:39:20Z","hostname":"ubuntu","application":"./alvarium-example","line-number":"sdk.go:41","log-level":"error","message":"sub_receive_announce() returned error \nSTREAMS ERROR: Operation failed to execute properly"}

I notice in the PR there's no error checking here so I removed my error check to see if it would fall through and work but no luck.


// Fetch sub link and pk for subscription
subLink := C.sub_send_subscribe(sub.Subscriber, address)
subPk := C.sub_get_public_key(sub.Subscriber)
var subLink *C.address_t
var subPk *C.public_key_t

cerr = C.sub_send_subscribe(&subLink, sub.Subscriber, address)
if cerr != C.ERR_OK {
fmt.Println(errCode(cerr))
}

cerr = C.sub_get_public_key(&subPk, sub.Subscriber)
if cerr != C.ERR_OK {
fmt.Println(errCode(cerr))
}

subIdStr := C.get_address_id_str(subLink)
subPkStr := C.public_key_to_string(subPk)
Expand Down Expand Up @@ -76,11 +92,20 @@ func (sub *Subscriber) SendMessage(message TangleMessage) {
messageBytes := C.CBytes([]byte(message.message))
messageLen := len(message.message)

C.sub_send_signed_packet(
var messageLinks C.message_links_t
log.Println("Sending streams message... ")
cerr := C.sub_send_signed_packet(
&messageLinks,
sub.Subscriber,
*sub.Keyload,
nil, 0,
(*C.uchar) (messageBytes), C.size_t(messageLen))

if cerr != C.ERR_OK {
fmt.Println(errCode(cerr))
}
msgStr := C.get_address_id_str(messageLinks.msg_link)
log.Println("Streams message sent ", C.GoString(msgStr))
}

func (sub *Subscriber) Drop() {
Expand All @@ -92,20 +117,26 @@ func (sub *Subscriber) AwaitKeyload() {
exists := false
for exists == false {
// Gen next message ids to look for existing messages
msgIds := C.sub_gen_next_msg_ids(sub.Subscriber)
// Search for keyload message from these ids and try to process it
processed := C.sub_receive_keyload_from_ids(sub.Subscriber, msgIds)
// Free memory for c msgids object
C.drop_next_msg_ids(msgIds)
var msgIds *C.next_msg_ids_t
cerr := C.sub_gen_next_msg_ids(&msgIds, sub.Subscriber)
if cerr != C.ERR_OK {
fmt.Println(errCode(cerr))
}

if processed != nil {
// Search for keyload message from these ids and try to process it
var processed C.message_links_t
cerr = C.sub_receive_keyload_from_ids(&processed, sub.Subscriber, msgIds)
if cerr != C.ERR_OK {
fmt.Println("Keyload not found yet... Checking again in 5 seconds...")
// Loop until keyload is found and processed
time.Sleep(time.Second * 5)
} else {
// Store keyload links for attaching messages to
sub.InsertKeyload(processed)
sub.InsertKeyload(&processed)
exists = true
} else {
// Loop until keyload is found and processed
time.Sleep(time.Second)
}
// Free memory for c msgids object
C.drop_next_msg_ids(msgIds)
}
}

Expand Down Expand Up @@ -147,3 +178,13 @@ func getAnnouncementId(url string) string {
}
return annResp.AnnId
}

func errCode(err C.err_t) string {
switch err {
case C.ERR_OK: return "\nFunction completed Ok"
case C.ERR_NULL_ARGUMENT: return "\nSTREAMS ERROR: Null argument passed to function"
case C.ERR_BAD_ARGUMENT: return "\nSTREAMS ERROR: Bad argument passed to function"
case C.ERR_OPERATION_FAILED: return "\nSTREAMS ERROR: Operation failed to execute properly"
}
return "\nError code does not match any provided error options"
}
151 changes: 108 additions & 43 deletions iota/include/channels.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@
#include <string.h>
#include <stdint.h>
#include <stdlib.h>
#include <stdbool.h>

typedef enum Err {
ERR_OK,
ERR_NULL_ARGUMENT,
ERR_BAD_ARGUMENT,
ERR_OPERATION_FAILED,
} err_t;

typedef struct Address address_t;
extern void drop_address(address_t const *);
Expand All @@ -12,12 +20,16 @@ extern address_t *address_from_string(char const *addr_str);
typedef struct ChannelAddress channel_address_t;
typedef struct MsgId msgid_t;
typedef struct PublicKey public_key_t;
typedef struct PskId psk_id_t;
typedef struct PskIds psk_ids_t;
typedef struct KePks ke_pks_t;

typedef struct NextMsgIds next_msg_ids_t;
extern void drop_next_msg_ids(next_msg_ids_t const *);

typedef struct UserState user_state_t;
extern void drop_user_state(user_state_t const *);

typedef struct UnwrappedMessage unwrapped_message_t;
extern void drop_unwrapped_message(unwrapped_message_t const *);

Expand All @@ -30,7 +42,6 @@ typedef struct MessageLinks {
} message_links_t;

extern void drop_links(message_links_t);
extern message_links_t new_message_links(address_t *msg_link, address_t *seq_link);

typedef struct Buffer {
uint8_t const *ptr;
Expand All @@ -51,86 +62,134 @@ extern void drop_payloads(packet_payloads_t);
/// Transport
////////////
typedef struct Transport transport_t;
extern transport_t *tsp_new();
extern void tsp_drop(transport_t *);
extern transport_t *transport_new();
extern void transport_drop(transport_t *);
#ifdef IOTA_STREAMS_CHANNELS_CLIENT
extern transport_t *tsp_client_new_from_url(char const *url);
extern void tsp_client_set_mwm(transport_t *tsp, uint8_t mwm);
extern transport_t *transport_client_new_from_url(char const *url);
#endif

#ifdef IOTA_STREAMS_CHANNELS_CLIENT
typedef enum LedgerInclusionState {
LIS_Conflicting = 0,
LIS_Included = 1,
LIS_NoTransaction = 2,
} ledger_inclusion_state_t;

typedef struct MessageMetadata {
char message_id[129];
char parent_message_ids[129][2];
bool is_solid;
uint32_t referenced_by_milestone_index;
uint32_t milestone_index;
ledger_inclusion_state_t ledger_inclusion_state;
uint8_t conflict_reason;
bool should_promote;
bool should_reattach;
uint32_t field_flags;
} msg_metadata_t;

typedef struct Milestone {
uint32_t milestone_index;
char message_id[129];
uint64_t timestamp;
} milestone_t;

typedef struct TransportDetails {
msg_metadata_t msg_metadata;
milestone_t milestone;
} transport_details_t;

extern err_t transport_get_link_details(transport_details_t *details, transport_t *transport, address_t const *link);
#endif

////////////
/// Author
////////////
typedef struct Author author_t;

extern author_t *auth_new(char const *seed, char const *encoding, size_t payload_length, uint8_t multi_branching, transport_t *tsp);
extern err_t auth_new(author_t **auth, char const *seed, char const *encoding, size_t payload_length, uint8_t multi_branching, transport_t *transport);
extern err_t auth_recover(author_t **auth, char const *seed, address_t const *announcement, uint8_t multi_branching, transport_t *transport);
extern void auth_drop(author_t *);

extern channel_address_t const *auth_channel_address(author_t const *user);
extern uint8_t auth_is_multi_branching(author_t const *user);
extern public_key_t const *auth_get_public_key(author_t const *user);
extern err_t auth_import(author_t **auth, buffer_t buffer, char const *password, transport_t *transport);
extern err_t auth_export(buffer_t *buf, author_t const *user, char const *password);

extern err_t auth_channel_address(channel_address_t const **addr, author_t const *user);
extern err_t auth_is_multi_branching(uint8_t *flag, author_t const *user);
extern err_t auth_get_public_key(public_key_t const **pk, author_t const *user);

// Announce
extern address_t const *auth_send_announce(author_t *author);
// Subscribe
extern void *auth_receive_subscribe(author_t *author, address_t const *address);
extern err_t auth_send_announce(address_t const **addr, author_t *author);
// Keyload
extern message_links_t auth_send_keyload(author_t *author, address_t const *link_to, psk_ids_t *psk_ids, ke_pks_t ke_pks);
extern err_t auth_send_keyload_for_everyone(message_links_t *links, author_t *author, address_t const *link_to);
extern err_t auth_send_keyload(message_links_t *links, author_t *author, address_t const *link_to, psk_ids_t *psk_ids, ke_pks_t ke_pks);

// Subscribe
extern err_t auth_receive_subscribe(author_t *author, address_t const *address);

extern message_links_t auth_send_keyload_for_everyone(author_t *author, address_t const *link_to);
// Tagged Packets
extern message_links_t auth_send_tagged_packet(author_t *author, message_links_t link_to, uint8_t const *public_payload_ptr, size_t public_payload_size, uint8_t const *masked_payload_ptr, size_t masked_payload_size);
extern packet_payloads_t auth_receive_tagged_packet(author_t *author, address_t const *address);
extern err_t auth_send_tagged_packet(message_links_t *links, author_t *author, message_links_t link_to, uint8_t const *public_payload_ptr, size_t public_payload_size, uint8_t const *masked_payload_ptr, size_t masked_payload_size);
extern err_t auth_receive_tagged_packet(packet_payloads_t *payloads, author_t *author, address_t const *address);
// Signed Packets
extern message_links_t auth_send_signed_packet(author_t *author, message_links_t link_to, uint8_t const *public_payload_ptr, size_t public_payload_size, uint8_t const *masked_payload_ptr, size_t masked_payload_size);
extern packet_payloads_t auth_receive_tagged_packet(author_t *author, address_t const *address) ;
extern err_t auth_send_signed_packet(message_links_t *links, author_t *author, message_links_t link_to, uint8_t const *public_payload_ptr, size_t public_payload_size, uint8_t const *masked_payload_ptr, size_t masked_payload_size);
extern err_t auth_receive_signed_packet(packet_payloads_t *payloads, author_t *author, address_t const *address) ;
// Sequence Message (for multi branch use)
extern address_t const *auth_receive_sequence(author_t *author, address_t const *address);
extern err_t auth_receive_sequence(address_t const **seq, author_t *author, address_t const *address);
// MsgId generation
extern next_msg_ids_t const *auth_gen_next_msg_ids(author_t *author);
extern err_t auth_gen_next_msg_ids(next_msg_ids_t const **ids, author_t *author);
// Generic Processing
extern unwrapped_message_t const *auth_receive_msg(author_t *author, address_t const *address);
extern err_t auth_receive_msg(unwrapped_message_t const **msg, author_t *author, address_t const *address);
// Fetching/Syncing
extern unwrapped_messages_t const *auth_fetch_next_msgs(author_t *author);
extern unwrapped_messages_t const *auth_sync_state(author_t *author);
extern err_t auth_fetch_next_msgs(unwrapped_messages_t const **umsgs, author_t *author);
extern err_t auth_sync_state(unwrapped_messages_t const **umsgs, author_t *author);
extern err_t auth_fetch_state(user_state_t const **state, author_t *author);
// Store Psk
extern err_t auth_store_psk(psk_id_t const **pskid, author_t *author, char const *psk);


/////////////
// Subscriber
/////////////
typedef struct Subscriber subscriber_t;
extern subscriber_t *sub_new(char const *seed, char const *encoding, size_t payload_length, transport_t *tsp);
extern err_t sub_new(subscriber_t **sub, char const *seed, char const *encoding, size_t payload_length, transport_t *transport);
extern err_t sub_recover(subscriber_t **sub, char const *seed, address_t const *announcement, transport_t *transport);
extern err_t sub_import(subscriber_t **sub, buffer_t buffer, char const *password, transport_t *transport);
extern err_t sub_export(buffer_t *buf, subscriber_t const *subscriber, char const *password);
extern void sub_drop(subscriber_t *);

extern channel_address_t const *sub_channel_address(subscriber_t const *user);
extern uint8_t sub_is_multi_branching(subscriber_t const *user);
extern public_key_t const *sub_get_public_key(subscriber_t const *user);
extern err_t sub_channel_address(channel_address_t const **addr, subscriber_t const *subscriber);
extern err_t sub_is_multi_branching(uint8_t *flag, subscriber_t const *subscriber);
extern err_t sub_get_public_key(public_key_t const **pk, subscriber_t const *subscriber);

// Registration state
extern uint8_t sub_is_registered(subscriber_t const *subscriber);
extern void sub_unregister(subscriber_t *subscriber);

// Announce
extern void sub_receive_announce(subscriber_t *subscriber, address_t const *address);
extern err_t sub_receive_announce(subscriber_t *subscriber, address_t const *address);
// Subscribe
extern address_t const *sub_send_subscribe(subscriber_t *subscriber, address_t const *announcement_link);
extern err_t sub_send_subscribe(address_t const **link, subscriber_t *subscriber, address_t const *announcement_link);
// Keyload
extern void sub_receive_keyload(subscriber_t *subscriber, address_t const *address);
extern message_links_t *sub_receive_keyload_from_ids(subscriber_t *subscriber, next_msg_ids_t const *messageLinks);
extern err_t sub_receive_keyload(subscriber_t *subscriber, address_t const *address);
extern err_t sub_receive_keyload_from_ids(message_links_t *links, subscriber_t *subscriber, next_msg_ids_t const *next_msg_ids);
// Tagged Packets
extern message_links_t sub_send_tagged_packet(subscriber_t *subscriber, message_links_t link_to, uint8_t const *public_payload_ptr, size_t public_payload_size, uint8_t const *masked_payload_ptr, size_t masked_payload_size);
extern packet_payloads_t sub_receive_tagged_packet(subscriber_t *subscriber, address_t const *address);
extern err_t sub_send_tagged_packet(message_links_t *links, subscriber_t *subscriber, message_links_t link_to, uint8_t const *public_payload_ptr, size_t public_payload_size, uint8_t const *masked_payload_ptr, size_t masked_payload_size);
extern err_t sub_receive_tagged_packet(packet_payloads_t *payloads, subscriber_t *subscriber, address_t const *address);
// Signed Packets
extern message_links_t sub_send_signed_packet(subscriber_t *subscriber, message_links_t link_to, uint8_t const *public_payload_ptr, size_t public_payload_size, uint8_t const *masked_payload_ptr, size_t masked_payload_size);
extern packet_payloads_t sub_receive_signed_packet(subscriber_t *subscriber, address_t const *address);
extern err_t sub_send_signed_packet(message_links_t *links, subscriber_t *subscriber, message_links_t link_to, uint8_t const *public_payload_ptr, size_t public_payload_size, uint8_t const *masked_payload_ptr, size_t masked_payload_size);
extern err_t sub_receive_signed_packet(packet_payloads_t *payloads, subscriber_t *subscriber, address_t const *address);
// Sequence Message (for multi branch use)
extern address_t const *sub_receive_sequence(subscriber_t *subscriber, address_t const *address);
extern err_t sub_receive_sequence(address_t const **address, subscriber_t *subscriber, address_t const *seq_address);
// MsgId Generation
extern next_msg_ids_t const *sub_gen_next_msg_ids(subscriber_t *subscriber);
extern err_t sub_gen_next_msg_ids(next_msg_ids_t const **ids, subscriber_t *subscriber);
// Generic Message Processing
extern unwrapped_message_t const *sub_receive_msg(subscriber_t *subscriber, address_t const *address);
extern err_t sub_receive_msg(unwrapped_message_t const *umsg, subscriber_t *subscriber, address_t const *address);
// Fetching/Syncing
extern unwrapped_messages_t const *sub_fetch_next_msgs(subscriber_t *subscriber);
extern unwrapped_messages_t const *sub_sync_state(subscriber_t *subscriber);
extern err_t sub_fetch_next_msgs(unwrapped_messages_t const **messages, subscriber_t *subscriber);
extern err_t sub_sync_state(unwrapped_messages_t const **messages, subscriber_t *subscriber);
extern err_t sub_fetch_state(user_state_t const **state, subscriber_t *subscriber);
// Store Psk
extern err_t sub_store_psk(psk_id_t const **pskid, subscriber_t *subscriber, char const *psk);

/////////////
/// Utility
Expand All @@ -142,12 +201,18 @@ extern char const *get_msgid_str(msgid_t const *msgid);

extern char const *get_address_inst_str(address_t const *address);
extern char const *get_address_id_str(address_t const *address);
extern char const *get_address_inst_trytes(address_t const *address);
extern char const *get_address_id_trytes(address_t const *address);

extern char const *public_key_to_string(public_key_t *pk);
extern char const *public_key_to_string(public_key_t *pubkey);

extern packet_payloads_t get_payload(unwrapped_message_t const *message);
extern size_t get_payloads_count(unwrapped_messages_t const *messages);
extern packet_payloads_t get_indexed_payload(unwrapped_messages_t const *messages, size_t index);

extern char const *get_address_index_str(address_t const *address);

extern address_t const *get_link_from_state(user_state_t const *state, public_key_t const *pub_key);

extern char const *pskid_as_str(psk_id_t const *pskid);
extern void drop_pskid(psk_id_t const *pskid);

#endif //IOTA_STREAMS_CHANNELS_H