K-Anonymisation on Streaming Data

The text of the exercise is the following: Imagine a stream of records arriving at a rate of one per second. For simplicity the records contain just one variable, a double. The goal is to generalise this variable to achieve a k-anonymity of 5 i.e., no generalised unique output value can appear fewer than 5 times. The effectiveness of the solution can be measured by its impact on data distortion and latency, with the aim of minimising both: Distortion is a measure of how close the generalised values are to the original ones. Latency is a measure of delay between ingesting the input record and publishing the generalised value. The solution should implement the StreamingKFilter interface that accepts InputRecords and transforms them into generalised OutputRecords. Method calls to the StreamingKFilter are strictly single threaded; as such the solution is expected to be single-threaded only. The code of the exercise and the tests are here. Under /src/test you will find two test suites for the solution. In order to run them, instantiate your filter in the​ CandidateFilterFactory.getStreamingKFilter()​ method. The project source code (solution and tests) can be built using Maven. Additional info: K-Anonymity A release of data is said to have the k-anonymity property if the information for each record contained in the release cannot be distinguished from at least k - 1 records whose information also appear in the release. My solution The StreamingKFilter interface contains two methods: void processNewRecord(InputRecord input); Collection<OutputRecord> returnPublishableRecords(); I implemented the processNewRecord as: @Override public void processNewRecord(InputRecord input) { Integer firstElementTime, lastElementTime, elapsedTime; this.inputBuffer.add(input); // calculating elapsed time firstElementTime = this.inputBuffer.get(0).getTime(); lastElementTime = this.inputBuffer.get(this.inputBuffer.size()-1).getTime(); elapsedTime = firstElementTime - lastElementTime; // update the current time this.currentTime = input.getTime(); // if enough records are received then process the block if (this.inputBuffer.size() >= this.blockSize) generateOutputRecords(); // if enough time is elapsed then process the block if (elapsedTime >= this.maxWaitTime && this.inputBuffer.size() >= this.K) { generateOutputRecords(); } } This function will trigger the generateOutputRecords (explained later on) in two circumstances: when there is enough input records received, controlled by the variable blockSize, in line 16. when enough time has passed, controlled by the variable maxWaitTime (and the minimum number of records to obtain k-anonymity has been received), in line 20. The input records are stored in the inputBuffer, in line 5. The function returnPublishableRecords is implemented as: @Override public Collection<OutputRecord> returnPublishableRecords() { Collection<OutputRecord > outputs = this.outputBuffer; this.outputBuffer = new ArrayList<OutputRecord >(); return outputs; } This function returns the records stored in the outputBuffer, filled by the function generateOutputRecords. The main function generateOutputRecords is implemented as: private void generateOutputRecords() { int size; double distortion, anonymizedValue, distortion_min = Double.MAX_VALUE; this.inputBuffer.sort(Comparator.comparing(InputRecord::getRawValue)); while (this.inputBuffer.isEmpty() == false) { // initialize the variables used in the search distortion_min = Double.MAX_VALUE; size = this.K -1; do { // process 'size' number of records size += 1; // process 'size' number of objects, if the remaining are less than this.K then process them all if (size > (this.inputBuffer.size() - this.K) ) size = this.inputBuffer.size(); // compute the distortion and the anonymized value on the records anonymizedValue = getAnonValue (this.inputBuffer.subList(0, size)); distortion = getDistortion(this.inputBuffer.subList(0, size), anonymizedValue); // if the distortion is decreasing when adding records then keep adding elements // otherwise output them if (distortion_min > distortion) distortion_min = distortion; else break; } while(size < this.inputBuffer.size()); // add the records to the output buffer addOutputRecords(this.inputBuffer.subList(0, size), anonymizedValue); // continue to process the other records in the input buffer this.inputBuffer = this.inputBuffer.subList(size, this.inputBuffer.size()); } // Clear the input buffer this.inputBuffer = new ArrayList<InputRecord>(); } In line 5, the inputBuffer is sorted to place similar values together. In line 12-33, the inputBuffer is chunked in pieces to minimize the distortions of each individual piece. Each chunk includes at least K elements and one more element will included iteratively until the distortion doesn’t decrease anymore. When the distortion stop decreasing then the chunk is removed from the input buffer and the output records are generated with the addOutputRecords function. This process will continue until there are no more records in the inputBuffer. Line 18-19 will make sure that even the last chunk has at least k elements to keep the k-anonymity property. (In line 23, the distortion is computed with the same function used in the measures package.) Trade-off Between Distortion and Latency The variable blockSize can be used to control the distortion and latency. Increasing the variable blockSize will decrease the distortion and increase latency. Decreasing the variable blockSize will increase the distortion and decrease latency. The variable maxWaitTime will top the latency, if necessary. You can download my solution here.