@@ -81,6 +81,10 @@ type Transport struct {
8181 // Default to 6s.
8282 MetadataTTL time.Duration
8383
84+ // Topic names for the metadata cached by this transport. If this field is left blank,
85+ // metadata information of all topics in the cluster will be retrieved.
86+ MetadataTopics []string
87+
8488 // Unique identifier that the transport communicates to the brokers when it
8589 // sends requests.
8690 ClientID string
@@ -235,14 +239,15 @@ func (t *Transport) grabPool(addr net.Addr) *connPool {
235239 p = & connPool {
236240 refc : 2 ,
237241
238- dial : t .dial (),
239- dialTimeout : t .dialTimeout (),
240- idleTimeout : t .idleTimeout (),
241- metadataTTL : t .metadataTTL (),
242- clientID : t .ClientID ,
243- tls : t .TLS ,
244- sasl : t .SASL ,
245- resolver : t .Resolver ,
242+ dial : t .dial (),
243+ dialTimeout : t .dialTimeout (),
244+ idleTimeout : t .idleTimeout (),
245+ metadataTTL : t .metadataTTL (),
246+ metadataTopics : t .MetadataTopics ,
247+ clientID : t .ClientID ,
248+ tls : t .TLS ,
249+ sasl : t .SASL ,
250+ resolver : t .Resolver ,
246251
247252 ready : make (event ),
248253 wake : make (chan event ),
@@ -276,14 +281,15 @@ type connPool struct {
276281 // Immutable fields of the connection pool. Connections access these field
277282 // on their parent pool in a ready-only fashion, so no synchronization is
278283 // required.
279- dial func (context.Context , string , string ) (net.Conn , error )
280- dialTimeout time.Duration
281- idleTimeout time.Duration
282- metadataTTL time.Duration
283- clientID string
284- tls * tls.Config
285- sasl sasl.Mechanism
286- resolver BrokerResolver
284+ dial func (context.Context , string , string ) (net.Conn , error )
285+ dialTimeout time.Duration
286+ idleTimeout time.Duration
287+ metadataTTL time.Duration
288+ metadataTopics []string
289+ clientID string
290+ tls * tls.Config
291+ sasl sasl.Mechanism
292+ resolver BrokerResolver
287293 // Signaling mechanisms to orchestrate communications between the pool and
288294 // the rest of the program.
289295 once sync.Once // ensure that `ready` is triggered only once
@@ -592,13 +598,16 @@ func (p *connPool) discover(ctx context.Context, wake <-chan event) {
592598 var notify event
593599 done := ctx .Done ()
594600
601+ req := & meta.Request {
602+ TopicNames : p .metadataTopics ,
603+ }
604+
595605 for {
596606 c , err := p .grabClusterConn (ctx )
597607 if err != nil {
598608 p .update (ctx , nil , err )
599609 } else {
600610 res := make (async , 1 )
601- req := & meta.Request {}
602611 deadline , cancel := context .WithTimeout (ctx , p .metadataTTL )
603612 c .reqs <- connRequest {
604613 ctx : deadline ,
0 commit comments