Class: Cod::Beanstalk::Channel

Inherits:
Channel
  • Object
show all
Defined in:
lib/cod/beanstalk/channel.rb

Overview

Note:

Beanstalk channels cannot currently be used in Cod.select. This is due to limitations inherent in the beanstalkd protocol. We’ll probably try to get a patch into beanstalkd to change this.

Note:

If you embed a beanstalk channel into one of your messages, you will get a channel that connects to the same server and the same tube on the other end. This behaviour is useful for Service.

A channel based on a beanstalkd tube. A #put will insert messages into the tube, and a #get will fetch the next message that is pending on the tube.

Defined Under Namespace

Classes: Control

Constant Summary collapse

JOB_PRIORITY =

All messages get inserted into the beanstalk queue as this priority.

0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Channel

#interact

Constructor Details

#initialize(tube_name, server_url) ⇒ Channel

Returns a new instance of Channel.



26
27
28
29
30
31
32
# File 'lib/cod/beanstalk/channel.rb', line 26

def initialize(tube_name, server_url)
  super()
  @tube_name, @server_url = tube_name, server_url

  @body_serializer = Cod::SimpleSerializer.new
  @transport = connection(server_url, tube_name)
end

Instance Attribute Details

#server_urlString (readonly)

Beanstalkd server this channel is connected to

Returns:

  • (String)


24
25
26
# File 'lib/cod/beanstalk/channel.rb', line 24

def server_url
  @server_url
end

#tube_nameString (readonly)

Which tube this channel is connected to

Returns:

  • (String)


21
22
23
# File 'lib/cod/beanstalk/channel.rb', line 21

def tube_name
  @tube_name
end

Class Method Details

._load(str) ⇒ Object



186
187
188
189
# File 'lib/cod/beanstalk/channel.rb', line 186

def self._load(str) # :nodoc:
  tube_name, server_url = Marshal.load(str)
  Cod.beanstalk(tube_name, server_url)
end

Instance Method Details

#_dump(level) ⇒ Object

———————————————————- serialization



181
182
183
184
# File 'lib/cod/beanstalk/channel.rb', line 181

def _dump(level) # :nodoc:
  Marshal.dump(
    [@tube_name, @server_url])
end

#bs_bury(msg_id) ⇒ Object



205
206
207
208
209
# File 'lib/cod/beanstalk/channel.rb', line 205

def bs_bury(msg_id)
  # NOTE: Why I need to assign a priority when burying I fail to
  # understand. Like a priority for rapture?
  bs_command([:bury, msg_id, JOB_PRIORITY], :buried)
end

#bs_delete(msg_id) ⇒ Object

—————————————————– beanstalk commands



193
194
195
# File 'lib/cod/beanstalk/channel.rb', line 193

def bs_delete(msg_id)
  bs_command([:delete, msg_id], :deleted)
end

#bs_release(msg_id) ⇒ Object



197
198
199
# File 'lib/cod/beanstalk/channel.rb', line 197

def bs_release(msg_id)
  bs_command([:release, msg_id, JOB_PRIORITY, 0], :released)
end

#bs_release_with_delay(msg_id, seconds) ⇒ Object



201
202
203
# File 'lib/cod/beanstalk/channel.rb', line 201

def bs_release_with_delay(msg_id, seconds)
  bs_command([:release, msg_id, JOB_PRIORITY, seconds], :released)
end

#client(answers_to) ⇒ Object



86
87
88
# File 'lib/cod/beanstalk/channel.rb', line 86

def client(answers_to)
  Service::Client.new(self, answers_to)
end

#closeObject



70
71
72
# File 'lib/cod/beanstalk/channel.rb', line 70

def close
  @transport.close
end

#getObject

Reads a job from the tube and decodes it as a message.

Returns:

  • (Object)


61
62
63
64
65
66
67
68
# File 'lib/cod/beanstalk/channel.rb', line 61

def get
  id, msg = bs_reserve
  
  # We delete the job immediately, since #get should be definitive.
  bs_delete(id)

  deserialize(msg)
end

#initialize_copy(other) ⇒ Object

Allow #dup on beanstalkd channels, resulting in a new connection to the beanstalkd server.



37
38
39
40
# File 'lib/cod/beanstalk/channel.rb', line 37

def initialize_copy(other)
  super(other)
  initialize(other.tube_name, other.server_url)
end

#put(msg) ⇒ void

This method returns an undefined value.

Puts a job on the tube after serializing.

Parameters:

  • msg (Object)

    message to send



47
48
49
50
51
52
53
54
55
# File 'lib/cod/beanstalk/channel.rb', line 47

def put(msg)
  pri   = JOB_PRIORITY
  delay = 0
  ttr   = 120
  body = @body_serializer.en(msg)
  
  answer, *rest = @transport.interact([:put, pri, delay, ttr, body])
  fail "#put fails, #{answer.inspect}" unless answer == :inserted
end

#serviceObject



83
84
85
# File 'lib/cod/beanstalk/channel.rb', line 83

def service
  Service.new(self)
end

#to_read_fdsObject



75
76
77
78
# File 'lib/cod/beanstalk/channel.rb', line 75

def to_read_fds
  fail "Cod.select not supported with beanstalkd channels.\n"+
    "To support this, we will have to extend the beanstalkd protocol."
end

#try_get {|Object, Cod::Beanstalk::Channel::Control| ... } ⇒ Object

Like #get, read next message from the channel but reserve the right to put it back. This uses beanstalkds flow control features to be able to control message flow in the case of exceptions and the like.

If the block given to this message raises an exception, the message is released unless a control command has been given. This means that other workers on the same tube will get the chance to see the message.

If the block is exited without specifying a fate for the message, it is deleted from the tube.

Examples:

All the flow control that beanstalkd allows

channel.try_get { |msg, ctl|
  if msg == 1
    ctl.release # don't handle messages of type 1
  else
    ctl.bury    # for example
  end
}

Exceptions release the message

# Will release the message and allow other connected channels to 
# #get it.
channel.try_get { |msg, ctl|
  fail "No such message handler"
}

Yields:

Returns:

  • the blocks return value

See Also:



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/cod/beanstalk/channel.rb', line 125

def try_get 
  fail "No block given to #try_get" unless block_given?
  
  id, msg = bs_reserve
  control = Control.new(id, self)
  
  begin
    retval = yield(deserialize(msg), control)
  rescue Exception
    control.release unless control.command_given?
    raise
  ensure
    control.delete unless control.command_given?
  end
  
  return retval
end