Open
Description
Expected behavior
Setup:
- topic has a schema uploaded to the registry
- schemaEnforced is set to true
- allowAutoUpdates is set to false
- producer is created successfully with a JSON schema that follows the definition from the registry
Expected:
- producer can only publish messages that are compliant with the schema
Actual behavior
The producer created with a schema can publish non-compliant messages. The payload is not validated against the schema.
Steps to reproduce
The first 3 steps from setup can be done through the admin API:
curl --request POST \
--url $BASE_URL/admin/v2/schemas/$TENANT/$NAMESPACE/$TOPIC/schema \
--header 'Content-Type: application/json' \
--data '{
"type": "JSON",
"schema": "{\n \"type\": \"record\",\n \"name\": \"SchemaTest\",\n \"fields\": [\n {\n \"name\": \"userName\",\n \"type\": \"string\"\n },\n {\n \"name\": \"userAge\",\n \"type\": \"int\"\n }\n ]\n}",
"properties": {}
}'
curl --request POST \
--url $BASE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/schemaValidationEnforced \
--header 'Content-Type: application/json' \
--data true
curl --request POST \
--url $BASE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/isAllowAutoUpdateSchema \
--header 'Content-Type: application/json' \
--data false
The code:
package main
import (
"context"
"fmt"
"log/slog"
"os"
"time"
"github.com/apache/pulsar-client-go/pulsar"
pulsarauth "github.com/apache/pulsar-client-go/pulsar/auth"
"github.com/google/uuid"
)
type SchemaTest struct {
UserName string `json:"userName" avro:"userName"`
UserAge int `json:"userAge" avro:"userAge"`
}
type WrongSchema struct {
NotUserName string `json:"notUserName" avro:"notUserName"`
NotUserAge int `json:"notUserAge" avro:"notUserAge"`
}
func main() {
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, nil)))
// Load settings here or hard-code them
token := "insert-your-token-here"
authProvider := pulsarauth.NewAuthenticationToken(token)
pulsarClient, err := client.CreatePulsarClient(settings.Url, authProvider)
if err != nil {
slog.Error("Error creating the pulsar client", err)
}
defer pulsarClient.Close()
producerName := fmt.Sprintf("SchemaProducer-%s", uuid.NewString()[0:6])
schemaDef := `{
"type": "record",
"name": "SchemaTest",
"fields": [
{
"name": "userName",
"type": "string"
},
{
"name": "userAge",
"type": "int"
}
]
}`
schemaExample := pulsar.NewJSONSchema(schemaDef, nil)
producer, err := pulsarClient.CreateProducer(pulsar.ProducerOptions{
Name: producerName,
Topic: settings.TopicAddress,
Schema: schemaExample,
})
if err != nil {
slog.Error("Error creating the pulsar producer", "error", err)
}
defer producer.Close()
for {
//goodMsg := &SchemaTest{
// UserName: "JohnDoe",
// UserAge: 30,
//}
badMsg := &WrongSchema{
NotUserName: "JohnDoe",
NotUserAge: 30,
}
msgId, sendErr := producer.Send(context.Background(), &pulsar.ProducerMessage{
//Value: goodMsg,
Value: badMsg,
//Schema: schemaExample,
})
if sendErr != nil {
slog.Error("Error sending message", "error", sendErr)
break
}
//slog.Info("Published message: ", slog.String("messageId", msgId.String()), slog.Any("message", goodMsg))
slog.Info("Published message: ", slog.String("messageId", msgId.String()), slog.Any("message", badMsg))
time.Sleep(time.Second * 1)
}
}
The schema from the registry (http response dump):
{
"version": 0,
"type": "JSON",
"timestamp": 1728915217796,
"data": "{\n \"type\": \"record\",\n \"name\": \"SchemaTest\",\n \"fields\": [\n {\n \"name\": \"userName\",\n \"type\": \"string\"\n },\n {\n \"name\": \"userAge\",\n \"type\": \"int\"\n }\n ]\n}",
"properties": {}
}
System configuration
Pulsar version: 3.0.6.8
Metadata
Metadata
Assignees
Labels
No labels
Activity