| Introduction |
| ------------------------------------------------------------ |
| |
| It's a common scenario that actors interact with a system |
| simultaneously and separately from each other for |
| arbitrarily long periods of time. |
| |
| An interaction takes place in two isolated steps: |
| |
| 1. The current state of an entity or aggregate is read. |
| 2. At least one action is performed that changes the state |
| of a previously read entity or aggregate. |
| |
| If access is synchronized by a locking mechanism, actors |
| hinder each other. Without such a mechanism, operations can |
| lead to conflicts if the same entity or aggregate is |
| changed. This can be the reason for data loss and |
| inconsistency. |
| |
| coherence addresses this problem by storing state |
| transformations as immutable events in an event store. If |
| an event is appended, it can be determined whether it |
| causes a conflict and this can be resolved manually or |
| automatically. |
| |
| coherence follows the command-query separation (CQS) |
| principle. Events are read from the event store and |
| projected into separate read models. |
| |
| Events have a sequence number to preserve order. To trace |
| individual state changes an event source and timestamp is |
| stored. |
| |
| |
| Installation |
| ------------------------------------------------------------ |
| |
| This library can be installed from Clojars [1]. |
| |
| |
| Walkthrough |
| ------------------------------------------------------------ |
| |
| coherence comes with a JDBC implementation for storing and |
| retrieving events. A JDBC driver for a database compatible |
| with HoneySQL [2] is required. |
| |
| The example below initializes a SQLite database (which |
| requires the org.xerial/sqlite-jdbc driver). |
| |
| (use 'coherence.core) |
| (use 'coherence.jdbc.core) |
| |
| (def honeysql-opts {:dialect :ansi |
| :quoted-snake true}) |
| |
| (def table-names {:event :events |
| :action :actions |
| :effect :effects |
| :trigger :triggers}) |
| |
| (def store-opts {:sql honeysql-opts |
| :tables table-names}) |
| |
| (def ds {:dbtype "sqlite" :dbname "test.db"}) |
| |
| (def store (->Store ds store-opts)) |
| |
| (coherence.jdbc.core/init-schema store) |
| |
| |
| Actions |
| ------------------------------------------------------------ |
| |
| An actor has a reason for applying a patch that changes an |
| aggregate. Patches may associate values with specified keys |
| and dissociate keys from an aggregate. |
| |
| ;; Alice adds two books to the store |
| (rebase! store |
| 1 |
| [{:timestamp (java.time.Instant/now) |
| :source :bookstore |
| :action {:reason :create |
| :actor [:user "alice"] |
| :aggregate [:book "Neuromancer"] |
| :patch {:assoc {:author "William Gibson" |
| :genre "SciFi"}}}} |
| {:timestamp (java.time.Instant/now) |
| :source :bookstore |
| :action {:reason :create |
| :actor [:user "alice"] |
| :aggregate [:book "Count Zero"] |
| :patch {:assoc {:author "William Gibson" |
| :genre "Cyberpunk"}}}}]) |
| |
| ; => {:result :ok :events [{:seq-no 1 ...} {:seq-no 2 ...}]} |
| |
| An action cannot be appended to the event store if the |
| affected aggregate was changed in the meantime. The conflict |
| is returned and the caller has to resolve it. When appending |
| an action to the event store, a list of sequence numbers is |
| passed that should be treated as resolved. |
| |
| ;; Bob adds two books to the store |
| (rebase! store |
| 1 |
| [{:timestamp (java.time.Instant/now) |
| :source :bookstore |
| :action {:reason :create |
| :actor [:user "bob"] |
| :aggregate [:book "Mona Lisa Overdrive"] |
| :patch {:assoc {:author "William Gibson" |
| :genre "Cyberpunk"}}}} |
| {:timestamp (java.time.Instant/now) |
| :source :bookstore |
| :action {:reason :create |
| :actor [:user "bob"] |
| :aggregate [:book "Neuromancer"] |
| :patch {:assoc {:author "William Gibson" |
| :genre "Cyberpunk"}}}}]) |
| |
| ; => {:result :conflict |
| ; :events [{:seq-no 3 ...}] |
| ; :conflict {:ours {:seq-no 4 ...} |
| ; :theirs {:seq-no 1 ...}}} |
| |
| ;; Bob resolves the conflict |
| (rebase! store |
| 1 |
| [{:timestamp (java.time.Instant/now) |
| :source :bookstore |
| :action {:reason :create |
| :actor [:user "bob"] |
| :aggregate [:book "Mona Lisa Overdrive"] |
| :patch {:assoc {:author "William Gibson" |
| :genre "Cyberpunk"}}}} |
| {:timestamp (java.time.Instant/now) |
| :source :bookstore |
| :action {:reason :update |
| :actor [:user "bob"] |
| :aggregate [:book "Neuromancer"] |
| :patch {:assoc {:genre "Cyberpunk"}}}}] |
| :resolved [1]) |
| |
| ; => {:result :ok :events [{:seq-no 3 ...} {:seq-no 4 ...}]} |
| |
| |
| Effects & projection |
| ------------------------------------------------------------ |
| |
| Stored information may depend on external resources. |
| Sometimes data must be made unreadable after a certain point |
| in time due to data protection regulations, for example. In |
| such a case the payload may be stored in encrypted form. The |
| key required for decryption has an expiration date. |
| |
| In coherence *actions* can be linked to any number of |
| triggers, which in turn are triggered by an effect for a |
| reason. If, for instance, a decryption key expires an effect |
| event is appended to the event store. If an effect is read |
| in during projection, the actions of all affected aggregates |
| are replayed. |
| |
| ;; Alice stores two receipts with encrypted credit card numbers |
| (rebase! store |
| 2 |
| [{:timestamp (java.time.Instant/now) |
| :source :bookstore |
| :action {:reason :payment |
| :actor [:user "alice"] |
| :aggregate [:receipt 1] |
| :patch {:assoc {:no "avd2dlg3g93fm1osu" |
| :total 100}}} |
| :triggers [[:encryption-key 1]]} |
| {:timestamp (java.time.Instant/now) |
| :source :bookstore |
| :action {:reason :payment |
| :actor [:user "alice"] |
| :aggregate [:receipt 2] |
| :patch {:assoc {:no "kepf62djsdk8fw21" |
| :total 200}}} |
| :triggers [[:encryption-key 2]]}]) |
| |
| ; => {:result :ok :events [{:seq-no 5 ...} {:seq-no 6 ...}]} |
| |
| ;; the encryption key of the first receipt expires |
| (rebase! store |
| 6 |
| [{:timestamp (java.time.Instant/now) |
| :source :keystore |
| :effect {:reason :key-expired |
| :trigger [:encryption-key 1]}}]) |
| |
| ; => {:result :ok :events [{:seq-no 7 ...}]} |
| |
| ;; receipt 1 is replayed |
| (def xf (map (fn [{seq-no :seq-no {:keys [:aggregate]} :action}] |
| {:seq-no seq-no :aggregate aggregate}))) |
| |
| (stream-events xf conj [] store :offset 6) |
| |
| ; => [{:seq-no 5, :aggregate [:receipt 1]} |
| ; {:seq-no 6, :aggregate [:receipt 2]}] |
| |
| |
| References |
| ------------------------------------------------------------ |
| [1]: Clojars |
| [2]: HoneySQL |
| |
| |
| Links |
| ------------------------------------------------------------ |
| main.zip |
| GitHub |