Saturday 31 July 2021

A view into state [backdated draft]

Note: This post is backdated to the date of the last draft (31 Jul 2021) as I changed my job and role and didn't want to bias / inform myself by that. It's an unfinished fragment of my thinking at that point in time that I just cleaned up a little and added references where necessary, but it's still rough and incomplete.

In principle, any stream processor could be used for materialized views
Kleppmann, 2017, p. 467

Martin Kleppmann recently gave a great keynote in which he summarized his ideas under a new framework of unordered vs. ordered events and mutable vs immutable state. In the accompanying paper he also gave an outlook on a new wave of tools that "abstract away" these consistency decisions in event-driven systems, in particular "Materialized Views" e.g. Materialize - a great overview on the different patterns is Jamie's article.

I've been a fan of this idea not only since the famous "database turned inside out" (1) quote but actually when I first worked with Spanner and saw firsthand how it turned into an SQL system which required a surprising amount of abstraction of internal (consistency) concepts. Most surprising was that fundamentally Spanner is a messaging system (in other words it is truly distributed and not built on replication paradigms), and could even be used as a queue by observing changes "inside out". In most databases those changes are row-based and often used (with CDC) as basis for event sourcing architectures, sadly mixing physical considerations with domain events. But I am interested in one level lower, of the changes within a domain event entity, and lineage, the reason why. This interest comes from Dataflow / Beam and its consistent view of temporal locality, something I've been keen on since agent-based simulation using Erlang's actor inbox (2). Beam doesn't use "wall clock" time but time in the sense of event-timeflow or an ordered sequence of events or the spacetime / promise-theory goal "to explain a process ... we need to be able to ... tell a story about its behaviours". I'm interested in exposing this state and making it legible and explainable.


Wouldn't it be great to have a database supporting per row state (consistency) metadata instead of state hidden in a stream processing system?



What I like would be temporal tables but causal consistency -ordered (e.g. Lamport or Vector Clocks), not only the data but the state of the row as metadata. The standard answer to this are Saga's in Microservice architecture. However usually Saga's are more seen like a variant of a two-phase commit, often in a strict workflow engine, a way to achieve consistency as in the proverbial shopping cart or payment example. How the internal state of the workflow of a saga is managed usually stays opaque or a UI gimmick like Lambda's Step Functions or Cloudflare Stateful Workers. Usually consistency here refers to a workflow, and the domain object is created as an output of that workflow.

A better example than sagas for what I mean is the LMAX Disruptor (which is also actor-inspired) or in general trading systems where one object (the transaction) needs to be enriched. For instance risk scores, party data or cost factors are loaded into the object to make it auditable, stateless and independently processable. This way the object can be sent to different engines embarrassingly parallel and "reassembled" by a Reactor (the reactive version of Balking or more general Teeing). In trading or bidding systems the reactor may also decide to throw away the object to save resources before all subprocesses return, e.g. load shedding if it's unlikely generate revenue or likely to be fraud (the object may be sent to a fast machine learning model in parallel). This "concurrent chain of responsibility" is also a common pattern in Go's channels where it's called the "Fan Out, Fan In" pattern.

But the use case for this are much wider, basically any reactive user interface, including APIs that hide state transitions - what's called the Gateway API, Aggregator or Gateway Aggregation pattern pattern. For instance to show a user profile page in the frontend dozens of services may need to be called - yet the profile itself should appear consistent to the user when shown (who doesn't hate when lists change while elements are being loaded for instance). Or an API might allow to update the same profile, yet behind the scenes change many services (which would be a saga). This is essentially the same problem as external serializability and CRDT's converging to it, which is Kleppmann's research area. At the same time, I want to avoid reinventing distributed object (CORBA...) and transaction managers, state transitions should not be enforced but by itself an optional, observable event. Most importantly, I want a trigger (not in the database sense) that independently moves the state based on rules ("all required fields have values now"), for instance sending a message to another service (like an actor).

Currently most solutions I've seen basically use key/value stores with some kind of idempotency key (RFC) queried with every transition and the moving forward in a lock-free read-modify-write instruction, for instance RocksDB has a merge operation. In particular Redis because they offer hashes and sets with field-level mutations, but they also actively participate in CRDT research and similarly to Spanner have messaging support. But these databases either hold state in memory or don't allow granular time travel - in particular they don't have a way to attach an external state to row metadata, e.g. "this row is consistent as of X" or "this row has all mandatory fields set" or "the uncommited read version of this row is 60% dirty" or "this row is archived".

The other solution is to rely on event sourcing (usually Kafka) and use materialization to create a view into state. Modern, fast query engines like Dremio (Apache Arrow is nice) in the Hadoop world, or the opposite, in-memory real-time Streaming databases like SingleStore MemSQL offer fast insights but only in small, optimized use cases - think materialized views in OLAP, either because the Stream processing (e.g. Spark SQL) is microbatched or in-memory not scaleable. Data warehouses like BigQuery, AWS Elastic Views and Impala added such functionality similar to Materialize which adds some consistency with a unified model by using timely dataflow (Naiad) instead of shuffling and thereby offers snapshot consistency. But it abstracts away time, again relying on CDC from sources (relying on the physical data model) and ignoring late events. Fundamentally that's still (in-memory) OLAP but at least with control without nightly ETL / ELT jobs. BigQuery had time travel and snapshots, dbt (which works with Materialize) can write a consistent Invocation ID and CosmosDB allows choosing consistency models, it's just a different way of hiding that ETL / ELT. Data Observability defines freshness SLOs for these kinds of approaches but (de)normalization, especially in microservice architectures / data meshes, is the tricky part because (incremental) recomputation becomes NP-complete. I guess that's why fast In-Memory Databases (IMDB) and In-Memory Data Grids (IMDG) like HANA don't even really have Streaming ingestion, instead they favour (distributed) transaction semantics where locks are explicit.

Another options are continuous query models, maybe with Aurora as the first one - not the AWS Aurora, when it comes to streams academia had a headstart. Basically the argument that the pipeline (with snapshots) is the view. Maybe that's why Kafka (or better Vectorized Redpanda) offers real-time querying over streams by using Kafka itself as database, which is why they call it ksqlDB, but still just uses shuffling on whatever data is available and doesn't really expose a relational model or time in the sense of a consistent snapshot. But it still doesn't explicitly expose time in the way Spanner does or with the flexibility of reasoning about late events like Beam (and Dataflow) allows you to. I guess another option would be to simply to expose time is use a timeseries database and aggregate in there, or the opposite, calculate upon every change, in Spreadsheet-style, or to change consistency guarantees for reads and timestamp them (like Spanner). But it's not time alone (and time itself can be a problem) rather the state of the value that's interesting.

I'd love to write such a database, and given Go has strong patterns I think Ryo Nakao's approach of a time-series database would be a great starting point.


1) Martin Kleppmann published the legendary "turning the database inside out" in March 2015, followed by his seminal posts on logs in May, in June Google released the Dataflow paper and in August Tyler widely published the core concepts of Dataflow which defined now standard terms like "unbounded" and various windowing strategies, and a year later Apache Beam has its first release. The respective books followed 2017 and 2018.

2) Reminds me of Software-Transactional Memory (STM), a failed experiment of the early 21st century. Some of the ideas of the Actor Model like chronological encapsulation to facilitate time travel and transactions and promise-based dataflow pipelines made it into concepts such as Goblins and Agent-based Modelling for simulations (fun fact OOP was invented for simulations).

No comments: