Casbin Informer Watcher is a Kubernetes informer-based watcher for Casbin. This watcher enables real-time policy synchronization across multiple Casbin enforcer instances by watching Kubernetes Custom Resource Definitions (CRDs).
- Real-time Updates: Uses Kubernetes informers to watch CRD changes without periodic polling
- Event-Driven: Reacts immediately to create, update, and delete events
- Concurrency-Safe: Safe to use with
SyncedEnforcerin multi-threaded environments - Graceful Reconnection: Handles disconnections and resource version drift automatically
- Incremental Updates: Supports both full policy reload and incremental policy updates
- GitOps Compatible: Changes applied via GitOps become effective quickly across all instances
go get github.com/casbin/casbin-informer-watcherpackage main
import (
"log"
"github.com/casbin/casbin/v2"
informerwatcher "github.com/casbin/casbin-informer-watcher"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
// Load Kubernetes configuration
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
log.Fatalf("Failed to load kubeconfig: %v", err)
}
// Create dynamic client
client, err := dynamic.NewForConfig(config)
if err != nil {
log.Fatalf("Failed to create dynamic client: %v", err)
}
// Define the GVR for your policy CRD
gvr := schema.GroupVersionResource{
Group: "casbin.org",
Version: "v1",
Resource: "policies",
}
// Create the watcher
watcher, err := informerwatcher.NewWatcher(client, gvr, "default", informerwatcher.WatcherOptions{})
if err != nil {
log.Fatalf("Failed to create watcher: %v", err)
}
defer watcher.Close()
// Initialize the enforcer
e, err := casbin.NewEnforcer("examples/rbac_model.conf", "examples/rbac_policy.csv")
if err != nil {
log.Fatalf("Failed to create enforcer: %v", err)
}
// Set the watcher for the enforcer
err = e.SetWatcher(watcher)
if err != nil {
log.Fatalf("Failed to set watcher: %v", err)
}
// By default, the watcher's callback is automatically set to the
// enforcer's LoadPolicy() in the SetWatcher() call.
// You can change it by explicitly setting a callback.
err = watcher.SetUpdateCallback(informerwatcher.DefaultUpdateCallback(e))
if err != nil {
log.Fatalf("Failed to set callback: %v", err)
}
log.Println("Watcher is running and monitoring policy changes...")
select {} // Keep the program running
}package main
import (
"log"
"time"
"github.com/casbin/casbin/v2"
informerwatcher "github.com/casbin/casbin-informer-watcher"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
// Load Kubernetes configuration
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
log.Fatalf("Failed to load kubeconfig: %v", err)
}
// Create dynamic client
client, err := dynamic.NewForConfig(config)
if err != nil {
log.Fatalf("Failed to create dynamic client: %v", err)
}
// Define the GVR for your policy CRD
gvr := schema.GroupVersionResource{
Group: "casbin.org",
Version: "v1",
Resource: "policies",
}
// Create watcher with custom options
options := informerwatcher.WatcherOptions{
LocalID: "instance-1", // Custom instance identifier
IgnoreSelf: true, // Ignore updates from this instance
ResyncPeriod: 30 * time.Second, // Resync period with API server
}
watcher, err := informerwatcher.NewWatcher(client, gvr, "default", options)
if err != nil {
log.Fatalf("Failed to create watcher: %v", err)
}
defer watcher.Close()
// Initialize the enforcer
e, err := casbin.NewEnforcer("examples/rbac_model.conf", "examples/rbac_policy.csv")
if err != nil {
log.Fatalf("Failed to create enforcer: %v", err)
}
// Set the watcher
err = e.SetWatcher(watcher)
if err != nil {
log.Fatalf("Failed to set watcher: %v", err)
}
// Custom callback that logs updates
customCallback := func(msg string) {
log.Printf("Policy update received: %s\n", msg)
informerwatcher.DefaultUpdateCallback(e)(msg)
}
err = watcher.SetUpdateCallback(customCallback)
if err != nil {
log.Fatalf("Failed to set callback: %v", err)
}
log.Println("Watcher is running with custom configuration...")
select {} // Keep the program running
}package main
import (
"log"
"github.com/casbin/casbin/v2"
informerwatcher "github.com/casbin/casbin-informer-watcher"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
// Setup client and GVR (same as basic example)
config, _ := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
client, _ := dynamic.NewForConfig(config)
gvr := schema.GroupVersionResource{
Group: "casbin.org",
Version: "v1",
Resource: "policies",
}
watcher, _ := informerwatcher.NewWatcher(client, gvr, "default", informerwatcher.WatcherOptions{})
defer watcher.Close()
// Use SyncedEnforcer for concurrency-safe operations
e, err := casbin.NewSyncedEnforcer("examples/rbac_model.conf", "examples/rbac_policy.csv")
if err != nil {
log.Fatalf("Failed to create synced enforcer: %v", err)
}
err = e.SetWatcher(watcher)
if err != nil {
log.Fatalf("Failed to set watcher: %v", err)
}
log.Println("SyncedEnforcer is running with watcher...")
select {}
}- LocalID (string): Unique identifier for this watcher instance. Auto-generated if not provided.
- IgnoreSelf (bool): If true, ignores updates triggered by this watcher instance. Default: false.
- ResyncPeriod (time.Duration): Period for the informer to resync with the API server. Default: 30 seconds.
- OptionalUpdateCallback (func(string)): Optional callback function set during initialization.
- The watcher uses Kubernetes informers to monitor Custom Resource Definitions (CRDs) that represent Casbin policies.
- When a CRD is created, updated, or deleted, the informer triggers the corresponding event handler.
- The event handler processes the change and invokes the registered callback function.
- The callback typically triggers the enforcer to reload its policy or apply incremental updates.
- All running instances with the same watcher configuration receive the same updates, keeping policies synchronized.
The watcher supports all standard Casbin policy update operations:
Update: Full policy reloadUpdateForAddPolicy: Add a single policy ruleUpdateForRemovePolicy: Remove a single policy ruleUpdateForAddPolicies: Add multiple policy rulesUpdateForRemovePolicies: Remove multiple policy rulesUpdateForRemoveFilteredPolicy: Remove filtered policy rulesUpdateForUpdatePolicy: Update a single policy ruleUpdateForUpdatePolicies: Update multiple policy rulesUpdateForSavePolicy: Save policy to storage
You'll need to define a CRD for your Casbin policies. Here's a basic example:
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: policies.casbin.org
spec:
group: casbin.org
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
policy:
type: string
scope: Namespaced
names:
plural: policies
singular: policy
kind: PolicyRun the test suite:
go test -v ./...Run tests with coverage:
go test -v -coverprofile=coverage.out ./...
go tool cover -html=coverage.outContributions are welcome! Please feel free to submit a Pull Request.
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
- Casbin - An authorization library that supports access control models like ACL, RBAC, ABAC
- Redis Watcher - Redis-based watcher for Casbin
- Etcd Watcher - Etcd-based watcher for Casbin
- GORM Adapter - GORM adapter for Casbin
- Ent Adapter - Ent adapter for Casbin