Skip to content

Commit 2e61e6d

Browse files
committed
Add support for continual gathering
1 parent 1ce9ff1 commit 2e61e6d

File tree

8 files changed

+647
-4
lines changed

8 files changed

+647
-4
lines changed

agent.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,11 @@ type Agent struct {
154154
enableRenomination bool
155155
nominationValueGenerator func() uint32
156156
nominationAttribute stun.AttrType
157+
158+
// Continual gathering support
159+
continualGatheringPolicy ContinualGatheringPolicy
160+
networkMonitorInterval time.Duration
161+
lastKnownInterfaces map[string]netip.Addr // map[iface+ip] for deduplication
157162
}
158163

159164
// NewAgent creates a new Agent.
@@ -244,6 +249,10 @@ func newAgentWithConfig(config *AgentConfig, opts ...AgentOption) (*Agent, error
244249
enableRenomination: false,
245250
nominationValueGenerator: nil,
246251
nominationAttribute: stun.AttrType(0x0030), // Default value
252+
253+
continualGatheringPolicy: GatherOnce, // Default to GatherOnce
254+
networkMonitorInterval: 2 * time.Second,
255+
lastKnownInterfaces: make(map[string]netip.Addr),
247256
}
248257

249258
agent.connectionStateNotifier = &handlerNotifier{
@@ -890,6 +899,19 @@ func (a *Agent) GetLocalCandidates() ([]Candidate, error) {
890899
return res, nil
891900
}
892901

902+
// GetGatheringState returns the current gathering state of the Agent.
903+
func (a *Agent) GetGatheringState() (GatheringState, error) {
904+
var state GatheringState
905+
err := a.loop.Run(a.loop, func(_ context.Context) {
906+
state = a.gatheringState
907+
})
908+
if err != nil {
909+
return GatheringStateUnknown, err
910+
}
911+
912+
return state, nil
913+
}
914+
893915
// GetLocalUserCredentials returns the local user credentials.
894916
func (a *Agent) GetLocalUserCredentials() (frag string, pwd string, err error) {
895917
valSet := make(chan struct{})

agent_options.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package ice
55

66
import (
77
"sync/atomic"
8+
"time"
89

910
"github.com/pion/stun/v3"
1011
)
@@ -152,3 +153,73 @@ func WithEnableUseCandidateCheckPriority() AgentOption {
152153
return nil
153154
}
154155
}
156+
157+
// WithContinualGatheringPolicy sets the continual gathering policy for the agent.
158+
// When set to GatherContinually, the agent will continuously monitor network interfaces
159+
// and gather new candidates as they become available.
160+
// When set to GatherOnce (default), gathering completes after the initial phase.
161+
//
162+
// Example:
163+
//
164+
// agent, err := NewAgentWithOptions(WithContinualGatheringPolicy(GatherContinually))
165+
func WithContinualGatheringPolicy(policy ContinualGatheringPolicy) AgentOption {
166+
return func(a *Agent) error {
167+
a.continualGatheringPolicy = policy
168+
169+
return nil
170+
}
171+
}
172+
173+
// WithNetworkMonitorInterval sets the interval at which the agent checks for network interface changes
174+
// when using GatherContinually policy. This option only has effect when used with
175+
// WithContinualGatheringPolicy(GatherContinually).
176+
// Default is 2 seconds if not specified.
177+
//
178+
// Example:
179+
//
180+
// agent, err := NewAgentWithOptions(
181+
// WithContinualGatheringPolicy(GatherContinually),
182+
// WithNetworkMonitorInterval(5 * time.Second),
183+
// )
184+
func WithNetworkMonitorInterval(interval time.Duration) AgentOption {
185+
return func(a *Agent) error {
186+
if interval <= 0 {
187+
return ErrInvalidNetworkMonitorInterval
188+
}
189+
a.networkMonitorInterval = interval
190+
191+
return nil
192+
}
193+
}
194+
195+
// WithNetworkTypes sets the enabled network types for candidate gathering.
196+
// By default, all network types are enabled.
197+
//
198+
// Example:
199+
//
200+
// agent, err := NewAgentWithOptions(
201+
// WithNetworkTypes([]NetworkType{NetworkTypeUDP4, NetworkTypeUDP6}),
202+
// )
203+
func WithNetworkTypes(networkTypes []NetworkType) AgentOption {
204+
return func(a *Agent) error {
205+
a.networkTypes = networkTypes
206+
207+
return nil
208+
}
209+
}
210+
211+
// WithCandidateTypes sets the enabled candidate types for gathering.
212+
// By default, host, server reflexive, and relay candidates are enabled.
213+
//
214+
// Example:
215+
//
216+
// agent, err := NewAgentWithOptions(
217+
// WithCandidateTypes([]CandidateType{CandidateTypeHost, CandidateTypeServerReflexive}),
218+
// )
219+
func WithCandidateTypes(candidateTypes []CandidateType) AgentOption {
220+
return func(a *Agent) error {
221+
a.candidateTypes = candidateTypes
222+
223+
return nil
224+
}
225+
}

errors.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,9 @@ var (
135135
// ErrInvalidNominationValueGenerator indicates a nil nomination value generator was provided.
136136
ErrInvalidNominationValueGenerator = errors.New("nomination value generator cannot be nil")
137137

138+
// ErrInvalidNetworkMonitorInterval indicates an invalid network monitor interval was provided.
139+
ErrInvalidNetworkMonitorInterval = errors.New("network monitor interval must be greater than 0")
140+
138141
errAttributeTooShortICECandidate = errors.New("attribute not long enough to be ICE candidate")
139142
errClosingConnection = errors.New("failed to close connection")
140143
errConnectionAddrAlreadyExist = errors.New("connection with same remote address already exists")
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# Continual Gathering Example
2+
3+
This example demonstrates the `ContinualGatheringPolicy` feature in Pion ICE, which allows agents to continuously discover network candidates throughout a connection's lifetime.
4+
5+
## Overview
6+
7+
Traditional ICE gathering (`GatherOnce`) collects candidates once at startup and stops. This can be problematic when:
8+
9+
- Users switch between WiFi and cellular networks
10+
- Network interfaces are added/removed
11+
- Moving between access points ("walk-out-the-door" problem)
12+
13+
With `GatherContinually`, the agent monitors for network changes and automatically discovers new candidates, enabling seamless connectivity transitions.
14+
15+
## Usage
16+
17+
```bash
18+
# Traditional gathering (stops after initial collection)
19+
go run main.go -mode once
20+
21+
# Continual gathering (monitors for network changes)
22+
go run main.go -mode continually -interval 2s
23+
```
24+
25+
## Testing
26+
27+
While running in continual mode, try:
28+
29+
- Connecting/disconnecting WiFi
30+
- Enabling/disabling network adapters
31+
- Switching between networks
32+
33+
New candidates will be discovered and reported automatically!
34+
35+
### Testing with Virtual Network Adapters (Linux)
36+
37+
You can easily test the continual gathering by creating/removing virtual network adapters:
38+
39+
```bash
40+
# Create a virtual network adapter with an IP address
41+
sudo ip link add veth0 type veth peer name veth1
42+
sudo ip addr add 10.0.0.1/24 dev veth0
43+
sudo ip link set veth0 up
44+
45+
# Wait a few seconds, then check the example output
46+
# You should see new candidates for 10.0.0.1
47+
48+
# Remove the virtual adapter
49+
sudo ip link delete veth0
50+
51+
# The removed interface will be detected and logged
52+
```
53+
54+
Alternative using dummy interface:
55+
```bash
56+
# Create a dummy interface (simpler, no peer needed)
57+
sudo ip link add dummy0 type dummy
58+
sudo ip addr add 192.168.100.1/24 dev dummy0
59+
sudo ip link set dummy0 up
60+
61+
# Remove it
62+
sudo ip link delete dummy0
63+
```
64+
65+
You can also change IP addresses on existing interfaces:
66+
```bash
67+
# Add a secondary IP to an existing interface
68+
sudo ip addr add 172.16.0.1/24 dev eth0
69+
70+
# Remove it
71+
sudo ip addr del 172.16.0.1/24 dev eth0
72+
```
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
// SPDX-FileCopyrightText: 2025 The Pion community <https://pion.ly>
2+
// SPDX-License-Identifier: MIT
3+
4+
// Package main demonstrates the ContinualGatheringPolicy feature
5+
package main
6+
7+
import (
8+
"context"
9+
"flag"
10+
"fmt"
11+
"log"
12+
"os"
13+
"os/signal"
14+
"syscall"
15+
"time"
16+
17+
"github.com/pion/ice/v4"
18+
"github.com/pion/logging"
19+
)
20+
21+
func main() { //nolint:cyclop
22+
var gatheringMode string
23+
var monitorInterval time.Duration
24+
25+
flag.StringVar(&gatheringMode, "mode", "continually", "Gathering mode: 'once' or 'continually'")
26+
flag.DurationVar(&monitorInterval, "interval", 2*time.Second, "Network monitoring interval (for continual mode)")
27+
flag.Parse()
28+
29+
// Determine gathering policy
30+
var policy ice.ContinualGatheringPolicy
31+
switch gatheringMode {
32+
case "once":
33+
policy = ice.GatherOnce
34+
fmt.Println("Using GatherOnce policy - gathering will complete after initial collection")
35+
case "continually":
36+
policy = ice.GatherContinually
37+
fmt.Printf("Using GatherContinually policy - monitoring for network changes every %v\n", monitorInterval)
38+
default:
39+
log.Fatalf("Invalid mode: %s. Use 'once' or 'continually'", gatheringMode)
40+
}
41+
42+
// Create logger
43+
loggerFactory := logging.NewDefaultLoggerFactory()
44+
loggerFactory.DefaultLogLevel = logging.LogLevelDebug
45+
46+
// Create ICE agent with the specified gathering policy using AgentOptions
47+
agent, err := ice.NewAgentWithOptions(
48+
ice.WithNetworkTypes([]ice.NetworkType{ice.NetworkTypeUDP4, ice.NetworkTypeUDP6}),
49+
ice.WithCandidateTypes([]ice.CandidateType{ice.CandidateTypeHost}),
50+
ice.WithContinualGatheringPolicy(policy),
51+
ice.WithNetworkMonitorInterval(monitorInterval),
52+
)
53+
if err != nil {
54+
log.Fatalf("Failed to create agent: %v", err)
55+
}
56+
57+
defer func() {
58+
if closeErr := agent.Close(); closeErr != nil {
59+
log.Printf("Failed to close agent: %v", closeErr)
60+
}
61+
}()
62+
63+
// Track candidates
64+
candidateCount := 0
65+
candidateMap := make(map[string]ice.Candidate)
66+
67+
// Set up candidate handler
68+
err = agent.OnCandidate(func(candidate ice.Candidate) {
69+
if candidate == nil {
70+
if policy == ice.GatherOnce {
71+
fmt.Println("\n=== Gathering completed (no more candidates) ===")
72+
}
73+
74+
return
75+
}
76+
77+
candidateCount++
78+
candidateID := candidate.String()
79+
80+
if _, exists := candidateMap[candidateID]; !exists {
81+
candidateMap[candidateID] = candidate
82+
fmt.Printf("[%s] Candidate #%d: %s\n", time.Now().Format("15:04:05"), candidateCount, candidate)
83+
}
84+
})
85+
if err != nil {
86+
log.Fatalf("Failed to set candidate handler: %v", err) //nolint:gocritic
87+
}
88+
89+
// Start gathering
90+
fmt.Println("\n=== Starting candidate gathering ===")
91+
err = agent.GatherCandidates()
92+
if err != nil {
93+
log.Fatalf("Failed to start gathering: %v", err)
94+
}
95+
96+
// Set up signal handling for graceful shutdown
97+
sigChan := make(chan os.Signal, 1)
98+
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
99+
100+
// Create a context for periodic status checks
101+
ctx, cancel := context.WithCancel(context.Background())
102+
defer cancel()
103+
104+
// Periodically check and display gathering state
105+
go func() {
106+
ticker := time.NewTicker(5 * time.Second)
107+
defer ticker.Stop()
108+
109+
for {
110+
select {
111+
case <-ctx.Done():
112+
return
113+
case <-ticker.C:
114+
state, err := agent.GetGatheringState() //nolint:contextcheck
115+
if err != nil {
116+
log.Printf("Failed to get gathering state: %v", err)
117+
118+
continue
119+
}
120+
121+
localCandidates, err := agent.GetLocalCandidates() //nolint:contextcheck
122+
if err != nil {
123+
log.Printf("Failed to get local candidates: %v", err)
124+
125+
continue
126+
}
127+
128+
fmt.Printf("\n[%s] Status: GatheringState=%s, Candidates=%d\n",
129+
time.Now().Format("15:04:05"), state, len(localCandidates))
130+
131+
if policy == ice.GatherContinually {
132+
fmt.Println("Tip: Try changing network interfaces (connect/disconnect WiFi, enable/disable network adapters)")
133+
fmt.Println(" New candidates will be discovered automatically!")
134+
}
135+
}
136+
}
137+
}()
138+
139+
// Wait for interrupt signal
140+
fmt.Println("\nPress Ctrl+C to exit...")
141+
<-sigChan
142+
143+
fmt.Println("\n=== Shutting down ===")
144+
cancel()
145+
146+
// Display final statistics
147+
state, _ := agent.GetGatheringState()
148+
localCandidates, _ := agent.GetLocalCandidates()
149+
150+
fmt.Printf("\nFinal Statistics:\n")
151+
fmt.Printf(" Gathering Policy: %s\n", policy)
152+
fmt.Printf(" Gathering State: %s\n", state)
153+
fmt.Printf(" Total Candidates Discovered: %d\n", candidateCount)
154+
fmt.Printf(" Unique Candidates: %d\n", len(candidateMap))
155+
fmt.Printf(" Current Active Candidates: %d\n", len(localCandidates))
156+
}

0 commit comments

Comments
 (0)