Taro Logo

Find Median from Data Stream

Medium
1 views
2 months ago

The median is the middle value in an ordered integer list. If the size of the list is even, there is no middle value, and the median is the mean of the two middle values.

  • For example, for arr = [2,3,4], the median is 3.
  • For example, for arr = [2,3], the median is (2 + 3) / 2 = 2.5.

Implement the MedianFinder class:

  • MedianFinder() initializes the MedianFinder object.
  • void addNum(int num) adds the integer num from the data stream to the data structure.
  • double findMedian() returns the median of all elements so far. Answers within 10-5 of the actual answer will be accepted.

Example 1:

Input
["MedianFinder", "addNum", "addNum", "findMedian", "addNum", "findMedian"]
[[], [1], [2], [], [3], []]
Output
[null, null, null, 1.5, null, 2.0]

Explanation
MedianFinder medianFinder = new MedianFinder();
medianFinder.addNum(1);    // arr = [1]
medianFinder.addNum(2);    // arr = [1, 2]
medianFinder.findMedian(); // return 1.5 (i.e., (1 + 2) / 2)
medianFinder.addNum(3);    // arr[1, 2, 3]
medianFinder.findMedian(); // return 2.0

Constraints:

  • -105 <= num <= 105
  • There will be at least one element in the data structure before calling findMedian.
  • At most 5 * 104 calls will be made to addNum and findMedian.

Follow up:

  • If all integer numbers from the stream are in the range [0, 100], how would you optimize your solution?
  • If 99% of all integer numbers from the stream are in the range [0, 100], how would you optimize your solution?
Sample Answer

MedianFinder Implementation

This problem requires implementing a MedianFinder class that can efficiently add numbers from a stream and find the median of the numbers added so far. A naive approach would involve storing all numbers in a list and sorting it every time findMedian is called, which is inefficient.

1. Naive Approach (Brute Force)

The simplest approach is to store the numbers in a list and sort the list every time we need to find the median.

class MedianFinder:
    def __init__(self):
        self.numbers = []

    def addNum(self, num: int) -> None:
        self.numbers.append(num)

    def findMedian(self) -> float:
        self.numbers.sort()
        n = len(self.numbers)
        if n % 2 == 0:
            return (self.numbers[n // 2 - 1] + self.numbers[n // 2]) / 2
        else:
            return self.numbers[n // 2]

2. Optimal Approach (Using Heaps)

To optimize, we can use two heaps: a max-heap to store the smaller half of the numbers and a min-heap to store the larger half. This way, the median can be found in O(1) time. We maintain the invariant that the max-heap has either the same number of elements or one more element than the min-heap.

import heapq

class MedianFinder:
    def __init__(self):
        self.small = []  # max-heap
        self.large = []  # min-heap

    def addNum(self, num: int) -> None:
        # Add to max-heap first
        heapq.heappush(self.small, -num)

        # Ensure every element in small is <= every element in large
        if self.large and -self.small[0] > self.large[0]:
            heapq.heappush(self.large, -heapq.heappop(self.small))

        # Balance the size
        if len(self.small) > len(self.large) + 1:
            heapq.heappush(self.large, -heapq.heappop(self.small))
        elif len(self.large) > len(self.small):
            heapq.heappush(self.small, -heapq.heappop(self.large))

    def findMedian(self) -> float:
        if len(self.small) == len(self.large):
            return (-self.small[0] + self.large[0]) / 2.0
        else:
            return -self.small[0]

3. Big(O) Run-time Analysis

Naive Approach:

  • addNum: O(1) (appending to a list)
  • findMedian: O(n log n) (sorting the list each time)

Optimal Approach (Heaps):

  • addNum: O(log n) (heap push and pop operations)
  • findMedian: O(1) (accessing the top elements of the heaps)

4. Big(O) Space Usage Analysis

Naive Approach:

  • O(n) (storing all numbers in a list)

Optimal Approach (Heaps):

  • O(n) (storing all numbers in two heaps)

5. Edge Cases and Considerations

  1. Empty Stream Initially:
    • The heaps will be empty initially, and the code handles this case correctly.
  2. Large Number of Elements:
    • The heap approach is efficient even for a large number of elements because the heap operations maintain logarithmic time complexity.
  3. Duplicate Elements:
    • The heap approach handles duplicate elements without any issues.
  4. Integer Overflow:
    • When calculating the median of two numbers, potential overflow can be avoided by using floating-point division.
  5. Follow-up Optimization (Numbers in Range [0, 100]):
    • If all numbers are in the range [0, 100], we can use a frequency array (bucket sort) to store the count of each number. This would allow addNum and findMedian to be performed in O(1) and O(100) = O(1) respectively.
  6. Follow-up Optimization (99% in Range [0, 100]):
    • If 99% of numbers are in the range [0, 100], we can use a hybrid approach. Use a frequency array for numbers in the range [0, 100] and the heap approach for numbers outside this range. This combines the benefits of both approaches, providing efficient performance for the common case and handling outliers gracefully.

Code with Follow-up Optimizations

Here's the code with the follow-up optimizations:

import heapq

class MedianFinder:
    def __init__(self):
        self.small = []  # max-heap for values < 0 or > 100
        self.large = []  # min-heap for values < 0 or > 100
        self.counts = [0] * 101  # Frequency array for values [0, 100]
        self.out_of_range_count = 0
        self.in_range_count = 0

    def addNum(self, num: int) -> None:
        if 0 <= num <= 100:
            self.counts[num] += 1
            self.in_range_count += 1
        else:
            self.out_of_range_count += 1
            if num < 0:
                heapq.heappush(self.small, -num)
            else:
                heapq.heappush(self.large, num)

    def findMedian(self) -> float:
        total_count = self.in_range_count + self.out_of_range_count
        if total_count == 0:  # Handle empty stream
            return 0  # Or raise an exception
        
        if total_count % 2 == 0:
            # Even number of elements
            k1 = total_count // 2
            k2 = k1 + 1
            return (self.find_kth(k1) + self.find_kth(k2)) / 2.0
        else:
            # Odd number of elements
            k = (total_count + 1) // 2
            return self.find_kth(k)
    
    def find_kth(self, k):
        # First, adjust k for out-of-range elements in the small heap
        if k <= len(self.small):
            return -heapq.nsmallest(k, self.small)[-1]
        
        k -= len(self.small)

        # Now, find the kth element in the combined [0, 100] range
        count = 0
        for i in range(101):
            count += self.counts[i]
            if count >= k:
                return i

        # Adjust k for [0, 100] elements
        k -= count - self.counts[i]

        # Finally, handle out-of-range elements in the large heap
        return heapq.nsmallest(k, self.large)[-1]

This approach handles the follow-up optimizations by utilizing frequency counts for values within the range [0, 100], and min/max heaps for values outside the range. This hybrid strategy provides an efficient solution, particularly when most of the data lies within the specified range.