Ads click aggregation — [Notes]
--
· Data / BOE
· Functional Requirement
· Challenges
· Query API Design
· Data Model
· Datastore?
· Two paths approach🔀
· High-Level Design
· Design Deep Dive
∘ Scaling our Kafka Queue components
∘ Scale the Aggregation Service
∘ Scale the Database
∘ Fault tolerance
∘ Data Integrity: Reconciliation Service
· Ways to serve aggregates
∘ Client-side events counting
∘ Counting service
· References
· Doubts
· TODO
Data / BOE
1 billion ad clicks in a day and 2 million total ads.
1 billion DAU
(Daily Active Users)1 billion ad clicks per day
, assuming each user clicks one ad daily.- Ad-Clicks QPS = 10⁹ events / 10⁵ seconds =
10,000 QPS
- Assuming Peak QPS is 5 times of average QPS,
Peak QPS is 50k QPS
Assuming a single ad click event has 0.1KB storage
, the daily storage requirement would be 0.1KB * 1⁰⁹ = 100 GB, monthly we would require 3TB of memory.
Functional Requirement
- Return the number of click events in the last M minutes for a particular advertisement.
- Return the top N ads clicked in the last M minutes.
Challenges
- The ads logging system has no guarantee of delivering the event message exactly once — There should be no duplicate events.
- There might be events that can arrive late than expected.
- Different parts of the system can be down, we need to consider system recovery.
- The SLA for the counting service should be less than 8ms’s P99 latency for scanning and filtering users’ raw events with 200k qps at peak time.
Query API Design
API-1: Aggregate the number of clicks of ad-id in the last M minutes
GET API: /v1/ads/{:ad_id}/aggregated_count
Query Params:
1. Start_Time: Default value can be current_time minus 1 minute
2. To_Time: Default value can be current_time
3. Filter_Identifier: This is an optional field, we can have a dedicated set of identifiers for e.g if filter=001 that means we can filter according to IP address.
Response: Return aggregated event count for a given ad_id
ad_id: The ad identifier
count: Number of times the particular ad was clicked.
API-2: Return Top N clicked Ad-Ids in the last M minutes.
GET API: /v1/ads/top_clicked_ads
Query Params:
1. Start_Time: Default value can be current_time minus 1 minute
2. To_Time: Default value can be current_time
3. N: Number of Top Ads to return, default value can be 10
4. Filter_Identifier: This is an optional field, we can have a dedicated set of identifiers for e.g if filter=001 that means we can filter according to IP address.
Response: Return aggregated event count for a given ad-id
ad_ids: List of the ad identifiers
Data Model
Now let’s visualize what are data model looks like, we will have more fields in a real-world scenario but now for learning purposes just considering a small data model.
Below is sample raw data columns we can consider as of now:
It’s also important to store data in an aggregated form for quicker API responses, assuming that the ad click events are aggregated every minute.
Now for filters, we would require to store the raw data in such a format that the filters are easily applied without a lot of computation every time we need to query.
We store in such a form that for every ad_id and click_minute we group it by filter_id and store the aggregated count.
Now we can have all the filter_config in a separate table.
Datastore?
it depends on a lot of factors —
- data is read heavy or write heavy, or both.
- Or whether our system has relational data,
- or if the data is growing continuously because scaling a relational database is tricky.
- if we have a lot of analytical queries(SUM, COUNT), in such cases OLAP databases will help us more.
We have 2 kinds of data, raw data, and aggregated data.
For Raw Data, as discussed in previous sections we have nearly 50K peak QPS which means our system is write-heavy. Also, we have a lot of analytical queries that will be the logic of our APIs. Our data is also continuously growing so we need a scalable distributed database.
In such cases, NoSQL Databases like Cassandra make a lot of sense because they are optimized for write-heavy systems and are easily scalable.
The only difference between raw data and aggregated data is that aggregated data should be stored in a database that can handle read-heavy systems since the dashboard would be queried very frequently. We can use Cassandra for aggregated data as well.
The only additional thing we would need is a migration service for raw data. Raw data will not be queried frequently and will be mostly used only for analytical purposes by data science or machine learning engineers. So moving raw data into cold storage after a certain period of time should be considered.
Two paths approach🔀
- Realtime counting or Fast path — may be inaccurate
- Batched counting or Slow Path — accurate
High-Level Design
We will be going over all these components shown in the diagram in detail, but before that need to get some important questions out of the way
Q1. Why we need two separate databases for raw data and aggregated data?
It’s always better to have raw data stored without any transformation because it can be treated as a backup. And just in case, if our streaming service has a bug we can always have a backfill service to update the data back.
Q2. Why we need the second queue? We can directly push data to aggregation database from Data Aggregation Service
Well, we need an exactly-once processing system. Any popular streaming service like Kafka Streams or Apache Flink Achieves Exactly Once processing using a two-phase commit protocol(atomic commit). More about it here.
Lets now go deep into the Aggregation Service:
The Map-Reduce Framework is a good choice to implement the aggregation service. It will have three kinds of nodes, Map/Aggregator/Reduce nodes.
Each node is responsible for one single task and sends the result to the further node.
Map Node:
- This node will take care of any kind of transformation if necessary and filter/re-route the ad_events based on ad_ids to a particular aggregator node.
- Filtering and re-routing generally is a hash function that will ensure that events with the same ad_ids reach the same aggregator node. If the same ad_ids get routed to different aggregator nodes we will not be able to ensure correctness.
Aggregator Node:
- An aggregator node can count ad-click events every minute in memory.
- It may need an in-memory heap for supporting use cases like top 100 ad click events.
Reduce Node:
- Reduce node reduces results from all the aggregator nodes into a single result form, which will then be written to another Kafka queue and eventually will reach the aggregator database.
The below figure gives a small example of what the map-reduce paradigm looks like.
One main thing that we need to discuss here is how the filtering criteria is supported using map-reduce.
In previous sections, we discussed that we will have pre-defined filter ids to represent every filter.
We will need to aggregate based on each filtering criteria that we have defined. Here the aggregated data will be the “fact data” and filters will be the “dimensional data”. Such a schema is called a star schema. One can understand more about star schema over here.
The problem with this approach is if we have a lot of filters we will end up defining a lot of aggregations. In such cases better to filter every request in API service and optimally serve each read-request using an OLAP database like Apache Pinot.
Design Deep Dive
Till now we have seen all the components acting in the system and have defined clear functionalities of each system.
Now we need to go a little bit deep on how exactly the system would be achieving those things, from here on the actual meat of the product starts.
Data Recalculation: What happens if there is a major bug in the aggregation service? or some event got dropped?
We will need a flow to backfill the data from the raw database to the aggregator service. In such cases, a separate service is recommended, because we would not want to impact the current real-time data stream. This will be a batched job, and would be run as and when required.
Just a couple of service addition from our previous HLD will do as shown below.
Which “time” to consider for our aggregation service? Event time or Processing Time
If we recollect from the previous section our aggregation service aggregates ad_click events every minute.
A quick definition of both
Event Time is whenever the event occurs on the client side, i.e. in our case when the user has clicked on any particular ad that time will be considered.
Processing Time is whenever the ad_click event is processed by our aggregation service.
There are pros and cons for both of them, let us understand each one of them in detail.
— One major flaw in processing time is what happens if an event arrives late, the server will never be able to detect that particular event is late, and we might have inaccurate results.
— At the same time if we use event time we have to deal with late-arriving events on our own which is not a trivial task, also the user might generate incorrect event times.
Keeping all this in mind since we need data accuracy we will go with event time.
Then how do we handle delayed events? By using watermarks. We define watermarks to track every event’s progress. This will help us basically in saying how much delay in the system is acceptable? Every aggregation occurs in a tumbling window fashion(there are other kinds of windows as well, for simplicity considering tumbling window. Read here more about windows), and by extending our window by let’s say 15 seconds all the events which will be delayed within 15 seconds can be included in the current window.
How much time should we have our watermark to be configured? It really depends on the use case, if the watermark is too small, data accuracy might take a hit but latency will be less, if we have a huge watermark then data accuracy will not be an issue but the system will have high latency.
We can always have a tiny bit of inaccuracy since we have a recalculation service to fix those accuracies.
There are a lot of details which is too verbose to cover here and will make us off track with the system design hence avoiding it.
How do we guarantee that every event is processed(No packet loss)? and How do we ensure every event is processed only once and there is not duplication in data(exactly once processing)
Since in our system we cannot afford data inaccuracy it is very important for us to answer the above questions in detail.
There are two scenarios where duplicated events can occur:
1. A client might send events multiple times, it might be a malicious user as well. Such cases must be handled by fraud risk control components.
2. Server outage. If our aggregation service fails just after consuming the message but before committing the message to the Kafka Producer, the Kafka producer will resend all the messages again which will lead to multiple processing.
One of the most popular and straightforward solutions is to record the offset in external storage like HDFS. Apache Flink calls such a feature as checkpointing. Read more about it here.
This approach also has issues and must be handled with care. See the following figure flow to understand better.
Now here the aggregator will read messages based on the last saved offset in HDFS. Since now we have consumed messages till offset 100 even if the aggregator fails we will re-start the aggregator from offset 100 only.
But the problem in the above flow is we are saving offset before pushing the result to downstream Kafka, what if after saving the offset our aggregator service fails? We will lose all the processed data and the new aggregator node will not reproccess it since the offset has been committed. Therefore we will need to save offset after we have pushed our result to the downstream Kafka stream as shown below.
Now lets come to one of the most important section, scalability
Scaling our Kafka Queue components
I am not going into details here about how to scale the Kafka queue, since I have already covered in detail how we can scale producers, consumers, and brokers in a Distributed Message Queue Setup.
I will just add some important details over here quickly
- Hashing the ad_id key to determine the Kafka Partition will be important since we would want the same ad_id to reach the same partition, and
- since we are mapping each consumer to a set of partitions, we would want all our ad_click events having the same ad_id to be mapped to the same consumer to avoid data inaccuracy.
- The number of partitions should be pre-created so that we do not have to add partitions later on which will change our message to partition mapping. Creating the required number of partitions beforehand is always a wise decision since changing partitions later on is complicated.(How to determine partition count beforehand?)
- Topic physical sharding: Creating one topic is definitely not enough, better to create topics according to different regions(US, Europe, etc.) or different devices(Mobile, Desktop, etc.)
Scale the Aggregation Service
Since our aggregation service is stateless we can easily horizontally scale our aggregation service. We can add and remove nodes as and when required.
How do we increase the throughput of our aggregation service?
Option 1: Allocate events with different ad_ids to different threads and process them in parallel.
Option 2: Use resource providers like Apache Hadoop Yarn to deploy the aggregation service, better to imagine it as a multi-processing system.
Option 2 is widely used in real-life systems.
Scale the Database
Cassandra natively supports horizontal scaling in a similar way as consistent hashing. Data is evenly distributed to every node with a defined and proper replication factor(fault tolerance). Each node saves the data, based on the hash value in the ring and also saves copies based on the virtual nodes.
If we add a new node to the system it automatically balances all the virtual nodes among the nodes, no resharding is required.
How do we handle hotspot issue?
- There could be some ads that can be very popular and events might be too huge for that particular ad.
- One way to handle such a problem is to allocate more aggregator nodes for popular ads. But we need to detect that there is a hotspot issue before scaling it.
- We will have to perform benchmarking tests beforehand and determine the number of events that can be handled by each aggregator node in a tumbling window.
- Let’s say the number is 300, if we receive more events than that, the aggregator node will request the Yarn resource manager to allocate more nodes for the operation and data will be calculated by multiple aggregator nodes. Once the calculation is done the result is written back to the original aggregation node.
There are more sophisticated techniques like Global Local Aggregation and Split Distinct Aggregation, which I am not covering here.
Fault tolerance
Now that we have seen scalability, it is also important to discuss what happens if each of these systems fails and how we are going to recover.
The only component we would need to discuss on fault tolerance is the aggregation service since other services are managed services and come with their own failure recovery. For Distributed Queue Fault tolerance you can refer to my previous article.
If our aggregation service fails, we can always recover the last status from our committed offset made on external storage like HDFS. Then we can rebuild the aggregated result again by consuming from the upstream Kafka broker from the committed offset. But this approach might be a bit time-consuming as we have to rebuild the whole result again, in such cases, it is better to save a snapshot also in HDFS alongside the offset. One can always recover the system from the latest offset committed and the latest snapshot. Apache Flink gives the ability to save snapshots right out of the box.
Data Integrity: Reconciliation Service
Every streaming service has a data reconciliation service to determine if there are any inconsistencies.
Here is what we can do to achieve data integrity, we can have a nightly batch job to determine if there are any differences between the aggregation database and our raw database. For higher accuracy requirements we can verify our data aggregation with a small window size, like 1 hour.
This is what our final design looks like after all the additions.
Final HLD
Thanks for reaching till here, please provide feedback in the comments. If you like my design please upvote. The intention is to get more people’s views on the design.
Ways to serve aggregates
Client-side events counting
Raw events served to clients and they have their own service/logic to calculate aggregates
- Ads servers reach out to the data store to fetch raw events.
- The business components run ad-hoc logic to compute counts.
Counting service
- The event data store has a counting layer to serve counts.
- The service exposes a set of generic counting APIs.
Doubts
- Count min sketch? and why have 60 copies? (as mentioned in Reference #1)
- Why — A count min sketch of size 1000 * 8? (as mentioned in Reference #1)
- why -min heap for top k? (as mentioned in Reference #1)