|
#Using 64bit R 2.15.3 |
|
#Packages we are going to use |
|
#download the packed data here: https://www.dropbox.com/s/c1rtcryp8p6500w/kms-2013-01-06-s.json.bz2 |
|
library(rjson) |
|
library(data.table) |
|
library(snow) |
|
library(rbenchmark) |
|
|
|
#set your WD where your unpackaged file is, or add path. |
|
#Reading only partial data for this test, you can naturally do as you wish |
|
eve = readLines("kms-2013-01-06-s.json", n=80000) |
|
|
|
#json => data.frames (data lines) |
|
|
|
eve = matrix(eve, ncol=1) |
|
eve.list = apply(eve, 1, list) |
|
|
|
#First and last lines are useless |
|
eve.list = eve.list[-1] |
|
#eve.list = eve.list[-length(eve.list)] #didn't read the last line in this case |
|
|
|
#Handling json... |
|
eve.list = lapply(eve.list, function(x){ |
|
x = unlist(x) |
|
return(fromJSON(x)) |
|
}) |
|
|
|
#Handling data.frame |
|
eve.data.frame = function(eve.list){ |
|
framo = unlist(eve.list) |
|
dups = unique(names(framo)[duplicated(names(framo))]) |
|
if(length(dups) > 0){ |
|
for(i in seq(dups)){ |
|
bol = names(framo) %in% dups[i] |
|
framo[unique(dups[i])] = paste(framo[bol], collapse= ",") |
|
} |
|
framo = framo[!duplicated(names(framo))] |
|
} |
|
return(as.matrix(t(framo))) |
|
} |
|
|
|
|
|
#This takes awhile if you read the whole data |
|
eve.frames = lapply(eve.list, eve.data.frame)[1:60000] |
|
#data.frames to data.tables |
|
eve.frames2 = lapply(eve.frames, as.data.table) |
|
#unique(sapply(eve.frames2, class)) #data.frame data.tables |
|
|
|
## |
|
#Functions |
|
## |
|
|
|
#Standard FM |
|
|
|
|
|
fast.merging = function(data.list, nparts){ |
|
|
|
if(!is.list(data.list)) stop("data.list isn't a list") |
|
|
|
while(length(data.list) != 1){ #Loop until everything is merged |
|
|
|
if(length(data.list) > nparts){ |
|
starts = seq(1, length(data.list), nparts) |
|
ends = seq(nparts, length(data.list), nparts) #starts and ends are of equal size if length(data.list) divides nparts. |
|
if(length(ends) < length(starts)) ends = c(ends, length(data.list)) #making sure things are even |
|
sections = matrix(c(starts, ends), ncol=2, byrow=FALSE) |
|
sections = apply(sections, 1, list) |
|
}else{ |
|
sections = list(c(1, length(data.list))) |
|
} |
|
|
|
#We have the standard way inside lapply |
|
data.list = lapply(sections, function(x, data.list){ |
|
if(is.list(x)) x = x[[1]] |
|
#the standard way starts -> |
|
part = data.list[[x[1]]] |
|
for(i in x[1]:x[2]){ |
|
part = merge(part, data.list[[i]], all=TRUE, sort=FALSE) |
|
} |
|
#<- standard way ends |
|
return(part) |
|
}, data.list = data.list) |
|
|
|
} |
|
return(data.list[[1]]) #returning the merged data frame |
|
} |
|
|
|
#MC FM |
|
|
|
|
|
mc.fast.merging = function(data.list, nparts, cluster){ |
|
|
|
if(!is.list(data.list)) stop("data.list isn't a list") |
|
|
|
|
|
while(length(data.list) != 1){ #Loop until everything is merged |
|
|
|
if(length(data.list) > nparts){ |
|
starts = seq(1, length(data.list), nparts) |
|
ends = seq(nparts, length(data.list), nparts) #starts and ends are of equal size if length(data.list) divides nparts. |
|
if(length(ends) < length(starts)) ends = c(ends, length(data.list)) #making sure things are even |
|
sections = matrix(c(starts, ends), ncol=2, byrow=FALSE) |
|
sections = apply(sections, 1, list) |
|
}else{ |
|
sections = list(c(1, length(data.list))) |
|
} |
|
|
|
if(length(sections) !=1){ |
|
data.list = parLapply(cluster, sections, function(x, data.list){ |
|
if(is.list(x)) x = x[[1]] |
|
#the standard way starts -> |
|
part = data.list[[x[1]]] |
|
for(i in x[1]:x[2]){ |
|
part = merge(part, data.list[[i]], all=TRUE, sort=FALSE) |
|
} |
|
#<- standard way ends |
|
return(part) |
|
}, data.list = data.list) |
|
}else{ |
|
data.list = lapply(sections, function(x, data.list){ |
|
if(is.list(x)) x = x[[1]] |
|
|
|
part = data.list[[x[1]]] |
|
for(i in x[1]:x[2]){ |
|
part = merge(part, data.list[[i]], all=TRUE, sort=FALSE) |
|
} |
|
return(part) |
|
}, data.list = data.list) |
|
} |
|
} |
|
return(data.list[[1]]) #returning the merged data frame |
|
} |
|
|
|
#Here's the lady |
|
|
|
LadyMergAlot = function(data.list, nparts, cluster){ |
|
|
|
if(!is.list(data.list)) stop("data.list isn't a list") |
|
|
|
|
|
while(length(data.list) != 1){ #Loop until everything is merged |
|
|
|
if(length(data.list) > nparts){ |
|
starts = seq(1, length(data.list), nparts) |
|
ends = seq(nparts, length(data.list), nparts) #starts and ends are of equal size if length(data.list) divides nparts. |
|
if(length(ends) < length(starts)) ends = c(ends, length(data.list)) #making sure things are even |
|
sections = matrix(c(starts, ends), ncol=2, byrow=FALSE) |
|
sections = apply(sections, 1, list) |
|
}else{ |
|
sections = list(c(1, length(data.list))) |
|
} |
|
|
|
if(length(sections) !=1){ |
|
data.list = parLapply(cluster, sections, function(x, data.list){ |
|
if(is.list(x)) x = x[[1]] |
|
#the standard way starts -> |
|
part = data.list[[x[1]]] |
|
for(i in x[1]:x[2]){ |
|
part = merge(part, data.list[[i]], |
|
by=intersect(names(part), names(data.list[[i]])), |
|
all=TRUE, sort=FALSE) |
|
} |
|
#<- standard way ends |
|
return(part) |
|
}, data.list = data.list) |
|
}else{ |
|
data.list = lapply(sections, function(x, data.list){ |
|
if(is.list(x)) x = x[[1]] |
|
|
|
part = data.list[[x[1]]] |
|
for(i in x[1]:x[2]){ |
|
part = merge(part, data.list[[i]], |
|
by=intersect(names(part), names(data.list[[i]])), |
|
all=TRUE, sort=FALSE) |
|
} |
|
return(part) |
|
}, data.list = data.list) |
|
} |
|
} |
|
return(data.list[[1]]) #returning the merged data frame |
|
} |
|
|
|
## |
|
#Here starts the bench. |
|
## |
|
|
|
sekvenssi = seq(2, 5002, 100) |
|
sekvenssi2 = seq(2, 50002, 1000) |
|
|
|
#Without data.table |
|
time1 = rep(NA, length(sekvenssi)) |
|
for(i in 1:length(sekvenssi)){ |
|
complete = eve.frames[[1]] |
|
time1[i] = system.time( |
|
for(j in 2:sekvenssi[i]){ |
|
complete = merge(complete, eve.frames[[j]], all=TRUE, sort=FALSE) |
|
} |
|
)[3] |
|
print(paste(round(time1[i],2), nrow(complete),i)) |
|
} |
|
write(time1, "time1.dat") |
|
|
|
#With data.table |
|
time2 = rep(NA, length(sekvenssi)) |
|
for(i in 1:length(sekvenssi)){ |
|
complete = eve.frames2[[1]] |
|
time2[i] = system.time( |
|
for(j in 2:sekvenssi[i]){ |
|
complete = merge(complete, eve.frames2[[j]], by=intersect(names(complete), names(eve.frames2[[j]])),all=TRUE, sort=FALSE) |
|
} |
|
)[3] |
|
print(paste(round(time2[i],2), nrow(complete),i)) |
|
} |
|
write(time2, "time2.dat") |
|
|
|
#FM with data.frames |
|
time3 = rep(NA, length(sekvenssi2)) |
|
for(i in 1:length(sekvenssi2)){ |
|
complete = eve.frames[1:sekvenssi2[i]] |
|
time3[i] = system.time(fast.merging(complete, 10))[3] |
|
print(c(i, time3[i])) |
|
if(time3[i] > 400) stop() |
|
} |
|
write(time3, "time3.dat") |
|
|
|
#MCFM with data.frames |
|
cl = makeCluster(4, type="SOCK") |
|
time4 = rep(NA, length(sekvenssi2)) |
|
for(i in seq(sekvenssi2)){ |
|
complete = eve.frames[1:sekvenssi2[i]] |
|
time4[i] = system.time(mc.fast.merging(complete, 10, cl))[3] |
|
print(c(i, time4[i])) |
|
if(time4[i] > 400) stop() |
|
} |
|
stopCluster(cl) |
|
write(time4, "time4.dat") |
|
|
|
#MCFM with data.tables |
|
cl = makeCluster(4, type="SOCK") |
|
time5 = rep(NA, length(sekvenssi2)) |
|
for(i in seq(sekvenssi2)){ |
|
complete = eve.frames2[1:sekvenssi2[i]] |
|
time5[i] = system.time(LadyMergAlot(complete, 10, cl))[3] |
|
print(c(i, time5[i])) |
|
if(time5[i] > 400) stop() |
|
|
|
} |
|
stopCluster(cl) |
|
write(time5, "time5.dat") |
|
|
|
time1 = scan("time1.dat") |
|
time2 = scan("time2.dat") |
|
time3 = scan("time3.dat") |
|
time4 = scan("time4.dat") |
|
time5 = scan("time5.dat") |
|
|
|
png("mark3.png", width=960) |
|
|
|
plot(sekvenssi, time1/60, type="l", xlim=c(0,50000), ylim=c(0,5), |
|
ylab="Elapsed time (minutes)", xlab="Frames merged") |
|
points(sekvenssi, time2/60, type="l", col="blue") |
|
points(sekvenssi2, time3/60, type="l", col="red") |
|
points(sekvenssi2, time4/60, type="l", col="pink") |
|
points(sekvenssi2, time5/60, type="l", col="green") |
|
legend("bottomright", c("R standard", "data.table", "FM 1 cores", "FM 4 cores","The Lady"), |
|
lty=c(1,1,1,1), col=c("black", "blue", "red", "pink", "green")) |
|
|
|
dev.off() |