Skip to content

Commit fd995d0

Browse files
committed
Implement oracle related log types for event consumer
1 parent a5a88bf commit fd995d0

11 files changed

Lines changed: 573 additions & 2 deletions

log-events-consumer/consume/consume_batch_integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func TestConsumeBatch_Integration(t *testing.T) {
6464
}
6565

6666
m := metrics.NewMetrics(fmt.Sprintf("test_integration_%d", i))
67-
consumer := NewConsumer(mockKafka, mockElastic, &tickstore.NoOpStore{}, m, map[uint64][]int16{0: {0, 1, 2, 3, 4, 5, 6, 8, 11, 12, 13, 255}})
67+
consumer := NewConsumer(mockKafka, mockElastic, &tickstore.NoOpStore{}, m, map[uint64][]int16{0: {0, 1, 2, 3, 4, 5, 6, 8, 11, 12, 13, 14, 15, 255}})
6868

6969
records, docs, err := consumer.consumeBatch(context.Background())
7070
require.NoError(t, err)

log-events-consumer/domain/kafka_to_elastic_serialization_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,14 @@ func TestLogEvent_Integration(t *testing.T) {
5757
filename: "testdata/kafka/contract-reserve-deduction-with-transaction.json",
5858
expected: "testdata/elastic/contract-reserve-deduction-with-transaction.json",
5959
},
60+
{
61+
filename: "testdata/kafka/oracle-query-status-change.json",
62+
expected: "testdata/elastic/oracle-query-status-change.json",
63+
},
64+
{
65+
filename: "testdata/kafka/oracle-subscriber-log-message.json",
66+
expected: "testdata/elastic/oracle-subscriber-log-message.json",
67+
},
6068
{
6169
filename: "testdata/kafka/custom-message-start-dividends.json",
6270
expected: "testdata/elastic/custom-message-start-dividends.json",

log-events-consumer/domain/log_event.go

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"slices"
77
"strconv"
88
"strings"
9+
"time"
910

1011
"github.com/qubic/go-node-connector/types"
1112
)
@@ -149,6 +150,18 @@ func (le *LogEvent) ToLogEventElastic() (LogEventElastic, error) {
149150
return LogEventElastic{}, fmt.Errorf("handling reserve deduction: %w", err)
150151
}
151152

153+
case 14:
154+
err = handleOracleQueryStatusChange(&lee, le.Body)
155+
if err != nil {
156+
return LogEventElastic{}, fmt.Errorf("handling oracle query status change: %w", err)
157+
}
158+
159+
case 15:
160+
err = handleOracleSubscriberLogMessage(&lee, le.Body)
161+
if err != nil {
162+
return LogEventElastic{}, fmt.Errorf("handling oracle subscriber log message: %w", err)
163+
}
164+
152165
case 255:
153166
err = handleCustomMessage(&lee, le.Body)
154167
if err != nil {
@@ -481,6 +494,107 @@ func handleReserveDeduction(lee *LogEventElastic, body map[string]any) error {
481494
return nil
482495
}
483496

497+
func handleOracleQueryStatusChange(lee *LogEventElastic, body map[string]any) error {
498+
var err error
499+
500+
qe, ok := body["queryingEntity"].(string)
501+
if !ok {
502+
return fmt.Errorf("missing or invalid querying entity")
503+
}
504+
lee.QueryingEntity = qe
505+
506+
qi, ok := body["queryId"].(float64)
507+
if !ok {
508+
return fmt.Errorf("missing or invalid query id")
509+
}
510+
lee.QueryID, err = toUint64(qi)
511+
if err != nil {
512+
return fmt.Errorf("converting query id: %w", err)
513+
}
514+
515+
ii, ok := body["interfaceIndex"].(float64)
516+
if !ok {
517+
return fmt.Errorf("missing or invalid interface index")
518+
}
519+
lee.InterfaceIndex, err = toUint64(ii)
520+
if err != nil {
521+
return fmt.Errorf("converting interface index: %w", err)
522+
}
523+
524+
qt, ok := body["type"].(float64)
525+
if !ok {
526+
return fmt.Errorf("missing or invalid query type")
527+
}
528+
lee.QueryType, err = toInt16(qt)
529+
if err != nil {
530+
return fmt.Errorf("converting query type: %w", err)
531+
}
532+
533+
qs, ok := body["status"].(string)
534+
if !ok {
535+
return fmt.Errorf("missing or invalid query status")
536+
}
537+
qsv := getOracleQueryStatusValue(qs)
538+
if qsv == 0 {
539+
return fmt.Errorf("invalid query status: %s(%d)", qs, qsv)
540+
}
541+
lee.QueryStatus = &qsv
542+
543+
return nil
544+
}
545+
546+
func handleOracleSubscriberLogMessage(lee *LogEventElastic, body map[string]any) error {
547+
var err error
548+
549+
sid, ok := body["subscriptionId"].(float64)
550+
if !ok {
551+
return fmt.Errorf("missing or invalid subscription id")
552+
}
553+
lee.SubscriptionID, err = toUint32(sid)
554+
if err != nil {
555+
return fmt.Errorf("converting subscription id: %w", err)
556+
}
557+
558+
ii, ok := body["interfaceIndex"].(float64)
559+
if !ok {
560+
return fmt.Errorf("missing or invalid interface index")
561+
}
562+
lee.InterfaceIndex, err = toUint64(ii)
563+
if err != nil {
564+
return fmt.Errorf("converting interface index: %w", err)
565+
}
566+
567+
ci, ok := body["contractIndex"].(float64)
568+
if !ok {
569+
return fmt.Errorf("missing or invalid contract index")
570+
}
571+
lee.ContractIndex, err = toUint64(ci)
572+
if err != nil {
573+
return fmt.Errorf("converting contract index: %w", err)
574+
}
575+
576+
pim, ok := body["periodInMilliseconds"].(float64)
577+
if !ok {
578+
return fmt.Errorf("missing or invalid period in milliseconds")
579+
}
580+
lee.PeriodMillis, err = toUint64(pim)
581+
if err != nil {
582+
return fmt.Errorf("converting period in milliseconds: %w", err)
583+
}
584+
585+
fqdt, ok := body["firstQueryDateAndTime"].(string)
586+
if !ok {
587+
return fmt.Errorf("missing or invalid first query date and time")
588+
}
589+
fqdtv, err := stringToDateAndTime(fqdt)
590+
if err != nil {
591+
return fmt.Errorf("converting first query date and time: %w", err)
592+
}
593+
lee.FirstQueryTimestamp = &fqdtv
594+
595+
return nil
596+
}
597+
484598
func handleCustomMessage(lee *LogEventElastic, body map[string]any) error {
485599
var err error
486600

@@ -497,6 +611,38 @@ func handleCustomMessage(lee *LogEventElastic, body map[string]any) error {
497611
return nil
498612
}
499613

614+
func getOracleQueryStatusValue(status string) int16 {
615+
switch status {
616+
case "pending":
617+
return 1
618+
case "committed":
619+
return 2
620+
case "success":
621+
return 3
622+
case "timeout":
623+
return 4
624+
case "unresolvable":
625+
return 5
626+
default:
627+
return 0
628+
}
629+
}
630+
631+
func stringToDateAndTime(s string) (uint64, error) {
632+
var year, month, day, hour, minute, second, millisecond, microsecond int
633+
_, err := fmt.Sscanf(s, "%d-%02d-%02d %02d:%02d:%02d.%03d'%03d",
634+
&year, &month, &day, &hour, &minute, &second, &millisecond, &microsecond)
635+
if err != nil {
636+
return 0, fmt.Errorf("parsing date and time format: %w", err)
637+
}
638+
639+
t := time.Date(year, time.Month(month), day, hour, minute, second,
640+
(millisecond*1_000+microsecond)*1_000, // nanoseconds
641+
time.UTC)
642+
643+
return uint64(t.UnixMilli()), nil
644+
}
645+
500646
func toUnitOfMeasurement(unitOfMeasurement string) ([]byte, error) {
501647
if len(unitOfMeasurement) != 7 {
502648
err := fmt.Errorf("expected 7 characters but got [%s]", unitOfMeasurement)
@@ -546,6 +692,22 @@ func toInt64(num float64) (*int64, error) {
546692
return &converted, nil
547693
}
548694

695+
func toUint32(num float64) (*uint32, error) {
696+
converted := uint32(num)
697+
if num < 0 || num != float64(converted) {
698+
return nil, fmt.Errorf("cannot convert to uint32: %f", num)
699+
}
700+
return &converted, nil
701+
}
702+
703+
func toInt16(num float64) (*int16, error) {
704+
converted := int16(num)
705+
if num < 0 || num != float64(converted) {
706+
return nil, fmt.Errorf("cannot convert to int16: %f", num)
707+
}
708+
return &converted, nil
709+
}
710+
549711
func toByte(num float64) (*byte, error) {
550712
if num < 0 || num > 255 {
551713
return nil, fmt.Errorf("cannot convert to byte: %f", num)

log-events-consumer/domain/log_event_elastic.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,14 @@ type LogEventElastic struct {
5252
DestinationContractIndex *uint64 `json:"destinationContractIndex,omitempty"`
5353
CustomMessage *uint64 `json:"customMessage,omitempty"`
5454
RawPayload []byte `json:"rawPayload,omitempty"`
55+
QueryingEntity string `json:"queryingEntity,omitempty"`
56+
QueryID *uint64 `json:"queryId,omitempty"`
57+
QueryType *int16 `json:"queryType,omitempty"`
58+
QueryStatus *int16 `json:"queryStatus,omitempty"`
59+
SubscriptionID *uint32 `json:"subscriptionId,omitempty"`
60+
InterfaceIndex *uint64 `json:"interfaceIndex,omitempty"`
61+
PeriodMillis *uint64 `json:"periodMillis,omitempty"`
62+
FirstQueryTimestamp *uint64 `json:"firstQueryTimestamp,omitempty"`
5563
}
5664

5765
func (lee *LogEventElastic) IsSupported() bool {

0 commit comments

Comments
 (0)