31
31
import io .streamnative .pulsar .handlers .kop .exceptions .KoPTopicException ;
32
32
import io .streamnative .pulsar .handlers .kop .format .EntryFormatter ;
33
33
import io .streamnative .pulsar .handlers .kop .format .EntryFormatterFactory ;
34
+ import io .streamnative .pulsar .handlers .kop .migration .metadata .MigrationMetadata ;
35
+ import io .streamnative .pulsar .handlers .kop .migration .metadata .MigrationMetadataManager ;
36
+ import io .streamnative .pulsar .handlers .kop .migration .metadata .MigrationStatus ;
34
37
import io .streamnative .pulsar .handlers .kop .offset .OffsetAndMetadata ;
35
38
import io .streamnative .pulsar .handlers .kop .offset .OffsetMetadata ;
36
39
import io .streamnative .pulsar .handlers .kop .security .SaslAuthenticator ;
91
94
import org .apache .commons .collections4 .ListUtils ;
92
95
import org .apache .commons .lang3 .NotImplementedException ;
93
96
import org .apache .commons .lang3 .tuple .Pair ;
97
+ import org .apache .kafka .clients .producer .Producer ;
98
+ import org .apache .kafka .clients .producer .ProducerRecord ;
94
99
import org .apache .kafka .common .Node ;
95
100
import org .apache .kafka .common .TopicPartition ;
96
101
import org .apache .kafka .common .acl .AclOperation ;
105
110
import org .apache .kafka .common .record .InvalidRecordException ;
106
111
import org .apache .kafka .common .record .MemoryRecords ;
107
112
import org .apache .kafka .common .record .MutableRecordBatch ;
113
+ import org .apache .kafka .common .record .Record ;
108
114
import org .apache .kafka .common .record .RecordBatch ;
109
115
import org .apache .kafka .common .requests .AbstractRequest ;
110
116
import org .apache .kafka .common .requests .AbstractResponse ;
@@ -193,6 +199,7 @@ public class KafkaRequestHandler extends KafkaCommandDecoder {
193
199
private final ScheduledExecutorService executor ;
194
200
private final PulsarAdmin admin ;
195
201
private final MetadataStoreExtended metadataStore ;
202
+ private final MigrationMetadataManager migrationMetadataManager ;
196
203
private final SaslAuthenticator authenticator ;
197
204
private final Authorizer authorizer ;
198
205
private final AdminManager adminManager ;
@@ -235,7 +242,7 @@ private String getCurrentTenant(String defaultTenant) {
235
242
&& authenticator .session () != null
236
243
&& authenticator .session ().getPrincipal () != null
237
244
&& authenticator .session ().getPrincipal ().getTenantSpec () != null ) {
238
- String tenantSpec = authenticator .session ().getPrincipal ().getTenantSpec ();
245
+ String tenantSpec = authenticator .session ().getPrincipal ().getTenantSpec ();
239
246
return extractTenantFromTenantSpec (tenantSpec );
240
247
}
241
248
// fallback to using system (default) tenant
@@ -291,7 +298,8 @@ public KafkaRequestHandler(PulsarService pulsarService,
291
298
boolean skipMessagesWithoutIndex ,
292
299
RequestStats requestStats ,
293
300
OrderedScheduler sendResponseScheduler ,
294
- KafkaTopicManagerSharedState kafkaTopicManagerSharedState ) throws Exception {
301
+ KafkaTopicManagerSharedState kafkaTopicManagerSharedState ,
302
+ MigrationMetadataManager migrationMetadataManager ) throws Exception {
295
303
super (requestStats , kafkaConfig , sendResponseScheduler );
296
304
this .pulsarService = pulsarService ;
297
305
this .tenantContextManager = tenantContextManager ;
@@ -300,6 +308,7 @@ public KafkaRequestHandler(PulsarService pulsarService,
300
308
this .executor = pulsarService .getExecutor ();
301
309
this .admin = pulsarService .getAdminClient ();
302
310
this .metadataStore = pulsarService .getLocalMetadataStore ();
311
+ this .migrationMetadataManager = migrationMetadataManager ;
303
312
final boolean authenticationEnabled = pulsarService .getBrokerService ().isAuthenticationEnabled ()
304
313
&& !kafkaConfig .getSaslAllowedMechanisms ().isEmpty ();
305
314
this .authenticator = authenticationEnabled
@@ -801,6 +810,16 @@ private void completeSendOperationForThrottling(long msgSize) {
801
810
}
802
811
}
803
812
813
+ private void produceToKafka (String topic , MemoryRecords records , MigrationMetadata migrationMetadata ) {
814
+ Producer <String , ByteBuffer > producer =
815
+ migrationMetadataManager .getKafkaProducerForTopic (topic , "public/default" ,
816
+ migrationMetadata .getKafkaClusterAddress ());
817
+ for (Record record : records .records ()) {
818
+ producer .send (new ProducerRecord <>(topic , record .value ()));
819
+ }
820
+ producer .flush ();
821
+ }
822
+
804
823
@ Override
805
824
protected void handleProduceRequest (KafkaHeaderAndRequest produceHar ,
806
825
CompletableFuture <AbstractResponse > resultFuture ) {
@@ -814,6 +833,7 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar,
814
833
}
815
834
final Map <TopicPartition , PartitionResponse > unauthorizedTopicResponsesMap = new ConcurrentHashMap <>();
816
835
final Map <TopicPartition , PartitionResponse > invalidRequestResponses = new HashMap <>();
836
+ final Map <TopicPartition , PartitionResponse > migrationInProgressResponses = new HashMap <>();
817
837
final Map <TopicPartition , MemoryRecords > authorizedRequestInfo = new ConcurrentHashMap <>();
818
838
int timeoutMs = produceRequest .timeout ();
819
839
String namespacePrefix = currentNamespacePrefix ();
@@ -848,43 +868,55 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar,
848
868
mergedResponse .putAll (response );
849
869
mergedResponse .putAll (unauthorizedTopicResponsesMap );
850
870
mergedResponse .putAll (invalidRequestResponses );
871
+ mergedResponse .putAll (migrationInProgressResponses );
851
872
resultFuture .complete (new ProduceResponse (mergedResponse ));
852
873
});
853
874
}
854
875
};
855
876
856
877
produceRequest .partitionRecordsOrFail ().forEach ((topicPartition , records ) -> {
857
- try {
858
- validateRecords (produceHar .getRequest ().version (), records );
859
- } catch (ApiException ex ) {
860
- invalidRequestResponses .put (topicPartition ,
861
- new ProduceResponse .PartitionResponse (Errors .forException (ex )));
862
- completeOne .run ();
863
- return ;
864
- }
865
- final String fullPartitionName = KopTopic .toString (topicPartition , namespacePrefix );
866
- authorize (AclOperation .WRITE , Resource .of (ResourceType .TOPIC , fullPartitionName ))
867
- .whenCompleteAsync ((isAuthorized , ex ) -> {
868
- if (ex != null ) {
869
- log .error ("Write topic authorize failed, topic - {}. {}" ,
870
- fullPartitionName , ex .getMessage ());
871
- unauthorizedTopicResponsesMap .put (topicPartition ,
872
- new ProduceResponse .PartitionResponse (Errors .TOPIC_AUTHORIZATION_FAILED ));
873
- completeOne .run ();
874
- return ;
878
+ migrationMetadataManager .getMigrationMetadata (topicPartition .topic (), "public/default" , ctx .channel ())
879
+ .thenAccept (migrationMetadata -> {
880
+ MigrationStatus migrationStatus = migrationMetadata .getMigrationStatus ();
881
+ if (migrationStatus == MigrationStatus .NOT_STARTED ) {
882
+ produceToKafka (topicPartition .topic (), records , migrationMetadata );
883
+ } else if (migrationStatus == MigrationStatus .STARTED ) {
884
+ migrationInProgressResponses .put (topicPartition ,
885
+ new ProduceResponse .PartitionResponse (
886
+ Errors .forCode (Errors .REBALANCE_IN_PROGRESS .code ())));
875
887
}
876
- if (!isAuthorized ) {
877
- unauthorizedTopicResponsesMap .put (topicPartition ,
878
- new ProduceResponse .PartitionResponse (Errors .TOPIC_AUTHORIZATION_FAILED ));
888
+ try {
889
+ validateRecords (produceHar .getRequest ().version (), records );
890
+ } catch (ApiException ex ) {
891
+ invalidRequestResponses .put (topicPartition ,
892
+ new ProduceResponse .PartitionResponse (Errors .forException (ex )));
879
893
completeOne .run ();
880
894
return ;
881
895
}
882
- authorizedRequestInfo .put (topicPartition , records );
883
- completeOne .run ();
884
- }, ctx .executor ());
896
+ final String fullPartitionName = KopTopic .toString (topicPartition , namespacePrefix );
897
+ authorize (AclOperation .WRITE , Resource .of (ResourceType .TOPIC , fullPartitionName ))
898
+ .whenCompleteAsync ((isAuthorized , ex ) -> {
899
+ if (ex != null ) {
900
+ log .error ("Write topic authorize failed, topic - {}. {}" ,
901
+ fullPartitionName , ex .getMessage ());
902
+ unauthorizedTopicResponsesMap .put (topicPartition ,
903
+ new ProduceResponse .PartitionResponse (
904
+ Errors .TOPIC_AUTHORIZATION_FAILED ));
905
+ completeOne .run ();
906
+ return ;
907
+ }
908
+ if (!isAuthorized ) {
909
+ unauthorizedTopicResponsesMap .put (topicPartition ,
910
+ new ProduceResponse .PartitionResponse (
911
+ Errors .TOPIC_AUTHORIZATION_FAILED ));
912
+ completeOne .run ();
913
+ return ;
914
+ }
915
+ authorizedRequestInfo .put (topicPartition , records );
916
+ completeOne .run ();
917
+ }, ctx .executor ());
918
+ });
885
919
});
886
-
887
-
888
920
}
889
921
890
922
private void validateRecords (short version , MemoryRecords records ) {
@@ -1555,7 +1587,7 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch,
1555
1587
}
1556
1588
String namespacePrefix = currentNamespacePrefix ();
1557
1589
MessageFetchContext .get (this , transactionCoordinator , fetch , resultFuture ,
1558
- fetchPurgatory , namespacePrefix ).handleFetch ();
1590
+ fetchPurgatory , namespacePrefix , migrationMetadataManager ).handleFetch ();
1559
1591
}
1560
1592
1561
1593
@ Override
0 commit comments