"""Library to simplify calls to the 'trigger' web services running on mro.mwa128t.org, used to
interrupt current MWA observations as a result of an incoming trigger.
"""
import base64
import json
import sys
import traceback
import logging
logging.basicConfig()
if sys.version_info.major == 3: # Python3
from urllib.parse import urlencode
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError
else: # Python2
from urllib import urlencode
from urllib2 import urlopen, HTTPError, URLError, Request
DEFAULTLOGGER = logging.getLogger()
DEFAULTLOGGER.level = logging.DEBUG
BASEURL = "http://mro.mwa128t.org/trigger/"
# BASEURL = "http://52.64.91.219/trigger/" # Testing Django service - must be used in 'pretend' mode, as it's using a read-only database connection
[docs]def web_api(
url="",
urldict=None,
postdict=None,
username=None,
password=None,
logger=DEFAULTLOGGER,
):
"""
Given a url, an optional dictionary for URL arguments, and an optional dictionary
containing data to POST, open the appropriate URL, POST data if supplied, and
return the result of the call converted from JSON format to a Python dictionary.
:param url: The full URL to open, minus any trailing ?name=value&name2=value2... arguments
:param urldict: Optional Python dictionary to be URL-encoded and appended as
...?name=value&name2=value2&... data in the URL itself
:param postdict: Python dictionary to be converted to JSON format and POSTed
with the request.
:param logger: If a logger object is passed, log activity to it, otherwise use
the default logger which will suppress all output.
:param username: Optional BASIC auth username
:param password: Optional BASIC auth password
:return: A tuple of (result, header) where result is a Python dict (un-jsoned from the
text), the text itself, or None, and 'header' is the HTTP header object (use
.get_param() to extract values) or None.
"""
if urldict is not None:
urldata = "?" + urlencode(urldict)
else:
urldata = ""
url += urldata
if postdict is not None:
postdata = urlencode(postdict)
if sys.version_info.major > 2:
postdata = postdata.encode("latin-1")
else:
postdata = None
if postdict:
reqtype = "POST"
else:
reqtype = "GET"
logger.debug("Request: %s %s." % (reqtype, url))
if postdict:
logger.debug("Data: %s" % postdict)
try:
if (username is not None) and (password is not None):
if sys.version_info.major > 2:
base64string = base64.b64encode(
("%s:%s" % (username, password)).encode("latin-1")
)
base64string = base64string.decode("latin-1")
postdata = postdata.encode("latin-1")
else:
base64string = base64.b64encode("%s:%s" % (username, password))
req = Request(
url,
postdata,
{
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": "Basic %s" % base64string,
},
)
else:
req = Request(
url,
postdata,
{"Content-Type": "application/json", "Accept": "application/json"},
)
try:
resobj = urlopen(req)
data = resobj.read()
if (sys.version_info.major > 2) and (data is not None):
data = data.decode(resobj.headers.get_content_charset() or "latin-1")
except (ValueError, URLError):
logger.error(
"urlopen failed, or there was an error reading from the opened request object"
)
logger.error(traceback.format_exc())
return None
try:
result = json.loads(data)
except ValueError:
result = data
return result
except HTTPError as error:
logger.error(
"HTTP error from server: code=%d, response:\n %s"
% (error.code, error.read())
)
logger.error("Unable to retrieve %s" % (url))
logger.error(traceback.format_exc())
return None
except URLError as error:
logger.error("URL or network error: %s" % error.reason)
logger.error("Unable to retrieve %s" % (url))
logger.error(traceback.format_exc())
return None
[docs]def busy(project_id=None, obstime=None, logger=DEFAULTLOGGER):
"""
Call with a project_id and a desired observing time. This function will return False if the given project_id
is allowed to override current observations from now for the given length of time, or True if not.
Note that a False result doesn't guarantee a later call to trigger() will succeed, as new observations may have been
added to the schedule in the meantime.
:param project_id: eg 'C001'
:param obstime: eg 1800
:param logger: optional logging.logger object
:return: boolean
"""
urldict = {}
if project_id is not None:
urldict["project_id"] = project_id
else:
logger.error("triggering.trigger() must be passed a valid project_id")
return None
if obstime is not None:
urldict["obstime"] = obstime
result = web_api(url=BASEURL + "busy", urldict=urldict, logger=logger)
return result
[docs]def vcsfree(logger=DEFAULTLOGGER):
"""
This function will return the maximum number of seconds that a VCS trigger will be allowed to request,
given the current free space, and upcoming VCS observations in the schedule.
Note that this doesn't guarantee a later call to trigger() will succeed, as new VCS observations may have been
added to the schedule in the meantime.
:param logger: optional logging.logger object
:return: int
"""
urldict = {}
result = web_api(url=BASEURL + "vcsfree", urldict=urldict, logger=logger)
return result
[docs]def obslist(obstime=None, logger=DEFAULTLOGGER):
"""
Call with a desired observing time. This function will return a list of tuples containing
(starttime, obsname, creator, projectid, mode) for each observation between 'now' and the
given number of seconds in the future.
:param obstime: eg 1800
:param logger: optional logging.logger object
:return: list of (starttime, obsname, creator, projectid, mode) tuples
"""
urldict = {}
if obstime is not None:
urldict["obstime"] = obstime
result = web_api(url=BASEURL + "obslist", urldict=urldict, logger=logger)
return result
[docs]def trigger(
project_id=None,
secure_key=None,
group_id=None,
ra=None,
dec=None,
alt=None,
az=None,
source=None,
subarray_list=None,
freqspecs=None,
creator=None,
obsname=None,
nobs=None,
exptime=None,
calexptime=None,
calibrator=None,
freqres=None,
inttime=None,
avoidsun=None,
vcsmode=None,
buffered=None,
pretend=None,
logger=DEFAULTLOGGER,
):
"""
Call with the parameters that describe the observation/s to schedule, and
those observations will be added to the schedule immediately, starting
'now'.
You can pass more than one position, in any combination of:
- one or more RA/Dec pairs
- one or more alt/az pairs
- one of more source names
Observations will be generated for each position given, in turn (all RA/Dec
first, then all Alt/Az, then all source names) unless the 'subarrays' list
of subarray names is provided, in which case each position will be
allocated to one subarray, and observed simultaneously.
You can also pass, for example, one Alt value and a list of Az values, in
which case the one Alt value will be propagated to the other Az's. For
example, alt=70.0, az=[0,90,180] will give [(70,0), (70,90), (70,180)]. The
same is true for RA/Dec.
You can also pass more than one frequency specifier, in which case
observations will be generated for each choice of frequency, AT each
position.
If the 'avoidsum' parameter is True, then the coordinates of the target and
calibrator are shifted slightly to put the Sun in a beam null. For this to
work, the target coordinates must be RA/Dec values, not Alt/Az.
If 'buffered' is specified, and True, then instead of scheduling new
observations, a voltage buffer dump of all available past data will be
triggered, and voltage capture will continue from 'now' until (nobs *
exptime) seconds into the future. Existing observations in the schedule,
from 'now' until that time, will be truncated or deleted.
The structure returned is a dictionary, containing the following:
- **result['success']** - a boolean, True if the observations were
scheduled successfully, False if there was an error.
- **result['errors']** - a dictionary, containing integer keys from 0-N,
where each value is an error message. Normally empty.
- **result['params']** - a dictionary containing all parameters passed to
the web service, after parsing, and some extra parameters calculated by
the web service (the name of the automatically chosen calibrator, etc).
- **result['clear']** - the commands used to clear the schedule. It
contains the keys/values:
- 'command': The full clear_schedule.py command line
- 'retcode': The integer return code from that command
- 'stderr': The output to STDERR from that command
- 'stdout': The output to STDOUT from that command
- **result['schedule']** - the commands used to schedule the triggered
observations. It contains the keys/values:
- 'command': A string containing all of the single_observation.py
command lines
- 'retcode':The integer return code from the shell spawned to run
those commands
- 'stderr': The output to STDERR from those commands
- 'stdout': The output to STDOUT from those commands
- **result['obsid_list']** - (only if buffered is True) The observation
IDs of all MWA observations covered by the buffer dump and subsequent
voltage capture. Use these observation IDs to download the voltage
capture files, and to determine the telescope setting/s during the time
span including the captured data.
:param project_id: eg 'C001' - project ID for the triggered observations
:param secure_key: password associated with that project_id
:param group_id: optional group ID - the start time of a previously
triggered observation of the same event
:param ra: Either one RA (float, in hours), or a list of RA floats. Eg
12.234, or [11.0, 12.0]
:param dec: Either one Dec (float, in degrees), or a list of Dec floats. Eg
-12.234, or [-26.0, -36.0]
:param alt: Either one Alt (float, in degrees), or a list of Alt floats. Eg
80.0, or [70.0, 90.0]
:param az: Either one Az (float, in degrees), or a list of Az floats. Eg
250.3, or [90.0, 270.0]
:param source: Either one source name string, or a list of source name
strings. Eg 'Sun', or ['Sun', 'Moon']
:param subarray_list: An optional list of subarray names, the same length
as the total number of points (ra/decs, alt/azs, etc)
:param freqspecs: Either one frequency specifier string, or a list of
frequency specifier strings. Eg '145,24', or ['121,24', '145,24']
:param creator: Creator string, eg 'Andrew'
:param obsname: Observation name string, eg 'Fermi Trigger 20180211.1234'
:param nobs: Number of observations to schedule for each position/frequency
combination
:param exptime: Exposure time of each observation scheduled, in seconds
(must be modulo-8 seconds)
:param calexptime: Exposure time of the trailing calibrator observation, if
applicable, in seconds
:param calibrator: None or False for no calibrator observation, a source
name to specify one, or True to have one chosen for you.
:param freqres: Correlator frequency resolution for observations. None to
use whatever the current mode is, for lower latency. Eg 40
:param inttime: Correlator integration time for observations. None to use
whatever the current mode is, for lower latency. Eg 0.5
:param avoidsun: boolean or integer. If True, the coordinates of the target
and calibrator are shifted slightly to put the Sun in a null.
:param vcsmode: boolean. If True, the observations are made in 'Voltage
Capture' mode instead of normal (HW_LFILES) mode.
:param buffered: boolean. If True and vcsmode, trigger a Voltage capture
using the ring buffer.
:param pretend: boolean or integer. If True, the clear_schedule.py and
single_observation.py commands will be generated but NOT run.
:param logger: optional logging.logger object
:return: dictionary structure describing the processing (see above for more
information).
"""
if vcsmode and buffered:
return triggerbuffer(
project_id=project_id,
secure_key=secure_key,
pretend=pretend,
obstime=nobs * exptime,
logger=logger,
)
urldict = {}
postdict = {}
if project_id is not None:
urldict["project_id"] = project_id
else:
logger.error("triggering.trigger() must be passed a valid project_id")
return None
if secure_key is not None:
postdict["secure_key"] = secure_key
else:
logger.error("triggering.trigger() must be passed a valid secure_key")
return None
if group_id is not None:
postdict["group_id"] = group_id
if ra is not None:
postdict["ra"] = ra
if dec is not None:
postdict["dec"] = dec
if alt is not None:
postdict["alt"] = alt
if az is not None:
postdict["az"] = az
if source is not None:
postdict["source"] = source
if subarray_list is not None:
if type(subarray_list) == list:
postdict["subarrays"] = json.dumps(subarray_list)
else:
postdict["subarrays"] = subarray_list
if freqspecs is not None:
if type(freqspecs) == list:
postdict["freqspecs"] = json.dumps(freqspecs)
else:
postdict["freqspecs"] = freqspecs
if creator is not None:
postdict["creator"] = creator
if obsname is not None:
urldict["obsname"] = obsname
if nobs is not None:
postdict["nobs"] = nobs
if exptime is not None:
postdict["exptime"] = exptime
if calexptime is not None:
postdict["calexptime"] = calexptime
if (freqres is not None) and (inttime is not None):
postdict["freqres"] = freqres
postdict["inttime"] = inttime
else:
if (freqres is None) != (inttime is None):
logger.error(
"triggering.trigger() must be passed BOTH inttime AND freqres, or neither of them."
)
return None
if calibrator is not None:
postdict["calibrator"] = calibrator
if avoidsun is not None:
postdict["avoidsun"] = avoidsun
if pretend is not None:
postdict["pretend"] = pretend
if vcsmode is not None:
postdict["vcsmode"] = vcsmode
logger.debug("urldict=%s" % urldict)
logger.debug("postdict=%s" % postdict)
if vcsmode:
result = web_api(
url=BASEURL + "triggervcs",
urldict=urldict,
postdict=postdict,
logger=logger,
)
else:
result = web_api(
url=BASEURL + "triggerobs",
urldict=urldict,
postdict=postdict,
logger=logger,
)
return result
[docs]def triggerbuffer(
project_id=None,
secure_key=None,
pretend=None,
start_time=None,
end_time=None,
obstime=None,
logger=DEFAULTLOGGER,
):
"""
Trigger an immediate dump of the memory buffers to disk, using 'start_time'
as the earliest time to capture (zero, or any time earlier than the oldest
time in the buffer, means 'save as much as possible'. If 'end_time' is
specified, save data only up to (but not necessarily including) that time,
or if 'obs_time' is specified, then keep capturing voltages until that many
seconds from 'now'. Existing observations in the schedule, from 'now' until
that time, will be truncated or deleted.
For triggers requiring continued capturing after the trigger, you would
normally pass zero to start_time, and a duration to obstime as a capture
duration, counted from the time that the trigger takes place.
For captures requiring only one or two subfiles at a specific time in the
past, you would pass specific GPS times to start_time and stop_time, and
not use obstime.
Note that if start_time and end_time are both specified, and both zero, no
subfiles are captured, but otherwise the full end-to-end process takes
place as a null operation, including calls to all 24 MWAX servers.
The structure returned is a dictionary, containing the following:
- **result['success']** - a boolean, True if the observations were scheduled
successfully, False if there was an error.
- **result['errors']** - a dictionary, containing integer keys from 0-N,
where each value is an error message. Normally empty.
- **result['params']** - a dictionary containing all parameters passed to the
web service, after parsing, and some extra parameters calculated by the
web service (the name of the automatically chosen calibrator, etc).
- **result['clear']** - the commands used to clear the schedule. It contains
the keys/values:
- 'command': The full clear_schedule.py command line
- 'retcode': The integer return code from that command
- 'stderr': The output to STDERR from that command
- 'stdout': The output to STDOUT from that command
- **result['schedule']** - the commands used to trigger the buffer dump and
add a VOLTAGE_STOP observation. It contains the keys/values:
- 'command': A string containing all of the single_observation.py command
lines
- 'retcode': The integer return code from the shell spawned to run
those commands
- 'stderr': The output to STDERR from those commands
- 'stdout': The output to STDOUT from those commands
- **result['obsid_list']** - The observation IDs of all MWA observations
covered by the buffer dump and subsequent voltage capture. Use these
observation IDs to download the voltage capture files, and to determine
the telescope setting/s during the time span including the captured
data.
:param project_id: eg 'C001' - project ID for the triggered observations
:param secure_key: password associated with that project_id
:param pretend: boolean or integer. If True, the triggervcs command will
NOT be run.
:param logger: optional logging.logger object
:param start_time: Optional earliest time to capture - defaults to zero,
for 'as early as possible'.
:param end_time: Optional end time, in GPS seconds, to capture (can also be
in the past).
:param obstime: Duration of data capture, in seconds, if end_time is not
specified. Counts from 'now', not the time in the past when
data capture started.
:return: dictionary structure describing the processing (see above for more
information).
"""
urldict = {}
postdict = {}
if project_id is not None:
urldict["project_id"] = project_id
else:
logger.error("triggering.trigger() must be passed a valid project_id")
return None
if secure_key is not None:
postdict["secure_key"] = secure_key
else:
logger.error("triggering.trigger() must be passed a valid secure_key")
return None
if pretend is not None:
postdict["pretend"] = pretend
if start_time is not None:
postdict["start_time"] = start_time
else:
postdict["start_time"] = 0
if obstime is not None:
postdict["obstime"] = obstime
if end_time is not None:
postdict["end_time"] = end_time
result = web_api(
url=BASEURL + "triggerbuffer", urldict=urldict, postdict=postdict, logger=logger
)
return result