Module: CloudCrowd

Defined in:
lib/cloud-crowd.rb,
lib/cloud_crowd/node.rb,
lib/cloud_crowd/action.rb,
lib/cloud_crowd/models.rb,
lib/cloud_crowd/models.rb,
lib/cloud_crowd/server.rb,
lib/cloud_crowd/worker.rb,
lib/cloud_crowd/helpers.rb,
lib/cloud_crowd/version.rb,
lib/cloud_crowd/inflector.rb,
lib/cloud_crowd/dispatcher.rb,
lib/cloud_crowd/exceptions.rb,
lib/cloud_crowd/models/job.rb,
lib/cloud_crowd/asset_store.rb,
lib/cloud_crowd/command_line.rb,
lib/cloud_crowd/models/work_unit.rb,
lib/cloud_crowd/helpers/resources.rb,
lib/cloud_crowd/models/node_record.rb,
lib/cloud_crowd/asset_store/s3_store.rb,
lib/cloud_crowd/helpers/authorization.rb,
lib/cloud_crowd/models/black_listed_action.rb,
lib/cloud_crowd/asset_store/cloudfiles_store.rb,
lib/cloud_crowd/asset_store/filesystem_store.rb

Defined Under Namespace

Modules: Helpers, Inflector, ModelStatus Classes: Action, AssetStore, BlackListedAction, CommandLine, Dispatcher, Error, Job, Node, NodeRecord, Server, WorkUnit, Worker

Constant Summary collapse

SCHEMA_VERSION =

Increment the schema version when there’s a backwards incompatible change.

5
ROOT =

Root directory of the CloudCrowd gem.

File.expand_path(File.dirname(__FILE__) + '/..')
LOG_PATH =

Default folder to log daemonized servers and nodes into.

'log'
PID_PATH =

Default folder to contain the pids of daemonized servers and nodes.

'tmp/pids'
MIN_RETRIES =

Minimum number of attempts per work unit.

1
PROCESSING =

A Job is processing if its WorkUnits are in the queue to be handled by nodes.

1
SUCCEEDED =

A Job has succeeded if all of its WorkUnits have finished successfully.

2
FAILED =

A Job has failed if even a single one of its WorkUnits has failed (they may be attempted multiple times on failure, however).

3
SPLITTING =

A Job is splitting if it’s in the process of dividing its inputs up into multiple WorkUnits.

4
MERGING =

A Job is merging if it’s busy collecting all of its successful WorkUnits back together into the final result.

5
COMPLETE =

A Job is considered to be complete if it succeeded or if it failed.

[SUCCEEDED, FAILED]
INCOMPLETE =

A Job is considered incomplete if it’s being processed, split up or merged.

[PROCESSING, SPLITTING, MERGING]
DISPLAY_STATUS_MAP =

Mapping of statuses to their display strings.

['unknown', 'processing', 'succeeded', 'failed', 'splitting', 'merging']
MODELS =
[Job, NodeRecord, WorkUnit, BlackListedAction]
VERSION =
'0.7.7'
VERSION_RELEASED =
'2017-05-18'

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.configObject (readonly)

Returns the value of attribute config.



83
84
85
# File 'lib/cloud-crowd.rb', line 83

def config
  @config
end

.identityObject

Returns the value of attribute identity.



84
85
86
# File 'lib/cloud-crowd.rb', line 84

def identity
  @identity
end

Class Method Details

.action_pathsObject

Retrieve the list of every installed Action for this node or server.



211
212
213
214
215
216
# File 'lib/cloud-crowd.rb', line 211

def action_paths
  default_actions   = config[:disable_default_actions] ? [] : Dir["#{ROOT}/actions/*.rb"]
  installed_actions = Dir["#{@config_path}/actions/*.rb"]
  custom_actions    = CloudCrowd.config[:actions_path] ? Dir["#{CloudCrowd.config[:actions_path]}/*.rb"] : []
  default_actions + installed_actions + custom_actions
end

.actionsObject

CloudCrowd::Actions are requested dynamically by name. Access them through this actions property, which behaves like a hash. At load time, we load all installed Actions and CloudCrowd’s default Actions into it. If you wish to have certain nodes be specialized to only handle certain Actions, then install only those into the actions directory.



196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/cloud-crowd.rb', line 196

def actions
  return @actions if @actions
  @actions = action_paths.inject({}) do |memo, path|
    path = Pathname.new(path)
    require path.relative? ? path.basename : path
    name = path.basename('.*').to_s
    memo[name] = Module.const_get( Inflector.camelize( name ) )
    memo
  end
rescue NameError => e
  adjusted_message = "One of your actions failed to load. Please ensure that the name of your action class can be deduced from the name of the file. ex: 'word_count.rb' => 'WordCount'\n#{e.message}"
  raise NameError.new(adjusted_message, e.name)
end

.central_serverObject

Get a reference to the central server, including authentication if configured.



153
154
155
# File 'lib/cloud-crowd.rb', line 153

def central_server
  @central_server ||= RestClient::Resource.new(CloudCrowd.config[:central_server], CloudCrowd.client_options)
end

.client_optionsObject

The standard RestClient options for the central server talking to nodes, as well as the other way around. There’s a timeout of 5 seconds to open a connection, and a timeout of 30 to finish reading it.



172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/cloud-crowd.rb', line 172

def client_options
  return @client_options if @client_options
  @client_options = {
    :timeout => (self.server? ? config[:node_timeout] : config[:server_timeout]) || 30,
    :open_timeout => config[:open_timeout] || 5
  }
  if CloudCrowd.config[:http_authentication]
    @client_options[:user]      = CloudCrowd.config[:login]
    @client_options[:password]  = CloudCrowd.config[:password]
  end
  @client_options
end

.configure(configuration) ⇒ Object

Configure CloudCrowd by passing in the path to config.yml.



87
88
89
90
91
92
93
# File 'lib/cloud-crowd.rb', line 87

def configure(configuration)
  if configuration.kind_of? Hash
    load_configuration(configuration)
  else
    load_configuration_from_path(configuration)
  end
end

.configure_database(configuration, validate_schema = true) ⇒ Object

Configure the CloudCrowd central database (and connect to it), by passing in a path to database.yml. The file should use the standard ActiveRecord connection format.



114
115
116
117
118
119
120
# File 'lib/cloud-crowd.rb', line 114

def configure_database(configuration, validate_schema=true)
  if configuration.kind_of? Hash
    load_database_configuration(configuration, validate_schema)
  else
    load_database_configuration_from_path(configuration, validate_schema)
  end
end

.deferObject

Starts a new thread with a ActiveRecord connection_pool and yields for peforming work inside the blocks



143
144
145
146
147
148
149
# File 'lib/cloud-crowd.rb', line 143

def defer
  Thread.new do
    ActiveRecord::Base.connection_pool.with_connection do
      yield
    end
  end
end

.display_status(status) ⇒ Object

Return the displayable status name of an internal CloudCrowd status number. (See the above constants).



187
188
189
# File 'lib/cloud-crowd.rb', line 187

def display_status(status)
  DISPLAY_STATUS_MAP[status] || 'unknown'
end

.load_configuration(configuration) ⇒ Object

Raises:

  • (ArgumentError)


95
96
97
98
99
100
101
102
103
# File 'lib/cloud-crowd.rb', line 95

def load_configuration(configuration)
  raise ArgumentError unless configuration.kind_of? Hash
  @config = configuration
  @config[:work_unit_retries] ||= MIN_RETRIES
  if @config[:actions_path]
    path = Pathname.new( @config[:actions_path] ).realpath
    $LOAD_PATH.unshift( path ) unless $LOAD_PATH.include?( path )
  end
end

.load_configuration_from_path(config_path) ⇒ Object



105
106
107
108
109
# File 'lib/cloud-crowd.rb', line 105

def load_configuration_from_path(config_path)
  @config_path = File.expand_path(File.dirname(config_path))
  configuration = YAML.load(ERB.new(File.read(config_path)).result)
  load_configuration(configuration)
end

.load_database_configuration(configuration, validate_schema = true) ⇒ Object



122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/cloud-crowd.rb', line 122

def load_database_configuration(configuration, validate_schema=true)
  ActiveRecord::Base.establish_connection(configuration)
  if validate_schema
    begin
      version = ActiveRecord::Base.connection.select_values('select max(version) from schema_migrations').first.to_i
    rescue
      version = 0
    end
    return true if version == SCHEMA_VERSION
    puts "Your database schema is out of date. Please use `crowd load_schema` to update it. This will wipe all the tables, so make sure that your jobs have a chance to finish first.\nexiting..."
    exit
  end
end

.load_database_configuration_from_path(config_path, validate_schema = true) ⇒ Object



136
137
138
139
# File 'lib/cloud-crowd.rb', line 136

def load_database_configuration_from_path(config_path, validate_schema=true)
  configuration = YAML.load(ERB.new(File.read(config_path)).result)
  load_database_configuration(configuration, validate_schema)
end

.log(message) ⇒ Object

Output a message with the current Timestamp prepended. Sinatra will re-direct stdout to a log file located at “log_path”



231
232
233
# File 'lib/cloud-crowd.rb', line 231

def log(message)
  printf("%-20s %s\n", Time.now.strftime("%F-%T:"), message)
end

.log_path(log_file = nil) ⇒ Object

The path that daemonized servers and nodes will log to.



158
159
160
161
# File 'lib/cloud-crowd.rb', line 158

def log_path(log_file=nil)
  @log_path ||= config[:log_path] || LOG_PATH
  log_file ? File.join(@log_path, log_file) : @log_path
end

.node?Boolean

Or is it a node?

Returns:

  • (Boolean)


225
226
227
# File 'lib/cloud-crowd.rb', line 225

def node?
  @identity == :node
end

.pid_path(pid_file = nil) ⇒ Object

The path in which daemonized servers and nodes will store their pids.



164
165
166
167
# File 'lib/cloud-crowd.rb', line 164

def pid_path(pid_file=nil)
  @pid_path ||= config[:pid_path] || PID_PATH
  pid_file ? File.join(@pid_path, pid_file) : @pid_path
end

.server?Boolean

Is this CloudCrowd instance a server? Useful for avoiding loading unneeded code from actions.

Returns:

  • (Boolean)


220
221
222
# File 'lib/cloud-crowd.rb', line 220

def server?
  @identity == :server
end