python - Why does PySpark get stuck after completing few ShuffleMapTasks? -
i use pyspark run python script, when executing "14/03/23 21:00:30 info scheduler.dagscheduler: completed shufflemaptask(5, 6)" there no reaction, , has been stuck in here, there no error message。
==========log=============
14/03/23 21:00:30 info scheduler.tasksetmanager: serialized task 5.0:4 3689 bytes in 1 ms 14/03/23 21:00:30 info scheduler.tasksetmanager: starting task 5.0:5 tid 73 on executor 0: myhost-bigdata-110d13 (node_local) 14/03/23 21:00:30 info scheduler.tasksetmanager: serialized task 5.0:5 3689 bytes in 0 ms 14/03/23 21:00:30 info scheduler.tasksetmanager: starting task 5.0:6 tid 74 on executor 2: myhost-bigdata-110d14 (node_local) 14/03/23 21:00:30 info scheduler.tasksetmanager: serialized task 5.0:6 3689 bytes in 0 ms 14/03/23 21:00:30 info scheduler.tasksetmanager: starting task 5.0:7 tid 75 on executor 1: myhost-bigdata-110d12 (node_local) 14/03/23 21:00:30 info scheduler.tasksetmanager: serialized task 5.0:7 3689 bytes in 0 ms 14/03/23 21:00:30 info scheduler.tasksetmanager: starting task 5.0:8 tid 76 on executor 0: myhost-bigdata-110d13 (node_local) 14/03/23 21:00:30 info scheduler.tasksetmanager: serialized task 5.0:8 3689 bytes in 0 ms 14/03/23 21:00:30 info scheduler.tasksetmanager: starting task 5.0:9 tid 77 on executor 2: myhost-bigdata-110d14 (node_local) 14/03/23 21:00:30 info scheduler.tasksetmanager: serialized task 5.0:9 3689 bytes in 0 ms 14/03/23 21:00:30 info scheduler.tasksetmanager: finished tid 73 in 200 ms on myhost-bigdata-110d13 (progress: 0/10) 14/03/23 21:00:30 info scheduler.dagscheduler: completed shufflemaptask(5, 5) 14/03/23 21:00:30 info scheduler.tasksetmanager: finished tid 76 in 218 ms on myhost-bigdata-110d13 (progress: 1/10) 14/03/23 21:00:30 info scheduler.dagscheduler: completed shufflemaptask(5, 8) 14/03/23 21:00:30 info scheduler.tasksetmanager: finished tid 72 in 324 ms on myhost-bigdata-110d12 (progress: 2/10) 14/03/23 21:00:30 info scheduler.dagscheduler: completed shufflemaptask(5, 4) 14/03/23 21:00:30 info scheduler.tasksetmanager: finished tid 69 in 371 ms on myhost-bigdata-110d12 (progress: 3/10) 14/03/23 21:00:30 info scheduler.dagscheduler: completed shufflemaptask(5, 1) 14/03/23 21:00:30 info scheduler.tasksetmanager: finished tid 75 in 367 ms on myhost-bigdata-110d12 (progress: 4/10) 14/03/23 21:00:30 info scheduler.dagscheduler: completed shufflemaptask(5, 7) 14/03/23 21:00:30 info scheduler.tasksetmanager: finished tid 77 in 423 ms on myhost-bigdata-110d14 (progress: 5/10) 14/03/23 21:00:30 info scheduler.dagscheduler: completed shufflemaptask(5, 9) 14/03/23 21:00:30 info scheduler.tasksetmanager: finished tid 71 in 435 ms on myhost-bigdata-110d14 (progress: 6/10) 14/03/23 21:00:30 info scheduler.dagscheduler: completed shufflemaptask(5, 3) 14/03/23 21:00:30 info scheduler.tasksetmanager: finished tid 74 in 510 ms on myhost-bigdata-110d14 (progress: 7/10) 14/03/23 21:00:30 info scheduler.dagscheduler: completed shufflemaptask(5, 6)
==============work1 log==============
14/03/23 21:32:33 info executor.executor: serialized size of result 59 962 14/03/23 21:32:33 info executor.executor: sending result 59 directly driver 14/03/23 21:32:33 info executor.executor: serialized size of result 47 962 14/03/23 21:32:33 info executor.executor: finished task id 59 14/03/23 21:32:33 info executor.executor: sending result 47 directly driver 14/03/23 21:32:33 info executor.executor: finished task id 47 14/03/23 21:32:33 info executor.executor: serialized size of result 44 962 14/03/23 21:32:33 info executor.executor: sending result 44 directly driver 14/03/23 21:32:33 info executor.executor: finished task id 44 14/03/23 21:32:33 info python.pythonrdd: times: total = 82, boot = 3, init = 76, finish = 3 14/03/23 21:32:33 info executor.executor: serialized size of result 65 962 14/03/23 21:32:33 info executor.executor: sending result 65 directly driver 14/03/23 21:32:33 info executor.executor: finished task id 65 14/03/23 21:32:33 info executor.coarsegrainedexecutorbackend: got assigned task 68 14/03/23 21:32:33 info executor.executor: running task id 68 14/03/23 21:32:33 info executor.coarsegrainedexecutorbackend: got assigned task 71 14/03/23 21:32:33 info executor.executor: running task id 71 14/03/23 21:32:33 info executor.coarsegrainedexecutorbackend: got assigned task 74 14/03/23 21:32:33 info executor.executor: running task id 74 14/03/23 21:32:33 info executor.coarsegrainedexecutorbackend: got assigned task 77 14/03/23 21:32:33 info executor.executor: running task id 77 14/03/23 21:32:33 info storage.blockmanager: found block broadcast_0 locally 14/03/23 21:32:33 info storage.blockmanager: found block broadcast_0 locally 14/03/23 21:32:33 info storage.blockmanager: found block broadcast_0 locally 14/03/23 21:32:33 info storage.blockmanager: found block broadcast_0 locally 14/03/23 21:32:33 info rdd.hadooprdd: input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00000.bz2:0+14685 14/03/23 21:32:33 info rdd.hadooprdd: input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00009.bz2:0+15447 14/03/23 21:32:33 info rdd.hadooprdd: input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00006.bz2:0+14924 14/03/23 21:32:33 info rdd.hadooprdd: input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00003.bz2:0+15015 14/03/23 21:32:33 info python.pythonrdd: times: total = 89, boot = 3, init = 62, finish = 24 14/03/23 21:32:33 info executor.executor: serialized size of result 68 851 14/03/23 21:32:33 info executor.executor: sending result 68 directly driver 14/03/23 21:32:33 info executor.executor: finished task id 68 14/03/23 21:32:33 info python.pythonrdd: times: total = 83, boot = 2, init = 57, finish = 24 14/03/23 21:32:33 info executor.executor: serialized size of result 77 851 14/03/23 21:32:33 info executor.executor: sending result 77 directly driver 14/03/23 21:32:33 info executor.executor: finished task id 77 14/03/23 21:32:34 info python.pythonrdd: times: total = 66, boot = 2, init = 40, finish = 24 14/03/23 21:32:34 info python.pythonrdd: times: total = 95, boot = 2, init = 60, finish = 33 14/03/23 21:32:34 info executor.executor: serialized size of result 71 851 14/03/23 21:32:34 info executor.executor: sending result 71 directly driver 14/03/23 21:32:34 info executor.executor: finished task id 71 14/03/23 21:32:34 info executor.executor: serialized size of result 74 851 14/03/23 21:32:34 info executor.executor: sending result 74 directly driver 14/03/23 21:32:34 info executor.executor: finished task id 74
=========work2 log ========================
14/03/23 21:32:40 info executor.executor: serialized size of result 60 962 14/03/23 21:32:40 info executor.executor: sending result 60 directly driver 14/03/23 21:32:40 info executor.executor: finished task id 60 14/03/23 21:32:40 info executor.coarsegrainedexecutorbackend: got assigned task 69 14/03/23 21:32:40 info executor.executor: running task id 69 14/03/23 21:32:40 info executor.coarsegrainedexecutorbackend: got assigned task 72 14/03/23 21:32:40 info executor.executor: running task id 72 14/03/23 21:32:40 info executor.coarsegrainedexecutorbackend: got assigned task 75 14/03/23 21:32:40 info executor.executor: running task id 75 14/03/23 21:32:40 info storage.blockmanager: found block broadcast_0 locally 14/03/23 21:32:40 info storage.blockmanager: found block broadcast_0 locally 14/03/23 21:32:40 info rdd.hadooprdd: input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00001.bz2:0+14114 14/03/23 21:32:40 info storage.blockmanager: found block broadcast_0 locally 14/03/23 21:32:40 info rdd.hadooprdd: input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00007.bz2:0+15325 14/03/23 21:32:40 info rdd.hadooprdd: input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00004.bz2:0+15169 14/03/23 21:32:40 info python.pythonrdd: stdin writer python finished 14/03/23 21:32:40 info python.pythonrdd: stdin writer python finished 14/03/23 21:32:40 info python.pythonrdd: times: total = 25, boot = 2, init = 10, finish = 13 14/03/23 21:32:40 info executor.executor: serialized size of result 72 851 14/03/23 21:32:40 info executor.executor: sending result 72 directly driver 14/03/23 21:32:40 info executor.executor: finished task id 72
===============work3 log===================
14/03/23 21:32:28 info executor.executor: serialized size of result 55 962 14/03/23 21:32:28 info executor.executor: serialized size of result 61 962 14/03/23 21:32:28 info executor.executor: sending result 55 directly driver 14/03/23 21:32:28 info executor.executor: finished task id 58 14/03/23 21:32:28 info executor.executor: finished task id 55 14/03/23 21:32:28 info executor.executor: sending result 61 directly driver 14/03/23 21:32:28 info executor.executor: finished task id 61 14/03/23 21:32:28 info python.pythonrdd: times: total = 92, boot = 3, init = 86, finish = 3 14/03/23 21:32:28 info executor.executor: serialized size of result 67 968 14/03/23 21:32:28 info executor.executor: sending result 67 directly driver 14/03/23 21:32:28 info executor.executor: finished task id 67 14/03/23 21:32:28 info executor.executor: serialized size of result 64 962 14/03/23 21:32:28 info executor.executor: sending result 64 directly driver 14/03/23 21:32:28 info executor.executor: finished task id 64 14/03/23 21:32:29 info executor.coarsegrainedexecutorbackend: got assigned task 70 14/03/23 21:32:29 info executor.executor: running task id 70 14/03/23 21:32:29 info executor.coarsegrainedexecutorbackend: got assigned task 73 14/03/23 21:32:29 info executor.executor: running task id 73 14/03/23 21:32:29 info executor.coarsegrainedexecutorbackend: got assigned task 76 14/03/23 21:32:29 info executor.executor: running task id 76 14/03/23 21:32:29 info storage.blockmanager: found block broadcast_0 locally 14/03/23 21:32:29 info storage.blockmanager: found block broadcast_0 locally 14/03/23 21:32:29 info storage.blockmanager: found block broadcast_0 locally 14/03/23 21:32:29 info rdd.hadooprdd: input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00002.bz2:0+14560 14/03/23 21:32:29 info rdd.hadooprdd: input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00008.bz2:0+14842 14/03/23 21:32:29 info rdd.hadooprdd: input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00005.bz2:0+14961 14/03/23 21:32:29 info python.pythonrdd: times: total = 73, boot = 3, init = 46, finish = 24 14/03/23 21:32:29 info executor.executor: serialized size of result 70 851 14/03/23 21:32:29 info executor.executor: sending result 70 directly driver 14/03/23 21:32:29 info executor.executor: finished task id 70 14/03/23 21:32:29 info python.pythonrdd: times: total = 73, boot = 2, init = 47, finish = 24 14/03/23 21:32:29 info executor.executor: serialized size of result 76 851 14/03/23 21:32:29 info executor.executor: sending result 76 directly driver 14/03/23 21:32:29 info executor.executor: finished task id 76 14/03/23 21:32:29 info python.pythonrdd: times: total = 78, boot = 3, init = 47, finish = 28 14/03/23 21:32:29 info executor.executor: serialized size of result 73 851 14/03/23 21:32:29 info executor.executor: sending result 73 directly driver 14/03/23 21:32:29 info executor.executor: finished task id 73
=============my code============
conf = sparkconf() conf.setmaster("spark://myhost-bigdata:7077") conf.setappname("job") conf.set("spark.ui.port", 4040) sc=sparkcontext(conf=conf) types = ["pv","uv"] type in types: dm = sc.textfile("/tmp/uv/part-%s-*.gz" % type) arrs1 = dm.map(lambda line: (int(line.split("^")[0].split("_")[0]),line.split("^")[1])).distinct().map(lambda a:( a[0], 1)).countbykey().items() arrs2 = dm.map(lambda line: (int(line.split("^")[0].split("_")[1]),line.split("^")[1])).distinct().map(lambda a:( a[0], 1)).countbykey().items() arrs3 = dm.map(lambda line: (int(line.split("^")[0].split("_")[2]),line.split("^")[1])).distinct().map(lambda a:( a[0], 1)).countbykey().items()
=====================my new code ==================
#-*- encoding: utf-8 -*- import logging import time pyspark import sparkconf, sparkcontext logger = logging.getlogger("endlesscode") formatter = logging.formatter('%(name)-12s %(asctime)s %(levelname)-8s %(message)s', '%a, %d %b %y %h:%m:%s', ) file_handler = logging.filehandler("y.log") file_handler.setformatter(formatter) logger.addhandler(file_handler) logger.setlevel(logging.debug) def getsparkcontext(port): conf = sparkconf() conf.setmaster("spark://my-bigdata-01:7077") conf.setappname("test") conf.set("spark.ui.port", port) return sparkcontext(conf=conf) def getpv(dm, level): return dm.map( lambda line: (int(line.split("^")[0].split("_")[level]), int(line.split("^")[1]))).reducebykey( lambda a, b: + b, numpartitions=80).collect() def getuv(dm, level): return dm.map( lambda line: (int(line.split("^")[0].split("_")[level]), line.split("^")[1])).distinct().map( lambda a: (a[0], 1)).reducebykey( lambda a, b: + b, numpartitions=80).collect() def calc(ls): sc = [] try: port = ls[0] cityname = ls[1] calctime = ls[2] regdate = calctime[0] + "-" + calctime[1] + "-" + calctime[2] lognames = ["pcpv", "wappv", "apppv", "pcuv", "wapuv", "appuv", "hottidall"] sc = getsparkcontext(port) index, ln in enumerate(lognames): log = "/tmp/2014report/%s/%s/%s/%s/%s/part-*.bz2" % (calctime[0], calctime[1], calctime[2], cityname, ln) dm = sc.textfile(log) logger.debug("type: %s, total count: %s; cityname: %s; first: %s" % (ln, dm.count(), cityname, dm.take(1))) if index in (0, 1, 2): in range(0, 3): result = getpv(dm, i) logger.debug("level: %s, regdate: %s, cityname: %s, result: %s", i, regdate, cityname, result) elif index in (3, 4, 5): in range(0, 3): result = getuv(dm, i) logger.debug("level: %s, regdate: %s, cityname: %s, result: %s", i, regdate, cityname, result) elif index == 6: pass except exception e: logger.error("error: %s", e) finally: sc.stop() def getallcityname(): array = ("cc", "hangzhou", "taizhou", "jiaxing") return array if __name__ == '__main__': port = 5100 curtime = time.time() citynames = getallcityname() index1, cityname in enumerate(citynames): calc((port + index1, cityname, ["2014", "03", "26"]))
it has 10 partitions (progress: 7/10)
. if data big, shuffle out of hand 10 partitions. i've found jobs hang when have big data , not enough partitions. hang might caused mental gc hang, or memory related. want 2 - 4 partitions per cpu, use more stop ooms , crazy gc hangs, 1000s.
also countbykey
doesn't scale in number of keys, can problem. try (scala code) .map((_, 1)).reducebykey(_ + _)
Comments
Post a Comment