6060import org .apache .fluss .server .coordinator .CoordinatorContext ;
6161import org .apache .fluss .server .entity .FetchReqInfo ;
6262import org .apache .fluss .server .entity .LakeBucketOffset ;
63+ import org .apache .fluss .server .entity .LogUserContext ;
6364import org .apache .fluss .server .entity .NotifyKvSnapshotOffsetData ;
6465import org .apache .fluss .server .entity .NotifyLakeTableOffsetData ;
6566import org .apache .fluss .server .entity .NotifyLeaderAndIsrData ;
@@ -450,21 +451,30 @@ public void appendRecordsToLog(
450451 int timeoutMs ,
451452 int requiredAcks ,
452453 Map <TableBucket , MemoryLogRecords > entriesPerBucket ,
454+ LogUserContext userContext ,
453455 Consumer <List <ProduceLogResultForBucket >> responseCallback ) {
454456 if (isRequiredAcksInvalid (requiredAcks )) {
455457 throw new InvalidRequiredAcksException ("Invalid required acks: " + requiredAcks );
456458 }
457459
458460 long startTime = System .currentTimeMillis ();
459461 Map <TableBucket , ProduceLogResultForBucket > appendResult =
460- appendToLocalLog (entriesPerBucket , requiredAcks );
462+ appendToLocalLog (entriesPerBucket , requiredAcks , userContext );
461463 LOG .debug ("Append records to local log in {} ms" , System .currentTimeMillis () - startTime );
462464
463465 // maybe do delay write operation.
464466 maybeAddDelayedWrite (
465467 timeoutMs , requiredAcks , entriesPerBucket .size (), appendResult , responseCallback );
466468 }
467469
470+ public void appendRecordsToLog (
471+ int timeoutMs ,
472+ int requiredAcks ,
473+ Map <TableBucket , MemoryLogRecords > entriesPerBucket ,
474+ Consumer <List <ProduceLogResultForBucket >> responseCallback ) {
475+ appendRecordsToLog (timeoutMs , requiredAcks , entriesPerBucket , null , responseCallback );
476+ }
477+
468478 /**
469479 * Fetch records from a replica. Currently, we will return the fetched records immediately.
470480 *
@@ -474,17 +484,27 @@ public void appendRecordsToLog(
474484 public void fetchLogRecords (
475485 FetchParams params ,
476486 Map <TableBucket , FetchReqInfo > bucketFetchInfo ,
487+ LogUserContext userContext ,
477488 Consumer <Map <TableBucket , FetchLogResultForBucket >> responseCallback ) {
478489 long startTime = System .currentTimeMillis ();
479- Map <TableBucket , LogReadResult > logReadResults = readFromLog (params , bucketFetchInfo );
490+ Map <TableBucket , LogReadResult > logReadResults =
491+ readFromLog (params , bucketFetchInfo , userContext );
480492 if (LOG .isTraceEnabled ()) {
481493 LOG .trace (
482494 "Fetch log records from local log in {} ms" ,
483495 System .currentTimeMillis () - startTime );
484496 }
485497
486498 // maybe do delay fetch log operation.
487- maybeAddDelayedFetchLog (params , bucketFetchInfo , logReadResults , responseCallback );
499+ maybeAddDelayedFetchLog (
500+ params , bucketFetchInfo , logReadResults , userContext , responseCallback );
501+ }
502+
503+ public void fetchLogRecords (
504+ FetchParams params ,
505+ Map <TableBucket , FetchReqInfo > bucketFetchInfo ,
506+ Consumer <Map <TableBucket , FetchLogResultForBucket >> responseCallback ) {
507+ fetchLogRecords (params , bucketFetchInfo , null , responseCallback );
488508 }
489509
490510 /**
@@ -496,14 +516,15 @@ public void putRecordsToKv(
496516 int requiredAcks ,
497517 Map <TableBucket , KvRecordBatch > entriesPerBucket ,
498518 @ Nullable int [] targetColumns ,
519+ LogUserContext userContext ,
499520 Consumer <List <PutKvResultForBucket >> responseCallback ) {
500521 if (isRequiredAcksInvalid (requiredAcks )) {
501522 throw new InvalidRequiredAcksException ("Invalid required acks: " + requiredAcks );
502523 }
503524
504525 long startTime = System .currentTimeMillis ();
505526 Map <TableBucket , PutKvResultForBucket > kvPutResult =
506- putToLocalKv (entriesPerBucket , targetColumns , requiredAcks );
527+ putToLocalKv (entriesPerBucket , targetColumns , requiredAcks , userContext );
507528 LOG .debug (
508529 "Put records to local kv storage and wait generate cdc log in {} ms" ,
509530 System .currentTimeMillis () - startTime );
@@ -514,6 +535,16 @@ public void putRecordsToKv(
514535 timeoutMs , requiredAcks , entriesPerBucket .size (), kvPutResult , responseCallback );
515536 }
516537
538+ public void putRecordsToKv (
539+ int timeoutMs ,
540+ int requiredAcks ,
541+ Map <TableBucket , KvRecordBatch > entriesPerBucket ,
542+ @ Nullable int [] targetColumns ,
543+ Consumer <List <PutKvResultForBucket >> responseCallback ) {
544+ putRecordsToKv (
545+ timeoutMs , requiredAcks , entriesPerBucket , targetColumns , null , responseCallback );
546+ }
547+
517548 /** Lookup a single key value. */
518549 @ VisibleForTesting
519550 protected void lookup (TableBucket tableBucket , byte [] key , Consumer <byte []> responseCallback ) {
@@ -927,7 +958,9 @@ private void addFetcherForReplicas(
927958
928959 /** Append log records to leader replicas of the buckets. */
929960 private Map <TableBucket , ProduceLogResultForBucket > appendToLocalLog (
930- Map <TableBucket , MemoryLogRecords > entriesPerBucket , int requiredAcks ) {
961+ Map <TableBucket , MemoryLogRecords > entriesPerBucket ,
962+ int requiredAcks ,
963+ LogUserContext userContext ) {
931964 Map <TableBucket , ProduceLogResultForBucket > resultForBucketMap = new HashMap <>();
932965 for (Map .Entry <TableBucket , MemoryLogRecords > entry : entriesPerBucket .entrySet ()) {
933966 TableBucket tb = entry .getKey ();
@@ -950,8 +983,8 @@ private Map<TableBucket, ProduceLogResultForBucket> appendToLocalLog(
950983 resultForBucketMap .put (
951984 tb ,
952985 new ProduceLogResultForBucket (tb , baseOffset , appendInfo .lastOffset () + 1 ));
953- tableMetrics .incLogBytesIn (appendInfo .validBytes ());
954- tableMetrics .incLogMessageIn (appendInfo .numMessages ());
986+ tableMetrics .incLogBytesIn (appendInfo .validBytes (), userContext );
987+ tableMetrics .incLogMessageIn (appendInfo .numMessages (), userContext );
955988 } catch (Exception e ) {
956989 if (isUnexpectedException (e )) {
957990 LOG .error ("Error append records to local log on replica {}" , tb , e );
@@ -973,7 +1006,8 @@ private Map<TableBucket, ProduceLogResultForBucket> appendToLocalLog(
9731006 private Map <TableBucket , PutKvResultForBucket > putToLocalKv (
9741007 Map <TableBucket , KvRecordBatch > entriesPerBucket ,
9751008 @ Nullable int [] targetColumns ,
976- int requiredAcks ) {
1009+ int requiredAcks ,
1010+ LogUserContext userContext ) {
9771011 Map <TableBucket , PutKvResultForBucket > putResultForBucketMap = new HashMap <>();
9781012 for (Map .Entry <TableBucket , KvRecordBatch > entry : entriesPerBucket .entrySet ()) {
9791013 TableBucket tb = entry .getKey ();
@@ -997,8 +1031,8 @@ private Map<TableBucket, PutKvResultForBucket> putToLocalKv(
9971031 tableMetrics .incKvMessageIn (entry .getValue ().getRecordCount ());
9981032 tableMetrics .incKvBytesIn (entry .getValue ().sizeInBytes ());
9991033 // metric for cdc log of kv
1000- tableMetrics .incLogBytesIn (appendInfo .validBytes ());
1001- tableMetrics .incLogMessageIn (appendInfo .numMessages ());
1034+ tableMetrics .incLogBytesIn (appendInfo .validBytes (), userContext );
1035+ tableMetrics .incLogMessageIn (appendInfo .numMessages (), userContext );
10021036 } catch (Exception e ) {
10031037 if (isUnexpectedException (e )) {
10041038 LOG .error ("Error put records to local kv on replica {}" , tb , e );
@@ -1051,7 +1085,9 @@ public void limitScan(
10511085 }
10521086
10531087 public Map <TableBucket , LogReadResult > readFromLog (
1054- FetchParams fetchParams , Map <TableBucket , FetchReqInfo > bucketFetchInfo ) {
1088+ FetchParams fetchParams ,
1089+ Map <TableBucket , FetchReqInfo > bucketFetchInfo ,
1090+ LogUserContext userContext ) {
10551091 Map <TableBucket , LogReadResult > logReadResult = new HashMap <>();
10561092 boolean isFromFollower = fetchParams .isFromFollower ();
10571093 int limitBytes = fetchParams .maxFetchBytes ();
@@ -1109,7 +1145,7 @@ public Map<TableBucket, LogReadResult> readFromLog(
11091145 if (isFromFollower ) {
11101146 serverMetricGroup .replicationBytesOut ().inc (recordBatchSize );
11111147 } else {
1112- tableMetrics .incLogBytesOut (recordBatchSize );
1148+ tableMetrics .incLogBytesOut (recordBatchSize , userContext );
11131149 }
11141150 } catch (Exception e ) {
11151151 if (isUnexpectedException (e )) {
@@ -1299,6 +1335,7 @@ private void maybeAddDelayedFetchLog(
12991335 FetchParams params ,
13001336 Map <TableBucket , FetchReqInfo > bucketFetchInfo ,
13011337 Map <TableBucket , LogReadResult > logReadResults ,
1338+ LogUserContext userContext ,
13021339 Consumer <Map <TableBucket , FetchLogResultForBucket >> responseCallback ) {
13031340 long bytesReadable = 0 ;
13041341 boolean errorReadingData = false ;
@@ -1365,7 +1402,8 @@ private void maybeAddDelayedFetchLog(
13651402 delayedResponse .putAll (expectedErrorBuckets );
13661403 responseCallback .accept (delayedResponse );
13671404 },
1368- serverMetricGroup );
1405+ serverMetricGroup ,
1406+ userContext );
13691407
13701408 // try to complete the request immediately, otherwise put it into the
13711409 // delayedFetchLogManager; this is because while the delayed fetch log operation is
0 commit comments