Building a Quine Streaming Graph: Ingest Streams

thatDot avatar Michael Aglietti

Quine Ingest Streams

Quine is optimized to process high volumes of data in motion and then stream out high-quality insights in real-time. The ingest stream is where a streaming graph starts. It connects to data producers, transforms the data, then populates a streaming graph to be analyzed by standing queries.

Quine streaming graph: high volume data in, high value data out..

Quine streaming graph combines multiple sources to detect high value patterns.

Let’s get under the hood to understand how ingest streams work.

Quine is fundamentally a stream-oriented data processor that uses a graph data model. This provides optimal integration with streaming data producers and consumers such as Kafka and Kinesis. Quine builds on this streaming foundation to provide batch-like capabilities by converting data stored in files to streaming data to load into the graph.

Ingest Stream Concepts

What is an Ingest Stream?

An ingest stream connects a data source to Quine and prepares the emitted data for the streaming graph. Within the ingest stream, an ingest query, written in Cypher, updates the streaming graph nodes and edges as data is received.

Backpressuring Ingest Streams

Inevitably, when streaming data producers outpace consumers, the consumer will become overwhelmed. In Quine, as an ingest stream begins to get more data than it can process, it manages the dataflow to avoid becoming overwhelmed using “backpressure.”

A backpressured system does not buffer, it causes producers upstream to *not* send data at a rate greater than it can process. The problem with buffering is that a buffer will eventually run out of space. And then what? The system must decide what to do when the buffer is full: drop new results, drop old results, crash the system, or backpressure.  
 
Backpressure is a protocol defining how to send a logical signal UP the stream with information about the downstream consumers readiness to receive more data. That backpressure signal follows the same path as data moving downstream, but in reverse. If downstream is not ready to consume, then upstream does does not send.

Quine uses a reactive stream implementation of backpressure, Akka Streams, built on top of the actor model to ensure that the ingestion and processing of streams are resilient.

Info

Curious about the operational challenge associated with reactive streams? Read the Reactive Manifesto to understand the problems faced by every streaming processor in a high-volume data pipeline.

Including asynchronous, non-blocking backpressure is the only method to ensure that all data from a high-volue stream is processed without data loss or processing delays.

All Nodes Exist

With a graph data model, nodes are the primary unit of data — much like a “row” is the primary unit of data in a relational database. However, unlike traditional graph data systems, a Quine user never has to create a node directly. Instead, the system functions as if all nodes exist.

Quine represents every possible node as an existing “empty node” with no interesting history. As data streams into the system, the node becomes interesting, and Quine creates a history for the node.

We added an idFrom function to Cypher that takes any number of arguments and deterministically produces a node ID from that data. This is similar to a consistent-hashing strategy, except that the ID produced from this function is always an ID that conforms to the type chosen for the ID provider.

You will use idFrom in the ingest query part of every ingest stream that you create. For example, the absolute minimum ingest query to load incoming data into the graph is simply a wrapper around the idFrom function.

MATCH (n) WHERE id(n) = idFrom($that) SET n.line = $that

Historical Versioning

Each node in the graph records all of its historical changes over time. When a node’s properties or edges are changed, the change event and timestamp are saved to an append-only log for that particular node. This historical log can be replayed up to any desired moment in time, allowing for the system to quickly answer questions using the state of the graph as it was in the past. This is a technique known as Event Sourcing, applied individually to each node.

Syntax and Structure

The first step when defining an ingest stream is to understand the overall shape of your data. This includes identifying the data elements necessary for standing queries to use in a MATCH.

An ingest query is defined by setting a type described by the API documentation. Quine supports eight types of ingest streams. Each type has a unique form and requires a specific structure to configure properly.

For example, constructing an ingest stream via the /api/v1/ingest/{name} API  endpoint to read data from standard in and store each line as a node looks similar to the example below.

{    
	"type": "StandardInputIngest",    
	"format": {	    
		"type": "CypherLine",	   
		"query": "MATCH (n) WHERE id(n) = idFrom($that) SET n.line = $that"    
    }
 }

Quine natively reads from standard-in, passing each line into a Cypher query as: $that. A unique node ID is generated using idFrom($that). Then, each line is stored as a line parameter associated with a new node in the streaming graph.

Info

When creating an ingest stream via the API, you are given the opportunity to name the steam with a name that has meaning. For example, you can name the above ingest stream standardIn to make it easier to reference in your application.

Alternatively, creating an ingest steam via a recipe, Quine automatically assigns a name to each steam using the format INGEST-1 where the first ingest stream defined in the recipe is INGEST-1 and subsequent ingest steams are name in order with # counting up.

Here is the same ingest stream defined in a Quine Recipe.

ingestStreams:
  - type: StandardInputIngest
    format:
      type: CypherLine
      query: |-
        MATCH (n)
        WHERE id(n) = idFrom($that)
        SET n.line = $that

Ingest Stream Reporting

Inspecting Ingest Streams via the API

Quine exposes a series of API endpoints that enable you to monitor and manage ingest streams while in operation. The complete endpoint definitions are available in the API documentation.

Let’s take a look at the information available from the INGEST-1 ingest stream  from the Ethereum Tag Propagation Recipe.

Start the recipe.

❯ java -jar quine-x.x.x.jar -r ethereum

List the ingest streams started by the Ethereum recipe using the /api/v1/ingest endpoint.

❯ curl -s "http://localhost:8080/api/v1/ingest" | jq '. | keys'
[
  "INGEST-1",
  "INGEST-2"
]

The Ethereum recipe creates two ingest streams; INGEST-1 and INGEST-2.

Now, view the ingest stream stats using the /api/v1/ingest/INGEST-1 endpoint.

❯ curl -s "http://localhost:8080/api/v1/ingest/INGEST-1" | jq
{
  "name": "INGEST-1",
  "status": "Running",
  "settings": {
    "format": {
      "query": "MATCH (BA), (minerAcc), (blk), (parentBlk)\nWHERE\n  
    id(blk) = idFrom('block', $that.hash)\n  AND id(parentBlk) = 
    idFrom('block', $that.parentHash)\n  AND id(BA) = 
    idFrom('block_assoc', $that.hash)\n  AND id(minerAcc) = 
    idFrom('account', $that.miner)\nCREATE\n  
    (minerAcc)>-[:mined_by]-(blk)-[:header_for]->(BA),\n  
    (blk)-[:preceded_by]->(parentBlk)\nSET\n  BA:block_assoc,\n  
     BA.number = $that.number,\n  BA.hash = $that.hash,\n  
     blk:block,\n  blk = $that,\n  minerAcc:account,\n  
     minerAcc.address = $that.miner",
     	"parameter": "that",
      "type": "CypherJson"
    },
    "url": "https://ethereum.demo.thatdot.com/blocks_head",
    "parallelism": 16,
    "type": "ServerSentEventsIngest"
  },
  "stats": {
    "ingestedCount": 57,
    "rates": {
      "count": 57,
      "oneMinute": 0.045556443551085735,
      "fiveMinute": 0.06175571100053622,
      "fifteenMinute": 0.04159128290271318,
      "overall": 0.07659077758191643
    },
    "byteRates": {
      "count": 78451,
      "oneMinute": 62.49789862393008,
      "fiveMinute": 84.92629746711795,
      "fifteenMinute": 57.22987512826503,
      "overall": 105.41446006900763
    },
    "startTime": "2022-05-17T18:56:08.161500Z",
    "totalRuntime": 744041
  }
}

Reporting on Ingest Stream progress using a Status Query

When creating an ingest query via a recipe, you can add a status query that runs continuously. For example, the status query below prints the information for each graph node, and a link to the visualization in the web UI.

statusQuery:
  cypherQuery: MATCH (n) RETURN count(n)

Ingest Stream Blog Series

This is just the beginning. There’s lots more to cover. Over the next few weeks, we will cover the most common ingest streams in separate blog posts.

Try Adding Ingest Data To Quine Yourself

And if you want to try Quine yourself, you can download it here. And in addition to the Ethereum recipe, take a look at the Wikipedia Ingest and  Apache Log Analytics recipes for different ingest stream examples.

If you have questions or want to check out the community, join Quine slack or visit our Github page.