Top Level Namespace

Includes:
GRPC::Core::TimeConsts

Defined Under Namespace

Modules: Api, Fluent, Nodeapi, Proto, StdoutLogger

Instance Method Summary collapse

Instance Method Details

#do_notifyDemand(name, arg, msg) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/fluent/plugin/provider-util.rb', line 34

def do_notifyDemand(name,arg,msg)

  ts = Google::Protobuf::Timestamp.new
  p ts
  ts.from_time(Time.now)
  cdata = Api::Content.new(entity: msg)

  req = Api::Demand.new(id: generateIntID(),
                        sender_id: $clientID,
                        target_id: 0,
                        channel_type: 7,
                        demand_name: name,
                        ts: ts,
                        arg_json: arg,
                        mbus_id: 0,
                        cdata: cdata)
  
#  GRPC.logger.info("NotifyDemand Call #{req.inspect}")
  resp = $sxstub.notify_demand(req)
  GRPC.logger.info("NotifyDemand Response #{resp.inspect}")
  
end

#do_notifySupply(name, arg, msg) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/fluent/plugin/provider-util.rb', line 57

def do_notifySupply(name,arg,msg)

  ts = Google::Protobuf::Timestamp.new

#  $log.puts("Do notify Supply:"+ts.to_s+":"+$clientID.to_s)

  ts.from_time(Time.now)
  cdata = Api::Content.new(entity: msg)

  req = Api::Supply.new(id: generateIntID(),
                        sender_id: $clientID,
                        target_id: 0,
                        channel_type: 7,   # for fluentd
                        supply_name: name,
                        ts: ts,
                        arg_json: arg,
                        mbus_id: 0,
                        cdata: cdata)
  
#  GRPC.logger.info("NotifySupply Call #{req.inspect}")
  resp = $sxstub.notify_supply(req)
  GRPC.logger.info("NotifySupply Response #{resp.inspect}")
  
end

#do_registerNode(stub, name) ⇒ Object



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/fluent/plugin/provider-util.rb', line 106

def do_registerNode(stub,name)
  GRPC.logger.info("Register Node")
  req = Nodeapi::NodeInfo.new(node_name: name,
                                node_type: Nodeapi::NodeType::PROVIDER,
                                server_info: "",
                                node_pbase_version: "0.1.2",
                                with_node_id: -1,
                                cluster_id: 0,
                                area_id: "Default",
                                channelTypes: [7]) #fluentd
  resp = stub.register_node(req)
  GRPC.logger.info("RN:Answer: #{resp.inspect}")

  return resp
end

#generateIntIDObject



122
123
124
# File 'lib/fluent/plugin/provider-util.rb', line 122

def generateIntID()
  return $idgen.next_id
end

#keepAliveObject



83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/fluent/plugin/provider-util.rb', line 83

def keepAlive()
  while $nodeInfo['secret'] != 0 do
    sleep( $nodeInfo['keepalive_duration'])
    $updateCount += 1
    req = Nodeapi::NodeUpdate.new(node_id: $nodeInfo['node_id'],
                                  secret: $nodeInfo['secret'],
                                  update_count: $updateCount,
                                  node_status: 0,
                                  node_arg: $status)
    resp = $nodestub.keep_alive(req)
    GRPC.logger.info("Response for #{$nodeInfo['node_id']}, #{resp.inspect}")
    
  end
end

#registerServ(nodesv, name) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/fluent/plugin/provider-util.rb', line 126

def registerServ(nodesv,name)
  # connect to nodeserv
  $nodestub = Nodeapi::Node::Stub.new(nodesv, :this_channel_is_insecure, timeout: INFINITE_FUTURE)
  GRPC.logger.info(".. connecting insecurely on nodeserv #{nodesv}")
  $nodeInfo =  do_registerNode($nodestub,name)  # got server info
  
  service_epoch = Time.new(2010, 11, 4, 1, 42, 54).strftime('%s%L').to_i
  # may Twitter epoch of snowflake,
  # 1288834974657
  
  $idgen = AnyFlake.new(service_epoch, $nodeInfo.node_id)
  $clientID = generateIntID()
  
  GRPC.logger.info(".. connecting insecurely on synerex server to #{$nodeInfo['server_info']}")
  $sxstub = Api::Synerex::Stub.new($nodeInfo.server_info, :this_channel_is_insecure, timeout: INFINITE_FUTURE)

  startKeepAlive
end

#startKeepAliveObject



100
101
102
# File 'lib/fluent/plugin/provider-util.rb', line 100

def startKeepAlive()
  $threads << Thread.new { keepAlive() }
end