python - How could I set hwm in the push/pull pattern of zmq? -


i have found similar question, zeromq: hwm on push not work, couldn't solve problem.

i want control number of messages push socket queues, doesn't work , still queues 1000 messages.
want know how set hwm of push socket. in advance.

my environment is: libzmq 4.0.4, pyzmq 14.1.0, python 3.3

here's code:

server.py

#!/usr/bin/env python3 # -*- coding: utf-8 -*-  import random  import zmq   class testpush(object):      def __init__(self):         self.ctx = zmq.context()          random.seed()      def run(self):         task_snd = self.ctx.socket(zmq.push)         task_snd.setsockopt(zmq.sndhwm, 10)         task_snd.bind('tcp://*:53000')                  while true:             workload = str(random.randint(1, 100))             task_snd.send(workload.encode('utf-8'))             print('send {0}'.format(workload))   if __name__ == '__main__':     test_push = testpush()     test_push.run() 

client.py

#!/usr/bin/env python3 # -*- coding: utf-8 -*-  import time import random  import zmq   class testpull(object):      def __init__(self):         self.ctx = zmq.context()      def run(self):         task_rcv = self.ctx.socket(zmq.pull)         task_rcv.setsockopt(zmq.rcvhwm, 1)         task_rcv.connect('tcp://localhost:53000')          while true:             msg = task_rcv.recv()             print('receive msg: {0}'.format(msg))              time.sleep(random.randint(2, 3))   if __name__ == '__main__':     test_pull = testpull()     test_pull.run() 

i faced similar problem zeromq when tried set hwm (high water mark) on push , pull socket. even, not working pub , sub socket. explain happened , how got resolved.

i made 2 scripts, first sender push socket , receiver pull socket. set hwm 10 on both sockets. , inside receiver script, put delay of 5 seconds after each message received. ran sender script loop of 100 messages (after keeping receiver running receive).

what had expected:

the receiver queue , sender's queue reach hwm. after that, sender stop sending more messages.

but happened:

the sender sends 100 messages , exited. receiver has kept processing message one-after long time till receives messages.

after research found out reason:

there called kernel socket buffer seats between both sender socket , receiver sockets. whenever, process opens socket, kernel allots memory space tcp sockets, default 128kb. kernel socket buffer applicable sender , receiver socket (so total buffer 128kb + 128kb). message size in bytes (an int characters). thus, total message buffering follows:

total buffer message = sender socket hwm + kernel socket buffer sender socket (128kb) + receiver socket hwm + kernel socket buffer sender socket (128kb)

solution:

now, changed message length little more 1kb. perform test again , found approx 260 messages sent (as expected), after sender stops in interval till receiver receives message , starts again.


additional info

in order push socket keep sending messages when receiver not able receive, can use noblock option in send routine, number of lost messages receiver increase high. so, better option use poll or timeout , call send routine noblock option.

please note may use sndbuff/rcvbuff of zeromq os may not obey (as in case not working).


Comments

Popular posts from this blog

java - WrongTypeOfReturnValue exception thrown when unit testing using mockito -

php - Magento - Deleted Base url key -

android - How to disable Button if EditText is empty ? -