In R parallel programming can be done for example with snow package. When we change our function from 1 core solution to a multicore solution we use snow packages parLapply function instead of lapply function. This change will magically make our independent loops run faster without doing any major changes to the script. As parLapply behaves almost exactly the same as lapply. So let us present the script first, the explanation will be given after that. There's alot more case handling in multicore case as we can't call our multicore solution if we only need one core.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#Multicore (mc) requires snow package as multicore isn't on windows... | |
require(snow) | |
mc.fast.merging = function(data.list, nparts, cluster){ | |
if(!is.list(data.list)) stop("data.list isn't a list") | |
while(length(data.list) != 1){ #Loop until everything is merged | |
if(length(data.list) > nparts){ | |
starts = seq(1, length(data.list), nparts) | |
ends = seq(nparts, length(data.list), nparts) #starts and ends are of equal size if length(data.list) divides nparts. | |
if(length(ends) < length(starts)) ends = c(ends, length(data.list)) #making sure things are even | |
sections = matrix(c(starts, ends), ncol=2, byrow=FALSE) | |
sections = apply(sections, 1, list) | |
}else{ | |
sections = list(c(1, length(data.list))) | |
} | |
if(length(sections) !=1){ | |
data.list = parLapply(cluster, sections, function(x, data.list){ | |
if(is.list(x)) x = x[[1]] | |
#the standard way starts -> | |
part = data.list[[x[1]]] | |
for(i in x[1]:x[2]){ | |
part = merge(part, data.list[[i]], all=TRUE, sort=FALSE) | |
} | |
#<- standard way ends | |
return(part) | |
}, data.list = data.list) | |
}else{ | |
data.list = lapply(sections, function(x, data.list){ | |
if(is.list(x)) x = x[[1]] | |
part = data.list[[x[1]]] | |
for(i in x[1]:x[2]){ | |
part = merge(part, data.list[[i]], all=TRUE, sort=FALSE) | |
} | |
return(part) | |
}, data.list = data.list) | |
} | |
} | |
return(data.list[[1]]) #returning the merged data frame | |
} |
For example if we have 100 frames to merge and a 4 core machine. We choose a section size of 25 parts (nparts=25). Which means that we have 4 sections. Section 1 will be frames from 1-25, section 2 will be frames 26-50, and so on. Now we tell the first core to merge the frames from section 1, the second core will merge frames outlined in section 2, and so on. After that is done we only have 4 frames, which can we will merge using the standard way. With this fast merging should atleast two times faster when using 2 cores. Let us see if that is the case, unfortunately I only have 4 cores.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#The frames. | |
eve.frames = lapply(eve.list, eve.data.frame)[1:40000] | |
#eve.frames = eve.frames[1:3001] | |
sekvenssi = seq(2, 40002, 500)[1:30] | |
#sekvenssi = seq(1, 4001, 100) | |
time1 = rep(NA, length(sekvenssi)) | |
for(i in seq(sekvenssi)){ | |
complete = eve.frames[1:sekvenssi[i]] | |
time1[i] = system.time(fast.merging(complete, 25))[3] | |
print(paste(i, round(time1[i], 2))) | |
} | |
write(time1, "time1.dat") | |
library(snow) | |
cl = makeCluster(2, type="SOCK") | |
time2 = rep(NA, length(sekvenssi)) | |
for(i in seq(sekvenssi)){ | |
complete = eve.frames[1:sekvenssi[i]] | |
time2[i] = system.time(mc.fast.merging(complete, 25, cl))[3] | |
print(i) | |
} | |
stopCluster(cl) | |
write(time2, "time2.dat") | |
cl = makeCluster(3, type="SOCK") | |
time3 = rep(NA, length(sekvenssi)) | |
for(i in seq(sekvenssi)){ | |
complete = eve.frames[1:sekvenssi[i]] | |
time3[i] = system.time(mc.fast.merging(complete, 25, cl))[3] | |
print(i) | |
} | |
stopCluster(cl) | |
write(time3, "time3.dat") | |
cl = makeCluster(4, type="SOCK") | |
time4 = rep(NA, length(sekvenssi)) | |
for(i in seq(sekvenssi)){ | |
complete = eve.frames[1:sekvenssi[i]] | |
time4[i] = system.time(mc.fast.merging(complete, 25, cl))[3] | |
print(i) | |
} | |
stopCluster(cl) | |
write(time4, "time4.dat") | |
time1 = scan("time1.dat") | |
time2 = scan("time2.dat") | |
time3 = scan("time3.dat") | |
time4 = scan("time4.dat") | |
png("mark2.png") | |
plot(sekvenssi, time1/60, type="l", ylab="Elapsed time (minutes)", xlab="Frames merged") | |
points(sekvenssi, time2/60, type="l", col="blue") | |
points(sekvenssi, time3/60, type="l", col="red") | |
points(sekvenssi, time4/60, type="l", col="pink") | |
legend("bottomright", c("FM 1core", "FM 2 cores", "FM 3 cores","FM 4 cores"), | |
lty=c(1,1,1,1), col=c("black", "blue", "red", "pink")) | |
dev.off() |
https://stackoverflow.com/questions/61027511/inner-join-on-large-datasets-using-mc-fast-merging-function
ReplyDeleteplease help
When I initially commented I appear to have clicked on the -Notify me when new comments are added- checkbox and now each time a comment is added I receive four emails with the exact same comment. There has to be a way you are able to remove me from that service? Many thanks!
ReplyDelete