Twitter DiscoBot

From i3Detroit
Revision as of 13:29, 22 December 2010 by Kc8nod (Talk | contribs)

(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to: navigation, search

This was a demonstration and learning exercise to help generate some excitement for the Space Automation project.

What It Does

There is a multi-colored, spinning, disco-light-thingy sitting on top of the network rack. Every time the string 'i3detroit' gets mentioned on twitter, the thigy lights up an spins for 30 seconds.

How it works

A python script on the debian server talks to one of these things over RS485. Relay 0 on the i/o box controls power to the disco-light-thingy.


twitter_disco.py

import twitter
import RelayDuino
import time


twitter = twitter.Api()
relay = RelayDuino.RelayDuino('/dev/ttyUSB0').relay[0]
relay.set('off')
last_id = twitter.Search('i3detroit').max_id

while True:
    
    result_set = twitter.Search('i3detroit', since_id=last_id)
    
    print "got %d new statuses" % len(result_set.results)

    for status in result_set.results:
        
        print "Got a mention, let's party!"
        print status.from_user_id
        print status.text
        
        relay.set('on')
        time.sleep(30)
        relay.set('off')
        time.sleep(30)
            
    last_id = result_set.max_id
    print "last_id == %s" % str(last_id)
    
    time.sleep(30)
    

RelayDuino.py

import sys
import serial


class RelayDuinoErr(Exception):
    pass


class RelayState(object):
    def __init__(self, value):
        if isinstance(value, bool):
            self.value = value
        elif value in ('ON', 'On', 'on', 'TRUE', 'True', 'true', '1', 1):
            self.value = True
        elif value in ('OFF', 'Off', 'off', 'FALSE', 'False', 'false', '0', 0):
            self.value = False
        else:
            raise ValueError("'%s' is not a valid relay state" % value)


    def __str__(self):
        if self.value:
            return 'ON'
        else:
            return 'OFF'
            
    def __eq__(self, other):
        return self.value == RelayState(other).value
    
    def __ne__(self, other): return not self == other

    def __nonzero__(self): return self.value
    

    
    class Relay(object):
    def __init__(self, relayduino, index):
        self.relayduino = relayduino
        self.index = index
        

        def get_state(self):
        response = self.relayduino.command("RS %d" % self.index)
        return RelayState(response)


    def set_state(self, state):
        if RelayState(state):
            self.relayduino.command("ON %d" % self.index)
        else:
            self.relayduino.command("OFF %d" % self.index)

        
    get = get_state
    set = set_state
    state = property(get_state, set_state)


    def toggle(self):
        self.state = not self.state
        


class DigitalInputState(object):
    def __init__(self, value):
        if value in (1, 0, '1', '0', True, False):
            self.value = int(value)
        elif isinstance(value, str):
            if value.lower() in ('hi', 'high', 'set'):
                self.value = 1
            elif value.lower() in ('lo', 'low', 'clear'):
                self.value = 0
            else:
                raise ValueError("'%s' is not a valid relay state" % value)
        else:
            raise ValueError("'%s' is not a valid relay state" % value)


    def __eq__(self, other):
        return self.value == DigitalInputState(other).value
    
    def __ne__(self, other):
        return not self == other

    def __nonzero__(self):
        return self.value
    
    
    
   
class DigitalInput(object):
    def __init__(self, relayduino, index):
        self.relayduino = relayduino
        self.index = index
        
    def get_state(self):
        response = self.relayduino.command("IS %d" % self.index)
        return DigitalInputState(response)

    get = get_state
    state = property(get_state)

        


class AnalogInput(object):
    def __init__(self, relayduino, index):
        self.relayduino = relayduino
        self.index = index

        
    def get_value(self):
        response = self.relayduino.command("IS %d" % self.index)
        return float(response)

    get = get_value
    value = property(get_value)

        
class FractionalAnalogInput(AnalogInput):
        
    def get_value(self):
        raw = super(FractionalAnalogInput, self).get_value()
        return float(raw) / 1024

        


    

class RelayDuino(object):
    """Library with handy functions to connect an Ocean Controls KTA-223 Relay Box"""
    
    delim = '\x0D'
    default_baudrate = 115200

    def autobaud(self):
        
        if self.is_open: self.port.close()
        
        for rate in (1200, 2400, 4800, 9600, 14400, 19200, 28800, 38400, 57600, 115200):
            try:
                self._open(rate)
                print "connected at %d" % rate
                break
            except RelayDuinoErr, ex:
                print "failed to connect at %d\n    %s" % (rate, str(ex))
                if self.port: self.port.close()
            
            time.sleep(0.500)         
                
    
    def __init__ (self, comport_name=None , device_address=0, debug=False):

        self.comport_name = comport_name
        self.device_address = device_address
        self.port = None
        self.debug_flag = debug
        
        self.relay  = tuple([Relay(self, idx+1) for idx in range(8)])
        self.input  = tuple([DigitalInput(self, idx+1) for idx in range(4)])
        self.analog = tuple([AnalogInput(self, idx+1) for idx in range(3)])
        self.analog_fractional = tuple([FractionalAnalogInput(self, idx+1) for idx in range(3)])
        


    
    def _open(self, rate=None):
        
        if rate==None:
            rate = self.default_baudrate
        try:
            self.port = serial.Serial(self.comport_name, baudrate=rate, timeout=0.100, dsrdtr=False, rtscts=False)
            self.port.flushInput()
            self.port.write("@00 RS 0" + self.delim)
            self.port.setRTS(False)
            response = self.port.readline(None, self.delim)
            pass
        except Exception, ex:
            raise RelayDuinoErr("Can't talk to relay box on port '%s' (%s)" % (self.comport_name, str(ex)))
        else:
            pass
        
        if response.startswith('#'):
            pass
        else:
            raise RelayDuinoErr("Can't talk to relay box on port '%s' response=(%s)(%d) port=<%s>" % (self.comport_name, response,  len(response), str(self.port)))
            pass

        
    def command(self, cmd):
        if not self.is_open:
            self._open()
            
        send_str = ("@%02d %s" + self.delim) % (self.device_address, cmd)
        if self.debug_flag:
            sys.stderr.write(("tx: %s\n") % repr(send_str))
        
        self.port.setRTS(True)
        self.port.write(send_str)
        
        self.port.setRTS(False)
        response = self.port.readline(None, self.delim)
        if self.debug_flag:
            sys.stderr.write("rx: %s\n" % repr(response))
        return response[5:-1]
        


                    
    def __del__(self):
        try:
            self.port.close()
        except:
            pass
        
    @property
    def is_open(self):
        return self.port != None and self.port.isOpen()

    def ss(self):
        return self.command("SS")


        
        
if __name__ == '__main__':
    import time
    
    kta223 = RelayDuino(sys.argv[1], 0)
    
    #kta223.autobaud()
    #if not kta223.is_open: sys.exit()    

    for io_list in (kta223.relay, kta223.input, kta223.analog):
        for n in range(len(io_list)):
            print "%s number %d is %s" % (type(io_list[n]), n, io_list[n].get())

    kta223.relay[0].set('on')
    
    time.sleep(1)
    
    kta223.relay[0].set('off')