Building a Quine Streaming Graph: Ingest Streams
Quine Ingest Streams
Quine is optimized to process high volumes of data in motion and then stream out high-quality insights in real-time. The ingest stream is where a streaming graph starts. It connects to data producers, transforms the data, then populates a streaming graph to be analyzed by standing queries.
Quine streaming graph combines multiple sources to detect high value patterns.
Let’s get under the hood to understand how ingest streams work.
Quine is fundamentally a stream-oriented data processor that uses a graph data model. This provides optimal integration with streaming data producers and consumers such as Kafka and Kinesis. Quine builds on this streaming foundation to provide batch-like capabilities by converting data stored in files to streaming data to load into the graph.
Ingest Stream Concepts
What is an Ingest Stream?
An ingest stream connects a data source to Quine and prepares the emitted data for the streaming graph. Within the ingest stream, an ingest query, written in Cypher, updates the streaming graph nodes and edges as data is received.
Backpressuring Ingest Streams
Inevitably, when streaming data producers outpace consumers, the consumer will become overwhelmed. In Quine, as an ingest stream begins to get more data than it can process, it manages the dataflow to avoid becoming overwhelmed using “backpressure.”
A backpressured system does not buffer, it causes producers upstream to *not* send data at a rate greater than it can process. The problem with buffering is that a buffer will eventually run out of space. And then what? The system must decide what to do when the buffer is full: drop new results, drop old results, crash the system, or backpressure.
Backpressure is a protocol defining how to send a logical signal UP the stream with information about the downstream consumers readiness to receive more data. That backpressure signal follows the same path as data moving downstream, but in reverse. If downstream is not ready to consume, then upstream does does not send.
Quine uses a reactive stream implementation of backpressure, Akka Streams, built on top of the actor model to ensure that the ingestion and processing of streams are resilient.
Info
Curious about the operational challenge associated with reactive streams? Read the Reactive Manifesto to understand the problems faced by every streaming processor in a high-volume data pipeline.
Including asynchronous, non-blocking backpressure is the only method to ensure that all data from a high-volue stream is processed without data loss or processing delays.
All Nodes Exist
With a graph data model, nodes are the primary unit of data — much like a “row” is the primary unit of data in a relational database. However, unlike traditional graph data systems, a Quine user never has to create a node directly. Instead, the system functions as if all nodes exist.
Quine represents every possible node as an existing “empty node” with no interesting history. As data streams into the system, the node becomes interesting, and Quine creates a history for the node.
We added an idFrom
function to Cypher that takes any number of arguments and deterministically produces a node ID from that data. This is similar to a consistent-hashing strategy, except that the ID produced from this function is always an ID that conforms to the type chosen for the ID provider.
You will use idFrom
in the ingest query part of every ingest stream that you create. For example, the absolute minimum ingest query to load incoming data into the graph is simply a wrapper around the idFrom
function.
MATCH (n) WHERE id(n) = idFrom($that) SET n.line = $that
Historical Versioning
Each node in the graph records all of its historical changes over time. When a node’s properties or edges are changed, the change event and timestamp are saved to an append-only log for that particular node. This historical log can be replayed up to any desired moment in time, allowing for the system to quickly answer questions using the state of the graph as it was in the past. This is a technique known as Event Sourcing, applied individually to each node.
Syntax and Structure
The first step when defining an ingest stream is to understand the overall shape of your data. This includes identifying the data elements necessary for standing queries to use in a MATCH
.
An ingest query is defined by setting a type
described by the API documentation. Quine supports eight types of ingest streams. Each type has a unique form and requires a specific structure to configure properly.
For example, constructing an ingest stream via the /api/v1/ingest/{name}
API endpoint to read data from standard in and store each line as a node looks similar to the example below.
{
"type": "StandardInputIngest",
"format": {
"type": "CypherLine",
"query": "MATCH (n) WHERE id(n) = idFrom($that) SET n.line = $that"
}
}
Quine natively reads from standard-in, passing each line into a Cypher query as: $that
. A unique node ID is generated using idFrom($that)
. Then, each line is stored as a line
parameter associated with a new node in the streaming graph.
Info
When creating an ingest stream via the API, you are given the opportunity to name the steam with a name that has meaning. For example, you can name the above ingest stream standardIn to make it easier to reference in your application.
Alternatively, creating an ingest steam via a recipe, Quine automatically assigns a name to each steam using the format INGEST-1 where the first ingest stream defined in the recipe is INGEST-1 and subsequent ingest steams are name in order with # counting up.
Here is the same ingest stream defined in a Quine Recipe.
ingestStreams:
- type: StandardInputIngest
format:
type: CypherLine
query: |-
MATCH (n)
WHERE id(n) = idFrom($that)
SET n.line = $that
Ingest Stream Reporting
Inspecting Ingest Streams via the API
Quine exposes a series of API endpoints that enable you to monitor and manage ingest streams while in operation. The complete endpoint definitions are available in the API documentation.
- List all running ingest streams
- Look up a running ingest stream
- Pause an ingest stream
- Unpause an ingest stream
- Cancel a running ingest stream
Let’s take a look at the information available from the INGEST-1
ingest stream from the Ethereum Tag Propagation Recipe.
Start the recipe.
❯ java -jar quine-x.x.x.jar -r ethereum
List the ingest streams started by the Ethereum recipe using the /api/v1/ingest
endpoint.
❯ curl -s "http://localhost:8080/api/v1/ingest" | jq '. | keys'
[
"INGEST-1",
"INGEST-2"
]
The Ethereum recipe creates two ingest streams; INGEST-1
and INGEST-2
.
Now, view the ingest stream stats using the /api/v1/ingest/INGEST-1
endpoint.
❯ curl -s "http://localhost:8080/api/v1/ingest/INGEST-1" | jq
{
"name": "INGEST-1",
"status": "Running",
"settings": {
"format": {
"query": "MATCH (BA), (minerAcc), (blk), (parentBlk)\nWHERE\n
id(blk) = idFrom('block', $that.hash)\n AND id(parentBlk) =
idFrom('block', $that.parentHash)\n AND id(BA) =
idFrom('block_assoc', $that.hash)\n AND id(minerAcc) =
idFrom('account', $that.miner)\nCREATE\n
(minerAcc)>-[:mined_by]-(blk)-[:header_for]->(BA),\n
(blk)-[:preceded_by]->(parentBlk)\nSET\n BA:block_assoc,\n
BA.number = $that.number,\n BA.hash = $that.hash,\n
blk:block,\n blk = $that,\n minerAcc:account,\n
minerAcc.address = $that.miner",
"parameter": "that",
"type": "CypherJson"
},
"url": "https://ethereum.demo.thatdot.com/blocks_head",
"parallelism": 16,
"type": "ServerSentEventsIngest"
},
"stats": {
"ingestedCount": 57,
"rates": {
"count": 57,
"oneMinute": 0.045556443551085735,
"fiveMinute": 0.06175571100053622,
"fifteenMinute": 0.04159128290271318,
"overall": 0.07659077758191643
},
"byteRates": {
"count": 78451,
"oneMinute": 62.49789862393008,
"fiveMinute": 84.92629746711795,
"fifteenMinute": 57.22987512826503,
"overall": 105.41446006900763
},
"startTime": "2022-05-17T18:56:08.161500Z",
"totalRuntime": 744041
}
}
Reporting on Ingest Stream progress using a Status Query
When creating an ingest query via a recipe, you can add a status query that runs continuously. For example, the status query below prints the information for each graph node, and a link to the visualization in the web UI.
statusQuery:
cypherQuery: MATCH (n) RETURN count(n)
Ingest Stream Blog Series
This is just the beginning. There’s lots more to cover. Over the next few weeks, we will cover the most common ingest streams in separate blog posts.
- Ingesting data from an Internet Source
- Ingesting Multiple Sources/CSV Data
- Ingesting Log Files
- Ingesting Data from Kafka
- Quine in a Data Pipeline
Try Adding Ingest Data To Quine Yourself
And if you want to try Quine yourself, you can download it here. And in addition to the Ethereum recipe, take a look at the Wikipedia Ingest and Apache Log Analytics recipes for different ingest stream examples.
If you have questions or want to check out the community, join Quine slack or visit our Github page.
Related posts
-
Stream Processing World Meets Streaming Graph at Current 2024
The thatDot team had a great time last week at Confluent’s big conference, Current 2024. We talked to a lot of folks about the power of Streaming Graph,…
-
Streaming Graph Get Started
It’s been said that graphs are everywhere. Graph-based data models provide a flexible and intuitive way to represent complex relationships and interconnectedness in data. They are particularly well-suited…
-
Streaming Graph for Real-Time Risk Analysis at Data Connect in Columbus 2024
After more than 25 years in the data management and analysis industry, I had a brand new experience. I attended a technical conference. No, that wasn’t the new…