-
Notifications
You must be signed in to change notification settings - Fork 21
Closed Shards Prevent Startup #12
Description
Describe the bug
To preface, I am new to Kinesis and the KCL, and this could be a configuration issue (either with Kinesis itself or with the KCL). If I'm doing something incredibly wrong, please let me know. I'd be happy if this was "user error". 😛
As a basic example, if I have a Kinesis stream with a single shard, everything works as expected. When the worker starts up, it takes a lease for that shard, and starts processing records. However, if I re-shard the stream to instead have two shards, then AWS closes the first shard, and creates two new shards.
The result is that the KCL worker repeatedly attempts to take out a lease on the closed shard, and then immediately closes it. This coupled with the fact that the worker will only start a job for a single shard per shard sync and always iterates over shards ordered by ID (meaning older shards first) means that the worker never actually starts jobs for the other (valid) shards. It just repeatedly tries to take a lease for the closed shard, then the polling job exits immediately since the shard is closed. On the next sync, it once again grabs a lease for the closed shard, and the process repeats forever. It does not seem to ever stop visiting the closed shard, and because it will only look at one shard per-sync, it never starts jobs for the other two valid shards.
Reproduction steps
- Create a Kinesis Data Stream with a single shard.
- Re-shard the data stream to have 2 shards (which will close the original shard).
- Start a KCL worker.
The worker is as simple as it gets. I observe the same behavior when using enhanced fan-out or polling.
package main
import (
"os"
"fmt"
"os/signal"
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/config"
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces"
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/worker"
)
type RecordProcessor struct {}
type RecordProcessorFactory struct {}
func (f *RecordProcessorFactory) CreateProcessor() interfaces.IRecordProcessor {
return &RecordProcessor{}
}
func (p *RecordProcessor) ProcessRecords(input *interfaces.ProcessRecordsInput) {}
func (p *RecordProcessor) Initialize(input *interfaces.InitializationInput) {}
func (p *RecordProcessor) Shutdown(input *interfaces.ShutdownInput) {}
func main() {
// Separately, I have no idea why, but the library seems incapable of figuring out the
// Kinesis service endpoint on it's own. Not specifying it manually results in errors
// where it seemingly is trying to use an empty string as a service endpoint, but that's
// probably a problem for a separate issue.
cfg := config.NewKinesisClientLibConfig("test", "caleb-testing", "us-east-2", "worker")
cfg.KinesisEndpoint = "https://kinesis.us-east-2.amazonaws.com"
kcl := worker.NewWorker(&RecordProcessorFactory{}, cfg)
if err := kcl.Start(); err != nil {
fmt.Printf("[!] failed to start kcl worker: %v\n", err)
return
}
defer kcl.Shutdown()
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt, os.Kill)
for range signals {
break
}
return
}Running the above example with a single shard, before re-sharding looks like this:

Running after re-sharding (where shardId-000000000000 is now closed) looks like:

As you can see, no polling shard consumers are started, so no records are being processed because the worker only attempted to start a single consumer, which immediately exited. When a shard sync triggered ~60 seconds later, it tried to pick up the same closed shard, and still did not start processing records.
Expected behavior
The worker starts up, and takes leases up to MaxLeasesForWorker, and starts a polling (or enhanced fan-out) job for each of the leased shards immediately.
Additional context
There are multiple things that don't make sense to me going on here.
- Why does the KCL worker only start a job for a single shard on each sync? According to the documentation, it should take out leases up to
MaxLeasesForWorker, and start jobs for each shard that it leases, but in practice it only takes out one lease at a time per-sync. If you have many shards, and use the defaultShardSyncInterval(1 minute), then it will take a very long time for the worker to startup and listen on all the expected shards. Even if you had a small number of shards like 10, and 2 workers, then it would take 5 full minutes at a minimum to even startup a single worker assuming they each lease 5 shards (these are just made up numbers to explain the point; not saying that configuration makes sense in a production context). - Why does the KCL worker attempt to start jobs for closed shards? Because it iterates over shards in order by ID and the above behavior of only starting one shard job, the worker gets stuck attempting to lease a closed shard forever, and never progresses to other shards.
Regarding the "only starting one job per sync" question, it seems this happens because of the break statement here. I'm struggling to understand why this happens at all, to be honest.
Is it normal to take an extremely long time to start up a worker? Is this a bug? Do people just set the shard sync interval much lower than default in practice? I can't find anything in the upstream AWS Java KCL documentation that mentions this. It all seems to indicate that on startup, a worker will take out leases up to the max lease limit, and start the appropriate processing jobs.