Twitter DiscoBot
| This page contains historical information. It is provided for reference but does not reflect current information or policy. |
This was a demonstration and learning exercise to help generate some excitement for the Space Automation project.
What It Does
There is a multi-colored, spinning, disco-light-thingy sitting on top of the network rack. Every time the string 'i3detroit' gets mentioned on twitter, the thigy lights up an spins for 30 seconds.
How it works
A python script on the debian server talks to one of these things over RS485. Relay 0 on the i/o box controls power to the disco-light-thingy.
twitter_disco.py
import twitter
import RelayDuino
import time
twitter = twitter.Api()
relay = RelayDuino.RelayDuino('/dev/ttyUSB0').relay[0]
relay.set('off')
last_id = twitter.Search('i3detroit').max_id
while True:
result_set = twitter.Search('i3detroit', since_id=last_id)
print "got %d new statuses" % len(result_set.results)
for status in result_set.results:
print "Got a mention, let's party!"
print status.from_user_id
print status.text
relay.set('on')
time.sleep(30)
relay.set('off')
time.sleep(30)
last_id = result_set.max_id
print "last_id == %s" % str(last_id)
time.sleep(30)
RelayDuino.py
import sys
import serial
class RelayDuinoErr(Exception):
pass
class RelayState(object):
def __init__(self, value):
if isinstance(value, bool):
self.value = value
elif value in ('ON', 'On', 'on', 'TRUE', 'True', 'true', '1', 1):
self.value = True
elif value in ('OFF', 'Off', 'off', 'FALSE', 'False', 'false', '0', 0):
self.value = False
else:
raise ValueError("'%s' is not a valid relay state" % value)
def __str__(self):
if self.value:
return 'ON'
else:
return 'OFF'
def __eq__(self, other):
return self.value == RelayState(other).value
def __ne__(self, other): return not self == other
def __nonzero__(self): return self.value
class Relay(object):
def __init__(self, relayduino, index):
self.relayduino = relayduino
self.index = index
def get_state(self):
response = self.relayduino.command("RS %d" % self.index)
return RelayState(response)
def set_state(self, state):
if RelayState(state):
self.relayduino.command("ON %d" % self.index)
else:
self.relayduino.command("OFF %d" % self.index)
get = get_state
set = set_state
state = property(get_state, set_state)
def toggle(self):
self.state = not self.state
class DigitalInputState(object):
def __init__(self, value):
if value in (1, 0, '1', '0', True, False):
self.value = int(value)
elif isinstance(value, str):
if value.lower() in ('hi', 'high', 'set'):
self.value = 1
elif value.lower() in ('lo', 'low', 'clear'):
self.value = 0
else:
raise ValueError("'%s' is not a valid relay state" % value)
else:
raise ValueError("'%s' is not a valid relay state" % value)
def __eq__(self, other):
return self.value == DigitalInputState(other).value
def __ne__(self, other):
return not self == other
def __nonzero__(self):
return self.value
class DigitalInput(object):
def __init__(self, relayduino, index):
self.relayduino = relayduino
self.index = index
def get_state(self):
response = self.relayduino.command("IS %d" % self.index)
return DigitalInputState(response)
get = get_state
state = property(get_state)
class AnalogInput(object):
def __init__(self, relayduino, index):
self.relayduino = relayduino
self.index = index
def get_value(self):
response = self.relayduino.command("IS %d" % self.index)
return float(response)
get = get_value
value = property(get_value)
class FractionalAnalogInput(AnalogInput):
def get_value(self):
raw = super(FractionalAnalogInput, self).get_value()
return float(raw) / 1024
class RelayDuino(object):
"""Library with handy functions to connect an Ocean Controls KTA-223 Relay Box"""
delim = '\x0D'
default_baudrate = 115200
def autobaud(self):
if self.is_open: self.port.close()
for rate in (1200, 2400, 4800, 9600, 14400, 19200, 28800, 38400, 57600, 115200):
try:
self._open(rate)
print "connected at %d" % rate
break
except RelayDuinoErr, ex:
print "failed to connect at %d\n %s" % (rate, str(ex))
if self.port: self.port.close()
time.sleep(0.500)
def __init__ (self, comport_name=None , device_address=0, debug=False):
self.comport_name = comport_name
self.device_address = device_address
self.port = None
self.debug_flag = debug
self.relay = tuple([Relay(self, idx+1) for idx in range(8)])
self.input = tuple([DigitalInput(self, idx+1) for idx in range(4)])
self.analog = tuple([AnalogInput(self, idx+1) for idx in range(3)])
self.analog_fractional = tuple([FractionalAnalogInput(self, idx+1) for idx in range(3)])
def _open(self, rate=None):
if rate==None:
rate = self.default_baudrate
try:
self.port = serial.Serial(self.comport_name, baudrate=rate, timeout=0.100, dsrdtr=False, rtscts=False)
self.port.flushInput()
self.port.write("@00 RS 0" + self.delim)
self.port.setRTS(False)
response = self.port.readline(None, self.delim)
pass
except Exception, ex:
raise RelayDuinoErr("Can't talk to relay box on port '%s' (%s)" % (self.comport_name, str(ex)))
else:
pass
if response.startswith('#'):
pass
else:
raise RelayDuinoErr("Can't talk to relay box on port '%s' response=(%s)(%d) port=<%s>" % (self.comport_name, response, len(response), str(self.port)))
pass
def command(self, cmd):
if not self.is_open:
self._open()
send_str = ("@%02d %s" + self.delim) % (self.device_address, cmd)
if self.debug_flag:
sys.stderr.write(("tx: %s\n") % repr(send_str))
self.port.setRTS(True)
self.port.write(send_str)
self.port.setRTS(False)
response = self.port.readline(None, self.delim)
if self.debug_flag:
sys.stderr.write("rx: %s\n" % repr(response))
return response[5:-1]
def __del__(self):
try:
self.port.close()
except:
pass
@property
def is_open(self):
return self.port != None and self.port.isOpen()
def ss(self):
return self.command("SS")
if __name__ == '__main__':
import time
kta223 = RelayDuino(sys.argv[1], 0)
#kta223.autobaud()
#if not kta223.is_open: sys.exit()
for io_list in (kta223.relay, kta223.input, kta223.analog):
for n in range(len(io_list)):
print "%s number %d is %s" % (type(io_list[n]), n, io_list[n].get())
kta223.relay[0].set('on')
time.sleep(1)
kta223.relay[0].set('off')