Skip to content

Commit a5d25d1

Browse files
committed
Limit and initial tests
1 parent 2f40015 commit a5d25d1

5 files changed

Lines changed: 259 additions & 1 deletion

File tree

internal/generated/gqlout/generated.go

Lines changed: 10 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

schema/graphql/schema.graphqls

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,8 @@ input VehiclePositionFilter {
147147
bbox: BoundingBox
148148
"Filter by feed OnestopIDs"
149149
feed_onestop_ids: [String!]
150+
"Maximum number of vehicle positions to return per update (default 1000)"
151+
limit: Int
150152
}
151153

152154
"""Result of entity delete operation"""

server/gql/subscription_resolver.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,18 @@ func (r *subscriptionResolver) collectVehiclePositions(ctx context.Context, cfg
6363
feedIDs := cfg.RTFinder.GetCachedFeedIDs()
6464
log.For(ctx).Trace().Strs("feed_ids", feedIDs).Msg("subscription: collecting vehicle positions")
6565

66+
// Apply limit: use filter value if provided, otherwise default
67+
limit := RESOLVER_MAXLIMIT
68+
if where != nil && where.Limit != nil {
69+
limit = *where.Limit
70+
}
71+
if limit > RESOLVER_MAXLIMIT {
72+
limit = RESOLVER_MAXLIMIT
73+
}
74+
if limit < 0 {
75+
limit = 0
76+
}
77+
6678
var result []*model.VehiclePosition
6779
for _, feedID := range feedIDs {
6880
// Filter by feed_onestop_ids if specified
@@ -73,6 +85,9 @@ func (r *subscriptionResolver) collectVehiclePositions(ctx context.Context, cfg
7385
}
7486
vps := cfg.RTFinder.GetVehiclePositions(ctx, feedID)
7587
for _, vp := range vps {
88+
if len(result) >= limit {
89+
return result
90+
}
7691
mvp := convertVehiclePosition(vp, feedID)
7792
if mvp == nil {
7893
continue
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
package gql
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/99designs/gqlgen/client"
9+
"github.com/interline-io/transitland-lib/internal/testconfig"
10+
"github.com/interline-io/transitland-lib/rt/pb"
11+
"github.com/interline-io/transitland-lib/server/auth/authn"
12+
"github.com/interline-io/transitland-lib/server/auth/mw/usercheck"
13+
"github.com/interline-io/transitland-lib/server/model"
14+
"github.com/stretchr/testify/assert"
15+
"github.com/stretchr/testify/require"
16+
"google.golang.org/protobuf/proto"
17+
)
18+
19+
func buildTestVehiclePositionData(t *testing.T) []byte {
20+
t.Helper()
21+
lat := float32(37.8044)
22+
lon := float32(-122.2712)
23+
bearing := float32(90.0)
24+
speed := float32(10.0)
25+
ts := uint64(time.Now().Unix())
26+
version := "2.0"
27+
incrementality := pb.FeedHeader_FULL_DATASET
28+
vehicleID := "test-vehicle-1"
29+
label := "Test Bus"
30+
entityID := "entity-1"
31+
tripID := "trip-1"
32+
routeID := "route-1"
33+
status := pb.VehiclePosition_IN_TRANSIT_TO
34+
35+
msg := &pb.FeedMessage{
36+
Header: &pb.FeedHeader{
37+
GtfsRealtimeVersion: &version,
38+
Incrementality: &incrementality,
39+
Timestamp: &ts,
40+
},
41+
Entity: []*pb.FeedEntity{
42+
{
43+
Id: &entityID,
44+
Vehicle: &pb.VehiclePosition{
45+
Vehicle: &pb.VehicleDescriptor{
46+
Id: &vehicleID,
47+
Label: &label,
48+
},
49+
Trip: &pb.TripDescriptor{
50+
TripId: &tripID,
51+
RouteId: &routeID,
52+
},
53+
Position: &pb.Position{
54+
Latitude: &lat,
55+
Longitude: &lon,
56+
Bearing: &bearing,
57+
Speed: &speed,
58+
},
59+
Timestamp: &ts,
60+
CurrentStatus: &status,
61+
},
62+
},
63+
},
64+
}
65+
data, err := proto.Marshal(msg)
66+
require.NoError(t, err)
67+
return data
68+
}
69+
70+
// newSubscriptionTestClient creates a test client and returns the config so
71+
// tests can push RT data into the cache via cfg.RTFinder.AddData.
72+
func newSubscriptionTestClient(t *testing.T) (*client.Client, model.Config) {
73+
t.Helper()
74+
cfg := testconfig.Config(t, testconfig.Options{})
75+
srv, _ := NewServer()
76+
graphqlServer := model.AddConfigAndPerms(cfg, srv)
77+
srvMiddleware := usercheck.NewUserDefaultMiddleware(func() authn.User {
78+
return authn.NewCtxUser("testuser", "", "").WithRoles("testrole")
79+
})
80+
return client.New(srvMiddleware(graphqlServer)), cfg
81+
}
82+
83+
func TestSubscriptionVehiclePositions(t *testing.T) {
84+
c, cfg := newSubscriptionTestClient(t)
85+
ctx := context.Background()
86+
87+
// Push VP data into cache before subscribing so the initial snapshot has data
88+
vpData := buildTestVehiclePositionData(t)
89+
err := cfg.RTFinder.AddData(ctx, "rtdata:test-feed:realtime_vehicle_positions", vpData)
90+
require.NoError(t, err)
91+
92+
// Subscribe
93+
sub := c.Websocket(`subscription { vehicle_positions { feed_onestop_id bearing speed position vehicle { id label } trip { trip_id route_id } } }`)
94+
defer sub.Close()
95+
96+
// Read initial snapshot
97+
var resp struct {
98+
VehiclePositions []struct {
99+
FeedOnestopID string `json:"feed_onestop_id"`
100+
Bearing *float64 `json:"bearing"`
101+
Speed *float64 `json:"speed"`
102+
Position any `json:"position"`
103+
Vehicle *struct {
104+
ID string `json:"id"`
105+
Label string `json:"label"`
106+
} `json:"vehicle"`
107+
Trip *struct {
108+
TripID string `json:"trip_id"`
109+
RouteID string `json:"route_id"`
110+
} `json:"trip"`
111+
} `json:"vehicle_positions"`
112+
}
113+
err = sub.Next(&resp)
114+
require.NoError(t, err)
115+
require.Len(t, resp.VehiclePositions, 1)
116+
117+
vp := resp.VehiclePositions[0]
118+
assert.Equal(t, "test-feed", vp.FeedOnestopID)
119+
assert.NotNil(t, vp.Bearing)
120+
assert.InDelta(t, 90.0, *vp.Bearing, 0.1)
121+
assert.NotNil(t, vp.Speed)
122+
assert.InDelta(t, 10.0, *vp.Speed, 0.1)
123+
assert.NotNil(t, vp.Position)
124+
require.NotNil(t, vp.Vehicle)
125+
assert.Equal(t, "test-vehicle-1", vp.Vehicle.ID)
126+
assert.Equal(t, "Test Bus", vp.Vehicle.Label)
127+
require.NotNil(t, vp.Trip)
128+
assert.Equal(t, "trip-1", vp.Trip.TripID)
129+
assert.Equal(t, "route-1", vp.Trip.RouteID)
130+
}
131+
132+
func TestSubscriptionVehiclePositions_FilterFeed(t *testing.T) {
133+
c, cfg := newSubscriptionTestClient(t)
134+
ctx := context.Background()
135+
136+
// Push VP data for two feeds
137+
vpData := buildTestVehiclePositionData(t)
138+
err := cfg.RTFinder.AddData(ctx, "rtdata:feed-a:realtime_vehicle_positions", vpData)
139+
require.NoError(t, err)
140+
err = cfg.RTFinder.AddData(ctx, "rtdata:feed-b:realtime_vehicle_positions", vpData)
141+
require.NoError(t, err)
142+
143+
// Subscribe with feed filter — only feed-a
144+
sub := c.Websocket(`subscription { vehicle_positions(where: {feed_onestop_ids: ["feed-a"]}) { feed_onestop_id } }`)
145+
defer sub.Close()
146+
147+
var resp struct {
148+
VehiclePositions []struct {
149+
FeedOnestopID string `json:"feed_onestop_id"`
150+
} `json:"vehicle_positions"`
151+
}
152+
err = sub.Next(&resp)
153+
require.NoError(t, err)
154+
require.Len(t, resp.VehiclePositions, 1)
155+
assert.Equal(t, "feed-a", resp.VehiclePositions[0].FeedOnestopID)
156+
}
157+
158+
func TestSubscriptionVehiclePositions_FilterBbox(t *testing.T) {
159+
c, cfg := newSubscriptionTestClient(t)
160+
ctx := context.Background()
161+
162+
// Test data is at lat=37.8044, lon=-122.2712
163+
vpData := buildTestVehiclePositionData(t)
164+
err := cfg.RTFinder.AddData(ctx, "rtdata:test-feed:realtime_vehicle_positions", vpData)
165+
require.NoError(t, err)
166+
167+
t.Run("matching bbox", func(t *testing.T) {
168+
sub := c.Websocket(`subscription { vehicle_positions(where: {bbox: {min_lon: -123, min_lat: 37, max_lon: -122, max_lat: 38}}) { feed_onestop_id } }`)
169+
defer sub.Close()
170+
171+
var resp struct {
172+
VehiclePositions []struct {
173+
FeedOnestopID string `json:"feed_onestop_id"`
174+
} `json:"vehicle_positions"`
175+
}
176+
err := sub.Next(&resp)
177+
require.NoError(t, err)
178+
assert.Len(t, resp.VehiclePositions, 1)
179+
})
180+
181+
t.Run("non-matching bbox", func(t *testing.T) {
182+
sub := c.Websocket(`subscription { vehicle_positions(where: {bbox: {min_lon: 0, min_lat: 0, max_lon: 1, max_lat: 1}}) { feed_onestop_id } }`)
183+
defer sub.Close()
184+
185+
var resp struct {
186+
VehiclePositions []struct {
187+
FeedOnestopID string `json:"feed_onestop_id"`
188+
} `json:"vehicle_positions"`
189+
}
190+
err := sub.Next(&resp)
191+
require.NoError(t, err)
192+
assert.Len(t, resp.VehiclePositions, 0)
193+
})
194+
}
195+
196+
func TestSubscriptionVehiclePositions_LiveUpdate(t *testing.T) {
197+
c, cfg := newSubscriptionTestClient(t)
198+
ctx := context.Background()
199+
200+
// Subscribe first with no data — initial snapshot should be empty
201+
sub := c.Websocket(`subscription { vehicle_positions { feed_onestop_id vehicle { id } } }`)
202+
defer sub.Close()
203+
204+
var resp struct {
205+
VehiclePositions []struct {
206+
FeedOnestopID string `json:"feed_onestop_id"`
207+
Vehicle *struct {
208+
ID string `json:"id"`
209+
} `json:"vehicle"`
210+
} `json:"vehicle_positions"`
211+
}
212+
213+
// Initial snapshot: empty
214+
err := sub.Next(&resp)
215+
require.NoError(t, err)
216+
assert.Len(t, resp.VehiclePositions, 0)
217+
218+
// Push data — should trigger a live update
219+
vpData := buildTestVehiclePositionData(t)
220+
err = cfg.RTFinder.AddData(ctx, "rtdata:test-feed:realtime_vehicle_positions", vpData)
221+
require.NoError(t, err)
222+
223+
// Read the live update
224+
err = sub.Next(&resp)
225+
require.NoError(t, err)
226+
require.Len(t, resp.VehiclePositions, 1)
227+
assert.Equal(t, "test-feed", resp.VehiclePositions[0].FeedOnestopID)
228+
require.NotNil(t, resp.VehiclePositions[0].Vehicle)
229+
assert.Equal(t, "test-vehicle-1", resp.VehiclePositions[0].Vehicle.ID)
230+
}

server/model/models_gen.go

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)