## loading packages
using Base.Threads
using MurmurHash3
using Formatting
using Random
using Statistics
const targetFilename = "random_data.bin"
targetFileStat = stat(targetFilename)
numEntries = div(targetFileStat.size, 32)
@assert targetFileStat.size % 32 == 0
printfmtln("number of entries: {:d}", numEntries)
## generate seeds
const numHashFuncs = 60
Random.seed!(0x1a2b3c4d)
hashSeeds = rand(UInt32(0x000FFFFF):UInt32(0x0FFFFFFF), numHashFuncs)
## create a lookup table
const lookup = zeros(UInt64, 255)
for i in UInt8(1):UInt8(255)
count = 0
for j in 0:7
shifted = i >> j
if shifted & UInt8(1) == UInt8(0)
count += 1
else
break
end
end
#print(count, " ")
lookup[i] = count
end
## function declaration
function count_tail_zeros(x::UInt64)
byteArr = reinterpret(UInt8, [x])
count = 0
for i in 1:8
if byteArr[i] == UInt8(0)
count += 8
else
count += lookup[byteArr[i]]
break
end
end
return count
end
function flajolet_martin(startInd::UInt, endInd::UInt, seeds::Array{UInt32, 1})
startLoc = startInd * 32
endLoc = endInd * 32
fileLength = endLoc - startLoc
# open and read the buffer
inFile = open(targetFilename, "r")
seek(inFile, startLoc)
buffer = read(inFile, fileLength)
close(inFile)
@assert length(buffer) == fileLength
# create a c buffer and use its pointer
cPtr = convert(Ptr{UInt8}, pointer_from_objref(buffer))
maxR = zeros(UInt, numHashFuncs)
for i in 1:32:fileLength
hashVals = [mmhash128_a(32, cPtr + i, seedVal)[1] for seedVal in seeds]
r = [count_tail_zeros(val) for val in hashVals]
for j in 1:length(maxR)
maxR[j] = max(maxR[j], r[j])
end
end
return maxR
end
# test counting function
count_tail_zeros(UInt64(2^16))
## split the task
numTask = 100
numEntriesPerTask = UInt(ceil(numEntries / numTask))
printfmtln("number of entries per task: {:d}", numEntriesPerTask)
function compute_distinct_elements()
# run with multiple threads
println("number of threads:", nthreads())
result = zeros(UInt, (numTask, numHashFuncs))
@threads for i in 1:numTask
startInd = UInt(i - 1) * UInt(numEntriesPerTask)
endInd = min(UInt(numEntries), UInt(i) * UInt(numEntriesPerTask))
result[i, :] .= flajolet_martin(startInd, endInd, hashSeeds)
println("task $i finished")
end
result
end
@elapsed result = compute_distinct_elements()
maxResult = mapslices(maximum, result, dims=[1])
println(maxResult)
medians = Array{Float64, 1}(undef, 0)
groups = 6
numElePerGroup = Int(numHashFuncs / groups)
for i in 1:groups
slice = maxResult[(i-1)*numElePerGroup + 1 : i*numElePerGroup]
push!(medians, median(slice))
end
meanVal = mean(medians)
println(meanVal)
numDistinct = 2^meanVal