Skip to content

Conversation

@RagingRedRiot
Copy link
Contributor

@RagingRedRiot RagingRedRiot commented Aug 5, 2025

Description of the change

Abnormal Security API adapter

Type of change

  • Bug fix (non-breaking change that fixes an issue)
  • New feature (non-breaking change that adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)

Related issues

BHISSOC/tracking#15

@RagingRedRiot
Copy link
Contributor Author

I tested this adapter against the API for all log/api types other than cases.

I used the following tests to test Error handling and cases log ingestion.

package usp_abnormal_security

import (
	"context"
	"encoding/json"
	"fmt"
	"net/http"
	"net/http/httptest"
	"strings"
	"sync"
	"testing"
	"time"

	"github.com/refractionPOINT/go-uspclient"
)

const (
	TestInstallationKey = "<InstallationKey>"
	TestOid             = "<OID>"
	TestSensorSeedKey   = "<SEED>"
	TestHostname        = "<HOSTNAME>"
)

func TestInternalServerErrorRetryMechanism(t *testing.T) {
	var requestCount int
	var mu sync.Mutex

	// Create a mock server to control responses to the adapter
	mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		mu.Lock()
		requestCount++
		currentCount := requestCount
		mu.Unlock()

		switch {
		// Cases base queries
		case strings.Contains(r.URL.Path, "/cases") && !strings.Contains(r.URL.Path, "/cases/"):

			// Respond with an error on the second query
			if currentCount == 2 {
				w.WriteHeader(http.StatusInternalServerError)
				w.Write([]byte(`{"error": "Internal Server Error"}`))
				return
			}

			w.Header().Set("Content-Type", "application/json")
			response :=
				// Docs provided sample: https://app.swaggerhub.com/apis/abnormal-security/abx/1.4.3#/Cases/v1_cases_retrieve
				map[string]interface{}{
					"cases": []map[string]interface{}{
						{
							"caseId":         "1234",
							"description":    "string",
							"severity_level": "LOW",
							"confidence":     "LOW",
							"last_modified":  "2025-08-05T17:22:20.307Z",
							"first_observed": "2025-08-05T17:22:20.307Z",
							"created":        "2025-08-05T17:22:20.307Z",
						},
					},
					"pageNumber":     1,
					"nextPageNumber": 0,
				}
			json.NewEncoder(w).Encode(response)
		// Cases detail queries
		case strings.Contains(r.URL.Path, "/cases/"):

			// Return 500 error for first 2 requests
			if currentCount <= 4 {
				w.WriteHeader(http.StatusInternalServerError)
				w.Write([]byte(`{"error": "Internal Server Error"}`))
				return
			}
			w.Header().Set("Content-Type", "application/json")
			response :=
				// Docs provided sample: https://app.swaggerhub.com/apis/abnormal-security/abx/1.4.3#/Cases/v1_cases_retrieve_2
				map[string]interface{}{
					"caseId":              "1234",
					"case_status":         "Action Required",
					"severity":            "Potential Account Takeover",
					"affectedEmployee":    "FirstName LastName",
					"customerVisibleTime": "2025-08-05T16:20:48.531Z",
					"firstObserved":       "2020-06-09T17:42:59Z",
					"analysis":            "string",
					"remediation_status":  "Not remediated",
					"severity_level":      "NONE",
					"confidence":          "LOW",
				}
			json.NewEncoder(w).Encode(response)

		}
	}))

	defer mockServer.Close()

	// Create adapter configuration with mock server URL
	config := AbnormalSecurityConfig{
		ClientOptions: uspclient.ClientOptions{
			OnWarning: func(msg string) {
				t.Logf("Warning: %s", msg)
			},
			OnError: func(err error) {
				t.Logf("Error: %v", err)
			},
			DebugLog: func(msg string) {
				t.Logf("Debug: %s", msg)
			},
		},
		AccessToken:          "test-token",
		BaseURL:              mockServer.URL,
		MaxConcurrentWorkers: 1,
	}

	config.ClientOptions.Identity.InstallationKey = TestInstallationKey
	config.ClientOptions.Identity.Oid = TestOid
	config.ClientOptions.SensorSeedKey = TestSensorSeedKey
	config.ClientOptions.Hostname = TestHostname
	config.ClientOptions.Platform = "json"
	config.ClientOptions.Architecture = "usp_adapter"

	adapter, stopped, err := NewAbnormalSecurityAdapter(config)
	if err != nil {
		t.Fatalf("Failed to create adapter: %v", err)
	}
	defer func() {
		adapter.Close()
		<-stopped
	}()

	// Create a test API configuration
	api := &Api{
		key:                "cases",
		endpoint:           "/cases",
		since:              time.Now().Add(-1 * queryInterval * time.Second),
		idField:            "caseId",
		timeField:          "lastModifiedTime",
		timeFieldSecondary: "customerVisibleTime",
		active:             true,
		dedupe:             make(map[string]int64),
		responseType:       &AbnormalSecurityCasesResponse{},
		detailResponseType: &AbnormalSecurityFlatSingleResponse{},
	}

	// First Base Attempt -- should succeed
	_, _, err = adapter.getEvents(context.Background(), mockServer.URL+"/cases", api, false)

	if err != nil {
		t.Errorf("expected success, but got error: %s", err.Error())
	}

	// Second Base Attempt -- should fail and NOT schedule a retry
	_, _, err = adapter.getEvents(context.Background(), mockServer.URL+"/cases", api, false)

	expectedError := "server error 500 scheduled for retry"
	if err.Error() == expectedError {
		t.Errorf("expected error but not error '%s', got %s", expectedError, err.Error())
	}
	if err == nil {
		t.Errorf("expected err, but got success.")
	}

	// First Detail Attempt -- should fail
	_, _, err = adapter.getEvents(context.Background(), mockServer.URL+"/cases/1234", api, true)

	if err == nil {
		t.Error("Expected error from initial 500 response, but got none")
	}
	if err.Error() != expectedError {
		t.Errorf("Expected error '%s', got '%s'", expectedError, err.Error())
	}

	time.Sleep(100 * time.Millisecond)

	// Second Detail Attempt -- should fail
	_, _, err = adapter.getEvents(context.Background(), mockServer.URL+"/cases/1234", api, true)
	if err == nil {
		t.Error("Expected error from initial 500 response, but got none")
	}
	if err.Error() != expectedError {
		t.Errorf("Expected error '%s', got '%s'", expectedError, err.Error())
	}
	time.Sleep(100 * time.Millisecond)

	// Third Detail Attempt -- should Succeed
	_, _, err = adapter.getEvents(context.Background(), mockServer.URL+"/cases/1234", api, true)
	if err != nil {
		t.Errorf("Expected a success, got an error: %s", err.Error())
	}

	mu.Lock()
	finalRequestCount := requestCount
	mu.Unlock()

	t.Logf("Test completed with %d requests", finalRequestCount)
	time.Sleep(100 * time.Millisecond)
}

func TestRetryScheduling(t *testing.T) {
	config := AbnormalSecurityConfig{
		ClientOptions: uspclient.ClientOptions{
			OnWarning: func(msg string) { t.Logf("Warning: %s", msg) },
			OnError:   func(err error) { t.Logf("Error: %v", err) },
			DebugLog:  func(msg string) { t.Logf("Debug: %s", msg) },
		},
		AccessToken:          "test-token",
		BaseURL:              "https://example.com",
		MaxConcurrentWorkers: 1,
	}

	config.ClientOptions.Identity.InstallationKey = TestInstallationKey
	config.ClientOptions.Identity.Oid = TestOid
	config.ClientOptions.SensorSeedKey = TestSensorSeedKey
	config.ClientOptions.Hostname = TestHostname
	config.ClientOptions.Platform = "json"
	config.ClientOptions.Architecture = "usp_adapter"

	adapter, stopped, err := NewAbnormalSecurityAdapter(config)
	if err != nil {
		t.Fatalf("Failed to create adapter: %v", err)
	}
	defer func() {
		adapter.Close()
		<-stopped
	}()

	ctx := context.Background()
	api := &Api{key: "test", endpoint: "/test"}

	// Test that we can schedule retries without blocking
	retry1 := InternalServerErrorRetry{
		ctx:       ctx,
		pageUrl:   "https://example.com/test1",
		api:       api,
		details:   false,
		retryTime: time.Now(),
		attempt:   1,
	}

	retry2 := InternalServerErrorRetry{
		ctx:       ctx,
		pageUrl:   "https://example.com/test2",
		api:       api,
		details:   false,
		retryTime: time.Now(),
		attempt:   2,
	}

	retry3 := InternalServerErrorRetry{
		ctx:       ctx,
		pageUrl:   "https://example.com/test3",
		api:       api,
		details:   false,
		retryTime: time.Now(),
		attempt:   3,
	}

	// These should not block
	select {
	case adapter.chRetry <- retry1:
		t.Log("Successfully scheduled retry 1")
	case <-time.After(100 * time.Millisecond):
		t.Error("Failed to schedule retry 1 - channel blocked")
	}

	select {
	case adapter.chRetry <- retry2:
		t.Log("Successfully scheduled retry 2")
	case <-time.After(100 * time.Millisecond):
		t.Error("Failed to schedule retry 2 - channel blocked")
	}

	select {
	case adapter.chRetry <- retry3:
		t.Log("Successfully scheduled retry 3")
	case <-time.After(100 * time.Millisecond):
		t.Error("Failed to schedule retry 3 - channel blocked")
	}

	// Give the retry manager a moment to process
	time.Sleep(200 * time.Millisecond)
}

func TestRetryChannelCapacity(t *testing.T) {
	config := AbnormalSecurityConfig{
		ClientOptions: uspclient.ClientOptions{
			OnWarning: func(msg string) { t.Logf("Warning: %s", msg) },
			OnError:   func(err error) { t.Logf("Error: %v", err) },
			DebugLog:  func(msg string) { t.Logf("Debug: %s", msg) },
		},
		AccessToken:          "test-token",
		BaseURL:              "https://example.com",
		MaxConcurrentWorkers: 1,
	}

	config.ClientOptions.Identity.InstallationKey = TestInstallationKey
	config.ClientOptions.Identity.Oid = TestOid
	config.ClientOptions.SensorSeedKey = TestSensorSeedKey
	config.ClientOptions.Hostname = TestHostname
	config.ClientOptions.Platform = "json"
	config.ClientOptions.Architecture = "usp_adapter"

	adapter, stopped, err := NewAbnormalSecurityAdapter(config)
	if err != nil {
		t.Fatalf("Failed to create adapter: %v", err)
	}
	defer func() {
		adapter.Close()
		<-stopped
	}()

	ctx := context.Background()
	api := &Api{key: "test", endpoint: "/test"}

	// Test that the retry channel has significant capacity (should be 1000 based on code)
	// We'll test with a smaller number to keep test fast
	testCapacity := 100

	for i := 0; i < testCapacity; i++ {
		retry := InternalServerErrorRetry{
			ctx:       ctx,
			pageUrl:   fmt.Sprintf("https://example.com/test%d", i),
			api:       api,
			details:   false,
			retryTime: time.Now(),
			attempt:   1,
		}

		select {
		case adapter.chRetry <- retry:
			// Successfully queued
		case <-time.After(10 * time.Millisecond):
			t.Errorf("Retry channel blocked after %d entries, expected capacity of at least %d", i, testCapacity)
			return
		}
	}

	t.Logf("Successfully queued %d retry requests without blocking", testCapacity)
}

func TestDoWithRetryServerErrorHandling(t *testing.T) {
	// Test different server error codes
	testCases := []struct {
		statusCode  int
		expectRetry bool
		description string
	}{
		{500, true, "Internal Server Error"},
		{502, true, "Bad Gateway"},
		{503, true, "Service Unavailable"},
		{504, true, "Gateway Timeout"},
		{400, false, "Bad Request (not a server error)"},
		{401, false, "Unauthorized (not a server error)"},
		{403, false, "Forbidden (not a server error)"},
		{404, false, "Not Found (not a server error)"},
	}

	for _, tc := range testCases {
		t.Run(tc.description, func(t *testing.T) {
			mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
				w.WriteHeader(tc.statusCode)
				w.Write([]byte(fmt.Sprintf(`{"error": "%s"}`, tc.description)))
			}))
			defer mockServer.Close()

			config := AbnormalSecurityConfig{
				ClientOptions: uspclient.ClientOptions{
					OnWarning: func(msg string) { t.Logf("Warning: %s", msg) },
					OnError:   func(err error) { t.Logf("Error: %v", err) },
					DebugLog:  func(msg string) { t.Logf("Debug: %s", msg) },
				},
				AccessToken:          "test-token",
				BaseURL:              mockServer.URL,
				MaxConcurrentWorkers: 1,
			}

			config.ClientOptions.Identity.InstallationKey = TestInstallationKey
			config.ClientOptions.Identity.Oid = TestOid
			config.ClientOptions.SensorSeedKey = TestSensorSeedKey
			config.ClientOptions.Hostname = TestHostname
			config.ClientOptions.Platform = "json"
			config.ClientOptions.Architecture = "usp_adapter"

			adapter, stopped, err := NewAbnormalSecurityAdapter(config)
			if err != nil {
				t.Fatalf("Failed to create adapter: %v", err)
			}
			defer func() {
				adapter.Close()
				<-stopped
			}()

			api := &Api{
				key:                "threats",
				endpoint:           "/threats/",
				since:              time.Now().Add(-1 * time.Hour),
				idField:            "threatId",
				timeField:          "receivedTime",
				active:             true,
				dedupe:             make(map[string]int64),
				responseType:       &AbnormalSecurityThreatsResponse{},
				detailResponseType: &AbnormalSecurityThreatsFlatSingleResponse{},
			}

			_, _, err = adapter.getEvents(context.Background(), mockServer.URL+"/threats", api, true)

			if tc.expectRetry {
				// Should get a "server error X scheduled for retry" message
				if err == nil {
					t.Errorf("Expected error for status %d, got none", tc.statusCode)
				} else if !contains(err.Error(), "scheduled for retry") {
					t.Errorf("Expected retry error for status %d, got: %s", tc.statusCode, err.Error())
				}
			} else {
				// Should get a different error (not scheduled for retry)
				if err != nil && contains(err.Error(), "scheduled for retry") {
					t.Errorf("Did not expect retry for status %d, but got retry error: %s", tc.statusCode, err.Error())
				}
			}
		})
	}
}

// Helper function to check if a string contains a substring
func contains(s, substr string) bool {
	if len(substr) == 0 {
		return true
	}
	if len(s) < len(substr) {
		return false
	}
	for i := 0; i <= len(s)-len(substr); i++ {
		if s[i:i+len(substr)] == substr {
			return true
		}
	}
	return false
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant