Skip to content

Commit

Permalink
Enable accessing underlying Message in TableView
Browse files Browse the repository at this point in the history
  • Loading branch information
petermnhull committed Feb 10, 2024
1 parent f476814 commit cb51dc9
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 11 deletions.
10 changes: 8 additions & 2 deletions pulsar/table_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,18 @@ type TableView interface {
// ContainsKey returns true if this TableView contains a mapping for the specified key.
ContainsKey(key string) bool

// Get returns the value to which the specified key is mapped, or nil if this map contains no mapping for the key.
// Get returns the value to which the specified key is mapped, or nil if this map contains no mapping for the key or the Message cannot be encoded to the SchemaValueType
Get(key string) interface{}

// Entries returns a map view of the mappings contained in this TableView.
// Message returns the Message to which the specified key is mapped, or nil if this map contains no mapping for the key.
Message(key string) Message

// Entries returns a map view of the mappings contained in this TableView, with values encoded into SchemaValueType.
Entries() map[string]interface{}

// Messages returns a map view of the Message mappings contained in this TableView.
Messages() map[string]Message

// Keys returns a slice of the keys contained in this TableView.
Keys() []string

Expand Down
57 changes: 48 additions & 9 deletions pulsar/table_view_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type TableViewImpl struct {
options TableViewOptions

dataMu sync.Mutex
data map[string]interface{}
data map[string]Message

readersMu sync.Mutex
cancelRaders map[string]cancelReader
Expand Down Expand Up @@ -75,7 +75,7 @@ func newTableView(client *client, options TableViewOptions) (TableView, error) {
tv := TableViewImpl{
client: client,
options: options,
data: make(map[string]interface{}),
data: make(map[string]Message),
cancelRaders: make(map[string]cancelReader),
logger: logger,
closedCh: make(chan struct{}),
Expand Down Expand Up @@ -178,6 +178,23 @@ func (tv *TableViewImpl) ContainsKey(key string) bool {
}

func (tv *TableViewImpl) Get(key string) interface{} {
tv.dataMu.Lock()
defer tv.dataMu.Unlock()
msg, ok := tv.data[key]
if !ok {
return nil
}

v, err := tv.schemaValueFromMessage(msg)
if err != nil {
tv.logger.Errorf("getting value for message, %w; msg is %v", err, msg)
return nil
}

return v
}

func (tv *TableViewImpl) Message(key string) Message {
tv.dataMu.Lock()
defer tv.dataMu.Unlock()
return tv.data[key]
Expand All @@ -186,10 +203,23 @@ func (tv *TableViewImpl) Get(key string) interface{} {
func (tv *TableViewImpl) Entries() map[string]interface{} {
tv.dataMu.Lock()
defer tv.dataMu.Unlock()

data := make(map[string]interface{}, len(tv.data))
for k, v := range tv.data {
for k, msg := range tv.data {
v, err := tv.schemaValueFromMessage(msg)
if err != nil {
tv.logger.Errorf("getting value for message, %w; msg is %v", len(tv.listeners), err, msg)
continue
}
data[k] = v
}

return data
}

func (tv *TableViewImpl) Messages() map[string]Message {
tv.dataMu.Lock()
defer tv.dataMu.Unlock()
return tv.data
}

Expand Down Expand Up @@ -245,23 +275,32 @@ func (tv *TableViewImpl) handleMessage(msg Message) {
tv.dataMu.Lock()
defer tv.dataMu.Unlock()

payload := reflect.New(tv.options.SchemaValueType)
if len(msg.Payload()) == 0 {
delete(tv.data, msg.Key())
} else {
if err := msg.GetSchemaValue(payload.Interface()); err != nil {
tv.logger.Errorf("msg.GetSchemaValue() failed with %v; msg is %v", err, msg)
}
tv.data[msg.Key()] = reflect.Indirect(payload).Interface()
tv.data[msg.Key()] = msg
}

v, err := tv.schemaValueFromMessage(msg)
if err != nil {
tv.logger.Errorf("will not action %d listeners, getting value for message, %w; msg is %v", len(tv.listeners), err, msg)
return
}
for _, listener := range tv.listeners {
if err := listener(msg.Key(), reflect.Indirect(payload).Interface()); err != nil {
if err := listener(msg.Key(), v); err != nil {
tv.logger.Errorf("table view listener failed for %v: %w", msg, err)
}
}
}

func (tv *TableViewImpl) schemaValueFromMessage(msg Message) (interface{}, error) {
payload := reflect.New(tv.options.SchemaValueType)
if err := msg.GetSchemaValue(payload.Interface()); err != nil {
return nil, fmt.Errorf("msg.GetSchemaValue() failed: %w", err)
}
return reflect.Indirect(payload).Interface(), nil
}

func (tv *TableViewImpl) watchReaderForNewMessages(ctx context.Context, reader Reader) {
for {
msg, err := reader.Next(ctx)
Expand Down
66 changes: 66 additions & 0 deletions pulsar/table_view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,72 @@ func TestTableView(t *testing.T) {
}
}

func TestTableView_Message(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.NoError(t, err)
defer client.Close()

topic := newTopicName()
schema := NewStringSchema(nil)

// Create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
Schema: schema,
})
assert.NoError(t, err)
defer producer.Close()

numMsg := 10
valuePrefix := "hello table view: "
publicationTimeForKey := map[string]time.Time{}
keys := make([]string, 0, numMsg)

for i := 0; i < numMsg; i++ {
key := fmt.Sprintf("%d", i)
keys = append(keys, key)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
_, err = producer.Send(ctx, &ProducerMessage{
Key: key,
Value: fmt.Sprintf(valuePrefix + key),
})
assert.NoError(t, err)

publicationTimeForKey[key] = time.Now()
}

// Create table view
v := ""
tv, err := client.CreateTableView(TableViewOptions{
Topic: topic,
Schema: schema,
SchemaValueType: reflect.TypeOf(&v),
})
assert.NoError(t, err)
defer tv.Close()

// Wait until table view receives all messages
for tv.Size() < numMsg {
time.Sleep(time.Second * 500)
t.Logf("TableView number of elements: %d", tv.Size())
}

for _, k := range keys {
msg := tv.Message(k)

// Check that the payload can be accessed as bytes
assert.Equal(t, []byte(fmt.Sprintf("%s%s", valuePrefix, k)), msg.Payload())

// Check publication times can be accessed and are close to the recorded times above
assert.WithinDuration(t, publicationTimeForKey[k], msg.PublishTime(), time.Millisecond*10)
}
}

func TestTableViewSchemas(t *testing.T) {
var tests = []struct {
name string
Expand Down

0 comments on commit cb51dc9

Please sign in to comment.