Skip to content

Conversation

@egov-arindam
Copy link
Collaborator

@egov-arindam egov-arindam commented Jun 27, 2025

Central Instance changes for MUKTASoft

Summary by CodeRabbit

  • New Features

    • Introduced support for tenant-aware Kafka topic routing and processing.
    • Added configuration options for central instance environments, including tenant ID and topic pattern settings.
  • Enhancements

    • SQL queries now dynamically resolve schema placeholders based on tenant ID for multi-state tenancy.
    • Notification and estimate processing now use tenant-specific topic resolution for improved message delivery.
  • Chores

    • Updated migration script to support Flyway migrations across multiple schemas.

	1. Estimate Service changes
@coderabbitai
Copy link

coderabbitai bot commented Jun 27, 2025

Walkthrough

The changes introduce multi-state and tenant-aware capabilities to the estimate service. Kafka consumers and producers are updated to use tenant-specific topic patterns and message routing. SQL queries now support dynamic schema resolution based on tenant ID. New configuration properties and constants are added, and migration scripts are enhanced to support multi-schema Flyway migrations.

Changes

File(s) Change Summary
.../config/EstimateServiceConfiguration.java Added estimateTopicPattern property for Kafka topic pattern configuration.
.../consumer/DenormalizeAndEnrichEstimateConsumer.java Updated listener to use topic pattern; added tenant ID logic; enhanced MDC context and logging.
.../producer/EstimateProducer.java Modified push method to include tenant ID and resolve tenant-specific topic names.
.../repository/EstimateRepository.java Integrated multi-state utility for dynamic schema replacement in queries based on tenant ID.
.../repository/rowmapper/EstimateQueryBuilder.java Updated SQL queries to use {schema} placeholder for table names.
.../service/EstimateService.java Updated all producer calls to include tenant ID as first argument.
.../service/NotificationService.java Modified notification push logic and method signatures to include tenant ID.
.../util/EstimateServiceConstant.java Added TENANTID_MDC_STRING constant.
.../resources/application.properties Added Kafka topic pattern and central instance configuration properties.
.../resources/db/migrate.sh Enhanced migration script to support multi-schema Flyway migrations with debug output and new flags.

Sequence Diagram(s)

sequenceDiagram
    participant Kafka as Kafka Broker
    participant Consumer as DenormalizeAndEnrichEstimateConsumer
    participant Producer as EstimateProducer
    participant Repo as EstimateRepository
    participant MultiUtil as MultiStateInstanceUtil

    Kafka->>Consumer: Receive message (topic pattern)
    Consumer->>Consumer: Check topic matches configured save/update
    Consumer->>Consumer: Set tenant ID in MDC
    Consumer->>Producer: push(tenantId, topic, message)
    Producer->>MultiUtil: Resolve topic for tenant
    Producer->>Kafka: Send message to tenant-specific topic
    Consumer->>Repo: (if needed) Query with checkCentralInstance
    Repo->>MultiUtil: Replace {schema} with tenant schema
Loading

Poem

In a warren of tenants, each schema unique,
Our code now listens with pattern so sleek.
Kafka hops topics, Flyway migrates anew,
Messages routed by tenant—oh what a view!
With schemas and patterns, our system’s robust—
A rabbit’s delight, in multi-state we trust! 🐇

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between df7c5d7 and 4481220.

📒 Files selected for processing (10)
  • backend/estimates/src/main/java/org/egov/config/EstimateServiceConfiguration.java (1 hunks)
  • backend/estimates/src/main/java/org/egov/consumer/DenormalizeAndEnrichEstimateConsumer.java (3 hunks)
  • backend/estimates/src/main/java/org/egov/producer/EstimateProducer.java (1 hunks)
  • backend/estimates/src/main/java/org/egov/repository/EstimateRepository.java (4 hunks)
  • backend/estimates/src/main/java/org/egov/repository/rowmapper/EstimateQueryBuilder.java (1 hunks)
  • backend/estimates/src/main/java/org/egov/service/EstimateService.java (3 hunks)
  • backend/estimates/src/main/java/org/egov/service/NotificationService.java (3 hunks)
  • backend/estimates/src/main/java/org/egov/util/EstimateServiceConstant.java (1 hunks)
  • backend/estimates/src/main/resources/application.properties (1 hunks)
  • backend/estimates/src/main/resources/db/migrate.sh (1 hunks)
🧰 Additional context used
🪛 Shellcheck (0.10.0)
backend/estimates/src/main/resources/db/migrate.sh

[warning] 7-7: In POSIX sh, string replacement is undefined.

(SC3060)


[info] 10-10: Double quote to prevent globbing and word splitting.

(SC2086)


[info] 10-10: Double quote to prevent globbing and word splitting.

(SC2086)


[info] 10-10: Double quote to prevent globbing and word splitting.

(SC2086)


[info] 10-10: Double quote to prevent globbing and word splitting.

(SC2086)


[info] 10-10: Double quote to prevent globbing and word splitting.

(SC2086)


[info] 10-10: Double quote to prevent globbing and word splitting.

(SC2086)


[info] 10-10: Double quote to prevent globbing and word splitting.

(SC2086)

🔇 Additional comments (22)
backend/estimates/src/main/java/org/egov/util/EstimateServiceConstant.java (1)

87-87: LGTM! Well-implemented constant addition.

The constant follows proper naming conventions and aligns with the multi-tenant MDC logging pattern being introduced.

backend/estimates/src/main/java/org/egov/config/EstimateServiceConfiguration.java (1)

63-64: LGTM! Proper configuration property addition.

The new estimateTopicPattern property follows established patterns and correctly supports the tenant-aware Kafka topic handling being introduced.

backend/estimates/src/main/java/org/egov/repository/rowmapper/EstimateQueryBuilder.java (1)

30-51: LGTM! Consistent multi-tenant schema pattern implementation.

The systematic addition of {schema} placeholders to all table references enables proper tenant-specific database schema resolution. The pattern is applied consistently across both fetch and count queries.

backend/estimates/src/main/java/org/egov/service/EstimateService.java (3)

73-73: LGTM! Consistent tenant-aware message routing.

The addition of tenant ID as the first parameter to producer.push enables proper tenant-specific Kafka topic resolution.


130-130: LGTM! Consistent tenant-aware message routing.

The tenant ID parameter addition maintains consistency with the multi-tenant messaging pattern.


150-150: LGTM! Consistent tenant-aware message routing.

The tenant ID parameter is correctly propagated for the previous estimate workflow status update.

backend/estimates/src/main/resources/application.properties (3)

134-136: Central instance configuration looks correct.

The central instance properties are properly configured with:

  • state.level.tenantid.length=2 for 2-character state codes
  • is.environment.central.instance=true to enable central instance mode
  • state.level.tenant.id=od for Odisha state identification

129-129: SMS additional field configuration added.

The sms.isAdditonalFieldRequired=true property enables additional fields in SMS processing, which aligns with the notification service changes.


131-131: Verify the Kafka topic pattern regex for correctness.

The regex pattern ((^[a-zA-Z]+-)?save-estimate|(^[a-zA-Z]+-)?update-estimate) should be validated to ensure it matches the intended topic naming convention. The pattern allows optional alphabetic prefixes followed by hyphens before the base topic names.

#!/bin/bash
# Test the regex pattern against various topic names to ensure it matches correctly
echo "Testing Kafka topic pattern regex..."

# Test cases for the pattern
pattern="((^[a-zA-Z]+-)?save-estimate|(^[a-zA-Z]+-)?update-estimate)"

test_topics=(
    "save-estimate"
    "update-estimate"
    "od-save-estimate"
    "od-update-estimate"
    "punjab-save-estimate"
    "punjab-update-estimate"
    "123-save-estimate"  # Should not match (numeric prefix)
    "save-estimate-extra"  # Should not match (extra suffix)
    "other-topic"  # Should not match
)

for topic in "${test_topics[@]}"; do
    if echo "$topic" | grep -qE "^${pattern}$"; then
        echo "$topic matches"
    else
        echo "$topic does not match"
    fi
done
backend/estimates/src/main/java/org/egov/repository/EstimateRepository.java (4)

33-34: Autowired dependency injection is correct.

The MultiStateInstanceUtil is properly injected using @Autowired annotation.


78-86: Error handling in checkCentralInstance method is appropriate.

The method properly catches InvalidTenantIdException and converts it to a CustomException with a descriptive error message. The schema placeholder replacement logic is correctly implemented.


58-58: Query execution with tenant-aware schema resolution.

The getEstimate method correctly applies schema placeholder replacement before executing the query, ensuring tenant-specific database schema access.


75-75: Count query execution with tenant-aware schema resolution.

The getEstimateCount method consistently applies the same schema placeholder replacement pattern as the main query method.

backend/estimates/src/main/java/org/egov/producer/EstimateProducer.java (1)

15-16: MultiStateInstanceUtil dependency injection is correct.

The MultiStateInstanceUtil is properly injected using @Autowired annotation for tenant-aware topic routing.

backend/estimates/src/main/java/org/egov/service/NotificationService.java (4)

106-106: Tenant ID parameter correctly passed to notification method.

The checkAdditionalFieldAndPushONSmsTopic method call correctly includes the tenant ID extracted from the estimate object.


131-131: Consistent tenant ID parameter passing in approve action.

The tenant ID is consistently passed to the notification method in the approve action path, maintaining consistency with the reject action.


291-291: Method signature updated to include tenant ID parameter.

The checkAdditionalFieldAndPushONSmsTopic method signature correctly includes the tenantId parameter for tenant-aware message routing.


298-298: Producer calls updated with tenant ID for both SMS types.

Both SMS request types (with and without additional fields) correctly include the tenant ID as the first parameter when calling the producer, ensuring consistent tenant-aware message routing.

Also applies to: 303-303

backend/estimates/src/main/java/org/egov/consumer/DenormalizeAndEnrichEstimateConsumer.java (4)

31-32: State-level tenant ID configuration injection is correct.

The @Value annotation properly injects the state.level.tenant.id configuration property for use in MDC context.


58-59: Topic pattern listening replaces specific topic listening.

The change from specific topics to topic pattern listening enables multi-tenant topic consumption. The commented-out line shows the previous implementation for reference.


66-75: Topic matching logic and MDC context management.

The implementation correctly checks if the received topic matches the configured save or update topics, sets the MDC context with the tenant ID, and processes the message accordingly. The conditional logic ensures only relevant topics are processed.


71-71: Producer call updated with tenant ID from estimate.

The producer call correctly includes the tenant ID extracted from the estimate object, ensuring tenant-aware message routing for the enriched estimate.

Comment on lines +7 to +11
for schemaname in ${schemas//,/ }
do
echo "the schema name : ${baseurl}${schemasetter}${schemaname}"
flyway -url=${baseurl}${schemasetter}${schemaname} -table=$SCHEMA_TABLE -user=$FLYWAY_USER -password=$FLYWAY_PASSWORD -locations=$FLYWAY_LOCATIONS -baselineOnMigrate=true -outOfOrder=true -ignoreMissingMigrations=true migrate
done
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix shell compatibility and quoting issues.

The script has several issues that should be addressed:

  1. POSIX compatibility: The string replacement ${schemas//,/ } is bash-specific and won't work in POSIX sh
  2. Missing quotes: Multiple variables lack proper quoting which can cause word splitting and globbing issues

Apply this diff to fix the issues:

-#!/bin/sh
+#!/bin/bash
 baseurl=$DB_URL
 echo "the baseurl : $DB_URL"
 schemasetter="?currentSchema="
 schemas=$SCHEMA_NAME
 echo "the schemas : $schemas"
 for schemaname in ${schemas//,/ }
 do
     echo "the schema name : ${baseurl}${schemasetter}${schemaname}"
-    flyway -url=${baseurl}${schemasetter}${schemaname}  -table=$SCHEMA_TABLE -user=$FLYWAY_USER -password=$FLYWAY_PASSWORD -locations=$FLYWAY_LOCATIONS -baselineOnMigrate=true -outOfOrder=true -ignoreMissingMigrations=true migrate
+    flyway -url="${baseurl}${schemasetter}${schemaname}" -table="$SCHEMA_TABLE" -user="$FLYWAY_USER" -password="$FLYWAY_PASSWORD" -locations="$FLYWAY_LOCATIONS" -baselineOnMigrate=true -outOfOrder=true -ignoreMissingMigrations=true migrate
 done

Alternatively, if you must use POSIX sh, replace the string substitution with a different approach:

 #!/bin/sh
 baseurl=$DB_URL
 echo "the baseurl : $DB_URL" 
 schemasetter="?currentSchema="
-schemas=$SCHEMA_NAME
-echo "the schemas : $schemas"
-for schemaname in ${schemas//,/ }
+echo "the schemas : $SCHEMA_NAME"
+IFS=','
+for schemaname in $SCHEMA_NAME
 do
+    IFS=' '
     echo "the schema name : ${baseurl}${schemasetter}${schemaname}"
-    flyway -url=${baseurl}${schemasetter}${schemaname}  -table=$SCHEMA_TABLE -user=$FLYWAY_USER -password=$FLYWAY_PASSWORD -locations=$FLYWAY_LOCATIONS -baselineOnMigrate=true -outOfOrder=true -ignoreMissingMigrations=true migrate
+    flyway -url="${baseurl}${schemasetter}${schemaname}" -table="$SCHEMA_TABLE" -user="$FLYWAY_USER" -password="$FLYWAY_PASSWORD" -locations="$FLYWAY_LOCATIONS" -baselineOnMigrate=true -outOfOrder=true -ignoreMissingMigrations=true migrate
 done
🧰 Tools
🪛 Shellcheck (0.10.0)

[warning] 7-7: In POSIX sh, string replacement is undefined.

(SC3060)


[info] 10-10: Double quote to prevent globbing and word splitting.

(SC2086)


[info] 10-10: Double quote to prevent globbing and word splitting.

(SC2086)


[info] 10-10: Double quote to prevent globbing and word splitting.

(SC2086)


[info] 10-10: Double quote to prevent globbing and word splitting.

(SC2086)


[info] 10-10: Double quote to prevent globbing and word splitting.

(SC2086)


[info] 10-10: Double quote to prevent globbing and word splitting.

(SC2086)


[info] 10-10: Double quote to prevent globbing and word splitting.

(SC2086)

🤖 Prompt for AI Agents
In backend/estimates/src/main/resources/db/migrate.sh around lines 7 to 11, the
script uses bash-specific string replacement ${schemas//,/ } which breaks POSIX
sh compatibility, and variables are unquoted causing potential word splitting
and globbing issues. To fix, replace the string substitution with a
POSIX-compatible method such as using IFS and read or a loop to split the
schemas variable by commas, and add double quotes around all variable expansions
to prevent word splitting and globbing.

Comment on lines +23 to 27
public void push(String tenantId, String topic, Object value) {
//kafkaTemplate.send(topic, value);
String updatedTopic = centralInstanceUtil.getStateSpecificTopicName(tenantId, topic);
kafkaTemplate.send(updatedTopic, value);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Remove commented code and improve method documentation.

The method signature change to include tenantId is correct for multi-tenant support. However, the commented-out code should be removed for cleaner code.

 public void push(String tenantId, String topic, Object value) {
-    //kafkaTemplate.send(topic, value);
     String updatedTopic = centralInstanceUtil.getStateSpecificTopicName(tenantId, topic);
     kafkaTemplate.send(updatedTopic, value);
 }

Consider adding JavaDoc to document the new parameter:

+/**
+ * Pushes a message to a tenant-specific Kafka topic
+ * @param tenantId The tenant ID for state-specific topic routing
+ * @param topic The base topic name
+ * @param value The message payload
+ */
 public void push(String tenantId, String topic, Object value) {
     String updatedTopic = centralInstanceUtil.getStateSpecificTopicName(tenantId, topic);
     kafkaTemplate.send(updatedTopic, value);
 }
🤖 Prompt for AI Agents
In backend/estimates/src/main/java/org/egov/producer/EstimateProducer.java
around lines 23 to 27, remove the commented-out kafkaTemplate.send(topic, value)
line to clean up the code. Additionally, add JavaDoc comments to the push method
to describe its purpose and parameters, including the new tenantId parameter, to
improve code documentation and clarity.

EstimateRequest enrichedEstimateRequest = denormalizeAndEnrichEstimateService.denormalizeAndEnrich(estimateRequest);
producer.push(serviceConfiguration.getEnrichEstimateTopic(), enrichedEstimateRequest);

if (topic.contains(serviceConfiguration.getSaveEstimateTopic()) || topic.contains(serviceConfiguration.getUpdateEstimateTopic())){
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Consider using String.equals() for topic comparison.

The current implementation uses contains() which could potentially match unintended topics. Consider using exact string matching or more precise pattern matching for better reliability.

-if (topic.contains(serviceConfiguration.getSaveEstimateTopic()) || topic.contains(serviceConfiguration.getUpdateEstimateTopic())){
+if (topic.equals(serviceConfiguration.getSaveEstimateTopic()) || 
+    topic.equals(serviceConfiguration.getUpdateEstimateTopic()) ||
+    topic.endsWith("-" + serviceConfiguration.getSaveEstimateTopic()) ||
+    topic.endsWith("-" + serviceConfiguration.getUpdateEstimateTopic())){

This approach would be more precise for tenant-prefixed topics while avoiding false matches.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (topic.contains(serviceConfiguration.getSaveEstimateTopic()) || topic.contains(serviceConfiguration.getUpdateEstimateTopic())){
if (topic.equals(serviceConfiguration.getSaveEstimateTopic()) ||
topic.equals(serviceConfiguration.getUpdateEstimateTopic()) ||
topic.endsWith("-" + serviceConfiguration.getSaveEstimateTopic()) ||
topic.endsWith("-" + serviceConfiguration.getUpdateEstimateTopic())) {
🤖 Prompt for AI Agents
In
backend/estimates/src/main/java/org/egov/consumer/DenormalizeAndEnrichEstimateConsumer.java
at line 66, replace the use of topic.contains(...) with exact string matching
using String.equals() to avoid unintended matches. Update the condition to
compare the topic string exactly with
serviceConfiguration.getSaveEstimateTopic() and
serviceConfiguration.getUpdateEstimateTopic() using equals() or
equalsIgnoreCase() as appropriate, ensuring more precise and reliable topic
matching.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants