Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ type Agent struct {
enableRenomination bool
nominationValueGenerator func() uint32
nominationAttribute stun.AttrType

// Continual gathering support
continualGatheringPolicy ContinualGatheringPolicy
networkMonitorInterval time.Duration
lastKnownInterfaces map[string]netip.Addr // map[iface+ip] for deduplication
}

// NewAgent creates a new Agent.
Expand Down Expand Up @@ -244,6 +249,10 @@ func newAgentWithConfig(config *AgentConfig, opts ...AgentOption) (*Agent, error
enableRenomination: false,
nominationValueGenerator: nil,
nominationAttribute: stun.AttrType(0x0030), // Default value

continualGatheringPolicy: GatherOnce, // Default to GatherOnce
networkMonitorInterval: 2 * time.Second,
lastKnownInterfaces: make(map[string]netip.Addr),
}

agent.connectionStateNotifier = &handlerNotifier{
Expand Down Expand Up @@ -890,6 +899,19 @@ func (a *Agent) GetLocalCandidates() ([]Candidate, error) {
return res, nil
}

// GetGatheringState returns the current gathering state of the Agent.
func (a *Agent) GetGatheringState() (GatheringState, error) {
var state GatheringState
err := a.loop.Run(a.loop, func(_ context.Context) {
state = a.gatheringState
})
if err != nil {
return GatheringStateUnknown, err
}

return state, nil
}

// GetLocalUserCredentials returns the local user credentials.
func (a *Agent) GetLocalUserCredentials() (frag string, pwd string, err error) {
valSet := make(chan struct{})
Expand Down
71 changes: 71 additions & 0 deletions agent_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package ice

import (
"sync/atomic"
"time"

"github.com/pion/stun/v3"
)
Expand Down Expand Up @@ -152,3 +153,73 @@ func WithEnableUseCandidateCheckPriority() AgentOption {
return nil
}
}

// WithContinualGatheringPolicy sets the continual gathering policy for the agent.
// When set to GatherContinually, the agent will continuously monitor network interfaces
// and gather new candidates as they become available.
// When set to GatherOnce (default), gathering completes after the initial phase.
//
// Example:
//
// agent, err := NewAgentWithOptions(WithContinualGatheringPolicy(GatherContinually))
func WithContinualGatheringPolicy(policy ContinualGatheringPolicy) AgentOption {
return func(a *Agent) error {
a.continualGatheringPolicy = policy

return nil
}
}

// WithNetworkMonitorInterval sets the interval at which the agent checks for network interface changes
// when using GatherContinually policy. This option only has effect when used with
// WithContinualGatheringPolicy(GatherContinually).
// Default is 2 seconds if not specified.
//
// Example:
//
// agent, err := NewAgentWithOptions(
// WithContinualGatheringPolicy(GatherContinually),
// WithNetworkMonitorInterval(5 * time.Second),
// )
func WithNetworkMonitorInterval(interval time.Duration) AgentOption {
return func(a *Agent) error {
if interval <= 0 {
return ErrInvalidNetworkMonitorInterval
}
a.networkMonitorInterval = interval

return nil
}
}

// WithNetworkTypes sets the enabled network types for candidate gathering.
// By default, all network types are enabled.
//
// Example:
//
// agent, err := NewAgentWithOptions(
// WithNetworkTypes([]NetworkType{NetworkTypeUDP4, NetworkTypeUDP6}),
// )
func WithNetworkTypes(networkTypes []NetworkType) AgentOption {
return func(a *Agent) error {
a.networkTypes = networkTypes

return nil
}
}

// WithCandidateTypes sets the enabled candidate types for gathering.
// By default, host, server reflexive, and relay candidates are enabled.
//
// Example:
//
// agent, err := NewAgentWithOptions(
// WithCandidateTypes([]CandidateType{CandidateTypeHost, CandidateTypeServerReflexive}),
// )
func WithCandidateTypes(candidateTypes []CandidateType) AgentOption {
return func(a *Agent) error {
a.candidateTypes = candidateTypes

return nil
}
}
3 changes: 3 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ var (
// ErrInvalidNominationValueGenerator indicates a nil nomination value generator was provided.
ErrInvalidNominationValueGenerator = errors.New("nomination value generator cannot be nil")

// ErrInvalidNetworkMonitorInterval indicates an invalid network monitor interval was provided.
ErrInvalidNetworkMonitorInterval = errors.New("network monitor interval must be greater than 0")

errAttributeTooShortICECandidate = errors.New("attribute not long enough to be ICE candidate")
errClosingConnection = errors.New("failed to close connection")
errConnectionAddrAlreadyExist = errors.New("connection with same remote address already exists")
Expand Down
72 changes: 72 additions & 0 deletions examples/continual-gathering/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Continual Gathering Example

This example demonstrates the `ContinualGatheringPolicy` feature in Pion ICE, which allows agents to continuously discover network candidates throughout a connection's lifetime.

## Overview

Traditional ICE gathering (`GatherOnce`) collects candidates once at startup and stops. This can be problematic when:

- Users switch between WiFi and cellular networks
- Network interfaces are added/removed
- Moving between access points ("walk-out-the-door" problem)

With `GatherContinually`, the agent monitors for network changes and automatically discovers new candidates, enabling seamless connectivity transitions.

## Usage

```bash
# Traditional gathering (stops after initial collection)
go run main.go -mode once

# Continual gathering (monitors for network changes)
go run main.go -mode continually -interval 2s
```

## Testing

While running in continual mode, try:

- Connecting/disconnecting WiFi
- Enabling/disabling network adapters
- Switching between networks

New candidates will be discovered and reported automatically!

### Testing with Virtual Network Adapters (Linux)

You can easily test the continual gathering by creating/removing virtual network adapters:

```bash
# Create a virtual network adapter with an IP address
sudo ip link add veth0 type veth peer name veth1
sudo ip addr add 10.0.0.1/24 dev veth0
sudo ip link set veth0 up

# Wait a few seconds, then check the example output
# You should see new candidates for 10.0.0.1

# Remove the virtual adapter
sudo ip link delete veth0

# The removed interface will be detected and logged
```

Alternative using dummy interface:
```bash
# Create a dummy interface (simpler, no peer needed)
sudo ip link add dummy0 type dummy
sudo ip addr add 192.168.100.1/24 dev dummy0
sudo ip link set dummy0 up

# Remove it
sudo ip link delete dummy0
```

You can also change IP addresses on existing interfaces:
```bash
# Add a secondary IP to an existing interface
sudo ip addr add 172.16.0.1/24 dev eth0

# Remove it
sudo ip addr del 172.16.0.1/24 dev eth0
```
156 changes: 156 additions & 0 deletions examples/continual-gathering/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// SPDX-FileCopyrightText: 2025 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

// Package main demonstrates the ContinualGatheringPolicy feature
package main

import (
"context"
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/pion/ice/v4"
"github.com/pion/logging"
)

func main() { //nolint:cyclop
var gatheringMode string
var monitorInterval time.Duration

flag.StringVar(&gatheringMode, "mode", "continually", "Gathering mode: 'once' or 'continually'")
flag.DurationVar(&monitorInterval, "interval", 2*time.Second, "Network monitoring interval (for continual mode)")
flag.Parse()

// Determine gathering policy
var policy ice.ContinualGatheringPolicy
switch gatheringMode {
case "once":
policy = ice.GatherOnce
fmt.Println("Using GatherOnce policy - gathering will complete after initial collection")
case "continually":
policy = ice.GatherContinually
fmt.Printf("Using GatherContinually policy - monitoring for network changes every %v\n", monitorInterval)
default:
log.Fatalf("Invalid mode: %s. Use 'once' or 'continually'", gatheringMode)
}

// Create logger
loggerFactory := logging.NewDefaultLoggerFactory()
loggerFactory.DefaultLogLevel = logging.LogLevelDebug

// Create ICE agent with the specified gathering policy using AgentOptions
agent, err := ice.NewAgentWithOptions(
ice.WithNetworkTypes([]ice.NetworkType{ice.NetworkTypeUDP4, ice.NetworkTypeUDP6}),
ice.WithCandidateTypes([]ice.CandidateType{ice.CandidateTypeHost}),
ice.WithContinualGatheringPolicy(policy),
ice.WithNetworkMonitorInterval(monitorInterval),
)
if err != nil {
log.Fatalf("Failed to create agent: %v", err)
}

defer func() {
if closeErr := agent.Close(); closeErr != nil {
log.Printf("Failed to close agent: %v", closeErr)
}
}()

// Track candidates
candidateCount := 0
candidateMap := make(map[string]ice.Candidate)

// Set up candidate handler
err = agent.OnCandidate(func(candidate ice.Candidate) {
if candidate == nil {
if policy == ice.GatherOnce {
fmt.Println("\n=== Gathering completed (no more candidates) ===")
}

return
}

candidateCount++
candidateID := candidate.String()

if _, exists := candidateMap[candidateID]; !exists {
candidateMap[candidateID] = candidate
fmt.Printf("[%s] Candidate #%d: %s\n", time.Now().Format("15:04:05"), candidateCount, candidate)
}
})
if err != nil {
log.Fatalf("Failed to set candidate handler: %v", err) //nolint:gocritic
}

// Start gathering
fmt.Println("\n=== Starting candidate gathering ===")
err = agent.GatherCandidates()
if err != nil {
log.Fatalf("Failed to start gathering: %v", err)
}

// Set up signal handling for graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

// Create a context for periodic status checks
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Periodically check and display gathering state
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
state, err := agent.GetGatheringState() //nolint:contextcheck
if err != nil {
log.Printf("Failed to get gathering state: %v", err)

continue
}

localCandidates, err := agent.GetLocalCandidates() //nolint:contextcheck
if err != nil {
log.Printf("Failed to get local candidates: %v", err)

continue
}

fmt.Printf("\n[%s] Status: GatheringState=%s, Candidates=%d\n",
time.Now().Format("15:04:05"), state, len(localCandidates))

if policy == ice.GatherContinually {
fmt.Println("Tip: Try changing network interfaces (connect/disconnect WiFi, enable/disable network adapters)")
fmt.Println(" New candidates will be discovered automatically!")
}
}
}
}()

// Wait for interrupt signal
fmt.Println("\nPress Ctrl+C to exit...")
<-sigChan

fmt.Println("\n=== Shutting down ===")
cancel()

// Display final statistics
state, _ := agent.GetGatheringState()
localCandidates, _ := agent.GetLocalCandidates()

fmt.Printf("\nFinal Statistics:\n")
fmt.Printf(" Gathering Policy: %s\n", policy)
fmt.Printf(" Gathering State: %s\n", state)
fmt.Printf(" Total Candidates Discovered: %d\n", candidateCount)
fmt.Printf(" Unique Candidates: %d\n", len(candidateMap))
fmt.Printf(" Current Active Candidates: %d\n", len(localCandidates))
}
Loading
Loading