python - Why does PySpark get stuck after completing few ShuffleMapTasks? -


i use pyspark run python script, when executing "14/03/23 21:00:30 info scheduler.dagscheduler: completed shufflemaptask(5, 6)" there no reaction, , has been stuck in here, there no error message。

==========log=============

14/03/23 21:00:30 info scheduler.tasksetmanager: serialized task 5.0:4 3689 bytes in 1 ms 14/03/23 21:00:30 info scheduler.tasksetmanager: starting task 5.0:5 tid 73 on executor 0: myhost-bigdata-110d13 (node_local) 14/03/23 21:00:30 info scheduler.tasksetmanager: serialized task 5.0:5 3689 bytes in 0 ms 14/03/23 21:00:30 info scheduler.tasksetmanager: starting task 5.0:6 tid 74 on executor 2: myhost-bigdata-110d14 (node_local) 14/03/23 21:00:30 info scheduler.tasksetmanager: serialized task 5.0:6 3689 bytes in 0 ms 14/03/23 21:00:30 info scheduler.tasksetmanager: starting task 5.0:7 tid 75 on executor 1: myhost-bigdata-110d12 (node_local) 14/03/23 21:00:30 info scheduler.tasksetmanager: serialized task 5.0:7 3689 bytes in 0 ms 14/03/23 21:00:30 info scheduler.tasksetmanager: starting task 5.0:8 tid 76 on executor 0: myhost-bigdata-110d13 (node_local) 14/03/23 21:00:30 info scheduler.tasksetmanager: serialized task 5.0:8 3689 bytes in 0 ms 14/03/23 21:00:30 info scheduler.tasksetmanager: starting task 5.0:9 tid 77 on executor 2: myhost-bigdata-110d14 (node_local) 14/03/23 21:00:30 info scheduler.tasksetmanager: serialized task 5.0:9 3689 bytes in 0 ms 14/03/23 21:00:30 info scheduler.tasksetmanager: finished tid 73 in 200 ms on myhost-bigdata-110d13 (progress: 0/10) 14/03/23 21:00:30 info scheduler.dagscheduler: completed shufflemaptask(5, 5) 14/03/23 21:00:30 info scheduler.tasksetmanager: finished tid 76 in 218 ms on myhost-bigdata-110d13 (progress: 1/10) 14/03/23 21:00:30 info scheduler.dagscheduler: completed shufflemaptask(5, 8) 14/03/23 21:00:30 info scheduler.tasksetmanager: finished tid 72 in 324 ms on myhost-bigdata-110d12 (progress: 2/10) 14/03/23 21:00:30 info scheduler.dagscheduler: completed shufflemaptask(5, 4) 14/03/23 21:00:30 info scheduler.tasksetmanager: finished tid 69 in 371 ms on myhost-bigdata-110d12 (progress: 3/10) 14/03/23 21:00:30 info scheduler.dagscheduler: completed shufflemaptask(5, 1) 14/03/23 21:00:30 info scheduler.tasksetmanager: finished tid 75 in 367 ms on myhost-bigdata-110d12 (progress: 4/10) 14/03/23 21:00:30 info scheduler.dagscheduler: completed shufflemaptask(5, 7) 14/03/23 21:00:30 info scheduler.tasksetmanager: finished tid 77 in 423 ms on myhost-bigdata-110d14 (progress: 5/10) 14/03/23 21:00:30 info scheduler.dagscheduler: completed shufflemaptask(5, 9) 14/03/23 21:00:30 info scheduler.tasksetmanager: finished tid 71 in 435 ms on myhost-bigdata-110d14 (progress: 6/10) 14/03/23 21:00:30 info scheduler.dagscheduler: completed shufflemaptask(5, 3) 14/03/23 21:00:30 info scheduler.tasksetmanager: finished tid 74 in 510 ms on myhost-bigdata-110d14 (progress: 7/10) 14/03/23 21:00:30 info scheduler.dagscheduler: completed shufflemaptask(5, 6) 

==============work1 log==============

14/03/23 21:32:33 info executor.executor: serialized size of result 59 962 14/03/23 21:32:33 info executor.executor: sending result 59 directly driver 14/03/23 21:32:33 info executor.executor: serialized size of result 47 962 14/03/23 21:32:33 info executor.executor: finished task id 59 14/03/23 21:32:33 info executor.executor: sending result 47 directly driver 14/03/23 21:32:33 info executor.executor: finished task id 47 14/03/23 21:32:33 info executor.executor: serialized size of result 44 962 14/03/23 21:32:33 info executor.executor: sending result 44 directly driver 14/03/23 21:32:33 info executor.executor: finished task id 44 14/03/23 21:32:33 info python.pythonrdd: times: total = 82, boot = 3, init = 76, finish = 3 14/03/23 21:32:33 info executor.executor: serialized size of result 65 962 14/03/23 21:32:33 info executor.executor: sending result 65 directly driver 14/03/23 21:32:33 info executor.executor: finished task id 65 14/03/23 21:32:33 info executor.coarsegrainedexecutorbackend: got assigned task 68 14/03/23 21:32:33 info executor.executor: running task id 68 14/03/23 21:32:33 info executor.coarsegrainedexecutorbackend: got assigned task 71 14/03/23 21:32:33 info executor.executor: running task id 71 14/03/23 21:32:33 info executor.coarsegrainedexecutorbackend: got assigned task 74 14/03/23 21:32:33 info executor.executor: running task id 74 14/03/23 21:32:33 info executor.coarsegrainedexecutorbackend: got assigned task 77 14/03/23 21:32:33 info executor.executor: running task id 77 14/03/23 21:32:33 info storage.blockmanager: found block broadcast_0 locally 14/03/23 21:32:33 info storage.blockmanager: found block broadcast_0 locally 14/03/23 21:32:33 info storage.blockmanager: found block broadcast_0 locally 14/03/23 21:32:33 info storage.blockmanager: found block broadcast_0 locally 14/03/23 21:32:33 info rdd.hadooprdd: input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00000.bz2:0+14685 14/03/23 21:32:33 info rdd.hadooprdd: input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00009.bz2:0+15447 14/03/23 21:32:33 info rdd.hadooprdd: input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00006.bz2:0+14924 14/03/23 21:32:33 info rdd.hadooprdd: input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00003.bz2:0+15015 14/03/23 21:32:33 info python.pythonrdd: times: total = 89, boot = 3, init = 62, finish = 24 14/03/23 21:32:33 info executor.executor: serialized size of result 68 851 14/03/23 21:32:33 info executor.executor: sending result 68 directly driver 14/03/23 21:32:33 info executor.executor: finished task id 68 14/03/23 21:32:33 info python.pythonrdd: times: total = 83, boot = 2, init = 57, finish = 24 14/03/23 21:32:33 info executor.executor: serialized size of result 77 851 14/03/23 21:32:33 info executor.executor: sending result 77 directly driver 14/03/23 21:32:33 info executor.executor: finished task id 77 14/03/23 21:32:34 info python.pythonrdd: times: total = 66, boot = 2, init = 40, finish = 24 14/03/23 21:32:34 info python.pythonrdd: times: total = 95, boot = 2, init = 60, finish = 33 14/03/23 21:32:34 info executor.executor: serialized size of result 71 851 14/03/23 21:32:34 info executor.executor: sending result 71 directly driver 14/03/23 21:32:34 info executor.executor: finished task id 71 14/03/23 21:32:34 info executor.executor: serialized size of result 74 851 14/03/23 21:32:34 info executor.executor: sending result 74 directly driver 14/03/23 21:32:34 info executor.executor: finished task id 74 

=========work2 log ========================

14/03/23 21:32:40 info executor.executor: serialized size of result 60 962 14/03/23 21:32:40 info executor.executor: sending result 60 directly driver 14/03/23 21:32:40 info executor.executor: finished task id 60 14/03/23 21:32:40 info executor.coarsegrainedexecutorbackend: got assigned task 69 14/03/23 21:32:40 info executor.executor: running task id 69 14/03/23 21:32:40 info executor.coarsegrainedexecutorbackend: got assigned task 72 14/03/23 21:32:40 info executor.executor: running task id 72 14/03/23 21:32:40 info executor.coarsegrainedexecutorbackend: got assigned task 75 14/03/23 21:32:40 info executor.executor: running task id 75 14/03/23 21:32:40 info storage.blockmanager: found block broadcast_0 locally 14/03/23 21:32:40 info storage.blockmanager: found block broadcast_0 locally 14/03/23 21:32:40 info rdd.hadooprdd: input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00001.bz2:0+14114 14/03/23 21:32:40 info storage.blockmanager: found block broadcast_0 locally 14/03/23 21:32:40 info rdd.hadooprdd: input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00007.bz2:0+15325 14/03/23 21:32:40 info rdd.hadooprdd: input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00004.bz2:0+15169 14/03/23 21:32:40 info python.pythonrdd: stdin writer python finished 14/03/23 21:32:40 info python.pythonrdd: stdin writer python finished 14/03/23 21:32:40 info python.pythonrdd: times: total = 25, boot = 2, init = 10, finish = 13 14/03/23 21:32:40 info executor.executor: serialized size of result 72 851 14/03/23 21:32:40 info executor.executor: sending result 72 directly driver 14/03/23 21:32:40 info executor.executor: finished task id 72 

===============work3 log===================

14/03/23 21:32:28 info executor.executor: serialized size of result 55 962 14/03/23 21:32:28 info executor.executor: serialized size of result 61 962 14/03/23 21:32:28 info executor.executor: sending result 55 directly driver 14/03/23 21:32:28 info executor.executor: finished task id 58 14/03/23 21:32:28 info executor.executor: finished task id 55 14/03/23 21:32:28 info executor.executor: sending result 61 directly driver 14/03/23 21:32:28 info executor.executor: finished task id 61 14/03/23 21:32:28 info python.pythonrdd: times: total = 92, boot = 3, init = 86, finish = 3 14/03/23 21:32:28 info executor.executor: serialized size of result 67 968 14/03/23 21:32:28 info executor.executor: sending result 67 directly driver 14/03/23 21:32:28 info executor.executor: finished task id 67 14/03/23 21:32:28 info executor.executor: serialized size of result 64 962 14/03/23 21:32:28 info executor.executor: sending result 64 directly driver 14/03/23 21:32:28 info executor.executor: finished task id 64 14/03/23 21:32:29 info executor.coarsegrainedexecutorbackend: got assigned task 70 14/03/23 21:32:29 info executor.executor: running task id 70 14/03/23 21:32:29 info executor.coarsegrainedexecutorbackend: got assigned task 73 14/03/23 21:32:29 info executor.executor: running task id 73 14/03/23 21:32:29 info executor.coarsegrainedexecutorbackend: got assigned task 76 14/03/23 21:32:29 info executor.executor: running task id 76 14/03/23 21:32:29 info storage.blockmanager: found block broadcast_0 locally 14/03/23 21:32:29 info storage.blockmanager: found block broadcast_0 locally 14/03/23 21:32:29 info storage.blockmanager: found block broadcast_0 locally 14/03/23 21:32:29 info rdd.hadooprdd: input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00002.bz2:0+14560 14/03/23 21:32:29 info rdd.hadooprdd: input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00008.bz2:0+14842 14/03/23 21:32:29 info rdd.hadooprdd: input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00005.bz2:0+14961 14/03/23 21:32:29 info python.pythonrdd: times: total = 73, boot = 3, init = 46, finish = 24 14/03/23 21:32:29 info executor.executor: serialized size of result 70 851 14/03/23 21:32:29 info executor.executor: sending result 70 directly driver 14/03/23 21:32:29 info executor.executor: finished task id 70 14/03/23 21:32:29 info python.pythonrdd: times: total = 73, boot = 2, init = 47, finish = 24 14/03/23 21:32:29 info executor.executor: serialized size of result 76 851 14/03/23 21:32:29 info executor.executor: sending result 76 directly driver 14/03/23 21:32:29 info executor.executor: finished task id 76 14/03/23 21:32:29 info python.pythonrdd: times: total = 78, boot = 3, init = 47, finish = 28 14/03/23 21:32:29 info executor.executor: serialized size of result 73 851 14/03/23 21:32:29 info executor.executor: sending result 73 directly driver 14/03/23 21:32:29 info executor.executor: finished task id 73 

=============my code============

conf = sparkconf() conf.setmaster("spark://myhost-bigdata:7077") conf.setappname("job") conf.set("spark.ui.port", 4040) sc=sparkcontext(conf=conf)  types = ["pv","uv"] type in types:     dm = sc.textfile("/tmp/uv/part-%s-*.gz" % type)     arrs1 = dm.map(lambda line: (int(line.split("^")[0].split("_")[0]),line.split("^")[1])).distinct().map(lambda a:( a[0], 1)).countbykey().items()     arrs2 = dm.map(lambda line: (int(line.split("^")[0].split("_")[1]),line.split("^")[1])).distinct().map(lambda a:( a[0], 1)).countbykey().items()     arrs3 = dm.map(lambda line: (int(line.split("^")[0].split("_")[2]),line.split("^")[1])).distinct().map(lambda a:( a[0], 1)).countbykey().items() 

=====================my new code ==================

#-*- encoding: utf-8 -*- import logging import time pyspark import sparkconf, sparkcontext  logger = logging.getlogger("endlesscode") formatter = logging.formatter('%(name)-12s %(asctime)s %(levelname)-8s %(message)s', '%a, %d %b %y %h:%m:%s', ) file_handler = logging.filehandler("y.log") file_handler.setformatter(formatter) logger.addhandler(file_handler) logger.setlevel(logging.debug)   def getsparkcontext(port):     conf = sparkconf()     conf.setmaster("spark://my-bigdata-01:7077")     conf.setappname("test")     conf.set("spark.ui.port", port)     return sparkcontext(conf=conf)   def getpv(dm, level):     return dm.map(         lambda line: (int(line.split("^")[0].split("_")[level]), int(line.split("^")[1]))).reducebykey(         lambda a, b: + b, numpartitions=80).collect()   def getuv(dm, level):     return dm.map(         lambda line: (int(line.split("^")[0].split("_")[level]), line.split("^")[1])).distinct().map(         lambda a: (a[0], 1)).reducebykey(         lambda a, b: + b, numpartitions=80).collect()   def calc(ls):     sc = []     try:         port = ls[0]         cityname = ls[1]         calctime = ls[2]         regdate = calctime[0] + "-" + calctime[1] + "-" + calctime[2]         lognames = ["pcpv", "wappv", "apppv", "pcuv", "wapuv", "appuv", "hottidall"]         sc = getsparkcontext(port)         index, ln in enumerate(lognames):             log = "/tmp/2014report/%s/%s/%s/%s/%s/part-*.bz2" % (calctime[0], calctime[1], calctime[2], cityname, ln)             dm = sc.textfile(log)             logger.debug("type: %s, total count: %s; cityname: %s; first: %s" % (ln, dm.count(), cityname, dm.take(1)))             if index in (0, 1, 2):                 in range(0, 3):                     result = getpv(dm, i)                     logger.debug("level: %s, regdate: %s, cityname: %s, result: %s", i, regdate, cityname, result)             elif index in (3, 4, 5):                 in range(0, 3):                     result = getuv(dm, i)                     logger.debug("level: %s, regdate: %s, cityname: %s, result: %s", i, regdate, cityname, result)             elif index == 6:                 pass     except exception e:         logger.error("error: %s", e)     finally:         sc.stop()   def getallcityname():     array = ("cc", "hangzhou", "taizhou", "jiaxing")      return array   if __name__ == '__main__':     port = 5100     curtime = time.time()     citynames = getallcityname()     index1, cityname in enumerate(citynames):         calc((port + index1, cityname, ["2014", "03", "26"])) 

it has 10 partitions (progress: 7/10). if data big, shuffle out of hand 10 partitions. i've found jobs hang when have big data , not enough partitions. hang might caused mental gc hang, or memory related. want 2 - 4 partitions per cpu, use more stop ooms , crazy gc hangs, 1000s.

also countbykey doesn't scale in number of keys, can problem. try (scala code) .map((_, 1)).reducebykey(_ + _)


Comments

Popular posts from this blog

java - WrongTypeOfReturnValue exception thrown when unit testing using mockito -

php - Magento - Deleted Base url key -

android - How to disable Button if EditText is empty ? -