Gluing the Sliced/Diced Metadata in a Distributed way
Problem Description
The Metadata Gluer is responsible of Glueing metadata of content/contents from multiple sources. The content metadata can arrive in many pieces and the metadata aggregator is responsible of glueing these pieces. For example lets say we have a content C1 in XML/Json, and its metadata arrives in many pieces C1P1(content1Piece1),C1P2 … over a discrete interval of time(not necessarily continious).For simplicity lets assume there is a queue for queung all these piece by piece updates ‘Cnpn’.And each of the piece can insert/update/delete different parts of the content XML/JSON.Note that the final metadata of content must be consistent. Also note that as and when pieces are glued, the current state of content is sent as an update for downstream components which process this update to the content.
Challenge
The challenge here is to build an active-active solution where there are multiple instances of Metadata Gluer running concurrently.This is extremely important since the data load can be too high specially in a multi tenant deployment and the granularity of the pieces are small and frequent.
Solution1: Active-Passive
Before we get into the active-active solution, lets try to understand the Active-Passive solution.There is one Metadata gluer instance which processes each pieces one after the other for all contents and glues each piece update one after the other and notifies the downstream components. If the active instance goes down there is a passive instance which can takeover from active and continue.
Pros and Cons Pros
Cons
That brings us to the active active solution which addresses all these concerns.
Solution2: Active-Active (Distributed Glueing)
There are multiple instances of metadata gluer processing the updates concurrently.As and when the updates arrive they are picked up by the free active instance and starts processing.
Just adding multiple instances will not solve the problem since there is a constraint that the updates(C1P1,C1P2,C1P3…C1Pn) of a particular content (C1) has to be processed in order and the final state of content (C1) has to be consistent. The above architecture will not address this constraint since there is a possibilitily that updates C1P1,C1P2…C1Pn can be adjacent in the queue and multiple gluer instances can take these updates simultaneously.As a result the final state of the content (C1) might be inconsistent
To address this challenge lets tweak the above architecture and make it flexible enough to address the above issue.Lets have a separate queue for each metadata gluer instance as shown below.
The queue creation and events forwarding to each queue is taken care by many message bus technologies like RMQ,KAFKA. So the queue management responsibility is offloaded to Message bus.
Here comes the Hash trick
Many distributed persistent technologies like cassandra,couchbase,mongo rely on hash based partitioning for data distribution across nodes.The same technique can be used here for load distribution across gluer instances.Each node is responsible for glueing the content for a range of hash values.
For instance in the above example lets say gluer1 has a hash range of 1-30,gluer2 has 31-60 and gluer3 has 61-99. Since all of them have a mirror of update events, each of the gluer filters the events based on the above hash range. Gluer1 only process contents 1-30 and rejects all events that are outside its range. Therefore the Gluer1 process the updates(C1P1,C1P2,C1P3…C1Pn) in order there by ensuring the consistentcy of the contents.
Note that the load is shared among the nodes yet ensuring the consistency.
Gluer1 Flow of events
Similarly all the other instances filters the events based on their hash range and process the events.
Lets work out the details.
Establishing communication among the Nodes
One of the key requirement of any distributed technology is to have some sort of communication among the nodes for knowing each other and also for exchanging some information among themselves to co-ordinate.There are many ways of achieving this.Example Gossip Protocol used in cassandra which is fairly extensive and complicated one which is not required in our case.One Simple way is by using a centralised persistance to which all the nodes have access to and each node updating its information in that persistance.
Adding a new Node to the cluster or When a node dies in the cluster
There are many approaches that can be used.One of the simple approach is to assign the nodes a sequence number as they are added as shown below.
Note the Gluer "instanceId" doesn't matter, it can be anything. But the sequence number matters.The sequence number can also indicate the nodes slot in the cluster.The new node can compute the sequence number by itself by refering the current state of cluster in the centralised persistence.
When the node dies in the middle of processing,all the resources that fall under the hash range of this node will not be processed untill the node comes back. When the node comes back or a new node is added[which is the same], while selecting the sequence number it should select the empty slot and resume processing the resources from that queue.
Queue Creation
Once the node determine its nodeID , It checks if the queue with name NodeId exists. If it exists it attachs to that queue and starts processing.If not it creates a new queue with name NodeID.
So far so good. But still it doesn’t fly. We need to figure out the following.
How can the nodes determine the hashrange themselves?
Assuming that the nodes have determine their IDs ,when the events arrive in the queue. they can determine if that event needs to be processed or not themselves by a simple math as given below.
Note in the previous discussion we had assigned contiious hash range (1-30,31-60). But the above method is a much better alternative which distributes the load well.
How can nodes update their hashrange when new nodes are added/deleted? or How do we scale based on load?
It might be required to add new nodes to cluster when there is load in the system.However the above method cannot scale up/down in accordance to the load.It is still an active-active solution but it doesn’t scale up/down.:(
Adding a new node to the cluster during high load is also carrying a subtle side effect. According to the Hashing approach, when a new node is added all the nodes recompute their hashrange. If you think for a minute and workout, we can clearly spot a problem. Since the hashrange has changed, all the nodes start dropping the resources that belong to the new node [i.e. the nodes start droping the resources which are not in their range which might have been in their range before]. As a result all the updates of resources falling under the new node are lost. A new node has to be added only when all the queues are empty. Clearly this is not auto scalable since we would like to add a new node when there is lot of load in the system and not when there is no load.[Note : Work in progress to figure out a way of auto scaling.]
How do we handle the failure of nodes?
The failure of nodes is still handled with the above approach.If a node fails, then all the updates of contents that fall in its hash range are not propogated.But when the node comes back/new node is added(which is the same) , takes the sequence number of the previously died node, and hence can start processing the events just like before.
Well the most important question…..
Isn’t this over engineering?
Well that is subjective. If something is engineered for the sake of it and it doesnt serve any purpose, then it is over engineering.But If something is done to solve a pertinent problem then coining it as over enginnering is incorrect.Also most of the approach taken above is very simple to serve the required purpose.The right questing to ask is “Is it required?”. That brings us to the following section.
Pros and Cons
Pros
Cons
We analyzed the pros and cons of Active/Passive and Active/Active approach of solving the gluing of content metadata.The Active/Passive does solve the problem in a simple way but it clearly doesn’t handle large data load.If there is a need to support multitenancy where the data load will be higher active/active approach is the way. But it comes with its own baggage. The decision clearly depends on the requirement.