Sophie

Sophie

distrib > Mageia > 7 > i586 > media > core-release > by-pkgid > b5ea5212b3c734991262ca77a0dfad8b > files > 415

python-qpid-1.37.0-1.mga7.noarch.rpm

#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# 
#   http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

import optparse, os, sys, time
from uuid import uuid4
from qpid.messaging import *
from qpid.log import enable, DEBUG, WARN
from common import *

parser = optparse.OptionParser(usage="usage: %prog [options] PATTERN ...",
                               description="reserve a machine")
parser.add_option("-b", "--broker", default="localhost",
                  help="connect to specified BROKER (default %default)")
parser.add_option("-a", "--address", default="reservations",
                  help="address for reservation requests")
parser.add_option("-r", "--release", action="store_true",
                  help="release any machines matching the pattern")
parser.add_option("-s", "--status", action="store_true",
                  help="list machine status")
parser.add_option("-d", "--discover", action="store_true",
                  help="use discovery instead of inventory")
parser.add_option("-o", "--owner", default=os.environ["USER"],
                  help="the holder of the reservation")
parser.add_option("-n", "--number", type=int, default=1,
                  help="the number of machines to reserve")
parser.add_option("-t", "--timeout", type=float, default=10,
                  help="timeout in seconds to wait for resources")
parser.add_option("-v", dest="verbose", action="store_true",
                  help="enable verbose logging")

opts, args = parser.parse_args()

if opts.verbose:
  enable("qpid", DEBUG)
else:
  enable("qpid", WARN)

if args:
  patterns = args
else:
  patterns = ["*"]

conn = Connection.establish(opts.broker)

if opts.release:
  request_type = "release"
  candidate_status = BUSY
  candidate_owner = opts.owner
else:
  request_type = "reserve"
  candidate_status = FREE
  candidate_owner = None

class Requester(Dispatcher):

  def __init__(self):
    self.agents = {}
    self.requests = set()
    self.outstanding = set()

  def agent_status(self, id):
    status, owner = self.agents[id]
    if owner:
      return "%s %s(%s)" % (id, status, owner)
    else:
      return "%s %s" % (id, status)

  def correlation(self, cid):
    self.requests.add(cid)
    self.outstanding.add(cid)

  def ignored(self, msg):
    return msg.properties.get("type") not in ("status", "empty") or \
        msg.correlation_id not in self.requests

  def do_status(self, msg):
    id, status, owner = get_status(msg)
    self.agents[id] = (status, owner)

    if opts.status:
      print self.agent_status(id)

  def do_empty(self, msg):
    print "no matching resources"

  def candidates(self, candidate_status, candidate_owner):
    for id, (status, owner) in self.agents.items():
      if status == candidate_status and owner == candidate_owner:
        yield id

  def dispatch(self, msg):
    result = Dispatcher.dispatch(self, msg)
    count = msg.properties.get("count")
    sequence = msg.properties.get("sequence")
    if count and sequence == count:
      self.outstanding.discard(msg.correlation_id)
    return result

try:
  ssn = conn.session()
  rcv = ssn.receiver(opts.address, capacity=10)
  snd = ssn.sender(opts.address)

  correlation_id = str(uuid4())

  if opts.discover:
    properties = {"type": "discover", "identity": patterns}
    content = None
  else:
    properties = {"type": "query"}
    content = {"identity": patterns}

  snd.send(Message(reply_to = opts.address,
                   correlation_id = correlation_id,
                   properties = properties,
                   content = content))

  req = Requester()
  req.correlation(correlation_id)

  start = time.time()
  ellapsed = 0
  requested = set()
  discovering = opts.discover

  while ellapsed <= opts.timeout and (discovering or req.outstanding):
    try:
      msg = rcv.fetch(opts.timeout - ellapsed)
      ssn.acknowledge(msg)
    except Empty:
      continue
    finally:
      ellapsed = time.time() - start

    req.dispatch(msg)
    if not opts.status:
      if len(requested) < opts.number:
        for cid in req.candidates(candidate_status, candidate_owner):
          if cid in requested: continue
          req_msg = Message(reply_to = opts.address,
                            correlation_id = str(uuid4()),
                            properties = {"type": request_type,
                                          "identity": [cid]},
                            content = {"owner": opts.owner})
          if not requested:
            print "requesting %s:" % request_type,
          print cid,
          sys.stdout.flush()
          req.correlation(req_msg.correlation_id)
          snd.send(req_msg)
          requested.add(cid)
      else:
        discovering = False

  if requested:
    print
    owners = {}
    for id in requested:
      st, ow = req.agents[id]
      if not owners.has_key(ow):
        owners[ow] = []
      owners[ow].append(id)
    keys = list(owners.keys())
    keys.sort()
    for k in keys:
      owners[k].sort()
      v = ", ".join(owners[k])
      if k is None:
        print "free: %s" % v
      else:
        print "owner %s: %s" % (k, v)
  elif req.agents and not opts.status:
    print "no available resources"

  if req.outstanding:
    print "request timed out"
except KeyboardInterrupt:
  pass
finally:
  conn.close()