Quantexa integrates Kafka as its primary streaming platform to process, ingest, and generate networks in near real-time, leveraging its powerful Entity Resolution, and Big Data capabilities. Quantexa provides out-of-the-box applications such as Kafka Record-Extraction and Kafka Document-Ingest, designed to streamline data transformations, from raw records to cleansed documents, via synchronous and asynchronous methods.
The blog explores the benefits of Kafka's message pipeline, the transformation processes, and the integration of Quantexa services for real-time fraud detection, graph-building, and scoring. It also highlights Kafka's role in consumer groups, partitioning strategies, and debugging streaming issues, ensuring efficient throughput and performance.
In summary, the post underscores the importance of understanding document complexity, optimizing graph builds, and using well-configured logging to ensure a seamless streaming solution.
Why do we use Kafka at Quantexa?
Kafka is an open-source distributed event streaming platform. This means it allows messages to be sent through a pipeline in near real-time fashion. Receiving information instantly has many benefits, and within Quantexa, it is possible to harness the capabilities of Entity Resolution as well as the other features the Quantexa Platform offers.
The most common surround the processing, data ingestion, and network generation of incoming data. This data can be industry agnostic with the key focus being to utilize Quantexa and to do it with a focus on 'Big Data'.
What do we have out of the box?
Quantexa provides out-of-the-box applications such as Kafka Record-Extraction, Kafka Document-Ingest, and Entity Ingest (as of Quantexa 2.7). In this blog post, we have examples utilizing Record-Extraction and Document-Ingest services.
Record-Extraction
The Kafka-Record-Extraction is an application that can read a message from a Kafka topic and begin to parse, cleanse, and extract records from it. This application leverages Fusion code already written as part of the ETL for any given data-source. There are three types of setup within this application:
- NoOpTransform - Direct transformation from the raw record to the cleansed document.
- SyncTransform - A synchronous transformation is required to get from the raw document to the cleansed document. E.g. mapping of fields to the raw fusion model.
- AsyncTransform - An asynchronous transformation is required to get to the cleansed document. E.g. an elastic lookup or a file read is required to bring information into the transformation.
The setup of this application and in-depth code sample can be found on the Docs site: Record Extraction service.
Document-Ingest
The Kafka-Document-Ingest is used to load documents into Elasticsearch. These documents come in the form of a DocumentIndexInputs, which is produced by the Record-Extraction in the first stage. The Ingest can load to two indices types: search, and resolve.
More on the Document Ingest on the Docs site: Document Ingest service.
The Kafka-Entity-Ingest (split from Kafka-Document-Ingest since 2.7) updates the Entity Store after Kafka-Document-Ingest. When the input message is received, all Entities linked to the Document are re-resolved, and the relevant Entities are updated.
In cases where the upstream data system cannot generate the required Record Extraction schema in the Input topic, a Mapper or Translator service can be employed to align these systems. The flow of this data transformation can be visualized below:
- Upstream System publishes raw data into Raw Data Input Topic.
- Data Mapper reads raw data messages from Raw Data Input Topic. Data Mapper maps, or translates the raw data message schema to the document model schema.
- The Data Mapper employs various lookup approaches:
- Extracting necessary information from Elasticsearch/OpenSearch (Dynamic Lookup - 3A)
- Using a static reference file added to the Quanfiguration service (Static Lookup - 3B)
- Calling an API to fetch information from an external system (API-based Dynamic Lookup - 3C).
- Data Mapper publishes the document model message into Record Extraction Input Topic.
- Record Extraction Service reads document model message from Record Extraction Input Topic.
- Record Extraction Service publishes compounds and cleansed document model messages to Document Ingest Notify Topic.
- Document Ingest Service reads cleansed data and compounds from Document Ingest Notify Topic.
- Document Ingest Service publishes success message to Document Ingest Success Topic.
Using Quantexa services within a streaming Kafka solution
Different use-cases require different levels of processing. It is possible to leverage the different Quantexa services such as Graph API, Scoring, Task Loading, and Search by using their available REST Clients within a Quantexa Application. The below outlines the flow of how this is orchestrated.
- A document ID is ingested from the Document Ingest Success Topic.
- The Record Processor starts a processing pipeline that can use services 3 - 6. These are sent as REST requests to Gateway which are routed appropriately.
- App-Resolve can be used for Graph Building and Expansions.
- App-Security is used to authenticate and apply any security policies
- App-Scoring can be used to apply the Assess scoring framework on top of any generated networks
- App-Investigate can be used to load investigations and tasks as Alerts.
- Once the Record Processer has performed its operations a message is sent to a success topic.
Quantexa Kafka streaming considerations
Data schemas and format
Quantexa supports two main types of message formats: JSON and AVRO. Both of these have different considerations and will affect how you set up your pipelines and configuration.
Document complexity
Designing the Document Model and relevant ETL for a data source that is being streamed will differ from when you are using it in a batch context. When data is streamed, the document is being created in near real-time and is generally stateless. It is possible to use the AsyncTransform configuration to make calls to other resources like Elastic or file-storage to retrieve information but this does not replicate a tool like Spark in which joining datasets is trivial. Therefore, understanding how the data will be received and how that can be processed is fundamental.
Resources and Graph Complexity
Building out graphs is expensive in both time and resources. Optimizing the expansions of a graph build within the Graph API is essential to ensure unnecessary Entities, Documents, and Edges do not consume resources for no extra value. Optimizing the graph you are trying to build depends on the use-case and which type of relationships you are trying to capture.
Debugging
Debugging issues in the Kafka pipeline requires an understanding of the processing steps and which services are being used within any given stage. Within any Big Data processing pipeline data is not always consistent and with Entity Resolution and Network Generation inputs can yield significantly different processing times.
Identifying bottlenecks
Bottlenecks can arise when several records are holding up a queue due to the amount of processing required. For example, if an input document is linked to a large network and requires several graph expansions this can occupy threads that then hold up resources to process incoming records. To identify areas that are causing build-ups, it is important to ensure logging is activated efficiently. This means enabling out-of-the-box Quantexa logging as well as implementing logging within added components.
Tracing messages through the pipeline and identifying the time taken at each stage can help identify the areas that are slowing the system down. At the top level, checking the topic timestamps for a given message will help narrow which application the issue exists.
Jaeger is a useful tool that you can easily deploy to capture data on requests flowing through Quantexa services. Each component handling a request generates data points called "spans", which include information like start time, end time, and operational metadata. These spans create a comprehensive trace of the request's journey enabling analysis of slow responses or failures.
Informative logging
As mentioned above good logging will allow the route of an issue to be identified quickly. When implementing Quantexa service calls (e.g. making a resolver request) adding logging is important. This logging can take various levels ERROR, WARN, INFO, DEBUG, or TRACE and the developer should consider what information should be at which level. Adding stage times for any REST calls can be an effective log to measure performance.
Run Time
If configuration is not being picked up correctly it is important to make sure several things have been addressed:
- Has the correct spring profile(s) been enabled within the app? This may be profiled at run time with the -Dspring.profiles.active argument.
- Has Libpostal been configured correctly? You will need to provide the following at run time to tell the Record-Extractor where to find Libpostal:
-Djava.library.path=/quantexa/libpostal -DlibpostalDataDir=/quantexa/libpostal
- It is possible to use -DstubParser=true instead of the above to disable the use of Libpostal
Kafka
Understanding consumer groups and partitioning strategy
Kafka Consumer Groups allow messages to be balanced across various applications if they share the same consumer group ID. This can be extremely beneficial if the purpose is to spread the load across many different instances of the application, improving throughout. However, it is important to assess if running too many processes may overload other services. In the case of Resolver requests or Task Loading it is possible to overload these services and therefore having an appropriate strategy is important.
Choosing the right partitioning strategy is important. Kafka implements a default where the key hash is used to map messages to partitions. Null key messages are sent to a partition in a round-robin fashion. Therefore, if you have specific keys considering how they can be balanced appropriately is necessary.
Configuration settings
There are many configuration settings that allow a consumer/producer to determine when and how many records to retrieve at a time. Within Quantexa applications this is known as Batch-Size and the batch-size determines how many concurrent processes can take place. This number has to be determined with consideration to the complexity of the data and processing that happens on the incoming records to avoid timeouts and bottlenecks.
Smoke testing a Quantexa Kafka streaming solution
Performing a smoke test is an important process and should be completed to ensure that the environment has been set up correctly, and that you have the ability to connect to the instance installed. Depending on the environment you may have to deal with various authentication methods but the following will identify key things to look out for on connecting using a basic authentication.
Pre-requisites
- Basic fusion code is set up to be used in the extractor
- Development Topics set up (these can be created automatically if cluster policy allows)
Kafka configuration
The two key settings that need to be updated are the bootstrap servers for
quantexa.document-ingest.kafka.clients.consumer.common.bootstrap.servers:${bootstrapServers}
quantexa.document-ingest.kafka.clients.producer.common.bootstrap.servers:${bootstrapServers}
The bootstrap servers can have multiple entries, e.g. localhost:9092,locahost:9093 and the connection happens over TCP so no other protocol like HTTP should be included in these addresses.
For other options on authorization please consult the Kafka open-source documentation.
Update the Helm Charts [Optional]
If deploying using Kubernetes the Quantexa Helm Charts do provide placeholders for the Kafka applications. These include record-extraction, document-ingest, entity-ingest, and three additional app placeholders in which other services can be added.
Testing the solution
Once the bootstrap servers and any other connection configuration has been set up, it's time to deploy the applications. Going with the simplest deployment of using a bash script, adding in the Kafka service and the relevant JVM arguments (similar to the standard mid-tier) will suffice. Once started, checking the logging will help identify the status of the application. Let's focus on the record-extraction.
On start-up, the configuration will be printed out and can be inspected. Looking at bootstrap.servers, verify that this is the expected address. Furthermore, inspect any other customisation in config - such as group.id
or auto.offset.reset
. If no values are being populated, please refer to the Run Time section above.
Now that the application has connected successfully, a test message can be sent. The easiest way to send individual messages is to do this via a UI. Confluent and other paid solutions offer this but there are open-source variants that will work; a project should establish which platform is approved and can be used for their needs. Sending a test JSON message in the correct format to the record-extraction-notify topic is the first step (schema set out in Record Extraction API).
If successful, the following logs should be produced. Importantly, a success message should be sent to a Kafka topic called document-ingest-notify (by default). Using a Kafka UI interface, validating this should be straightforward.
The DEBUG logs are enabled above which allows the user to see stage by stage what is being processed.