diff --git a/taxonomy-api/taxonomy-actors/src/main/scala/org/sunbird/actors/LockActor.scala b/taxonomy-api/taxonomy-actors/src/main/scala/org/sunbird/actors/LockActor.scala index 4f36f002c..981a19c25 100644 --- a/taxonomy-api/taxonomy-actors/src/main/scala/org/sunbird/actors/LockActor.scala +++ b/taxonomy-api/taxonomy-actors/src/main/scala/org/sunbird/actors/LockActor.scala @@ -17,7 +17,6 @@ import org.sunbird.utils.Constants import java.text.SimpleDateFormat import java.util.concurrent.CompletionException import javax.inject.Inject -import scala.collection.immutable.{List, Map} import scala.concurrent.{ExecutionContext, Future} class LockActor @Inject()(implicit oec: OntologyEngineContext) extends BaseActor{ @@ -36,94 +35,144 @@ class LockActor @Inject()(implicit oec: OntologyEngineContext) extends BaseActor } } + // ─── Private helpers ──────────────────────────────────────────────────────── + + /** Throws ClientException if the X-device-Id header is absent from the request. */ + private def validateDeviceHeader(request: Request): Unit = + if (!request.getRequest.containsKey(Constants.X_DEVICE_ID)) + throw new ClientException("ERR_LOCK_CREATION_FAILED", "X-device-Id is missing in headers") + + /** Builds the standard lock-OK response with lockKey, expiresAt, expiresIn. */ + private def lockOkResponse(lockId: Any, expiresAt: Date): Response = + ResponseHandler.OK + .put("lockKey", lockId) + .put("expiresAt", formatExpiryDate(expiresAt)) + .put("expiresIn", defaultLockExpiryTime / 60) + + /** + * Sets identifier on the request then reads external props from Cassandra. + */ + private def readLockExternalProps(request: Request, resourceId: String): Future[Response] = { + request.put("identifier", resourceId) + val externalProps = DefinitionNode.getExternalProps( + request.getContext.get("graph_id").asInstanceOf[String], + request.getContext.get("version").asInstanceOf[String], + request.getContext.get("schemaName").asInstanceOf[String]) + oec.graphService.readExternalProps(request, externalProps) + } + + // ─── Operations ───────────────────────────────────────────────────────────── + @throws[Exception] private def create(request: Request) = { val newDateObj = createExpiryTime() val resourceId = request.getRequest.getOrDefault(Constants.RESOURCE_ID, "").asInstanceOf[String] - if(!request.getRequest.containsKey(Constants.X_DEVICE_ID)) throw new ClientException("ERR_LOCK_CREATION_FAILED", "X-device-Id is missing in headers") - if(!request.getRequest.containsKey(Constants.X_AUTHENTICATED_USER_ID)) throw new ClientException("ERR_LOCK_CREATION_FAILED", "You are not authorized to lock this resource") - if(request.getRequest.isEmpty) throw new ClientException("ERR_LOCK_CREATION_FIELDS_MISSING","Error due to required request is missing") - val schemaValidator = SchemaValidatorFactory.getInstance(request.getContext.get("schemaName").asInstanceOf[String],request.getContext.get("version").asInstanceOf[String]) + validateDeviceHeader(request) + if (!request.getRequest.containsKey(Constants.X_AUTHENTICATED_USER_ID)) + throw new ClientException("ERR_LOCK_CREATION_FAILED", "You are not authorized to lock this resource") + if (request.getRequest.isEmpty) + throw new ClientException("ERR_LOCK_CREATION_FIELDS_MISSING", "Error due to required request is missing") + val schemaValidator = SchemaValidatorFactory.getInstance( + request.getContext.get("schemaName").asInstanceOf[String], + request.getContext.get("version").asInstanceOf[String]) schemaValidator.validate(request.getRequest) - validateResource(request).flatMap( res =>{ - val versionKey: String = res.getMetadata.getOrDefault("versionKey","").asInstanceOf[String] - val channel: String = res.getMetadata.getOrDefault("channel","").asInstanceOf[String] - request.put("identifier", resourceId) - val externalProps = DefinitionNode.getExternalProps(request.getContext.get("graph_id").asInstanceOf[String], request.getContext.get("version").asInstanceOf[String], request.getContext.get("schemaName").asInstanceOf[String]) - oec.graphService.readExternalProps(request, externalProps).map(response => { - if (!ResponseHandler.checkError(response)) { - if(request.getRequest.get("userId") == response.getResult.asScala.toMap.getOrElse("createdby", "") && - request.getRequest.get("deviceId") == response.getResult.asScala.toMap.getOrElse("deviceid", "") && - request.getRequest.get("resourceType") == response.getResult.asScala.toMap.getOrElse("resourcetype", "")){ - Future { - ResponseHandler.OK.put("lockKey", response.getResult.asScala.toMap.getOrElse("lockid", "")).put("expiresAt", formatExpiryDate(response.getResult.asScala.toMap.getOrElse("expiresat", "").asInstanceOf[Date])).put("expiresIn", defaultLockExpiryTime / 60) - } - } - else if (request.getRequest.get("userId") == response.getResult.asScala.toMap.getOrElse("createdby", "")) - throw new ClientException("RESOURCE_SELF_LOCKED", "Error due to self lock , Resource already locked by user ") - else { - val creatorInfoStr = response.getResult.get("creatorinfo").asInstanceOf[String] - val creatorInfoMap = JsonUtils.convertJSONString(creatorInfoStr).asInstanceOf[java.util.Map[String, String]] - val userName = Option(creatorInfoMap.get("name")).getOrElse("another user") - throw new ClientException("RESOURCE_LOCKED", s"The resource is already locked by $userName ") - } - } - else { - val lockId = UUID.randomUUID() - request.getRequest.remove("resourceId") - request.getRequest.remove("userId") - request.put("lockid", lockId) - request.put("expiresat", newDateObj) - request.put("createdon", new Timestamp(new Date().getTime)) - oec.graphService.saveExternalPropsWithTtl(request, defaultLockExpiryTime).flatMap { response => - if (ResponseHandler.checkError(response)) { - throw new ServerException("ERR_WHILE_SAVING_TO_CASSANDRA", "Error while saving external props to Cassandra") - } - else { - updateContent(request, channel, res.getGraphId, res.getObjectType, resourceId, versionKey, lockId) - Future { - ResponseHandler.OK.put("lockKey", lockId).put("expiresAt", formatExpiryDate(newDateObj)).put("expiresIn", defaultLockExpiryTime / 60) - } - } - } - } - }).flatMap( f => f).recoverWith { case e: CompletionException => throw e.getCause } + + validateResource(request).flatMap(res => { + val versionKey = res.getMetadata.getOrDefault("versionKey", "").asInstanceOf[String] + val channel = res.getMetadata.getOrDefault("channel", "").asInstanceOf[String] + readLockExternalProps(request, resourceId).map(response => { + if (!ResponseHandler.checkError(response)) + handleExistingLock(request, response, resourceId, versionKey, channel, res, newDateObj) + else + createNewLock(request, resourceId, versionKey, channel, res, newDateObj) + }).flatMap(f => f).recoverWith { case e: CompletionException => throw e.getCause } }) } + /** Handles the case where a lock record already exists in Cassandra. */ + private def handleExistingLock( + request: Request, response: Response, + resourceId: String, versionKey: String, channel: String, + res: org.sunbird.graph.dac.model.Node, + newDateObj: Date): Future[Response] = { + val existing = response.getResult.asScala.toMap + val sameUser = request.getRequest.get("userId") == existing.getOrElse("createdby", "") + val sameDevice = request.getRequest.get("deviceId") == existing.getOrElse("deviceid", "") + val sameType = request.getRequest.get("resourceType") == existing.getOrElse("resourcetype","") + if (sameUser && sameDevice && sameType) { + Future(lockOkResponse( + existing.getOrElse("lockid", ""), + existing.getOrElse("expiresat", "").asInstanceOf[Date])) + } else if (sameUser) { + throw new ClientException("RESOURCE_SELF_LOCKED", "Error due to self lock , Resource already locked by user ") + } else { + val creatorInfoStr = response.getResult.get("creatorinfo").asInstanceOf[String] + val creatorInfoMap = JsonUtils.convertJSONString(creatorInfoStr).asInstanceOf[java.util.Map[String, String]] + val userName = Option(creatorInfoMap.get("name")).getOrElse("another user") + throw new ClientException("RESOURCE_LOCKED", s"The resource is already locked by $userName ") + } + } + + /** Persists a fresh lock entry to Cassandra then updates the content node. */ + private def createNewLock( + request: Request, + resourceId: String, versionKey: String, channel: String, + res: org.sunbird.graph.dac.model.Node, + newDateObj: Date): Future[Response] = { + val lockId = UUID.randomUUID() + request.getRequest.remove("resourceId") + request.getRequest.remove("userId") + request.put("lockid", lockId) + request.put("expiresat", newDateObj) + request.put("createdon", new Timestamp(new Date().getTime)) + oec.graphService.saveExternalPropsWithTtl(request, defaultLockExpiryTime).flatMap { response => + if (ResponseHandler.checkError(response)) + throw new ServerException("ERR_WHILE_SAVING_TO_CASSANDRA", "Error while saving external props to Cassandra") + else { + updateContent(request, channel, res.getGraphId, res.getObjectType, resourceId, versionKey, lockId) + Future(lockOkResponse(lockId, newDateObj)) + } + } + } + @throws[Exception] private def refresh(request: Request) = { val newDateObj = createExpiryTime() val resourceId = request.getRequest.getOrDefault(Constants.RESOURCE_ID, "").asInstanceOf[String] - val userId = request.getRequest.getOrDefault(Constants.X_AUTHENTICATED_USER_ID, "").asInstanceOf[String] - if(!request.getRequest.containsKey(Constants.X_DEVICE_ID)) throw new ClientException("ERR_LOCK_CREATION_FAILED", "X-device-Id is missing in headers") - if (!(request.getRequest.containsKey("resourceType") && request.getRequest.containsKey("resourceId") && - request.getRequest.containsKey("lockId"))) throw new ClientException("ERR_LOCK_CREATION_FIELDS_MISSING", "Error due to required request is missing") - validateResource(request).flatMap( res =>{ + val userId = request.getRequest.getOrDefault(Constants.X_AUTHENTICATED_USER_ID, "").asInstanceOf[String] + validateDeviceHeader(request) + if (!(request.getRequest.containsKey("resourceType") && + request.getRequest.containsKey("resourceId") && + request.getRequest.containsKey("lockId"))) + throw new ClientException("ERR_LOCK_CREATION_FIELDS_MISSING", "Error due to required request is missing") + + validateResource(request).flatMap(res => { val contentLockId = res.getMetadata.get("lockKey") - if(request.getRequest.getOrDefault("lockId", " ") != contentLockId) + if (request.getRequest.getOrDefault("lockId", " ") != contentLockId) throw new ClientException("RESOURCE_LOCKED", "Lock key and request lock key does not match") - request.put("identifier", resourceId) - val externalProps = DefinitionNode.getExternalProps(request.getContext.get("graph_id").asInstanceOf[String], request.getContext.get("version").asInstanceOf[String], request.getContext.get("schemaName").asInstanceOf[String]) - // TODO : update proper username - val creatorInfo = JsonUtils.serialize(Map("name" -> "username", "id" -> userId).asJava) - val resourceInfo = JsonUtils.serialize(Map("contentType" -> res.getMetadata.getOrDefault("contentType", ""), "framework" -> res.getMetadata.getOrDefault("framework", ""), - "identifier" -> resourceId, "mimeType" -> res.getMetadata.getOrDefault("mimeType", "")).asJava) - oec.graphService.readExternalProps(request, externalProps).map(response => { + // TODO: update proper username + val creatorInfo = JsonUtils.serialize(Map("name" -> "username", "id" -> userId).asJava) + val resourceInfo = JsonUtils.serialize(Map( + "contentType" -> res.getMetadata.getOrDefault("contentType", ""), + "framework" -> res.getMetadata.getOrDefault("framework", ""), + "identifier" -> resourceId, + "mimeType" -> res.getMetadata.getOrDefault("mimeType", "")).asJava) + + readLockExternalProps(request, resourceId).map(response => { if (ResponseHandler.checkError(response)) { - val createLockReq = request - createLockReq.put("resourceInfo", resourceInfo) - createLockReq.put("creatorInfo", creatorInfo) - createLockReq.put("createdBy", userId) if (contentLockId == request.getRequest.getOrDefault("lockId", "")) { + val createLockReq = request + createLockReq.put("resourceInfo", resourceInfo) + createLockReq.put("creatorInfo", creatorInfo) + createLockReq.put("createdBy", userId) createLockReq.getRequest.remove("lockId") create(createLockReq) - } - else throw new ClientException("ERR_LOCK_REFRESHING_FAILED", "no data found from db for refreshing lock") - } - else { - val lockId = response.getResult.asScala.toMap.getOrElse("lockid", "") - val createdBy = response.getResult.asScala.toMap.getOrElse("createdby", "") + } else + throw new ClientException("ERR_LOCK_REFRESHING_FAILED", "no data found from db for refreshing lock") + } else { + val existing = response.getResult.asScala.toMap + val lockId = existing.getOrElse("lockid", "") + val createdBy = existing.getOrElse("createdby", "") if (createdBy != userId) throw new ClientException("ERR_LOCK_REFRESHING_FAILED", "Unauthorized to refresh this lock") request.put("fields", List("expiresat")) @@ -131,91 +180,83 @@ class LockActor @Inject()(implicit oec: OntologyEngineContext) extends BaseActor oec.graphService.updateExternalPropsWithTtl(request, defaultLockExpiryTime).flatMap { response => if (ResponseHandler.checkError(response)) throw new ServerException("ERR_WHILE_UPDAING_TO_CASSANDRA", "Error while updating external props to Cassandra") - else { - Future { - ResponseHandler.OK.put("lockKey", lockId).put("expiresAt", formatExpiryDate(newDateObj)).put("expiresIn", defaultLockExpiryTime / 60) - } - } + else + Future(lockOkResponse(lockId, newDateObj)) } } - }).flatMap( f => f).recoverWith { case e: CompletionException => throw e.getCause } + }).flatMap(f => f).recoverWith { case e: CompletionException => throw e.getCause } }).recoverWith { case e: CompletionException => throw e.getCause } } @throws[Exception] private def retire(request: Request) = { val resourceId = request.getRequest.getOrDefault(Constants.RESOURCE_ID, "").asInstanceOf[String] - val userId = request.getRequest.getOrDefault(Constants.X_AUTHENTICATED_USER_ID, "").asInstanceOf[String] - if(!request.getRequest.containsKey(Constants.X_DEVICE_ID)) throw new ClientException("ERR_LOCK_CREATION_FAILED", "X-device-Id is missing in headers") - if(!(request.getRequest.containsKey("resourceType") && request.getRequest.containsKey("resourceId"))) throw new ClientException("ERR_LOCK_CREATION_FIELDS_MISSING","Error due to required request is missing") - validateResource(request).flatMap( res =>{ - request.put("identifier", resourceId) - val externalProps = DefinitionNode.getExternalProps(request.getContext.get("graph_id").asInstanceOf[String], request.getContext.get("version").asInstanceOf[String], request.getContext.get("schemaName").asInstanceOf[String]) - oec.graphService.readExternalProps(request, externalProps).map(response => { + val userId = request.getRequest.getOrDefault(Constants.X_AUTHENTICATED_USER_ID, "").asInstanceOf[String] + validateDeviceHeader(request) + if (!(request.getRequest.containsKey("resourceType") && request.getRequest.containsKey("resourceId"))) + throw new ClientException("ERR_LOCK_CREATION_FIELDS_MISSING", "Error due to required request is missing") + + validateResource(request).flatMap(res => { + readLockExternalProps(request, resourceId).map(response => { if (!ResponseHandler.checkError(response)) { val createdBy = response.getResult.asScala.toMap.getOrElse("createdby", "") if (createdBy != userId) throw new ClientException("ERR_LOCK_RETIRING_FAILED", "Unauthorized to retire lock") request.put("identifiers", List(resourceId)) - oec.graphService.deleteExternalProps(request) flatMap { response => + oec.graphService.deleteExternalProps(request).flatMap { response => if (ResponseHandler.checkError(response)) throw new ServerException("ERR_WHILE_DELETING_FROM_CASSANDRA", "Error while deleting external props from Cassandra") - else { - Future { - ResponseHandler.OK() - } - } + else + Future(ResponseHandler.OK()) } - } - else throw new ClientException("ERR_LOCK_RETIRING_FAILED","no data found from db for retiring lock") - }).flatMap( f => f).recoverWith { case e: CompletionException => throw e.getCause } + } else + throw new ClientException("ERR_LOCK_RETIRING_FAILED", "no data found from db for retiring lock") + }).flatMap(f => f).recoverWith { case e: CompletionException => throw e.getCause } }).recoverWith { case e: CompletionException => throw e.getCause } } @throws[Exception] private def list(request: Request) = { - val filters = request.getRequest.getOrDefault("filters", new util.HashMap[String, AnyRef]()).asInstanceOf[java.util.Map[String, AnyRef]] + val filters = request.getRequest.getOrDefault("filters", new util.HashMap[String, AnyRef]()).asInstanceOf[java.util.Map[String, AnyRef]] val resourceId = filters.getOrDefault("resourceId", "") - if (!request.getRequest.containsKey(Constants.X_DEVICE_ID)) - throw new ClientException("ERR_LOCK_CREATION_FAILED", "X-device-Id is missing in headers") - if (resourceId.isInstanceOf[String]) { + validateDeviceHeader(request) + if (resourceId.isInstanceOf[String]) request.getRequest.put("identifier", resourceId) - } - else { - request.getRequest.put("identifiers", resourceId.asInstanceOf[java.util.List[String]].asScala.toList) - } - val externalProps = DefinitionNode.getExternalProps(request.getContext.get("graph_id").asInstanceOf[String], request.getContext.get("version").asInstanceOf[String], request.getContext.get("schemaName").asInstanceOf[String]) + else + request.getRequest.put("identifiers", resourceId.asInstanceOf[java.util.List[String]].asScala.toList) + val externalProps = DefinitionNode.getExternalProps( + request.getContext.get("graph_id").asInstanceOf[String], + request.getContext.get("version").asInstanceOf[String], + request.getContext.get("schemaName").asInstanceOf[String]) oec.graphService.readExternalProps(request, externalProps).map(response => { if (!ResponseHandler.checkError(response)) { Future { val formattedLockDataList = response.getResult.values().asScala.map { lockData => val lockDataMap = lockData.asInstanceOf[java.util.Map[String, AnyRef]].asScala.toMap - val updatedLockDataMap = lockDataMap.updated("createdon", formatExpiryDate(lockDataMap("createdon").asInstanceOf[Date])) + val updatedLockDataMap = lockDataMap + .updated("createdon", formatExpiryDate(lockDataMap("createdon").asInstanceOf[Date])) .updated("expiresat", formatExpiryDate(lockDataMap("expiresat").asInstanceOf[Date])) updatedLockDataMap.asJava }.toList.asJava ResponseHandler.OK.put("count", response.getResult.size()).put("data", formattedLockDataList) } - } - else throw new ClientException("ERR_LOCK_LISTING_FAILED","error while fetching lock list data from db") - }).flatMap( f => f).recoverWith { case e: CompletionException => throw e.getCause } + } else + throw new ClientException("ERR_LOCK_LISTING_FAILED", "error while fetching lock list data from db") + }).flatMap(f => f).recoverWith { case e: CompletionException => throw e.getCause } } - def updateContent(request : Request, channel: String, graphId: String, objectType: String, resourceId: String, versionKey: String, lockId: UUID)(implicit oec: OntologyEngineContext, ec: ExecutionContext) = { + def updateContent(request: Request, channel: String, graphId: String, objectType: String, + resourceId: String, versionKey: String, lockId: UUID)(implicit oec: OntologyEngineContext, ec: ExecutionContext) = { val contentUpdateReq = new Request() - contentUpdateReq.setContext(new util.HashMap[String, AnyRef]() { - { - putAll(request.getContext) - } - }) - contentUpdateReq.getContext.put("graph_id", graphId) + contentUpdateReq.setContext(new util.HashMap[String, AnyRef]() {{ putAll(request.getContext) }}) + contentUpdateReq.getContext.put("graph_id", graphId) contentUpdateReq.getContext.put("objectType", objectType) - contentUpdateReq.getContext.put("channel", channel) + contentUpdateReq.getContext.put("channel", channel) contentUpdateReq.getContext.put(Constants.SCHEMA_NAME, objectType.toLowerCase) - contentUpdateReq.getContext.put(Constants.VERSION, version) - contentUpdateReq.getContext.put(Constants.IDENTIFIER, resourceId) + contentUpdateReq.getContext.put(Constants.VERSION, version) + contentUpdateReq.getContext.put(Constants.IDENTIFIER, resourceId) contentUpdateReq.put("versionKey", versionKey) - contentUpdateReq.put("lockKey", lockId.toString) + contentUpdateReq.put("lockKey", lockId.toString) DataNode.update(contentUpdateReq).recoverWith { case e: CompletionException => throw e.getCause } } @@ -223,22 +264,17 @@ class LockActor @Inject()(implicit oec: OntologyEngineContext) extends BaseActor val resourceId = request.getRequest.getOrDefault(Constants.RESOURCE_ID, "").asInstanceOf[String] if (resourceId.isEmpty) throw new ClientException("ERR_INVALID_RESOURCE_ID", s"Invalid resourceId: '${resourceId}' ") val getRequest = new Request() - getRequest.setContext(new util.HashMap[String, AnyRef]() { - { - putAll(request.getContext) - } - }) - getRequest.getContext.put(Constants.SCHEMA_NAME, Constants.LOCK_SCHEMA_NAME) - getRequest.getContext.put(Constants.VERSION, Constants.LOCK_SCHEMA_VERSION) + getRequest.setContext(new util.HashMap[String, AnyRef]() {{ putAll(request.getContext) }}) + getRequest.getContext.put(Constants.SCHEMA_NAME, Constants.LOCK_SCHEMA_NAME) + getRequest.getContext.put(Constants.VERSION, Constants.LOCK_SCHEMA_VERSION) getRequest.put(Constants.IDENTIFIER, resourceId) DataNode.read(getRequest)(oec, ec).map(node => { if (null != node && StringUtils.equalsAnyIgnoreCase(node.getIdentifier, resourceId)) node - else - throw new ClientException("ERR_RESOURCE_NOT_FOUND", "Error as resource validation failed ") + else throw new ClientException("ERR_RESOURCE_NOT_FOUND", "Error as resource validation failed ") })(ec) } - def createExpiryTime():Date = { + def createExpiryTime(): Date = { val expiryTimeMillis = System.currentTimeMillis() + (defaultLockExpiryTime * 1000) new Date(expiryTimeMillis) } @@ -248,6 +284,4 @@ class LockActor @Inject()(implicit oec: OntologyEngineContext) extends BaseActor dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")) dateFormat.format(date) } - - }