Databases – how they work under the hood? Key takeaways from brilliant book “Designing data intensive application” – to quickly recap core concepts.
DB engines classifications
- Type of load: OLTP (transaction processing) vs OLAP (data warehousing and analytics)
- Relational vs NoSQL, document vs columnar, graph vs triple-store (semantic facts storage)
- Even within NoSQL camp you can find wide column based db – descendants of Big Table (DynamoDB, Cassandra, HBase) vs purely key-value store like Riak, Redis.
- Classical RDBMS vs MPP (massive parallel processing engines) – usually relational as well but will benefit from parallel processing – Impala, Clickhouse, Presto
- Data access patterns: database\api call – rest or RPC\asynchronous message passing using message queue or actor framework
Schema on read vs schema on write, which usually involve topics related to forward and backward schema compatibility – i.e. they not really need to be matched completely.
How data are stored
There are two key approaches for data storage organisation:
- Based on Log-structured Merge Tree (LSM) engine. Generally, beneficial for range query and friendly for sequential writes that may offer high throughput. Its foundation is SStable (sorted string table) – log-structured storage segment of key-value pairs, where entries are sorted by keys, stored in memory (hence named memtable). Data is organised in some kind of balanced search tree (usually red-black or AVL) and from time to time it flushed to disk to actual SStable files, that later will be merged during compaction. Compaction – dedicated process that merge SStable by combining new values of similar keys (using last update win technic) and marking removed entries with tombstones. Various strategies available: time based vs size tiered vs levelled compaction (less disk space but big write amplification). Detailed overview of implementation LSM-based storage engine in Tarantool DB – here (in Russian language).
- In B-tree based data engines, segments have fixed size – usually page size – disk read and (over)write one page a time (even if just one field was changed) vs append only approach for LSM-tree. WAL – write ahead log (redo-log), append only, contains every operation on B-tree to protect from any failure during write operation. Using latches – lightweight locks – to deal with concurrent operations.
Generally B-tree fast for read, LSM-tree – fast for write, but, as usual, it depends.
Write amplification – on single write – multiple actual write operations under the hood in db engine.
Index can be either in a way of key->reference, where reference will be pointing out to the row stored in heap file (handy in presence of multiple secondary indexes – all of them refer to the same reference) or clustered index – where key directly pointed to row without additional level of lookups. Compromise between them – covered indexes – that contains not complete row but some columns. There are special kind of indexes for dedicated data i.e. for multidimensional indexes – R-tree.
De-facto classical approach for data warehouses (centralised data store from multiple sources):
- Star schema (dimensional modelling): fact table (event) vs dimension (SCD – slowly changed dimensions – who, where, when, etc).
- Snowflake ~ star but dimensions are sub-divided on sub-dimensions.
Another interesting point of view is system of records (fact, ground truth) usually normalised and derived data (transformed and enriched, non-normalised)
Normal forms – as corner stones of data organisation in RDBMS.
Fact tables are wide (100+ columns) but query usually require just a small subset of all columns. In case of document database the whole document will be read from disk to get just few attributes, OLTP is row oriented – i.e. read the whole row – 100+ columns – parse them, hence, columnar data storage beneficial for analytical kind of usage. Additional advantages are compression (the simplest approach would be to repeat just ids of unique values within column) and vectorised processing (applying predicates to several entries using single cpu instruction). If data within column – entries – would be sorted – retrieval also would be faster, but for second\third order will not help for cases when we need to return complete row. NOTE: Column family based db engine is not the same as column oriented (Cassandra\HBase)!
How data are spread across the nodes:
Replica is a node storing full copy of database, may be leader (master or primary) – accept writes and may accept reads, but usually offload read load to follower (slave), that accept change stream (replication log) from master and reads form clients. Failover – process of handling failure of leader. There are several common approach for organising db:
- single leader
- multi leader (examples are multiple datacenter\client with offline operations\collaborative editing). Common issue is how to resolve write conflicts.
Replication of changes between nodes are done through replication log, which can be either:
- statement based replication (caveats are operations with side effects – now(), rand(), sequence numbers, triggers)
- physical replication – coupled with stored engine internals ~ analysis of changes in write ahead log (WAL)
- logical (row-based) replication
Replication may be synchronous\asynchronous\semi-synchronous. Async operations weakening durability (data persistence guarantee) of writes but increase speed. Different topology – circular, all to all, star. Alternative is to rely on some erasure coding scheme allows lost data to be recovered with lower storage overhead than full replication.
For cases when data is too big for single node partition based data distribution is used – some nodes responsible for portion of data and dataset spread across nodes, no single node that keep everything. Each node may be leader for some partition and follower for others. Db engines quite enthusiastic about how to properly name partition:
- shard – elastic\mongo
- tablet – big table\snowflake
- vnode – cassandra
- region – HBase
- vbucket – couchbase
The most naive forms of partitioning are:
- key-range based when entries are routed to particular node based on routing settings defined by ranges of key (0-1000 1st node, 1001-2000 – another one) – key distribution most likely not be uniform
- hash based (DHT – Distributed Hash Table) where target node is determined by formula (hash_function(key) mod number_of_node = node_number) – rebalancing become nightmare.
If load is skewed – partitions that contains hot keys (responsible for disproportionately huge volume of records or requests) will experience high load and become hot spot during processing. Common example is time-based partitions.
Consistent hashing extends idea of using hash for routing requests by utilising number of token ranges (hash values) per node. Number of such ranges can be fixed or dynamic – if size exceed some limits, data will be split further and transfer to corresponding nodes (HBase, Mongo).
Secondary indexes in such settings become more complex, it can be either:
- by document – every node will contain indexes for data with local partitions only, need to do scatter\gather\rank during query
- by key (term) – dedicated index contains references to all partitions containing term, downside is that it have to be sharded as well
How requests are routed from client to partition that contains data:
- routing tier – often rely on Zookeeper (Kafka\HBase)
- any node may be contacted – in this case it become node coordinator that will forward request to node with data, knowledge about partitions are acquired through gossip protocol.
Nightmare of data consistency
In case two or more nodes that accept writes it is open question how to handle writes for the same key:
- Conflict-free replicated datatypes (two way merge functions)
- Mergeable persistent data structure (three way merge functions)
- Operational transformation
One of key approach for conflict resolution in multi-replica setup without leader – version vector. It contains version number per key, per replica – so it can distinguish whether two operations are concurrent or one operation causally depend on other. Lamport timestamps – used for total ordering – but you can’t say whether they really concurrent or causally dependent – advantage – more compact.
There are several tricks to improve data consistency:
- Read-repair – during regular read operation, do read from multiple nodes and update stale value
- Anti-entropy – background process that check for inconsistency between nodes
- Sloppy quorums. Quorum when for operations acknowledgement required number of votes from several nodes – usually absolute majority, sloppy quorum when not designated nodes accepting requests.
- Hinted handoff – loading writes into coordinator node in case target node is not responding, for increasing write availability
But even if we talk about old good single node database various issues may happen:
- dirty read – one transaction read uncommitted writes of another transactions
- dirty write – one transaction overwrite (uncommitted) data from another transaction
- non-repeatable reads – values for the same rows are different during the same transaction, some time called read skew. Reading old-data before commit may be harmful for backups and analytics.
- phantom reads – during single transaction new rows are added or removed for the same query put it another way – write in one transaction change result of read in another
Transaction – set of reads and\or writes – executed as single operation, if it succeeds – commit all changes, if it fails – abort\rollback. It is the key component for OLTP engines that offer ACID guarantee:
- Atomicity – is about handling halfway changes – transactional commits or rollback – all or nothing.
- Consistency – data invariant always true i.e. db will be in a good state before and after any transactions, depending on application may rely on db’s isolation and atomicity
- Isolation – handling simultaneous access to the same record – pretending that no one will concurrently accessing database (various forms are present). Note that it is different in comparison to single object write operations aka compare and set – so called “light weight” transactions in distributed systems.
- Durability – persisting state of committed transactions in case system failure
- Reading your own writes – read after write consistency (for example to cross device access)
- Monotonic reads – read from different replica (possible solutions is to read from the same)
- Consistent prefix reads – if a sequence of writes happens in certain order anyone reading those writes will see them in same order
Read committed – read operation will see only committed values. Default transaction isolation level for many db – postgres, oracle. Implemented by row level locks for writes + memorised values before and after transaction, return old values for any read ops before new value actually committed.
Snapshot isolation – each transactions read from consistent snapshot of the db – at the start of transaction, some times called repeatable read.
MVCC – multi-version concurrency control maintaining state of db in different point of time. Some times can be used as alternative to locking. Key principles – readers never block writers and wise versa, maintaining set of old committed versions for every transaction in progress. Updates usually implemented as delete + insert with transaction id for reference. Db index in this setup may refer to all versions of values or copy-on-write (B-tree may be used).
One of anomalies related to simultaneous writes – read-modify-write cycles – lost updates (incrementing counter or modifying a list). Possible workaround – atomic write but db usually doesn’t support it for all data types. Alternative is to rely on explicit lock by app code. Many db engine provide lost update detection that require transaction to be re-tried. For non transactional db atomic compare and set operation can be used to prevent lost updates. For replicated data – need conflict resolution technics, common is last write win – error prone, commutative atomic operations – good.
For write skew – when the same objects are red by multiple transactions and some of them are updated by different transactions – best option is to rely on serialisable isolation level – may be actually executed in parallel but result are the same as serial execution. Examples where it actually important: games – move different figure to the same cell or booking system, that prevent double booking – or double spending from account. Common pattern for write skew is: select + logic + write.
- rely on serial order (as in Redis), sometimes through procedures.
- partition per cpu core => cross partition coordination very slow.
- data fit in memory + anti-caching (abort transaction load in memory retry)
- Two phase locking (2PL) – readers and writers block each other (opposite to snapshot isolation), shared and exclusive locks. Predicate locks for the whole object doesn’t present within modern db. Index-range locking – approximation of predicate locks.
- optimistic concurrency control techniques – serialisable snapshot isolation. try to detect conflict and only if they present abort transactions. One of approach is to materialise conflict – create rows before hand and lock them.
For distributed databases in order to tackle atomic commits and offer consistency guarantee for situations like consistent snapshot or foreign key reference across partitions (when transaction fails on some nodes but succeeded on other) commonly used two-phase commit protocol (2PC) :
- save data at nodes
- send prepare requests to participating nodes
- send commit if all participant on1st stage responded with yes
It involve coordinator or transaction manager – if it fails before making prepare\commit(?) requests not really clear what involved nodes have to do. XA – extended architecture – standard for implementing 2PC across heterogeneous technologies ~ C api for coordinating with transaction coordinator
Total order broadcast (atomic broadcast) – reliable delivery of messages (if msg reach one node => it will be delivered to all) + totally ordered delivery (msgs delivered to nodes in the same order) – even if a node(s) or network are faulty. ~ asynchronous append log
Transaction isolation levels is a safety guarantees, and are not the same as distributed consistency (coordinating state of replicas in the face of delays and faults) – it is not set of operations within transaction, but status of individual object replicated among several nodes. Linearizability – recency guarantee – so called atomic|strong|immediate consistency it can be treated as impression(!) of single copy of data with atomic operations. Even strict quorum can not guarantee it. Read repair can help for read and write, but not for compare-and-set\increment-and-get as it requires consensus among the data nodes.
Fundamental characteristics of consensus:
- epoch sequence number with every request to filter stale data
- choosing leader
- vote on leader’s proposals.
Key difference in comparison with 2PC – leader are not elected and we do not need all nodes to agree – majority of votes will be sufficient. Another feature of good consensus algo – ability to add new nodes – dynamic membership extension. Consensus services for writes – Zookeaper, etcd. Consensus algorithms: paxos, raft, zab.
Alternatives for ACID
BASE (Basically Available, Soft State, Eventual Consistency) – favour availability over consistency, weakening of acid
Another common concept is a CAP theorem – system is either consistent or available when (network) partitioned (network failure between nodes). Consistency – in reality mean linearizability – update object by key in distributed system would be atomic – i.e. all reads after writes will return new value, partition tolerance – actually mean network partition i.e. network instability between nodes\RACs\Datacenters.
Map-reduce cluster can be thought through analogy of multiple unix nodes with simple command line tools: awk, sed, grep, sort, uniq, xargs. They usually do not modify input – i.e. doesn’t have any side effects, except producing output to distributed filesystem. Mapper – some function that would be called once for every record – can generate many output which will be sorted by engine. Reducer take all records for the same key and iterate over collections of values for each key. File at hdfs considered as separate partition that can be processed by independent mapper task.
Idea is to run compute near data: mapper code (usually just jar files) copied to nodes with data. Partitioning by key we guarantee that all values for key end up with the same reducer. Reducer connect to mapper – download result of map stage – sorted key-values for partition – this is called shuffle. Success is when all mappers and reducers finish, i.e. time of execution is depended on slowest reducers. Provide all-or-nothing guarantee – output from failed task discarded. It is not as MPP where focus is on parallel execution of analytical query on cluster of machines. Map-reduce and distributed filesystems can be thought as some generic Os that can run arbitrary programs, for example: query engine – Hive, oltp – HBase, mpp – Impala.
Dataflow engine (Spark, Tez, Flink) can be use to run workflow from Hive or Pig. DAG – directed acyclic graph – represent flow of data between operators. For graph processing – Pregel model – (Bulk synchronous parallel model of computation) – vertex have state and at every map call get message(s) (fault tolerant and durable) from other vertexes (for example along the edge) .
- Map-side joins. Hive require specifying hot keys explicitly and use map-side join
- partitioned hash join (bucketed map joins) – both join input have the same number of partitions – hash join can be applied to each partition
- map-side merge join – when both dataframe not only partitioned but also sorted based on the same key
- Reducer side joins:
- skewed join or sharded join – hot keys to multiple reducer (replicate other join input to multiple reducers).
- broadcast hash join – copy small tables to all reducers
Hive metastore contains info about encoding format, name of directories where data stored, number of partitions and the keys by which it is partitioned and sorted. Sort merge join – may use secondary sort (user-birth->[activity list])
Event stream: producer (publisher\sender) and consumer (subscribers\recipient). Events are grouped in topics or streams. Publish\subscribe model – when messaging system is used to notify consumers about new events. If producers faster than consumers: dropping\backpressure\buffering. Direct messaging vs message brokers(queues). Multiple consumers can be used for load balancing – i.e. one msg for some consumer vs fan out when one msg delivered to all. It may be important to wait for acknowledgements from consumer before msg deletion from the queue. Msg reordering may happening due to network retries.
Jms\AMQP based (RabbitMQ) vs log-based (Kafka) brokers. Topic – group of partitions ~ files (may be at different machines). Offset – sequence number within partition – msgs within partition are totally ordered. No ordering guarantee across different partitions. Consumer group have offsets per partitions to mark what they read so far. Scalability is achieved by assigning the whole partition for consumer node i.e. at most we will have number of consumers equal to number of partitions. Slow msg processing = hold all msg in partitions.
Changelog or change data capture (CDC) – observing all changes to db and extracting them in form they can be replicated to other systems. Event sourcing – similar concept as cdc but not at the level of db operations – instead it will be app specific events. Difference is history of modification vs current state: cdc – complete row for key, event sourcing – later event not override prior, doesn’t contains mechanism of state update. Command -> validation(?) -> event~fact.
Command query responsibility segregation (CQRS) – separating form in which data is written from how it will be read ~ different read view.
Complex event processing (CEP) – query and data roles reversal – store query to find patterns but data are transient. Example is percolator feature of elasticsearch. Stream analytics use probabilistic algos – bloom filters for set membership, hyperloglog – for cardinality estimation.
Within stream processing topic of estimation time of event become not so straightforward: time of action + time of send to server + time of process by server.
Various time-based windows are possible:
- tumbling – for example fall in wall clock 1-min interval
- hopping – aggregated overlapping tumbling
Joins at stream: require some state. If ordering for events is not determined, join become nondeterministic – slowly changing dimensions (SCD) – addressing it by adding ids for particular version of join – downside breaking log compaction.
- stream-table – cdc to sync point-in-time cache
- table-table ~ mat view maintenance
Microbatch and checkpointing for fulfil exactly once semantic IF no side effect: idempotence (run many times have the same effect as once, i.e. set value vs incrementing) or distributed transactions.
Control of load:
- client side backpressure or flow control
- exponential backoff to fight with overload
- phi accrual failure detection (akka, cassandra), tcp retransmission – measure response time and their variability(jitter) adjust timeouts according to observed response time distribution
- quality of service (QoS – prioritisation and scheduling packets) + admission control (rate limiting senders)
Byzantine fault tolerance
When node try to trick system – byzantine fault, reaching consensus in this environment – byzantine general problem – node malfunction and not obey the protocol. Partially synchronous model with crash recovery faults – common model for distributed systems. Safety (nothing bad) and liveness (eventually good happens). Limping but not dead nodes. Fencing token – auto-incrementing sequence during lock acquisition – preventing inadvertently acting nodes – work in environment where nodes are honest.