@@ -66,8 +66,8 @@ func (c *kafkaSourceConf) validate() error {
6666 return nil
6767}
6868
69- func (c * kafkaSourceConf ) GetReaderConfig () kafkago.ReaderConfig {
70- return kafkago.ReaderConfig {
69+ func (c * kafkaSourceConf ) GetReaderConfig () * kafkago.ReaderConfig {
70+ return & kafkago.ReaderConfig {
7171 Brokers : strings .Split (c .Brokers , "," ),
7272 GroupID : c .GroupID ,
7373 Topic : c .Topic ,
@@ -102,6 +102,10 @@ func (k *KafkaSource) Provision(ctx api.StreamContext, configs map[string]any) e
102102 return err
103103 }
104104 k .sc = kConf
105+ if err := k .sc .GetReaderConfig ().Validate (); err != nil {
106+ conf .Log .Errorf ("kafka souce config validate err: %v" , err )
107+ return fmt .Errorf ("kafka souce config validate err: %v" , err )
108+ }
105109 tlsConfig , err := cert .GenTLSConfig (ctx , configs )
106110 if err != nil {
107111 conf .Log .Errorf ("kafka tls conf error: %v" , err )
@@ -173,13 +177,18 @@ func (k *KafkaSource) Connect(ctx api.StreamContext, sch api.StatusChangeHandler
173177 TLS : k .tlsConfig ,
174178 SASLMechanism : k .mechanism ,
175179 }
176- reader := kafkago .NewReader (readerConfig )
180+ reader := kafkago .NewReader (* readerConfig )
177181 k .reader = reader
178- err := k .reader .SetOffset (kafkago .LastOffset )
179- if err != nil {
180- k .connected = false
181- sch (api .ConnectionDisconnected , err .Error ())
182- return err
182+ if len (k .sc .GroupID ) < 1 {
183+ err := k .reader .SetOffset (kafkago .LastOffset )
184+ if err != nil {
185+ k .connected = false
186+ sch (api .ConnectionDisconnected , err .Error ())
187+ return fmt .Errorf ("kafka source SetOffset error: %v" , err )
188+ } else {
189+ k .connected = true
190+ sch (api .ConnectionConnected , "" )
191+ }
183192 } else {
184193 k .connected = true
185194 sch (api .ConnectionConnected , "" )
@@ -193,6 +202,7 @@ func (k *KafkaSource) handleConnectedSch(err error) {
193202 k .connected = false
194203 k .sch (api .ConnectionDisconnected , err .Error ())
195204 } else if ! k .connected && err == nil {
205+ k .connected = true
196206 k .sch (api .ConnectionConnected , "" )
197207 }
198208}
@@ -231,9 +241,11 @@ func (k *KafkaSource) Rewind(offset interface{}) error {
231241 default :
232242 return fmt .Errorf ("%v can't be set as offset" , offset )
233243 }
234- if err := k .reader .SetOffset (offsetV ); err != nil {
235- conf .Log .Errorf ("kafka offset error: %v" , err )
236- return fmt .Errorf ("set kafka offset failed, err:%v" , err )
244+ if len (k .sc .GroupID ) < 1 {
245+ if err := k .reader .SetOffset (offsetV ); err != nil {
246+ conf .Log .Errorf ("kafka offset error: %v" , err )
247+ return fmt .Errorf ("set kafka offset failed, err:%v" , err )
248+ }
237249 }
238250 return nil
239251}
0 commit comments