Skip to content

Unable to pull messages from imported JS #1481

@z0mb1ek

Description

@z0mb1ek

Observed behavior

I make simple importing for JS and cannot correctly pull it. Thread in slack here

Expected behavior

cannot make durable consumer and pull

Server and client version

server 2.12.2 jnats 2.24.0

Host environment

No response

Steps to reproduce

make simple config for server:

jetstream {
  max_file_store: 10GB
}

accounts {
  "hydra-aggregator": {
    users: [
      { user: "hydra-aggregator", password: "pass" }
    ]

    exports: [
    { service: "$JS.API.>" }
    ]

    jetstream: enabled
    jetstream {
      max_mem: 256M
      max_file: 1G
      max_streams: 100
      max_consumers: 500
    }
  }

  "push-manager": {
    users: [
      { user: "push-manager", password: "pass" }
    ]

    imports: [
    { service: { account: "hydra-aggregator", subject: "$JS.API.>"}, to: "$JS.HYDRA_AGGREGATOR.API.>"  }
    ]

    jetstream: enabled
    jetstream {
      max_mem: 256M
      max_file: 1G
      max_streams: 100
      max_consumers: 500
    }
  }
}

push to JS:

val connect = Nats
        .connect(
            Options
                .Builder()
                .server("localhost")
                .userInfo("hydra-aggregator", "pass")
                .maxReconnects(-1)
                .build(),
        )

    connect.jetStreamManagement().addStream(
        StreamConfiguration
            .builder()
            .name("HYDRA_AGGREGATOR")
            .storageType(StorageType.File)
            .subjects("hydra_aggregator.external.kafka.test.event")
            .maxAge(Duration.ofDays(7))
            .duplicateWindow(Duration.ofDays(1))
            .retentionPolicy(RetentionPolicy.Limits)
            .build(),
    )

    connect.jetStream().publish("hydra_aggregator.external.kafka.test.event","test1".toByteArray())
    connect.jetStream().publish("hydra_aggregator.external.kafka.test.event","test2".toByteArray())
    connect.jetStream().publish("hydra_aggregator.external.kafka.test.event","test3".toByteArray())
    connect.jetStream().publish("hydra_aggregator.external.kafka.test.event","test4".toByteArray())
    connect.jetStream().publish("hydra_aggregator.external.kafka.test.event","test5".toByteArray())

and try to get:

val connect = Nats
        .connect(
            Options
                .Builder()
                .server("localhost")
                .userInfo("push-manager", "pass")
                .maxReconnects(-1)
                .build(),
        )

    connect.getStreamContext(
        "HYDRA_AGGREGATOR",
        JetStreamOptions
            .builder()
            .domain("HYDRA_AGGREGATOR")
            .build(),
    ).createOrUpdateConsumer(
        ConsumerConfiguration.builder()
            .filterSubjects("hydra_aggregator.external.kafka.test.event")
            .maxAckPending(1)
            .durable("hydra_aggregator_external_kafka_test_event")
            .build(),
    ).consume(
        ConsumeOptions.builder().build(), connect.createDispatcher(),
    ) { msg ->
        println(msg)

        msg.ack()
    }

i see in logs:

NatsJetStreamMessage |hydra_aggregator.external.kafka.test.event|$JS.ACK.HYDRA_AGGREGATOR.hydra_aggregator_external_kafka_test_event.11.1.11.1763469021328867004.4|test1|
NatsJetStreamMessage |hydra_aggregator.external.kafka.test.event|$JS.ACK.HYDRA_AGGREGATOR.hydra_aggregator_external_kafka_test_event.12.1.12.1763469021328867004.4|test1|
Nov 18, 2025 4:16:49 PM io.nats.client.impl.ErrorListenerLoggerImpl heartbeatAlarm
SEVERE: heartbeatAlarm, Connection: 34, Subscription: 1890795193, Consumer Name: hydra_aggregator_external_kafka_test_event, lastStreamSequence: 1, lastConsumerSequence: 1

and see then in cli:

nats --server localhost:4222 --user push-manager --password pass consumer info --js-domain HYDRA_AGGREGATOR
? Select a Stream HYDRA_AGGREGATOR
? Select a Consumer hydra_aggregator_external_kafka_test_event
Information for Consumer HYDRA_AGGREGATOR > hydra_aggregator_external_kafka_test_event created 2025-11-18 15:45:46

Configuration:

                    Name: hydra_aggregator_external_kafka_test_event
               Pull Mode: true
          Filter Subject: hydra_aggregator.external.kafka.test.event
          Deliver Policy: All
              Ack Policy: Explicit
                Ack Wait: 30.00s
           Replay Policy: Instant
         Max Ack Pending: 1
       Max Waiting Pulls: 512

State:

            Host Version: 2.12.2
      Required API Level: 0 hosted at level 2
  Last Delivered Message: Consumer sequence: 12 Stream sequence: 1 Last delivery: 18.61s ago
    Acknowledgment Floor: Consumer sequence: 0 Stream sequence: 0
        Outstanding Acks: 1 out of maximum 1
    Redelivered Messages: 1
    Unprocessed Messages: 4
           Waiting Pulls: 1 of maximum 512

Metadata

Metadata

Assignees

No one assigned

    Labels

    defectSuspected defect such as a bug or regression

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions