Skip to content. | Skip to navigation

Personal tools

Navigation

You are here: Home / Members / jhb / speak_02.py

speak_02.py

by Jörg Baach last modified Jan 20, 2011 04:17 PM
espeak and aplay server that can handle mutliple commands

Python Source icon speak_02.py — Python Source, 3 kB (3636 bytes)

File contents

#!/usr/bin/env python

# quick hack to for a speak server for navigation commands
# it can handle multiple commands and play sound files
# that are located in directory SOUNDPATH
#
# run: python speak.py [language]
#      language is the name of any voice that espeak understands
#
# use: http://localhost:9999 ...
#      /turn_left  -> triggers espeak
#      /:turn-right.wav -> triggers aplay
#      /:turn-right.wav|at broadway then|:turn-left.wav -> mixed
#      /CQ -> clears the queue of commands a.k.a interrupt the talk
#
# each call adds commands to be spoken, unless CQ is sent, which clears
# the queue
#
# needs espeak and aplay available in the cli

from BaseHTTPServer import BaseHTTPRequestHandler
import SocketServer, sys, os, Queue, threading, time
from subprocess import  call
import re

PORT = 9999
SOUNDPATH = 'english/english_male/' #relative to where you run the server

if len(sys.argv) < 2:
    language = 'english'
else:
    language = sys.argv[1]

class OurServer(SocketServer.TCPServer):
    allow_reuse_address = True
    stopped = False

    def serve_forever(self):
        while not self.stopped:
            self.handle_request()

class Talker(threading.Thread):
    def __init__(self,tqueue):
        threading.Thread.__init__(self)
        self.tqueue = tqueue
        self.do_run = 1

    def run(self):
        while self.do_run:
            time.sleep(0.1)
            try:
                task = self.tqueue.get(0)                
            except Queue.Empty:                
                continue
            
            print str(task)
            call(task)
            self.tqueue.task_done()

        print 'talker thread stopped'            

    def clear(self):
        try:
            while self.tqueue.get(0):
                self.tqueue.task_done()
        except Queue.Empty:
            pass

    
class TalkHandler(BaseHTTPRequestHandler):

    def do_GET(self):
         
        path = self.path
        out = []
        talker = self.server.talker
        if path.startswith('/'):
            path = path[1:]

        entries = path.split('|')
        
        commands = []
        
        for entry in entries:
            if re.search('%(?!20)', entry):
                entry = "something"
            
            if entry == 'favicon.ico':
                continue
            elif entry == 'SHUTDOWN':
                out.append('shuting down')
            elif entry == 'CQ':
                talker.clear()
                out.append('clear the queue')

            elif entry.startswith(':'):                
                command = ['/usr/bin/aplay',SOUNDPATH+entry[1:]]
                out.append(' '.join(command))
                talker.tqueue.put(command)
            else:
                command = ['/usr/bin/espeak','-v',language,'"%s"' % entry.replace("%20", ", ")]
                out.append(' '.join(command))
                talker.tqueue.put(command)
            

        self.send_response(200)
        self.send_header('Content-type', 'text/plain')
        self.send_header('Cache-Control','no-cache, must-revalidate')
        self.end_headers()

        self.wfile.write('\n'.join(out))
        self.wfile.flush()
        self.connection.shutdown(1)
        if entry == 'SHUTDOWN':
            talker.do_run = 0
            self.server.stopped = True


tqueue = Queue.Queue(0)
talker = Talker(tqueue)
talker.start()

httpd = OurServer(("", PORT), TalkHandler)
httpd.talker = talker
print "serving at port %s in %s" % (PORT, language)
try:
    httpd.serve_forever()
except KeyboardInterrupt:
    talker.do_run = 0
    print 'shuting down...'