The text of the exercise is the following:
Imagine a stream of records arriving at a rate of one per second. For simplicity the records contain just one variable, a double. The goal is to generalise this variable to achieve a k-anonymity of 5 i.e., no generalised unique output value can appear fewer than 5 times.
The effectiveness of the solution can be measured by its impact on data distortion and latency, with the aim of minimising both:
The solution should implement the StreamingKFilter interface that accepts InputRecords and transforms them into generalised OutputRecords. Method calls to the StreamingKFilter are strictly single threaded; as such the solution is expected to be single-threaded only.
The code of the exercise and the tests are here.
Under /src/test you will find two test suites for the solution. In order to run them, instantiate your filter in the CandidateFilterFactory.getStreamingKFilter() method.
The project source code (solution and tests) can be built using Maven.
Additional info: K-Anonymity
A release of data is said to have the k-anonymity property if the information for each record contained in the release cannot be distinguished from at least k - 1 records whose information also appear in the release.
My solution
The StreamingKFilter
interface contains two methods:
void processNewRecord(InputRecord input);
Collection<OutputRecord> returnPublishableRecords();
I implemented the processNewRecord
as:
@Override
public void processNewRecord(InputRecord input) {
Integer firstElementTime, lastElementTime, elapsedTime;
this.inputBuffer.add(input);
// calculating elapsed time
firstElementTime = this.inputBuffer.get(0).getTime();
lastElementTime = this.inputBuffer.get(this.inputBuffer.size()-1).getTime();
elapsedTime = firstElementTime - lastElementTime;
// update the current time
this.currentTime = input.getTime();
// if enough records are received then process the block
if (this.inputBuffer.size() >= this.blockSize)
generateOutputRecords();
// if enough time is elapsed then process the block
if (elapsedTime >= this.maxWaitTime && this.inputBuffer.size() >= this.K) {
generateOutputRecords();
}
}
This function will trigger the generateOutputRecords
(explained later on) in two circumstances:
-
when there is enough input records received, controlled by the variable blockSize, in line 16.
-
when enough time has passed, controlled by the variable maxWaitTime (and the minimum number of records to obtain k-anonymity has been received), in line 20.
The input records are stored in the inputBuffer, in line 5.
The function returnPublishableRecords
is implemented as:
@Override
public Collection<OutputRecord> returnPublishableRecords() {
Collection<OutputRecord > outputs = this.outputBuffer;
this.outputBuffer = new ArrayList<OutputRecord >();
return outputs;
}
This function returns the records stored in the outputBuffer, filled by the function generateOutputRecords.
The main function generateOutputRecords
is implemented as:
private void generateOutputRecords() {
int size;
double distortion, anonymizedValue, distortion_min = Double.MAX_VALUE;
this.inputBuffer.sort(Comparator.comparing(InputRecord::getRawValue));
while (this.inputBuffer.isEmpty() == false) {
// initialize the variables used in the search
distortion_min = Double.MAX_VALUE;
size = this.K -1;
do {
// process 'size' number of records
size += 1;
// process 'size' number of objects, if the remaining are less than this.K then process them all
if (size > (this.inputBuffer.size() - this.K) )
size = this.inputBuffer.size();
// compute the distortion and the anonymized value on the records
anonymizedValue = getAnonValue (this.inputBuffer.subList(0, size));
distortion = getDistortion(this.inputBuffer.subList(0, size), anonymizedValue);
// if the distortion is decreasing when adding records then keep adding elements
// otherwise output them
if (distortion_min > distortion)
distortion_min = distortion;
else
break;
}
while(size < this.inputBuffer.size());
// add the records to the output buffer
addOutputRecords(this.inputBuffer.subList(0, size), anonymizedValue);
// continue to process the other records in the input buffer
this.inputBuffer = this.inputBuffer.subList(size, this.inputBuffer.size());
}
// Clear the input buffer
this.inputBuffer = new ArrayList<InputRecord>();
}
In line 5, the inputBuffer is sorted to place similar values together.
In line 12-33, the inputBuffer is chunked in pieces to minimize the distortions of each individual piece.
Each chunk includes at least K elements and one more element will included iteratively until the distortion doesn’t decrease anymore.
When the distortion stop decreasing then the chunk is removed from the input buffer and the output records are generated with the addOutputRecords function.
This process will continue until there are no more records in the inputBuffer.
Line 18-19 will make sure that even the last chunk has at least k elements to keep the k-anonymity property.
(In line 23, the distortion is computed with the same function used in the measures package.)
Trade-off Between Distortion and Latency
The variable blockSize can be used to control the distortion and latency.
-
Increasing the variable blockSize will decrease the distortion and increase latency.
-
Decreasing the variable blockSize will increase the distortion and decrease latency.
-
The variable maxWaitTime will top the latency, if necessary.
You can download my solution here.