Skip to content

Commit c1ac206

Browse files
authored
Fix the sync issue of Abha Details in Elasticsearch (#133)
* fix: abha details sync to ES * fix: add environment variables in common properties file * fix: extend the connection timeout
1 parent 4388d59 commit c1ac206

10 files changed

Lines changed: 1105 additions & 79 deletions

File tree

E:/uat_new/wildfly-30.0.0.Final/wildfly-30.0.0.Final/Logs/fhir-api.log.json

Lines changed: 615 additions & 0 deletions
Large diffs are not rendered by default.

pom.xml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,30 @@
5353
</properties>
5454

5555
<dependencies>
56+
<dependency>
57+
<groupId>co.elastic.logging</groupId>
58+
<artifactId>logback-ecs-encoder</artifactId>
59+
<version>1.3.2</version>
60+
</dependency>
61+
<dependency>
62+
<groupId>org.springframework.boot</groupId>
63+
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
64+
</dependency>
65+
<dependency>
66+
<groupId>co.elastic.clients</groupId>
67+
<artifactId>elasticsearch-java</artifactId>
68+
<version>8.11.0</version>
69+
</dependency>
70+
71+
<dependency>
72+
<groupId>org.springframework.boot</groupId>
73+
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
74+
</dependency>
75+
<dependency>
76+
<groupId>co.elastic.clients</groupId>
77+
<artifactId>elasticsearch-java</artifactId>
78+
<version>8.11.0</version>
79+
</dependency>
5680

5781
<dependency>
5882
<groupId>org.springframework.boot</groupId>

src/main/environment/common_ci.properties

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,3 +126,13 @@ spring.redis.host=@env.REDIS_HOST@
126126
cors.allowed-origins=@env.CORS_ALLOWED_ORIGINS@
127127

128128
hipSystemUrl= @env.HIP_SYSTEM_URL@
129+
130+
# Elasticsearch Configuration
131+
elasticsearch.host=@env.ELASTICSEARCH_HOST@
132+
elasticsearch.port=@env.ELASTICSEARCH_PORT@
133+
elasticsearch.username=@env.ELASTICSEARCH_USERNAME@
134+
elasticsearch.password=@env.ELASTICSEARCH_PASSWORD@
135+
elasticsearch.index.beneficiary=@env.ELASTICSEARCH_INDEX_BENEFICIARY@
136+
137+
# Enable/Disable ES (for gradual rollout)
138+
elasticsearch.enabled=@env.ELASTICSEARCH_ENABLED@

src/main/environment/common_docker.properties

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,3 +125,14 @@ spring.redis.host=${REDIS_HOST}
125125

126126

127127
hipSystemUrl= ${HIP_SYSTEM_URL}
128+
129+
# Elasticsearch Configuration
130+
elasticsearch.host=${ELASTICSEARCH_HOST}
131+
elasticsearch.port=${ELASTICSEARCH_PORT}
132+
elasticsearch.username=${ELASTICSEARCH_USERNAME}
133+
elasticsearch.password=${ELASTICSEARCH_PASSWORD}
134+
elasticsearch.index.beneficiary=${ELASTICSEARCH_INDEX_BENEFICIARY}
135+
136+
# Enable/Disable ES (for gradual rollout)
137+
elasticsearch.enabled=${ELASTICSEARCH_ENABLED}
138+

src/main/environment/common_example.properties

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,4 +119,15 @@ logging.file.name=logs/fhir-api.log
119119

120120
cors.allowed-origins=http://localhost:*
121121

122-
hipSystemUrl= <Enter HIP request URL>
122+
hipSystemUrl= <Enter HIP request URL>
123+
124+
# Elasticsearch Configuration
125+
elasticsearch.host=localhost
126+
elasticsearch.port=9200
127+
elasticsearch.username=elastic
128+
elasticsearch.password=piramalES
129+
elasticsearch.index.beneficiary=beneficiary_index
130+
131+
# Enable/Disable ES (for gradual rollout)
132+
elasticsearch.enabled=true
133+

src/main/java/com/wipro/fhir/FhirApiApplication.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@
3030
import org.springframework.data.redis.core.RedisTemplate;
3131
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
3232
import org.springframework.data.redis.serializer.StringRedisSerializer;
33+
import org.springframework.scheduling.annotation.EnableAsync;
3334

3435
import com.wipro.fhir.data.users.User;
3536

3637
@SpringBootApplication
38+
@EnableAsync
3739
public class FhirApiApplication {
3840

3941
public static void main(String[] args) {
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package com.wipro.fhir.config;
2+
3+
import co.elastic.clients.elasticsearch.ElasticsearchClient;
4+
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
5+
import co.elastic.clients.transport.ElasticsearchTransport;
6+
import co.elastic.clients.transport.rest_client.RestClientTransport;
7+
import org.apache.http.HttpHost;
8+
import org.apache.http.auth.AuthScope;
9+
import org.apache.http.auth.UsernamePasswordCredentials;
10+
import org.apache.http.impl.client.BasicCredentialsProvider;
11+
import org.apache.http.impl.nio.reactor.IOReactorConfig;
12+
import org.elasticsearch.client.RestClient;
13+
import org.elasticsearch.client.RestClientBuilder;
14+
import org.springframework.beans.factory.annotation.Value;
15+
import org.springframework.context.annotation.Bean;
16+
import org.springframework.context.annotation.Configuration;
17+
import org.springframework.scheduling.annotation.EnableAsync;
18+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
19+
20+
import java.util.concurrent.Executor;
21+
22+
@Configuration
23+
@EnableAsync
24+
public class ElasticsearchConfig {
25+
26+
@Value("${elasticsearch.host}")
27+
private String esHost;
28+
29+
@Value("${elasticsearch.port}")
30+
private int esPort;
31+
32+
@Value("${elasticsearch.username}")
33+
private String esUsername;
34+
35+
@Value("${elasticsearch.password}")
36+
private String esPassword;
37+
38+
@Value("${elasticsearch.connection.timeout:10000}")
39+
private int connectionTimeout;
40+
41+
@Value("${elasticsearch.socket.timeout:120000}")
42+
private int socketTimeout;
43+
44+
@Value("${elasticsearch.max.connections:200}")
45+
private int maxConnections;
46+
47+
@Value("${elasticsearch.max.connections.per.route:100}")
48+
private int maxConnectionsPerRoute;
49+
50+
@Bean
51+
public ElasticsearchClient elasticsearchClient() {
52+
BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
53+
credentialsProvider.setCredentials(
54+
AuthScope.ANY,
55+
new UsernamePasswordCredentials(esUsername, esPassword)
56+
);
57+
58+
RestClientBuilder builder = RestClient.builder(
59+
new HttpHost(esHost, esPort, "http")
60+
);
61+
62+
// Apply timeout configurations
63+
builder.setRequestConfigCallback(requestConfigBuilder ->
64+
requestConfigBuilder
65+
.setConnectTimeout(connectionTimeout)
66+
.setSocketTimeout(socketTimeout)
67+
.setConnectionRequestTimeout(connectionTimeout)
68+
);
69+
70+
// Apply connection pool settings
71+
builder.setHttpClientConfigCallback(httpClientBuilder ->
72+
httpClientBuilder
73+
.setDefaultCredentialsProvider(credentialsProvider)
74+
.setMaxConnTotal(maxConnections)
75+
.setMaxConnPerRoute(maxConnectionsPerRoute)
76+
.setDefaultIOReactorConfig(
77+
IOReactorConfig.custom()
78+
.setSoTimeout(socketTimeout)
79+
.build()
80+
)
81+
);
82+
83+
RestClient restClient = builder.build();
84+
85+
ElasticsearchTransport transport = new RestClientTransport(
86+
restClient,
87+
new JacksonJsonpMapper()
88+
);
89+
90+
return new ElasticsearchClient(transport);
91+
}
92+
93+
@Bean(name = "esAsyncExecutor")
94+
public Executor asyncExecutor() {
95+
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
96+
executor.setCorePoolSize(5);
97+
executor.setMaxPoolSize(20);
98+
executor.setQueueCapacity(500);
99+
executor.setThreadNamePrefix("es-sync-");
100+
executor.setRejectedExecutionHandler(new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy());
101+
executor.initialize();
102+
return executor;
103+
}
104+
}
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
package com.wipro.fhir.service.elasticsearch;
2+
3+
import co.elastic.clients.elasticsearch.ElasticsearchClient;
4+
import co.elastic.clients.elasticsearch._types.Refresh;
5+
import co.elastic.clients.elasticsearch.core.GetRequest;
6+
import co.elastic.clients.elasticsearch.core.GetResponse;
7+
import co.elastic.clients.elasticsearch.core.UpdateRequest;
8+
import com.fasterxml.jackson.databind.ObjectMapper;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
import org.springframework.beans.factory.annotation.Autowired;
12+
import org.springframework.beans.factory.annotation.Value;
13+
import org.springframework.scheduling.annotation.Async;
14+
import org.springframework.stereotype.Service;
15+
16+
import java.util.HashMap;
17+
import java.util.Map;
18+
19+
/**
20+
* Lightweight ES sync service for FHIR-API
21+
* Only updates ABHA-related fields without fetching full beneficiary data
22+
*/
23+
@Service
24+
public class AbhaElasticsearchSyncService {
25+
26+
private static final Logger logger = LoggerFactory.getLogger(AbhaElasticsearchSyncService.class);
27+
28+
@Autowired
29+
private ElasticsearchClient esClient;
30+
31+
@Value("${elasticsearch.index.beneficiary}")
32+
private String beneficiaryIndex;
33+
34+
@Value("${elasticsearch.enabled}")
35+
private boolean esEnabled;
36+
37+
private final ObjectMapper objectMapper = new ObjectMapper();
38+
39+
/**
40+
* Update ABHA details in Elasticsearch after ABHA is created/updated
41+
* This method updates only ABHA fields, doesn't require full beneficiary data
42+
*/
43+
@Async("esAsyncExecutor")
44+
public void updateAbhaInElasticsearch(Long benRegId, String healthId, String healthIdNumber, String createdDate) {
45+
if (!esEnabled) {
46+
logger.debug("Elasticsearch is disabled, skipping ABHA sync");
47+
return;
48+
}
49+
50+
if (benRegId == null) {
51+
logger.warn("benRegId is null, cannot sync ABHA to ES");
52+
return;
53+
}
54+
55+
int maxRetries = 3;
56+
int retryDelay = 2000; // 2 seconds
57+
58+
for (int attempt = 1; attempt <= maxRetries; attempt++) {
59+
try {
60+
logger.info("Syncing ABHA details to ES for benRegId: {} (attempt {}/{})", benRegId, attempt, maxRetries);
61+
62+
Map<String, Object> abhaData = new HashMap<>();
63+
abhaData.put("healthID", healthId);
64+
abhaData.put("abhaID", healthIdNumber);
65+
abhaData.put("abhaCreatedDate", createdDate);
66+
67+
String documentId = String.valueOf(benRegId);
68+
boolean exists = checkDocumentExists(documentId);
69+
70+
if (exists) {
71+
UpdateRequest<Object, Object> updateRequest = UpdateRequest.of(u -> u
72+
.index(beneficiaryIndex)
73+
.id(documentId)
74+
.doc(abhaData)
75+
.refresh(Refresh.True)
76+
.docAsUpsert(false)
77+
.retryOnConflict(3)
78+
);
79+
80+
esClient.update(updateRequest, Object.class);
81+
logger.info("Successfully updated ABHA in ES: benRegId={}", benRegId);
82+
return;
83+
84+
} else {
85+
logger.warn("Document not found in ES for benRegId={} (attempt {}/{})", benRegId, attempt, maxRetries);
86+
if (attempt < maxRetries) {
87+
Thread.sleep(retryDelay * attempt);
88+
}
89+
}
90+
91+
} catch (java.net.SocketTimeoutException e) {
92+
logger.error("Timeout updating ABHA in ES for benRegId {} (attempt {}/{}): {}",
93+
benRegId, attempt, maxRetries, e.getMessage());
94+
if (attempt < maxRetries) {
95+
try {
96+
Thread.sleep(retryDelay * attempt);
97+
} catch (InterruptedException ie) {
98+
Thread.currentThread().interrupt();
99+
return;
100+
}
101+
}
102+
} catch (Exception e) {
103+
logger.error("Error updating ABHA in ES for benRegId {} (attempt {}/{}): {}",
104+
benRegId, attempt, maxRetries, e.getMessage());
105+
if (attempt == maxRetries) {
106+
logger.error("Failed to sync ABHA after {} attempts for benRegId {}", maxRetries, benRegId);
107+
}
108+
}
109+
}
110+
}
111+
112+
/**
113+
* Check if document exists in ES
114+
*/
115+
private boolean checkDocumentExists(String documentId) {
116+
try {
117+
GetRequest getRequest = GetRequest.of(g -> g
118+
.index(beneficiaryIndex)
119+
.id(documentId)
120+
);
121+
122+
GetResponse<Object> response = esClient.get(getRequest, Object.class);
123+
return response.found();
124+
125+
} catch (Exception e) {
126+
logger.debug("Document not found or error checking: {}", e.getMessage());
127+
return false;
128+
}
129+
}
130+
131+
/**
132+
* Retry sync after 5 seconds if document wasn't found
133+
* (Handles race condition where ABHA is saved before beneficiary is synced to ES)
134+
*/
135+
@Async("esAsyncExecutor")
136+
private void retryAfterDelay(Long benRegId, String healthId, String healthIdNumber, String createdDate) {
137+
try {
138+
Thread.sleep(5000); // Wait 5 seconds
139+
logger.info("Retrying ABHA sync for benRegId: {}", benRegId);
140+
updateAbhaInElasticsearch(benRegId, healthId, healthIdNumber, createdDate);
141+
} catch (InterruptedException e) {
142+
Thread.currentThread().interrupt();
143+
logger.error("Retry interrupted for benRegId: {}", benRegId);
144+
}
145+
}
146+
147+
/**
148+
* Update multiple ABHA addresses (comma-separated)
149+
*/
150+
@Async("esAsyncExecutor")
151+
public void updateMultipleAbhaAddresses(Long benRegId, String commaSeparatedHealthIds,
152+
String healthIdNumber, String createdDate) {
153+
if (!esEnabled || benRegId == null) {
154+
return;
155+
}
156+
157+
// For multiple ABHA addresses, store as comma-separated string
158+
updateAbhaInElasticsearch(benRegId, commaSeparatedHealthIds, healthIdNumber, createdDate);
159+
}
160+
}

0 commit comments

Comments
 (0)