Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,141 +37,153 @@ class FrameworkActor @Inject()(implicit oec: OntologyEngineContext) extends Base
}
}

// ─── Private helpers ─────────────────────────────────────────────────────────

/**
* Builds a Request scoped to the channel schema, reads the channel node,
* and returns it. Used by both create() and publish().
*/
private def readChannelNode(base: Request, channelId: String): Future[Node] = {
val req = new Request()
req.setContext(new util.HashMap[String, AnyRef]() {{ putAll(base.getContext) }})
req.getContext.put(Constants.SCHEMA_NAME, Constants.CHANNEL_SCHEMA_NAME)
req.getContext.put(Constants.VERSION, Constants.CHANNEL_SCHEMA_VERSION)
req.put(Constants.IDENTIFIER, channelId)
DataNode.read(req)
}

/**
* Fetches the framework hierarchy from the configured backing store.
* Falls back to Redis when Cassandra is disabled.
*/
private def fetchFrameworkFromStorage(request: Request, frameworkId: String): Future[Map[String, AnyRef]] =
if (Platform.getBoolean("service.db.cassandra.enabled", true))
FrameworkManager.getFrameworkHierarchy(request)
else {
val frameworkStr = RedisCache.get("fw:" + frameworkId, (_: String) => "{}")
Future(JsonUtils.deserialize(frameworkStr, classOf[java.util.Map[String, AnyRef]]).asScala.toMap)
}

/**
* Reads the framework SubGraph, builds its complete hierarchy metadata,
* persists it, and returns the publish-OK response.
*/
private def persistFrameworkHierarchy(request: Request, frameworkId: String): Future[Response] = {
val getFrameworkReq = new Request()
getFrameworkReq.setContext(new util.HashMap[String, AnyRef]() {{ putAll(request.getContext) }})
getFrameworkReq.getContext.put(Constants.SCHEMA_NAME, Constants.FRAMEWORK_SCHEMA_NAME)
getFrameworkReq.getContext.put(Constants.VERSION, Constants.FRAMEWORK_SCHEMA_VERSION)
getFrameworkReq.put(Constants.IDENTIFIER, frameworkId)
DataSubGraph.read(getFrameworkReq).map { data =>
val frameworkHierarchy = FrameworkManager.getCompleteMetadata(frameworkId, data, includeRelations = true)
CategoryCache.setFramework(frameworkId, frameworkHierarchy)
val hierarchy = ScalaJsonUtils.serialize(frameworkHierarchy)
if (Platform.getBoolean("service.db.cassandra.enabled", true)) {
val req = new Request(request)
req.put("hierarchy", hierarchy)
req.put("identifier", frameworkId)
oec.graphService.saveExternalProps(req)
} else RedisCache.set("fw:" + frameworkId, hierarchy)
ResponseHandler.OK.put(Constants.PUBLISH_STATUS,
s"Publish Event for Framework Id '$frameworkId' is pushed Successfully!")
}
}

// ─── Operations ──────────────────────────────────────────────────────────────

@throws[Exception]
private def create(request: Request): Future[Response] = {
RequestUtil.restrictProperties(request)
val code = request.getRequest.getOrDefault(Constants.CODE, "").asInstanceOf[String]
val code = request.getRequest.getOrDefault(Constants.CODE, "").asInstanceOf[String]
val channel = request.getRequest.getOrDefault(Constants.CHANNEL, "").asInstanceOf[String]
if (StringUtils.isNotBlank(code) && StringUtils.isNotBlank(channel)) {
request.getRequest.put(Constants.IDENTIFIER, code)
val getChannelReq = new Request()
getChannelReq.setContext(new util.HashMap[String, AnyRef]() {
{
putAll(request.getContext)
}
})
getChannelReq.getContext.put(Constants.SCHEMA_NAME, Constants.CHANNEL_SCHEMA_NAME)
getChannelReq.getContext.put(Constants.VERSION, Constants.CHANNEL_SCHEMA_VERSION)
getChannelReq.put(Constants.IDENTIFIER, channel)
DataNode.read(getChannelReq).map(node => {
readChannelNode(request, channel).flatMap { node =>
if (null != node && StringUtils.equalsAnyIgnoreCase(node.getIdentifier, channel)) {
FrameworkManager.validateTranslationMap(request)
DataNode.create(request).map(frameNode => {
ResponseHandler.OK.put(Constants.NODE_ID, frameNode.getIdentifier).put("versionKey", frameNode.getMetadata.get("versionKey"))
})
DataNode.create(request).map { frameNode =>
ResponseHandler.OK
.put(Constants.NODE_ID, frameNode.getIdentifier)
.put("versionKey", frameNode.getMetadata.get("versionKey"))
}
} else throw new ClientException("ERR_INVALID_CHANNEL_ID", "Please provide valid channel identifier")
}).flatMap(f => f)
}
} else throw new ClientException("ERR_INVALID_REQUEST", "Invalid Request. Please Provide Required Properties!")

}


@throws[Exception]
private def read(request: Request): Future[Response] = {
val frameworkId = request.get("identifier").asInstanceOf[String]
val returnCategories: java.util.List[String] = request.get("categories").asInstanceOf[String].split(",").filter(category => StringUtils.isNotBlank(category) && !StringUtils.equalsIgnoreCase(category, "null")).toList.asJava
val frameworkId = request.get("identifier").asInstanceOf[String]
val returnCategories: java.util.List[String] = request.get("categories").asInstanceOf[String]
.split(",")
.filter(c => StringUtils.isNotBlank(c) && !StringUtils.equalsIgnoreCase(c, "null"))
.toList.asJava
request.getRequest.put("categories", returnCategories)

if (StringUtils.isNotBlank(frameworkId)) {
val framework = FrameworkCache.get(frameworkId, returnCategories)
if(framework != null){
Future {
ResponseHandler.OK.put(Constants.FRAMEWORK, framework)
}
val cached = FrameworkCache.get(frameworkId, returnCategories)
if (cached != null) {
Future(ResponseHandler.OK.put(Constants.FRAMEWORK, cached))
} else {
val frameworkData: Future[Map[String, AnyRef]] = if (Platform.getBoolean("service.db.cassandra.enabled", true))
FrameworkManager.getFrameworkHierarchy(request) else {
val frameworkStr = RedisCache.get("fw:"+frameworkId, (key: String) => "{}")
Future(JsonUtils.deserialize(frameworkStr, classOf[java.util.Map[String, AnyRef]]).asScala.toMap)
}
frameworkData.map(framework => {
fetchFrameworkFromStorage(request, frameworkId).flatMap { framework =>
if (framework.isEmpty) {
DataNode.read(request).map(node => {
DataNode.read(request).map { node =>
if (null != node && StringUtils.equalsAnyIgnoreCase(node.getIdentifier, frameworkId)) {
val framework = NodeUtil.serialize(node, null, request.getContext.get(Constants.SCHEMA_NAME).asInstanceOf[String], request.getContext.get(Constants.VERSION).asInstanceOf[String])
ResponseHandler.OK.put(Constants.FRAMEWORK, framework)
val fw = NodeUtil.serialize(node, null,
request.getContext.get(Constants.SCHEMA_NAME).asInstanceOf[String],
request.getContext.get(Constants.VERSION).asInstanceOf[String])
ResponseHandler.OK.put(Constants.FRAMEWORK, fw)
} else throw new ClientException("ERR_INVALID_REQUEST", "Invalid Request. Please Provide Required Properties!")
})
}
} else {
Future {
val filterFrameworkData: Map[String, AnyRef] = FrameworkManager.filterFrameworkCategories(framework.asJava, returnCategories)
val filterFrameworkData = FrameworkManager.filterFrameworkCategories(framework.asJava, returnCategories)
FrameworkCache.save(filterFrameworkData, returnCategories)
val javaMap: java.util.Map[String, AnyRef] = filterFrameworkData.asJava
ResponseHandler.OK.put(Constants.FRAMEWORK, javaMap)
ResponseHandler.OK.put(Constants.FRAMEWORK, filterFrameworkData.asJava)
}
}
}).flatMap(f => f)
}
}
} else throw new ClientException("ERR_INVALID_REQUEST", "Invalid Request. Please Provide Required Properties!")
}

@throws[Exception]
private def update(request: Request): Future[Response] = {
RequestUtil.restrictProperties(request)
DataNode.update(request).map(node => {
DataNode.update(request).map(node =>
ResponseHandler.OK.put("node_id", node.getIdentifier).put("versionKey", node.getMetadata.get("versionKey"))
})
)
}

@throws[Exception]
private def retire(request: Request): Future[Response] = {
request.getRequest.put("status", "Retired")
DataNode.update(request).map(node => {
DataNode.update(request).map(node =>
ResponseHandler.OK.put("node_id", node.getIdentifier).put("identifier", node.getIdentifier)
})
)
}


@throws[Exception]
@throws[Exception]
private def publish(request: Request): Future[Response] = {
RequestUtil.restrictProperties(request)
val frameworkId = request.getRequest.getOrDefault(Constants.IDENTIFIER, "").asInstanceOf[String]
val channel = request.getRequest.getOrDefault(Constants.CHANNEL, "").asInstanceOf[String]
val getChannelReq = new Request()
getChannelReq.setContext(new util.HashMap[String, AnyRef]() {
{
putAll(request.getContext)
}
})
getChannelReq.getContext.put(Constants.SCHEMA_NAME, Constants.CHANNEL_SCHEMA_NAME)
getChannelReq.getContext.put(Constants.VERSION, Constants.CHANNEL_SCHEMA_VERSION)
getChannelReq.put(Constants.IDENTIFIER, channel)
DataNode.read(getChannelReq).map(node => {
val channel = request.getRequest.getOrDefault(Constants.CHANNEL, "").asInstanceOf[String]
readChannelNode(request, channel).flatMap { node =>
if (null != node && StringUtils.equalsAnyIgnoreCase(node.getIdentifier, channel)) {
val name = node.getMetadata.getOrDefault("name", "").asInstanceOf[String]
val name = node.getMetadata.getOrDefault("name", "").asInstanceOf[String]
val description = node.getMetadata.getOrDefault("description", "").asInstanceOf[String]
request.getRequest.putAll(Map("name" -> name, "description" -> description).asJava)
if(StringUtils.isNotBlank(frameworkId)){
val getFrameworkReq = new Request()
getFrameworkReq.setContext(new util.HashMap[String, AnyRef]() {
{
putAll(request.getContext)
}
})
getFrameworkReq.getContext.put(Constants.SCHEMA_NAME, Constants.FRAMEWORK_SCHEMA_NAME)
getFrameworkReq.getContext.put(Constants.VERSION, Constants.FRAMEWORK_SCHEMA_VERSION)
getFrameworkReq.put(Constants.IDENTIFIER, frameworkId)
val subGraph: Future[SubGraph] = DataSubGraph.read(getFrameworkReq)
subGraph.map(data => {
val frameworkHierarchy = FrameworkManager.getCompleteMetadata(frameworkId, data, true)
CategoryCache.setFramework(frameworkId, frameworkHierarchy)
val hierarchy = ScalaJsonUtils.serialize(frameworkHierarchy)
if (Platform.getBoolean("service.db.cassandra.enabled", true)) {
val req = new Request(request)
req.put("hierarchy", hierarchy)
req.put("identifier", frameworkId)
oec.graphService.saveExternalProps(req)
} else RedisCache.set("fw:"+frameworkId, hierarchy)
ResponseHandler.OK.put(Constants.PUBLISH_STATUS, s"Publish Event for Framework Id '${frameworkId}' is pushed Successfully!")
})
} else throw new ClientException("ERR_INVALID_FRAMEWORK_ID", "Please provide valid framework identifier")
if (StringUtils.isNotBlank(frameworkId))
persistFrameworkHierarchy(request, frameworkId)
else
throw new ClientException("ERR_INVALID_FRAMEWORK_ID", "Please provide valid framework identifier")
} else throw new ClientException("ERR_INVALID_CHANNEL_ID", "Please provide valid channel identifier")
}).flatMap(f => f)
}
}

//TODO:
private def copy(request: Request): Future[Response] = {
RequestUtil.restrictProperties(request)
FrameworkManager.copyHierarchy(request)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,44 +70,75 @@ object FrameworkManager {
})
}

def getCompleteMetadata(id: String, subGraph: SubGraph, includeRelations: Boolean)(implicit oec: OntologyEngineContext, ec: ExecutionContext): util.Map[String, AnyRef] = {
/**
* Builds and returns the filtered metadata map for a single node in the subgraph.
* Applies JSON property conversion, key-name normalisation, and schema field filtering.
*/
private def buildFilteredNodeMetadata(id: String, subGraph: SubGraph): util.Map[String, AnyRef] = {
val nodes = subGraph.getNodes
val relations = subGraph.getRelations
val node = nodes.get(id)
if (null == node) {
throw new ClientException("ERR_NODE_NOT_FOUND", s"Node with ID '$id' not found in SubGraph. Available nodes: ${nodes.keySet()}")
}
val metadata = node.getMetadata
val node = nodes.get(id)
if (null == node)
throw new ClientException("ERR_NODE_NOT_FOUND",
s"Node with ID '$id' not found in SubGraph. Available nodes: ${nodes.keySet()}")

val metadata = node.getMetadata
val objectType = node.getObjectType.toLowerCase().replace("image", "")
val channel = node.getMetadata.getOrDefault("channel", "all").asInstanceOf[String]
val definition: ObjectCategoryDefinition = DefinitionNode.getObjectCategoryDefinition("", objectType, channel)
val channel = node.getMetadata.getOrDefault("channel", "all").asInstanceOf[String]
val definition: ObjectCategoryDefinition =
DefinitionNode.getObjectCategoryDefinition("", objectType, channel)

val jsonProps = DefinitionNode.fetchJsonProps(node.getGraphId, schemaVersion, objectType, definition)
val updatedMetadata: util.Map[String, AnyRef] = (metadata.entrySet().asScala.filter(entry => null != entry.getValue)
.map((entry: util.Map.Entry[String, AnyRef]) => handleKeyNames(entry, null) -> convertJsonProperties(entry, jsonProps)).toMap ++
Map("objectType" -> node.getObjectType, "identifier" -> node.getIdentifier, "languageCode" -> NodeUtil.getLanguageCodes(node))).asJava
val updatedMetadata: util.Map[String, AnyRef] =
(metadata.entrySet().asScala
.filter(entry => null != entry.getValue)
.map(entry => handleKeyNames(entry, null) -> convertJsonProperties(entry, jsonProps))
.toMap ++ Map(
"objectType" -> node.getObjectType,
"identifier" -> node.getIdentifier,
"languageCode" -> NodeUtil.getLanguageCodes(node)
)).asJava

val fields = DefinitionNode.getMetadataFields(node.getGraphId, schemaVersion, objectType, definition)
if (fields.nonEmpty) updatedMetadata.asScala.filter(entry => fields.contains(entry._1)).asJava
else updatedMetadata
}

val fields =DefinitionNode.getMetadataFields(node.getGraphId, schemaVersion, objectType, definition)
val filteredData: util.Map[String, AnyRef] = if(fields.nonEmpty) updatedMetadata.asScala.filter(entry => fields.contains(entry._1)).asJava else updatedMetadata
/**
* Returns the out-relations of a node sorted by their sequence index.
*/
private def sortedOutRelations(id: String, subGraph: SubGraph): util.List[Relation] = {
val node = subGraph.getNodes.get(id)
val relations = subGraph.getRelations
relations.asScala
.filter(rel => StringUtils.equals(rel.getStartNodeId.toString(), node.getIdentifier))
.sortBy(rel => rel.getMetadata.get("IL_SEQUENCE_INDEX").asInstanceOf[Long])(Ordering.Long)
.toList.asJava
}

val relationDef = DefinitionNode.getRelationDefinitionMap(node.getGraphId, schemaVersion, objectType, definition)
val outRelations = relations.asScala.filter((rel: Relation) => {
StringUtils.equals(rel.getStartNodeId.toString(), node.getIdentifier)
}).sortBy((rel: Relation) => rel.getMetadata.get("IL_SEQUENCE_INDEX").asInstanceOf[Long])(Ordering.Long).toList.asJava
def getCompleteMetadata(id: String, subGraph: SubGraph, includeRelations: Boolean)(implicit oec: OntologyEngineContext, ec: ExecutionContext): util.Map[String, AnyRef] = {
val filteredData = buildFilteredNodeMetadata(id, subGraph)

if (includeRelations) {
val node = subGraph.getNodes.get(id)
val objectType = node.getObjectType.toLowerCase().replace("image", "")
val channel = node.getMetadata.getOrDefault("channel", "all").asInstanceOf[String]
val definition = DefinitionNode.getObjectCategoryDefinition("", objectType, channel)
val relationDef = DefinitionNode.getRelationDefinitionMap(node.getGraphId, schemaVersion, objectType, definition)
val outRelations = sortedOutRelations(id, subGraph)

if(includeRelations){
val relMetadata = getRelationAsMetadata(relationDef, outRelations, "out")
val childHierarchy = relMetadata.map(x => (x._1, x._2.asScala.map(a => {
val identifier = a.getOrElse("identifier", "")
val childNode = nodes.get(identifier)
val index = a.getOrElse("index", 1).asInstanceOf[Number]
val metaData = (childNode.getMetadata.asScala ++ Map("index" -> index)).asJava
childNode.setMetadata(metaData)
if("associations".equalsIgnoreCase(x._1)){
getCompleteMetadata(childNode.getIdentifier, subGraph, false)
} else {
getCompleteMetadata(childNode.getIdentifier, subGraph, true)
}
}).toList.asJava))
val childHierarchy = relMetadata.map { case (relKey, children) =>
relKey -> children.asScala.map { a =>
val identifier = a.getOrElse("identifier", "")
val childNode = subGraph.getNodes.get(identifier)
val index = a.getOrElse("index", 1).asInstanceOf[Number]
childNode.setMetadata((childNode.getMetadata.asScala ++ Map("index" -> index)).asJava)
if ("associations".equalsIgnoreCase(relKey))
getCompleteMetadata(childNode.getIdentifier, subGraph, includeRelations = false)
else
getCompleteMetadata(childNode.getIdentifier, subGraph, includeRelations = true)
}.toList.asJava
}
(filteredData.asScala ++ childHierarchy).asJava
} else {
filteredData
Expand Down
Loading