-
Notifications
You must be signed in to change notification settings - Fork 339
SPL practice:solve space time collision problem of trillion scale calculations
Dataset A contains the time and space information of n source objects A1, …, An, and each piece of information includes three attributes: ID (iA), location (lA) and time (tA). It can be assumed that the same Ai will not appear twice in A at the same time, or in other words, no two pieces of information have the same iA and tA. Dataset B, which has the same structure as A, contains m target objects B1, ..., Bm (with the similar attributes iB, lB, tB) that are to be confirmed whether they collided with A. Likewise, it can be assumed that Bi will not appear twice in B at the same time.
This article involves many set-oriented operations. Instead of using the term “record” to refer to information of data set, we use the set-related term “member”.
Group the dataset A by iA to get n subsets, and still name these subsets A1..., An. And correspondingly, split the dataset B into m subsets B1…, Bm. If ‘a’ belongs to the subset Ai and ‘b’ belongs to the subset Bj, and a.lA=b.lB and |a.tA-b.tB|<=1 minute (this time length can be changed), then we consider that ‘a’ collides with ‘b’, and that object Ai and object Bj collided once.
Rule 1: The number of collisions of each Ai member is counted as once at most, which means that if a collides with b1 and b2, we consider that only one collision occurs between Ai and Bj.
Rule 2: The member of Bj that once collided will no longer be identified as having had a collision. For example, if b collides with both a1 and a2, and assume a1.tA<a2.tA, then only the collision between a1 and b is identified as collision, and the collision between a2 and b is no longer identified as collision.
Objective: find the top 10 objects Bj with the highest similarity for each Ai.
The formula for calculating the similarity ‘r’ is: r(Ai, Bj)=E/U. where, the molecule E refers to the total number of collisions between Ai and Bj calculated based on the above rules;
The denominator U refers to the total number of members of Ai and Bj after deduplication, which can be calculated using |Ai|+|Bj|-E’, where E’ refers to the number of Bj members that collide with a certain Ai member.
Dataset A
Field name | Field meaning | Data type | Sample data |
---|---|---|---|
iA | ID | String | A0001 |
lA | Location | Long | 10001 |
tA | Time | Datetime | 2023-08-28 10:00:00 |
Dataset B
Field name | Field meaning | Data type | Sample data |
---|---|---|---|
iB | ID | String | B0001 |
lB | Location | Long | 10001 |
tB | Time | Datetime | 2023-08-28 10:00:00 |
tA and tB are accurate to the second, and the time span is 30 days.
The total number of records of Dataset A is 21 million rows, with daily addition of approximately 700,000 rows.
The total number of records of Dataset B is 15 million rows, with daily addition of approximately 500,000 rows.
The scale of n (the number of Ai) is 2.5 million, and that of m (the number of Bj) is 1.5 million.
The number of locations is 10, which means the possibility of values of lA and lB.
We hope to obtain the result within 15 minutes on a 40C128G server.
The amount of data is not large and the data can be fully loaded into in-memory database. However, the calculation process is complicated. Since it is difficult to work out in SQL alone, external program (Java or Python) is needed. As a result, the overall performance is very low, and it took more than two hours to perform this task.
n is 2.5 million and m is 1.5 million. If we calculate the similarity of each pair of Ai and Bj according to the above-mentioned definition, we have to calculate 2.5 million * 1.5 million = 3.75 trillion pairs. Even if each CPU can calculate the similarity of one pair of members in only one microsecond (in fact, such complex set-oriented calculations cannot be worked out quickly), it would take several hours on the current multi-CPU environment. Obviously, this hard traversal method is not feasible.
For a pair of Ai and Bj, according to the similarity calculation formula, we know that if there is no collision between the members of Ai and Bj, then E=0, and the similarity is also 0, and hence there is no need to perform the subsequent TopN calculation.
Assume that the data in A are evenly distributed and the average number of members in each Ai is less than 10 (21 million/2.5 million), and that the Bj that collides with Ai must satisfy the condition |tA-tB|<=1 minute, if the data in B are evenly distributed, then there are approximately 350 (500,000/1440) members per minute on average. The 10 members of Ai will collide with a maximum of 350210=7000 B members (between one minute before and after each A member). The average number of members of Bj is also 10 (15 million/1.5 million), and the average number of B members in Bj is only 7000/10=700 after distributing 7000 B members into Bj. In other words, there are only 700 Bj that have the similarity not equal to 0 with Ai on average, which is much smaller than the total number of Bj (1.5 million, a difference of over 2,000 times). Considering the condition lA=lB, if all objects are also evenly distributed (which is unlikely), then the average number of Bj that have the similarity not equal to 0 with Ai can be further reduced by 10 times (10 locations).
Based on the above information, we design the following algorithms:
1. For each Ai, find the set of members of B that may collide with each member of Ai based on the time and location conditions, and denote the set as B’:
B’=Ai.conj(B.select(tB>=tA-60 && tB<=tA+60 && lA==lB))
Note that during the calculation of B’, the corresponding B members will be filtered for each Ai member, so the members of both Ai and B may appear repeatedly in B’.
2. For each Ai, we need to filter B with tB to obtain B’. If B is sorted by tB in advance, the binary search can be used to speed up. Moreover, we need to add the tA attribute of Ai to facilitate subsequent calculations (since lA and lB are always the same and iA is a fixed value for Ai, there is no need to add these two attributes). The calculation of B’ can be changed as follows:
B’=Ai.conj(B.select@b(tB>=tA-60 && tB<=tA+60).select(lA==lB).derive(Ai.tA))
3. Group B’ by iB to get a group of B’j. Since all members of Bj that may collide with the member of Ai are in B’j, the number of collisions between Ai and B’j is equal to that between Ai and Bj.
If the same ‘b’ appears in B’j, it indicates that more than one ‘a’ (Ai member) collide with b. According to rule 2 that only the ‘a’ with the minimum tA should be retained, we can use the following equation
A’j=B’j.group(tB).(~.minp(tA))
to determine the set composed of member pairs consisting of Ai members and B’j members that have collided (it is also the set of the member pairs consisting of Ai members and Bj members that have collided). In this equation, the member of Ai is represented as the field tA, and the member of Bj is represented as the field tB.
4. If A is sorted by iA and tA in advance, and the members of Ai after grouping are also in order by tA, then likewise, the members of B’ will be ordered by tA, and the members of B'j will also in order by tA. In this way, the member with the minimum tA in each grouped subset of B'j.group(tB) will definitely be the first member. Therefore, the calculation of A’j can be simplified as:
A’j=B’j.group@1(tB)
5. According to rule 1 that the number of collisions of each Ai member is counted as once at most, and based on the assumption mentioned at the beginning of this article that the same Ai will not appear twice in A at the same time, we just need to deduplicate tA, that is, the numerator can be calculated as follows:
E=A’.icount(tA)
6. In the formula U=|Ai|+|Bj|-E’ for calculating the denominator,
A’j is the set composed of member pairs consisting of A members and B members, and these member pairs have been identified as having had a collision. Moreover, since the Bj members are already deduplicated (group@1(tB) means taking only one member from each grouped subset), the member of Bj will not appear repeatedly in A’j. Therefore, the members of A’j can correspond to the collided members of Bj one to one, and the equation E’=|A’j| holds.
7. |Ai| and |Bj| can be calculated and saved in advance by grouping A and B by iA and iB respectively. Since the amount of data is not large, the results can all be stored in memory.
Finally, according to the formula E/U, we can get the similarity ‘r’ between Ai and Bj. What remains is the common task of calculating TopN for the similarity results.
8. In the step 1 above, to search the full data of dataset B for each a, there is still a certain amount of computation with binary search (2*log 15 million means about 50 comparisons). Considering both the number of minutes and the number of locations are not large (30-day time span, 1440 minutes per day and 10 locations mean only around 400,000), which can be fully held in memory, we can use aligned sequence to directly locate.
When computing, let
G=B.align@a(30*1440,tB\60+1).(~.align@a(10,lB))
Because there are two dimensions: time and location, we also use a two-layer aligned sequence. First, divide B into 43200 (30*1440) groups by the minute sequence number tB\60+1, and then divide the sub-group members into 10 groups by the location sequence number lB. For the tA of a certain member in Ai, we can use:
G’=G.m(tA\60)(lA) | G(tA\60+1)(lA) | G.m(tA\60+2)(lA)
to quickly and roughly screen out a small superset of B’. The time difference between tB and tA of the member in the superset is no more than 2 minutes (the difference between the minute sequence numbers at which tA and tB are located is not greater than 1), thereby filtering out a large number of B members that are impossible to collide on the time dimension. In the small superset, since there may be a small number of members whose difference between tB and tA is greater than 1 minute, we need to use:
G’.select@b(tB>=tA-60 && tB<=tA+60)
to screen out an exact B’, which can significantly reduce the computing amount.
9. In addition, when calculating U, we need to search for the corresponding |Ai| by iA. If iA is continuous integer, we can also find it directly by location to avoid search action, that is:
nA=A.groups@n(iA;count(1)).(#2)
Now, |Ai|=nA(iA). B can be processed in the same way.
We directly prepare the data that are already converted to sequence number. Assume that the time span is 30 days and the enumeration number of locations is 10, the simulated data script is as follows:
A | |
---|---|
1 | =K=30,nA=K*700000,nB=K*500000,t=K*86400,LN=10,rand@s(1) |
2 | =file("A.ctx").create@yp(#iA,tA,lA).append(nA.new(int(rand(2500000)+1):iA,int(rand(t)):tA,int(rand(LN)+1):lA).sort(iA,tA)) |
3 | =file("B.ctx").create@y(iB,tB,lB).append(nB.new(int(rand(1500000)+1):iB, int(rand(t)):tB, int(rand(LN)+1):lB).sort(tB,iB)) |
In A1, K represents the number of days, nA represents the total data amount of dataset A, nB represents the total data amount of dataset B, t represents the number of seconds of 30 days, and LN represents the enumeration number of locations.
In A2 and A3, the composite tables A.ctx and B.ctx are created respectively, and the data of randomly generated data sets A and B are exported to the two composite tables respectively.
tA (tB) refers to the number of seconds elapsed from the starting time point. For example, if the starting time point is 2023-08-23 00:00:00, then the value corresponding to the time point 2023-08-23 00:01:01 is 61.
In A2, the @p option is used to create the composite table, indicating that the first field ‘iA’ is used as the segmentation key. During parallel computing, the composite table needs to be segmented. Since the records of the same ‘iA’ cannot be assigned to two segments, we use the @p option to ensure this during the segmentation of composite table.
Special attention should be paid to different sort orders when saving A and B. A is sorted by iA and tA (the step 4 of ‘Problem analysis’), while B is sorted by tB and iB (the step 2 of ‘Problem analysis’). In this way, we can read the ordered data directly in subsequent operations.
A | |
---|---|
1 | =K=30,LN=10 |
2 | =A=file("A.ctx").open().cursor() |
3 | =B=file("B.ctx").open().import() |
4 | =nA=A.groups@n(iA;count(1)).(#2) |
5 | =nB=B.groups@n(iB;count(1)).(#2) |
6 | =G=B.align@a(K*1440,tB\60+1).(~.align@a(LN,lB)) |
7 | =A=file("A.ctx").open().cursor() |
8 | =A7.group(iA).(~.news((G.m(tA\60)(lA)|gG(tA\60+1)(lA)|G.m(tA\60+2)(lA)).select@b(tB>=tA-60 && tB<=tA+60);iA,iB,tA,tB)) |
9 | =A8.(~.group@1(iB,tB).group(iB)) |
10 | =A9.conj(~.new(~.iA,~.iB,~.icount(tA)/(nB(iB)+nA(iA)-~.len()):r).top(-10;r)) |
11 | =file("result.csv").export@ct(A10) |
A1: K=30 represents the number of days of the time span to be counted; LN=10 represents the enumeration number of locations;
A4, A5: correspond to the step 7 of ‘Problem analysis’. Group the members by the ID of datasets A and B respectively, and count the number of members of each Ai and Bj, and store the result as sequence;
A6: correspond to the step 8 of ‘Problem analysis’. Align and group the members of dataset B by minute, and then align and group the sub-group members by location to calculate the aligned sequence mentioned above;
A8: correspond to the steps 1, 2 and 8 of ‘Problem analysis’. Divide the dataset A into several groups of Ai by iA, and loop through each group of Ai to obtain the corresponding B’; here we use ‘news’ instead of ‘conj’, which eliminates the derive action, and can obtain the same result. In addition, we also add iA to facilitate subsequent search for |Ai|;
A9: correspond to the steps 3 and 4 of ‘Problem analysis’. The method of calculating A’j:
B’.group(iB).group@1(tB)
can be simplified as:
B’.group@1(iB,tB)
A10: correspond to the steps 6 and 7 of ‘Problem analysis’. The result of deduplicating and counting tA by Aj’ is the molecule E, which is equivalent to the step 5 of ‘Problem analysis’. By adding the previously calculated results |Ai| and |Bi| and then subtracting the number of records in the current group (i.e. E’), we can get the denominator U. Finally, calculate the top 10 records based on the similarity result ‘r’.
Convert the ID, location and time to sequence number (the sequence-numberization of time is to calculate the number of seconds from the start time to the current time). The data structure after conversion is as follows:
Dataset A
Field name | Field meaning | Data type | Sample data |
---|---|---|---|
iA | ID | Int | 2048986 |
lA | Location | Int | 8 |
tA | Time | Int | 628588 |
Dataset B
Field name | Field meaning | Data type | Sample data |
---|---|---|---|
iB | ID | Int | 1050621 |
lB | Location | Int | 8 |
tB | Time | Int | 1802419 |
The creation of data with the above code is based on the premise that the members of all fields are already converted to the above data structure. Therefore, in practice, we need to perform data conversion and organization first, and then restore the data after calculation. For details, refer to the method described in SPL Practice: integerization during data dump. Since the said method is not the focus of this article, we won’t describe it again here.
When the total time span is 30 days (the data volume of data set A is 21 million rows, and that of data set B is 15 million rows), computing in SPL on a single machine (8C64G) takes 161 seconds including exporting all results to CSV file.
In fact, achieving this performance requires using a small number of column-wise computing options of SPL Enterprise Edition. Since the use of such options doesn’t involve principle analysis, we do not describe it in detail in this article.
This article discusses a typical object counting problem, which generally has the following characteristics:
1. Count the number of objects that satisfy a certain condition.
2. The number of objects is very large, but the amount of data involved in each object is not large.
3. The condition is very complex, usually also related to the order, and requires some steps to determine.
Normally, solving such problem needs to sort the data by object. However, since the amount of data involved in this task is very small, the optimization of storage becomes unimportant. The key to solving this task is to provide powerful set-oriented computing ability, especially the ability to compute the ordered set. For example, the data type should be able to support the set of sets so that the grouped subsets can be retained without having to aggregate like SQL. Moreover, the two-layer aligned sequence should be supported, allowing us to access the members of set by location and, the ordered grouping functionality should be provided.
SPL Resource: SPL Official Website | SPL Blog | Download esProc SPL | SPL Source Code