Class: Cod::Beanstalk::Channel
- Defined in:
- lib/cod/beanstalk/channel.rb
Overview
Beanstalk channels cannot currently be used in Cod.select. This is due to limitations inherent in the beanstalkd protocol. We’ll probably try to get a patch into beanstalkd to change this.
If you embed a beanstalk channel into one of your messages, you will get a channel that connects to the same server and the same tube on the other end. This behaviour is useful for Service.
A channel based on a beanstalkd tube. A #put will insert messages into the tube, and a #get will fetch the next message that is pending on the tube.
Defined Under Namespace
Classes: Control
Constant Summary collapse
- JOB_PRIORITY =
All messages get inserted into the beanstalk queue as this priority.
0
Instance Attribute Summary collapse
-
#server_url ⇒ Object
readonly
Beanstalkd server this channel is connected to.
-
#tube_name ⇒ Object
readonly
Which tube this channel is connected to.
Class Method Summary collapse
Instance Method Summary collapse
-
#_dump(level) ⇒ Object
———————————————————- serialization.
- #bs_bury(msg_id) ⇒ Object
-
#bs_delete(msg_id) ⇒ Object
—————————————————– beanstalk commands.
- #bs_release(msg_id) ⇒ Object
- #bs_release_with_delay(msg_id, seconds) ⇒ Object
- #client(answers_to) ⇒ Object
- #close ⇒ Object
- #get ⇒ Object
-
#initialize(tube_name, server_url) ⇒ Channel
constructor
A new instance of Channel.
-
#initialize_copy(other) ⇒ Object
Allow #dup on beanstalkd channels, resulting in a new connection to the beanstalkd server.
- #put(msg) ⇒ Object
-
#service ⇒ Object
——————————————————— service/client.
- #to_read_fds ⇒ Object
-
#try_get {|Object, Cod::Beanstalk::Channel::Control| ... } ⇒ Object
Like #get, read next message from the channel but reserve the right to put it back.
Methods inherited from Channel
Constructor Details
#initialize(tube_name, server_url) ⇒ Channel
Returns a new instance of Channel.
24 25 26 27 28 29 30 |
# File 'lib/cod/beanstalk/channel.rb', line 24 def initialize(tube_name, server_url) super() @tube_name, @server_url = tube_name, server_url @body_serializer = Cod::SimpleSerializer.new @transport = connection(server_url, tube_name) end |
Instance Attribute Details
#server_url ⇒ Object (readonly)
Beanstalkd server this channel is connected to
22 23 24 |
# File 'lib/cod/beanstalk/channel.rb', line 22 def server_url @server_url end |
#tube_name ⇒ Object (readonly)
Which tube this channel is connected to
20 21 22 |
# File 'lib/cod/beanstalk/channel.rb', line 20 def tube_name @tube_name end |
Class Method Details
Instance Method Details
#_dump(level) ⇒ Object
———————————————————- serialization
168 169 170 171 |
# File 'lib/cod/beanstalk/channel.rb', line 168 def _dump(level) # :nodoc: Marshal.dump( [@tube_name, @server_url]) end |
#bs_bury(msg_id) ⇒ Object
192 193 194 195 196 |
# File 'lib/cod/beanstalk/channel.rb', line 192 def bs_bury(msg_id) # NOTE: Why I need to assign a priority when burying I fail to # understand. Like a priority for rapture? bs_command([:bury, msg_id, JOB_PRIORITY], :buried) end |
#bs_delete(msg_id) ⇒ Object
—————————————————– beanstalk commands
180 181 182 |
# File 'lib/cod/beanstalk/channel.rb', line 180 def bs_delete(msg_id) bs_command([:delete, msg_id], :deleted) end |
#bs_release(msg_id) ⇒ Object
184 185 186 |
# File 'lib/cod/beanstalk/channel.rb', line 184 def bs_release(msg_id) bs_command([:release, msg_id, JOB_PRIORITY, 0], :released) end |
#bs_release_with_delay(msg_id, seconds) ⇒ Object
188 189 190 |
# File 'lib/cod/beanstalk/channel.rb', line 188 def bs_release_with_delay(msg_id, seconds) bs_command([:release, msg_id, JOB_PRIORITY, seconds], :released) end |
#client(answers_to) ⇒ Object
73 74 75 |
# File 'lib/cod/beanstalk/channel.rb', line 73 def client(answers_to) Service::Client.new(self, answers_to) end |
#close ⇒ Object
59 60 61 |
# File 'lib/cod/beanstalk/channel.rb', line 59 def close @transport.close end |
#get ⇒ Object
50 51 52 53 54 55 56 57 |
# File 'lib/cod/beanstalk/channel.rb', line 50 def get id, msg = bs_reserve # We delete the job immediately, since #get should be definitive. bs_delete(id) deserialize(msg) end |
#initialize_copy(other) ⇒ Object
Allow #dup on beanstalkd channels, resulting in a new connection to the beanstalkd server.
35 36 37 38 |
# File 'lib/cod/beanstalk/channel.rb', line 35 def initialize_copy(other) super(other) initialize(other.tube_name, other.server_url) end |
#put(msg) ⇒ Object
40 41 42 43 44 45 46 47 48 |
# File 'lib/cod/beanstalk/channel.rb', line 40 def put(msg) pri = JOB_PRIORITY delay = 0 ttr = 120 body = @body_serializer.en(msg) answer, *rest = @transport.interact([:put, pri, delay, ttr, body]) fail "#put fails, #{answer.inspect}" unless answer == :inserted end |
#service ⇒ Object
——————————————————— service/client
70 71 72 |
# File 'lib/cod/beanstalk/channel.rb', line 70 def service Service.new(self) end |
#to_read_fds ⇒ Object
64 65 66 67 |
# File 'lib/cod/beanstalk/channel.rb', line 64 def to_read_fds fail "Cod.select not supported with beanstalkd channels.\n"+ "To support this, we will have to extend the beanstalkd protocol." end |
#try_get {|Object, Cod::Beanstalk::Channel::Control| ... } ⇒ Object
Like #get, read next message from the channel but reserve the right to put it back. This uses beanstalkds flow control features to be able to control message flow in the case of exceptions and the like.
If the block given to this message raises an exception, the message is released unless a control command has been given. This means that other workers on the same tube will get the chance to see the message.
If the block is exited without specifying a fate for the message, it is deleted from the tube.
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/cod/beanstalk/channel.rb', line 112 def try_get fail "No block given to #try_get" unless block_given? id, msg = bs_reserve control = Control.new(id, self) begin retval = yield(deserialize(msg), control) rescue Exception control.release unless control.command_given? raise ensure control.delete unless control.command_given? end return retval end |