This article provides practical solution designs to enrich information in incoming messages and align them with Quantexa's Streaming Tier (Document Model) definitions, leveraging lookup principles to ensure seamless integration and efficient operation.
Introduction
The Quantexa Streaming Tier is a critical component for enabling near-real-time data ingestion and processing. Ensuring that input messages adhere to the required schema and include necessary data is essential for maintaining system performance, reliability, and data integrity.
Data streaming design principles to enrich input messages
When input schemas do not fully align with the Quantexa Streaming tier schema, particularly when enrichment or augmentation is required to align the input Kafka message with the Record Extraction service, the following design recommendations can bridge the gap effectively.
Conform Data Using a Mapper Application
If incoming Kafka events lack the necessary data, a bespoke mapper application can bridge the gap, providing functionality that is not natively supported by the Quantexa Streaming tier. For instance, in a vehicle claims fraud detection use case, raw Kafka messages may reference external data sources (e.g., vehicle makes and models) without including full details. A mapper application can enhance these messages by performing lookups to enrich the incoming message to align with the document model.
This article will be particularly valuable if you are looking to understand the principles of extending the Quantexa Streaming tier to accommodate non-standard functional or non-functional requirements, including the implementation of a mapper service.
The diagram below illustrates the design approach for integrating a data mapper application.
Static lookups
When reference data (e.g., a list of claim types) does not change frequently, it can be stored as configuration (Quanfiguration) files within the Quantexa deployment repository. If the data is structured and relatively small, JSON is recommended for flexibility and ease of parsing. CSV may be suitable for tabular data with simple structures. A mapper application can use these files to enrich Kafka messages, ensuring all necessary details are included.
Dynamic lookups
For data sources that are updated periodically (e.g., vehicle makes and models), storing references in an Elasticsearch index is recommended. A mapper application can query the Elasticsearch index to retrieve the required data, enriching Kafka messages and aligning them with the required schema.
If leveraging Quantexa’s native functions for maintainability and extendibility, it is recommended to load these references as Foreign Documents in Elasticsearch, ensuring seamless integration with the Quantexa platform:
- Set up a data-source-reference module
- Set up a generic loader for each reference file
object GenElasticLoader extends GenericLoaderScript[RefVehModel]( loader = new Elastic7GenericLoader(), mappingSettings = None ) { val metrics = qssMetricsRepository}
- For each reference file, create a foreign document config. E.g.:
reference-data-config.json
{ "referencevehmake": { "dataModelRedactorObject": "com.quantexa.xxx.common.model.RefVehMakeRedactor", "dataModelRootClass": "com.quantexa.xxx.common.model.RefVehMake" }}
- Add these into the
application.yml
:
foreign-document.config: locations: - ${config-files.dir}/reference-vehmake-config.json
- Using the Document Clients, lookup the document:
val documentRequestVehMake = DocumentsRequest(Set(DocumentId(referenceVehMakeType, vehMakeLookupKey)))val secureDocumentRequestVehMake = SecureDocumentsRequest(documentRequestVehMake, sc)val documentSearchVehMake = clients.documentClient .documentRequest(secureDocumentRequestVehMake) .recoverWith { case e => Future(DocumentsResponse(Seq.empty, 0L)) }
API based lookups
When additional information needed to construct the document model is available from external systems, integrating an API-based dynamic lookup within the mapper pre-ingest application can be effective. These API lookups enable real-time enrichment from third-party or internal systems.
Data streaming lookup design architecture
Diagram below illustrates how a mapper application can be employed to perform lookups and acts as an intermediary between the upstream system and Quantexa’s Streaming tier Document Ingest services.
- Upstream system publishes raw data into Raw Data Input topic
- The mapper application reads raw data messages from Raw Data Input topic.
- The mapper application performs the lookup approach:
- Extracting necessary information from Elasticsearch (Dynamic lookup - 3A) or
- Using a static reference file added to the Quanfiguration service (Static lookup - 3B) or
- Calling an API to fetch information from an external system (API-based lookup - 3C)
- Mapper application publishes the document model message into Record Extraction Input topic
- Record Extraction service reads document model message from Record Extraction Input topic
- Record Extraction service publishes compounds and cleansed document model messages to Document Ingest Notify topic
- Document Ingest service reads cleansed data and compounds from Document Ingest Notify topic
- Document Ingest service publishes success message to Document Ingest Success topic
Lookup considerations
Although the lookup design approach in this article belongs primarily outside of the remit of Quantexa platform, below considerations are useful if you consider containing a lookup approach in your solution.
- Lookup approach overhead: Implementing any lookup approach can introduce additional overhead, potentially affecting system performance and processing times.
- Static lookup: Deploying large reference files (e.g. exceeding 1MB) as
ConfigMaps
in Kubernetes may not be feasible due to limitations within a Kubernetes cluster. - Dynamic lookup: Continuous indexing of records in Elasticsearch/OpenSearch may impact performance, particularly in a high-volume, MUC platform hence the need for scaling up Elasticsearch/OpenSearch may become necessary to accommodate increased data loads.
- API lookup: Integrating with external systems for API-based lookup can introduce complexities related to connectivity, security, integration, and data accessibility.
Conclusion
Designing a Kafka-based streaming pipeline for the Quantexa platform requires careful planning and adherence to best practices. By ensuring schema compliance, leveraging appropriate lookup services, and optimizing transformations, you can build a scalable, reliable, and high-performing solution.
For complex implementations, collaboration with a solutions architect is recommended to align your design with Quantexa’s standards and long-term objectives.