#!/usr/bin/python from pyalsa import alsaseq import time def reverse(s): out="" for i in reversed(range(len(s))): out+=s[i] return out SND_SEQ_PORT_CAP_READ =(1<<0) SND_SEQ_PORT_CAP_WRITE=(1<<1) SND_SEQ_PORT_CAP_SYNC_READ=(1<<2) SND_SEQ_PORT_CAP_SYNC_WRITE=(1<<3) SND_SEQ_PORT_CAP_DUPLEX=(1<<4) SND_SEQ_PORT_CAP_SUBS_READ =(1<<5) SND_SEQ_PORT_CAP_SUBS_WRITE=(1<<6) SND_SEQ_PORT_CAP_NO_EXPORT=(1<<7) class AlsaPort: def __init__(self, clientName, name=None, clientId=None, pid=None): self.clientName=clientName self._update(name, clientId, pid) def update(self): self.update() def _update(self, name=None, clientId=None, pid=None): clientName=self.clientName if name==None: x = AlsaConnector.extractFromAlsaCompleteName(clientName) self.clientName = x[0] self.name = x[1] self.clientId = x[2][0] self.portId = x[2][1] else: self.clientName=clientName self.name=name self.clientId=clientId self.portId=pid self.globalId=(self.clientId, self.portId) self.globalString=self.clientName+":"+self.name+" "+str(self.clientId)+":"+str(self.portId) x=AlsaConnector.connector.get_port_info(self.portId, self.clientId) self.capability=x['capability'] self.type=x['type'] self.isInput() self.isOutput() def isInput(self): return self.capability & (SND_SEQ_PORT_CAP_READ | SND_SEQ_PORT_CAP_DUPLEX) def isOutput(self): return self.capability & (SND_SEQ_PORT_CAP_WRITE | SND_SEQ_PORT_CAP_DUPLEX) def __str__(self): x = " (" +( "In" if self.isInput() else "") x += ( " Out" if self.isOutput() else "") return self.globalString+x+")" class AlsaClient: def __init__(self, clientName, clientId, portList): self.clientName=clientName self.clientId=clientId self.ports={} for p in portList: self.ports[p[1]]=AlsaPort(clientName, p[0], clientId, p[1]) def __str__(self): return self.clientName+":"+str(self.clientId) class AlsaConnector: def __init__(self): AlsaConnector.connector=None if AlsaConnector.connector==None: AlsaConnector.connector=alsaseq.Sequencer() @staticmethod def listClients(): t=time.time() out={} for item in AlsaConnector.connector.connection_list(): clientName=item[0] clientId=item[1] portLists=item[2] out[clientId]=AlsaClient(clientName, clientId, portLists) return out @staticmethod def listPorts(): out=[] x=AlsaConnector.listClients() for i in x: for j in x[i].ports: out.append(x[i].ports[j]) return out @staticmethod def listPortsString(): out=[] x=AlsaConnector.listClients() for i in x: for j in x[i].ports: out.append(j.globalString) return out @staticmethod def connect( fro, to): AlsaConnector.connector.connect_ports(fro, to) @staticmethod def disconnect( fro, to): AlsaConnector.connector.diconnect_ports(fro, to) @staticmethod def findMe(cname, pname): l=AlsaConnector.listPorts() for xi in reversed(range(len(l))): x=l[xi] name=x.globalString lastSpace=name.rfind(' ') if lastSpace<0: return None name=name[:lastSpace] if name == (cname+":"+pname): return x return None @staticmethod def getPortFromId(alsaId): clients=AlsaConnector.listClients() if alsaId[0] in clients: if alsaId[1] in clients[alsaId[0]].ports: return clients[alsaId[0]].ports[alsaId[1]] return None @staticmethod def getPortNameFromId(alsaId): clients=AlsaConnector.listClients() if alsaId[0] in clients: if alsaId[1] in clients[alsaId[0]].ports: return clients[alsaId[0]].ports[alsaId[1]].globalString return None @staticmethod def portAuto(port): if isinstance(port, (tuple, list)): port=AlsaConnector.getPortFromId(port) elif isinstance(port, str): port=AlsaConnector.getPortFromId(port) return port @staticmethod def getPortFromName(name): return extractFromAlsaCompleteName(name) @staticmethod def extractFromAlsaCompleteName(name): clientName="" portName="" clientId=0 portId=0 i=len(name)-1 tmp="" #port id while name[i]!=':': tmp+=name[i] i-=1 i-=1 portId=int(reverse(tmp)) tmp="" while name[i]!=' ': tmp+=name[i] i-=1 clientId=int(reverse(tmp)) i-=1 tmp="" while name[i]!=':': tmp+=name[i] i-=1 portName=reverse(tmp) clientName=name[:i] return (clientName, portName, (clientId, portId)) __alsa_connector_singleton=AlsaConnector()