This blog discusses setting up and using Network Pruning within a Quantexa batch build, a process which reduces the complexity of a Network, so Investigators receive a less noisy version of the Network, with only the most important paths to analyze.
So what’s Network Pruning all about?
Network Pruning describes the process of taking a Network and removing any Nodes and Edges that are deemed irrelevant to an Investigation post Scoring.
Note: The definition of “irrelevant” Nodes and Edges depends on the Network traversals that are used in your project’s Scores.
Why is the feature useful?
Since Network pruning reduces the size of the Network, the generated Network is easier to explore and investigate. If a Network still contained these "irrelevant" Nodes and Edges, it could complicate an Investigation and make conclusions less clear.
Generally, a visual user interface (UI) such as the one Quantexa uses, is easier to navigate if that extraneous information is removed. It can also reduce the file size and complexity of exported data as well, which will increase productivity of any information which is analyzed outside Quantexa.
Why would you prune a Network?
When loading Tasks into an instance of the Quantexa UI, we typically load an accompanying Investigation that contains the Nodes and Edges relevant to any triggered Scores.
Loading an unpruned Network is therefore not recommended for the following reasons:
- Networks can become very large, impacting the performance of the Task Loading job.
- Large Networks can also hinder Investigation load times within the UI, so pruning helps your workflow be more efficient.
- Finally, With large Networks it can be difficult to identify the Nodes associated with the Scores that have triggered within a Task. Consequently, end users are required to spend significantly more time working through the noise to understand the Network.
Network Pruning example
The following diagrams, Unpruned Network and Pruned Network illustrate the impact of pruning a Network.
The Graph Script output for a particular seed customer may look similar to the following:
Figure 1. Unpruned Network
Following Scoring, we find that the network has triggered a Score which looks for a seed customer trading with another customer through account Entities. The second customer Document and telephone Entity have not triggered any Scores. We can prune the Network by removing all Nodes and Edges that did not appear in the results of any triggered Scores.
Figure 2. Pruned Network
How do we prune Networks?
This depends on the version of The Quantexa Platform that your project is using.
If you are using Assess on version 2.1.0 or more recent, there is a Network Pruning helper method that you can use. Project Example has an example implementation of Network Pruning in Assess which you can follow for your project.
If you are using a version before 2.1.0 or are using Scoring Framework 1, then you must add custom Network Pruning code to your project. See Custom Network Pruning for further details.
What did you find out during implementation?
- Prior to Task Loading,
GraphWithIdAndFacts
must be converted to EntityGraph
, which is an expensive operation. Pruning prior to this conversion improves the performance. - Task Loading is quicker and more reliable.
- The resulting Networks are vastly more simple to interact with than unpruned Networks.
- Having the Networks programmatically expand to only the Nodes of interest saves investigators significant effort to recreate.
It is possible to prune the Entity Graph directly, however this method should be avoided for the following reasons:
- Pruning the Entity Graph directly involves writing and reading
EntityGraph
to and from disk which some projects have found problematic. - Converting from
GraphWithIdAndFacts
to EntityGraph
is more performant when the graphs are smaller. Pruning before converting helps with this.
When using Assess, the effort required to implement Network Pruning at 2.1.0 or newer is noticeably reduced versus pre 2.1.0 codebases.
- Prior to Assess, it is the project’s responsibility to collect the Documents and Entities of interest required as input for pruning.
- You should narrow down the pool of Networks eligible for pruning to just the Networks that are going to be used for Task Loading, otherwise you will spend unnecessary effort pruning Networks which will not be used.
Custom Network Pruning
Important: This method should only be used if you are on a version of the platform before 2.1.0, or using Scoring Framework 1.
At a high level, the following steps are performed:
- Get all Node ids from the graph (from Graph Script DSL Output)
- Get the set of Nodes you want to keep (from Scoring Output)
- Perform a diff between the two to retrieve the Nodes that you want to remove
- Apply the updates to the unpruned graph to prune it.
There are two parts to the code, a utility file containing the functions, and the main pruning code which applies those functions.
Example custom utility file containing the functions
Example custom utility file containing the functions
package com.quantexa.<yourClientName>.graph.scripting.dsl.eng.utilsimport com.quantexa.<yourClientName>.scoring.model.TaskRelatedNodesModel.CustomerRelatedNodesimport com.quantexa.graphdsl.eng.GraphWithIdAndFactsimport com.quantexa.graphdsl.Graphimport com.quantexa.resolver.core.EntityGraph.{DocumentId, EntityId, NodeId}import org.apache.spark.sql.{Dataset, SparkSession}private[eng] object DslENGUtils { /** * NOTE - if you are on version 1.6.2 upwards, you should not need to define this implicit class * as the removeNodes method is available on Graph */ implicit class GraphWithRemoveNodes(graph: Graph) { /** * Remove Nodes from the graph, if they exist, along with any edges that link to them. */ def removeNodes(ids: Seq[NodeId]): Graph = { val updatedDocuments = graph.documents.filterNot(doc => ids.contains(doc.documentId)) val updatedEntities = graph.entities.filterNot(ent => ids.contains(ent.entityId)) val updatedEdges = graph.edges.filterNot(edge => ids.contains(edge.source) || ids.contains(edge.target)) Graph(updatedDocuments, updatedEntities, updatedEdges, graph.parameters) } } case class NodesToRemove(id: String, docIds: Seq[DocumentId], entIds: Seq[EntityId]) def getNodesToRemove(graphs: Dataset[GraphWithIdAndFacts], scoredNodes: Dataset[CustomerRelatedNodes] )(implicit spark: SparkSession): Dataset[NodesToRemove] = { import spark.implicits._ graphs.joinWith(scoredNodes, graphs.col("id") === scoredNodes.col("customerId"), "inner") .mapPartitions { partition => partition.map { graphAndNodes => val id = graphAndNodes._1.id val docsToKeep = graphAndNodes._2.relatedNodes.relatedDocuments val entitiesToKeep = graphAndNodes._2.relatedNodes.relatedEntityRecords.map(_.entityId) val allDocIds = graphAndNodes._1.graph.documents.map(_.documentId) val allEntityIds = graphAndNodes._1.graph.entities.map(_.entityId) val docIdsToRemove = allDocIds.diff(docsToKeep) val entIdsToRemove = allEntityIds.diff(entitiesToKeep) NodesToRemove(id, docIdsToRemove, entIdsToRemove) } } } /** * Returns a new GraphWithIdAndFacts containing a graph with nodesToRemove removed */ def pruneGraph(graphWithIdAndFacts: GraphWithIdAndFacts, nodesToRemove: NodesToRemove): GraphWithIdAndFacts = { val prunedGraph = graphWithIdAndFacts.graph.removeNodes(nodesToRemove.docIds ++ nodesToRemove.entIds) graphWithIdAndFacts.copy(graph = prunedGraph) }}
Example custom Network Pruning code
Warning: The following logic in the code snippet up to line 65 can vary between different projects, different versions of The Quantexa Platform, and between Assess and Scoring Framework 1.
The code prior to line 65 .join(latestAlertedCustomers, Seq("id"), "leftsemi")
is performing the following at a high level:
- Reading in a
Dataset[GraphWithIdAndFacts]
. - Getting the run date of the current Scoring run.
- Getting the customers that have activated an alert in this Scoring run, using the run date.
- Reading in the ‘scored Nodes’. A data set of related Entity and Document IDs per customer. There is an example code snippet of how to gather these related Nodes in Assess prior to 2.1.0 in Appendix: Gathering Related Nodes in Assess prior to version 2.1.0.
Example custom Network Pruning code
package com.quantexa.<yourClientName>.graph.scripting.dsl.engimport io.circe.config.syntax._import com.quantexa.<yourClientName>.graph.scripting.dsl.config.GraphScriptingConfigimport com.quantexa.<yourClientName>.graph.scripting.dsl.eng.utils.DslENGUtils._import com.quantexa.<yourClientName>.scoring.batch.alerting.model.{AlertingDecisionAudit, CustomerToRelatedScorecards}import com.quantexa.<yourClientName>.scoring.model.TaskRelatedNodesModel.CustomerRelatedNodesimport com.quantexa.etl.api.MetadataModel.MetadataRunIdimport com.quantexa.graphdsl.eng.{GraphScriptingSparkUtils, _}import com.quantexa.scriptrunner.util.metrics.QSSMetricsRepositoryimport com.quantexa.scriptrunner.{QuantexaSparkScript, TypedSparkScriptIncremental}import io.circe.generic.auto.exportDecoderimport org.apache.log4j.Loggerimport org.apache.spark.sql.{Dataset, SparkSession}/** * PruneAndConvertToEntityGraphs runs after Scoring and before Task Loading. * It takes the DSL graph output and filters out any graphs which do not associate with alerted seed customers, then converts the DSL graphs to Entity Graphs. * The Entity Graphs are then pruned for downstream use in Task Loading. * */object PruneAndConvertToEntityGraphs extends TypedSparkScriptIncremental[GraphScriptingConfig] with GraphScriptingSparkUtils { override val name: String = s"PruneGraphs" val fileDependencies = Map.empty[String, String] val scriptDependencies = Set.empty[QuantexaSparkScript] def run( sparkSession: SparkSession, logger: Logger, args: Seq[String], projectConfig: GraphScriptingConfig, qssMetricsRepository: QSSMetricsRepository, metadataRunId: MetadataRunId ): Unit = { import sparkSession.implicits._ implicit val _: SparkSession = sparkSession val graphScriptConfig = projectConfig.graphScriptConfig val engModeConfig = projectConfig.engModeConfig val graphScriptingDataRoot = graphScriptConfig.graphScriptRoot // Inputs val graphs: Dataset[GraphWithIdAndFacts] = sparkSession.read.parquet(graphScriptConfig.outputPath).as[GraphWithIdAndFacts] val latestScorecards = sparkSession.read.parquet(graphScriptConfig.scorecardInputPath).as[CustomerToRelatedScorecards] val runDate = latestScorecards.head.runDate val latestAlertedCustomers = sparkSession.read.parquet(graphScriptConfig.alertedCustomersInputPath).as[AlertingDecisionAudit] .filter(dec => dec.customerAlert && dec.runDate == runDate) // Keep customers that alerted in this run .withColumnRenamed("customerId", "id") val scoredNodes = sparkSession.read.parquet(graphScriptConfig.ScoredNodesInputPath).as[CustomerRelatedNodes] // Output val destinationPath = graphScriptConfig.prunedGraphPath // Only keep DSL graphs that are required for Task Loading val graphsToLoad: Dataset[GraphWithIdAndFacts] = graphs .join(latestAlertedCustomers, Seq("id"), "leftsemi") .as[GraphWithIdAndFacts] // Collect document and entity nodes that we want to remove val unscoredNodes: Dataset[NodesToRemove] = getNodesToRemove(graphsToLoad, scoredNodes) // Joining the unscored Nodes to the Entity Graphs so that each row has a sequence of Nodes to remove val prunedGraphs: Dataset[GraphWithIdAndFacts] = graphsToLoad .joinWith(unscoredNodes, graphsToLoad.col("id") === unscoredNodes.col("id")) .map { case (graph, nodes) => pruneGraph(graph, nodes) } prunedGraphs.write.mode("overwrite").parquet(destinationPath) }}
Appendix: Gathering Related Nodes in Assess prior to version 2.1.0
Simple example of custom code for gathering related Nodes in Assess
object ExampleRelatedNodes extends TypedSparkScriptIncremental[GraphScriptingConfig] { def run( sparkSession: SparkSession, logger: Logger, args: Seq[String], projectConfig: GraphScriptingConfig, qssMetricsRepository: QSSMetricsRepository, metadataRunId: MetadataRunId ): Unit = { val subjectScorecardPath = config.scorecardInputPath val scoreIdsToIgnoreAsString: Set[String] = projectConfig.scoreIdsToIgnoreAsString val scoreIdsToIgnore = scoreIdsToIgnoreAsString.map(ScoreId(_)) val allScorecards = spark.read.parquet(subjectScorecardPath).as[ScorecardResult[DocumentId]] val relatedDocumentsAndEntitiesFromScorecardResult = getRelatedDocumentsAndEntities(allScorecards, scoreIdsToIgnore, documentIdToGraphId) } def documentIdToGraphId(docId: DocumentId): String = { docId.value // This varies depending on what you set the graph ID to in your Graph Script DSL stages }/** Function to extract related Documents and Entities from subject level Scores and related Scores in [[ScorecardResult]]. * * @param scorecardDS Dataset of Scorecard results * @param scoreIdsToIgnore Score IDs to ignore * @param subjectIdToGraphId Function to convert [[Subject]] to [[String]] which must be aligned with the `id` of [[GraphWithIdAndFacts GraphWithIdAndFacts]] * @tparam Subject Subject data type of the Scorecard * @return Data set of related Documents and Entities */ def getRelatedDocumentsAndEntities[Subject <: Product]( scorecardDS: Dataset[ScorecardResult[Subject]], scoreIdsToIgnore: Set[ScoreId] = Set.empty, subjectIdToGraphId: Subject => String )( implicit scorecardEncoder: Encoder[ScorecardResult[Subject]] ): Dataset[RelatedDocumentsAndEntitiesFromScorecardResult] = { import scorecardDS.sparkSession.implicits._ scorecardDS.map(scorecard => { val filteredScoreSummary: Map[ScoreId, WeightedScore] = scorecard.scoreSummary .filterNot { case (scoreId, _) => scoreIdsToIgnore(scoreId) } val filteredRelatedScores: Map[ScoreHitId, StandardScoreOutputWithSeverity] = scorecard.relatedScores.filterNot { case (ScoreHitId(scoreId, _), _) => scoreIdsToIgnore(scoreId) } val relatedDocumentsFromSubjectLevelScores = filteredScoreSummary.flatMap { case (_, weightedScore) => weightedScore.relatedDocuments }.toSet val relatedEntitiesFromSubjectLevelScores = filteredScoreSummary.flatMap { case (_, weightedScore) => weightedScore.relatedEntities }.toSet val relatedDocumentsFromRelatedScores = filteredRelatedScores.flatMap { case (_, sso) => sso.relatedDocuments }.toSet val relatedEntitiesFromRelatedScores = filteredRelatedScores.flatMap { case (_, sso) => sso.relatedEntities }.toSet val relatedDocuments = relatedDocumentsFromSubjectLevelScores ++ relatedDocumentsFromRelatedScores val relatedEntities = relatedEntitiesFromSubjectLevelScores ++ relatedEntitiesFromRelatedScores RelatedDocumentsAndEntitiesFromScorecardResult( subjectIdToGraphId(scorecard.subjectId), relatedDocuments, relatedEntities ) }) }}
Build information
Version 1.6.0 TQP, April 2022
For more technical details on Quantexa products and services, take a look at our Documentation site.