Skip to content
7 changes: 7 additions & 0 deletions share/shwap/accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"

libshare "github.com/celestiaorg/go-square/v3/share"
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/share"
)
Expand All @@ -13,5 +14,11 @@ import (
type Accessor interface {
AxisRoots(context.Context) (*share.AxisRoots, error)
RowNamespaceData(context.Context, libshare.Namespace, int) (RowNamespaceData, error)
Sample(ctx context.Context, idx SampleCoords) (Sample, error)
AxisHalf(ctx context.Context, axisType rsmt2d.Axis, axisIdx int) (AxisHalf, error)
Reader() (io.Reader, error)
RangeNamespaceData(
ctx context.Context,
from, to int,
) (RangeNamespaceData, error)
}
4 changes: 4 additions & 0 deletions share/shwap/namespace_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,7 @@ func (nd NamespaceData) WriteTo(writer io.Writer) (int64, error) {
}
return n, nil
}

func (nd NamespaceData) IsEmpty() bool {
return len(nd) == 0
}
33 changes: 20 additions & 13 deletions share/shwap/p2p/shrex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,15 @@ func (c *Client) Get(
"peer", peer.String(),
)
requestTime := time.Now()
status, err := c.doRequest(ctx, logger, req, resp, peer)
n, status, err := c.doRequest(ctx, logger, req, resp, peer)
if err != nil {
logger.Warnw("requesting data from peer failed", "error", err)
}
c.metrics.observeRequest(ctx, req.Name(), status, time.Since(requestTime))
logger.Debugw("requested data", "status", status, "duration", time.Since(requestTime))
logger.Debugw("requested data",
"status", status, "duration",
time.Since(requestTime), "total bytes received", n,
)
return err
}

Expand All @@ -75,13 +78,13 @@ func (c *Client) doRequest(
req request,
resp response,
peer peer.ID,
) (status, error) {
) (int64, status, error) {
streamOpenCtx, cancel := context.WithTimeout(ctx, c.params.ReadTimeout)
defer cancel()

stream, err := c.host.NewStream(streamOpenCtx, peer, ProtocolID(c.params.NetworkID(), req.Name()))
if err != nil {
return statusOpenStreamErr, fmt.Errorf("open stream: %w", err)
return 0, statusOpenStreamErr, fmt.Errorf("open stream: %w", err)
}
defer func() {
utils.CloseAndLog(log, "shrex/client stream", stream)
Expand All @@ -91,7 +94,7 @@ func (c *Client) doRequest(

_, err = req.WriteTo(stream)
if err != nil {
return statusSendReqErr, fmt.Errorf("writing request: %w", err)
return 0, statusSendReqErr, fmt.Errorf("writing request: %w", err)
}

err = stream.CloseWrite()
Expand All @@ -100,26 +103,30 @@ func (c *Client) doRequest(
}

var statusResp shrexpb.Response
_, err = serde.Read(stream, &statusResp)
statusLength, err := serde.Read(stream, &statusResp)
if err != nil {
return statusReadStatusErr, fmt.Errorf("unexpected error during reading the status from stream: %w", err)
return int64(statusLength),
statusReadStatusErr,
fmt.Errorf("unexpected error during reading the status from stream: %w", err)
}

switch statusResp.Status {
case shrexpb.Status_OK:
case shrexpb.Status_NOT_FOUND:
return statusNotFound, ErrNotFound
return int64(statusLength), statusNotFound, ErrNotFound
case shrexpb.Status_INTERNAL:
return statusInternalErr, ErrInternalServer
return int64(statusLength), statusInternalErr, ErrInternalServer
default:
return statusReadRespErr, ErrInvalidResponse
return int64(statusLength), statusReadRespErr, ErrInvalidResponse
}

_, err = resp.ReadFrom(stream)
dataLength, err := resp.ReadFrom(stream)
st := statusSuccess
if err != nil {
return statusReadRespErr, fmt.Errorf("%w: %w", ErrInvalidResponse, err)
err = fmt.Errorf("%w: %w", ErrInvalidResponse, err)
st = statusReadRespErr
}
return statusSuccess, nil
return int64(statusLength) + dataLength, st, err
}

func (c *Client) setStreamDeadlines(ctx context.Context, logger *zap.SugaredLogger, stream network.Stream) {
Expand Down
1 change: 0 additions & 1 deletion share/shwap/p2p/shrex/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func InitClientMetrics() (*Metrics, error) {
if err != nil {
return nil, err
}

return &Metrics{
requestDuration: requestDuration,
totalRequestCounter: totalRequestCounter,
Expand Down
9 changes: 9 additions & 0 deletions share/shwap/p2p/shrex/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ var registry = []newRequestID{
func() request {
return &shwap.EdsID{}
},
func() request {
return &shwap.SampleID{}
},
func() request {
return &shwap.RowID{}
},
func() request {
return &shwap.RangeNamespaceDataID{}
},
}

// request represents compatible generalised interface for requests.
Expand Down
42 changes: 42 additions & 0 deletions share/shwap/p2p/shrex/shrex_getter/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package shrex_getter

import (
"context"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/celestiaorg/celestia-node/libs/utils"
)

type metrics struct {
requestAttempt metric.Int64Histogram
}

func (sg *Getter) WithMetrics() error {
requestAttempt, err := meter.Int64Histogram(
"getters_shrex_attempts_per_request",
metric.WithDescription("Number of attempts per shrex request"),
)
if err != nil {
return err
}

sg.metrics = &metrics{
requestAttempt: requestAttempt,
}
return nil
}

func (m *metrics) recordAttempts(ctx context.Context, requestName string, attempt int, success bool) {
if m == nil {
return
}
ctx = utils.ResetContextOnError(ctx)

m.requestAttempt.Record(ctx, int64(attempt),
metric.WithAttributes(
attribute.String("request_type", requestName),
attribute.Bool("success", success)),
)
}
Loading
Loading