The poor man's version of Materialize that I implemented is the following:
Step 1) Find all "paths" between tables
* Use the postgres information schema to get all relations
* Use npm library graph-cycles to see if there are any graph cycles. If so.. some relations go on a blacklist.
* Use npm library topopsort to sort the graph
* Traverse the graph and find all possible paths from and to tables
* Generate SQL queries to look up "affected" other tables. Input: table name + id Output: list of table names + ids
Step 2) Track changes in database using a transactional outbox
* Create outbox table fields: id, timestamp, event, table_name, table id, jsonb payload
* After running migrations have a script that ensures every table has triggers on insert,update,delete that would insert a row on each of these events in the outbox
Step 3) Compute ( You want to do reporting for a certain table ( target table ) which has relationships with other tables and for this table you want a "materialized" view )
* Have a script on a loop that takes all the outbox entries since last time processed
* Use the queries from step1 to find out which ids of "target table" are affected
* Then only materialize / compute those ids
* Store in Elasticsearch ( in our case )
This is not a solution if you are Facebook scale. But as a small SAAS company with not too many transactions this works brilliantly.
And with more tenants you can just scale this up by sharding the inbox.
Bonus points: have elasticsearch (ingest node) and a postgres replica on the same host as the "compute" script. So if you have a lot of queries calculating the "dependencies" you get better performance.
Kafka Connect can do all this for you if you configure it properly. You would use a postgres "source" connector called Debezium that tracks all changes via postgres replication. All row changes then flow in realtime to Kafka topics. Keeping the data updated in real time in elastic search is also another off-the-shelf Kafka Connector (a "sink" connector)
What you are describing is having the data in elasticsearch in the same format as the data in postgresql. Which is easy.
You could also for instance create a script and use a postgresql logical replication connection ( just like Debezium ) stream the changes into elasticsearch. Without having a full Kafka connect setup. And all the training an maintenance that comes with it.
What I am describing is, before storing the data in Elasticsearch computing the data in a materialized format. So it's more efficient for Elasticsearch to work with. And no longer having any needs for joins.
So instead of recomputing the materialized data every time in it's entirety you want to be more smart about it.
The root table of the materialized data depends on maybe 5 other tables. So if data in those 5 other tables change you need to know if they have a relationship with a row in the "root table". And then only re-materialize those rows.
Materialize does this by having it's own SQL language where you define your materialized view. Which compiles to clever algorithms and uses an execution engine to get to this result.
What I am doing is just having a lookup graph + queries to see what tables, id entries are invalidated. And I re-materialize using normal sql and some extra processing using a nodejs script to make it more optimal for Elasticsearch.
The biggest problem we've encountered with existing tools in the Kafka ecosystem (and the homegrown solutions that we've seen) is that nearly all of them sacrifice consistency. Debezium and most other Kafka Connect plugins will produce duplicate records upon restart, for example, that are very difficult to correctly deduplicate downstream. Things look right when you first turn on the plugin, but a week later when your Kafka Connect cluster restarts, a bit of incorrectness seeps in.
Materialize, by contrast, has been explicitly designed to preserve the consistency present in your upstream system. Our PostgreSQL source, for example, ensures that transactions committed to PostgreSQL appear atomically in Materialize, even when those transactions span multiple tables. See our "consistency guarantees" docs for some more information [0]. We have some additional features coming down the pipe, too, like allowing you to guarantee that your queries against Materialize reflect the latest data in your upstream sources [1].
Failures while communicating to the external systems (the kv store and elastic in your example) are usually where this falls down. It's easy to build a system that's consistent ~90% of the time, but if you want to build a system where things like failures during snapshot write or failures during export to elastic are handled properly it starts getting complex (you will need to find ways to recover and retract data, or build smarts into the consumer to query around aborts, or find a way to do a 2PC-esque dance with the external system a la Kafka's transaction support, etc.). Getting to full consistency isn't easy.
This has been my experience too. Instead of going the logical replication route I tend to leverage the transactional outbox to achieve consistency in the application layer instead.
So when I transact data into tables I immediately fetch the latest outbox id.
And then when query from Elasticsearch I first fetch what the last outbox id of the processed data is.
This way I know if the transaction was already processed into Elasticsearch or not. Repeat. Until outbox id of Elasticsearch is equal or higher than the outbox id of the mutation.
This way I don't have to use logical replication, no k/v store and I can just use a script that fetches and processes the latest outbox changes on a loop.
Looked in the source of Materialize and it looks like this is exactly what they are doing.
They are using Debezium + Kafka for receiving the WAL changes. And using send the processed WAL offsets back using a Kafka topic to Debezium + Postges.
This way they can achieve consistency
It's very hard for Kafka Connect plugins to maintain consistency in all scenarios - both because of the semantics of some upstream databases, and because of the guarantees the connect API itself offers. Hopefully KIP-618 will eliminate more of the edge cases though.
We do something similar, but in 2), instead of using the outbox pattern, we make use (in several different settings) of integers that are guaranteed to increment in commit order, then each consumer can track where their cursor is on the feed of changes. This requires some more care to get that sequence number generated in a safe way, but it means that publishers of changes don't need one outbox per consumer or similar.
Then you can have "processes" that query for new data in an input table, and update aggregates/derived tables from that simply by "select * ... where ChangeSequenceNumber > @MaxSequenceNumberFromPreviousExecution"...
The idea here implemented for Microsoft SQL for the OLTP case:
I've worked on specific instances of this problem at a few companies and its hard! The number of ways distributed systems can fail only serves to compound the complexity here as well. Each time I've built one of these systems its taken years to get it almost good enough to keep customers happy and to meet our performance goals. However, the resulting system were plagued by deep set correctness bugs that were the bi-products of early decisions made by engineers who didn't understand correctness.
It's really exciting to see a system getting built by people who know what they are doing and will be hopefully a more "correct" solution.
I've been following Materialize as their blog posts are a great source of inspiration when working on OctoSQL[0] (a CLI SQL dataflow engine), but was a bit surprised with how few data sources they were supporting (basically Kafka and Postgres based on their docs), but now that they're switching/pivoting to being a database themselves, this makes much more sense.
I also think the architecture is really cool. Cloud-native is the way to go for modern databases and will make adoption much easier than something you'd have to host on bare metal. One question though, does this mean the open-source version is basically deprecated now and further development is closed-source, or does the open-source project represent the "compute" part of the "next gen Materialize"?
The core components of Materialize have always been licensed under the BSL [0], with a conversion to the Apache v2.0 License four years from the date of publication. These components (now called the storage, compute, and adapter layers) continue to be developed in the open under the same license, but are no longer packaged or supported for on-premise deployment.
Timely Dataflow and Differential Dataflow [1] are the open source frameworks that have always been at the heart of the compute layer in Materialize.
If you're referencing the downloadable binary when you mentioned "the open-source version", we decided the opportunity cost of splitting our attention and continuing to release updated binary versions by "rebundling" Materialize is too high. We have a small team and ambitious plans for Materialize, (maybe some `WITH RECURSIVE`?) so we don't plan to release updates to the binary.
From a technical point of view Materialize seems absolutely fantastic - everything that the Firebase databases are trying to be, except with full SQL support (well, minimal SQL support - there's no RETURNING, ON CONFLICT or json - but probably good enough, we at least get JOINs and CTEs).
But unfortunately they only seem to be interested in enterprise customers: Not only is it not open source, but there's no open sign up and pricing for compute nodes isn't publicly available!
> well, minimal SQL support - there's no RETURNING, ON CONFLICT or json
We support both `INSERT ... RETURNING` [0] and the `jsonb` data type [1]. The only feature in your list that we're actually missing is UPSERT (i.e., `INSERT ... ON CONFLICT`). We have a tracking issue [2] if you're interested in following along.
> But unfortunately they only seem to be interested in enterprise customers: Not only is it not open source, but there's no open sign up and pricing for compute nodes isn't publicly available!
Rest assured: we're working towards open sign up! We're at the very beginning of our early access period.
The downside is they pivoted away from supporting running it yourself. The technology is certainly exciting but also changes the target demographic. I'm curious to see how it plays out
Such a shame, I had an ideal use case for a product I'm working on, but there's no way I'm shackling it to cloud vendors. Just not a good fit for this product.
I really hope materialize eventually inspires something like it in the open source world.
Please also take a look at https://github.com/risingwavelabs/risingwave if you are looking for advanced streaming databases. It is under Apache License and also support on-prem deployment (docker, kubernetes) with full function set of distributed clustering, compute-storage disaggregation, etc..
Let me know if anything.
I'm not sure that 100% of the bits you'd need to run it are available, and even if they are it's unsupported, which for many businesses is a non starter
Wow that's sad, I was really excited by the promise of this project. But going cloud native means I won't be able to use it in any of my projects any time soon.
I’m starting to play with differential-dataflow in a new Rust project. On the one hand, it’s cool. On the other, it could use better documentation.
With Materialize the database, it really depends on price/performance whether I could use it (could it be really cheap when idle, like Aurora Serverless?), but two things that would make it easier to use would be the ability to purchase it through AWS Marketplace and to deploy it/use it in my organization’s own AWS account. As an enterprise dev team senior manager I then do not have to go through a vendor approval process or deal with my procurement department, nor do I need to worry about third party data control.
A lot of folks are reacting to the fact that it seems like this new version won't be able to be self-hosted. And while I get why that's a turn-off to many orgs, there's a massive market for turnkey, managed data products like this. You can get really far without having to staff data or infra engineers. The way I see it, the next phase of "tech" company will deemphasize the amount of in-house engineering outside of their core areas of innovation. There will, of course, be many companies that remain engineering-driven, but I see that as less the assumed norm going forward.
Does this fix how much of an insane memory hog materialize is? Some queries are just impossible if you can’t use disk. This is why I was forced to stick with Flink. Even though materialize makes things appear stupid simple and easy with SQL, I found that you can only do the most simple streaming views with it. You can’t even do unique counts with this for very long without breaking—and there’s no probabilistic alternatives.
Bad for big data. Great for small and simple data sets. But who is using Kafka with small data?
Also they do not integrate at all with custom data types in Postgres IME. E.g. an enumeration in your table will mean materialize can’t read the table as a source. Lame.
As mentioned in the blog post, clusters allow horizontal scalability and daisy chaining, so you can allocate more memory for your views even if you run up against the limits of how much memory you can fit on a single machine. We've got plans in the works to support out-of-core execution, too.
> Also they do not integrate at all with custom data types in Postgres IME. E.g. an enumeration in your table will mean materialize can’t read the table as a source. Lame.
We're aware of this and are working on a fix. There are two tracking issues, if you'd like to follow along:
I use PG with an alternative materialized views implementation[0] that is pure PlPgSQL and that exposes real tables that can be used to write to in triggers, and where the views can be marked stale too.
This means hand-coding triggers to keep the materializations up to date, or else to mark them as out of date (because maybe some operations would be slow or hard to hand-code triggers for), but this works remarkably well.
As a bonus, I get an update history table that can be used to generate updates to external systems.
In principle one can get the AST for a VIEW's query from the PG catalog and use that generate triggers on the tables it queries to keep it up to date. In practice that's only trivial for some kinds of queries, and I've not written such a tool yet.
I've been following the Materialize project and Frank McSherry's work for a long time and seeing them go cloud native should really democratize the use cases for the mainstream. yes there are trade-offs associated with this but this will make it usable by the vast vast majority of the market.
Also I love the tagline "Consistency, Scalability, Low Latency: Pick Three"
It lets you create materialized views that are automatically updated in an incremental way as underlying data changes, without recomputing the whole query. Previously they supported running those queries on Postgres/Kafka, now they've added their own persistence layer and horizontal scalability.
I have worked at Materialize since 2019. The elevator pitch is that it is a database that lets you maintain the results of queries in memory (and now S3) and have them update in real time so the current result is always available.
Can you describe some use cases? The only one that comes to mind is powering dashboards.
That said, do you support persistent tables or time travel? (E.g., accessing the contents of the view as it was at time t). If not, how should a situation where multiple independent clients read the view be handled? If two people load the same dashboard but see differences based on when the table was read, that might cause confusion.
No, but I think the changes that need to be made in order to do so are tractable, so I expect we will eventually (I’m not an expert in this area of the code, so please don’t take this as an official statement).
Step 1) Find all "paths" between tables
* Use the postgres information schema to get all relations * Use npm library graph-cycles to see if there are any graph cycles. If so.. some relations go on a blacklist. * Use npm library topopsort to sort the graph * Traverse the graph and find all possible paths from and to tables * Generate SQL queries to look up "affected" other tables. Input: table name + id Output: list of table names + ids
Step 2) Track changes in database using a transactional outbox
* Create outbox table fields: id, timestamp, event, table_name, table id, jsonb payload * After running migrations have a script that ensures every table has triggers on insert,update,delete that would insert a row on each of these events in the outbox
Step 3) Compute ( You want to do reporting for a certain table ( target table ) which has relationships with other tables and for this table you want a "materialized" view )
* Have a script on a loop that takes all the outbox entries since last time processed * Use the queries from step1 to find out which ids of "target table" are affected * Then only materialize / compute those ids * Store in Elasticsearch ( in our case )
This is not a solution if you are Facebook scale. But as a small SAAS company with not too many transactions this works brilliantly. And with more tenants you can just scale this up by sharding the inbox.
Bonus points: have elasticsearch (ingest node) and a postgres replica on the same host as the "compute" script. So if you have a lot of queries calculating the "dependencies" you get better performance.
sorry for my terrible explanation