123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- #!/usr/bin/python
- from pyalsa import alsaseq
- import time
- def reverse(s):
- out=""
- for i in reversed(range(len(s))):
- out+=s[i]
- return out
- SND_SEQ_PORT_CAP_READ =(1<<0)
- SND_SEQ_PORT_CAP_WRITE=(1<<1)
- SND_SEQ_PORT_CAP_SYNC_READ=(1<<2)
- SND_SEQ_PORT_CAP_SYNC_WRITE=(1<<3)
- SND_SEQ_PORT_CAP_DUPLEX=(1<<4)
- SND_SEQ_PORT_CAP_SUBS_READ =(1<<5)
- SND_SEQ_PORT_CAP_SUBS_WRITE=(1<<6)
- SND_SEQ_PORT_CAP_NO_EXPORT=(1<<7)
- class AlsaPort:
- def __init__(self, clientName, name=None, clientId=None, pid=None):
- self.clientName=clientName
- self._update(name, clientId, pid)
-
- def update(self):
- self.update()
-
- def _update(self, name=None, clientId=None, pid=None):
- clientName=self.clientName
- if name==None:
- x = AlsaConnector.extractFromAlsaCompleteName(clientName)
- self.clientName = x[0]
- self.name = x[1]
- self.clientId = x[2][0]
- self.portId = x[2][1]
- else:
- self.clientName=clientName
- self.name=name
- self.clientId=clientId
- self.portId=pid
- self.globalId=(self.clientId, self.portId)
- self.globalString=self.clientName+":"+self.name+" "+str(self.clientId)+":"+str(self.portId)
- x=AlsaConnector.connector.get_port_info(self.portId, self.clientId)
- self.capability=x['capability']
- self.type=x['type']
- self.isInput()
- self.isOutput()
-
- def isInput(self):
- return self.capability & (SND_SEQ_PORT_CAP_READ | SND_SEQ_PORT_CAP_DUPLEX)
-
- def isOutput(self):
- return self.capability & (SND_SEQ_PORT_CAP_WRITE | SND_SEQ_PORT_CAP_DUPLEX)
-
- def __str__(self):
- x = " (" +( "In" if self.isInput() else "")
- x += ( " Out" if self.isOutput() else "")
- return self.globalString+x+")"
-
-
- class AlsaClient:
- def __init__(self, clientName, clientId, portList):
- self.clientName=clientName
- self.clientId=clientId
- self.ports={}
- for p in portList:
- self.ports[p[1]]=AlsaPort(clientName, p[0], clientId, p[1])
-
- def __str__(self):
- return self.clientName+":"+str(self.clientId)
-
- class AlsaConnector:
- def __init__(self):
- AlsaConnector.connector=None
- if AlsaConnector.connector==None:
- AlsaConnector.connector=alsaseq.Sequencer()
-
-
- @staticmethod
- def listClients():
- t=time.time()
- out={}
- for item in AlsaConnector.connector.connection_list():
- clientName=item[0]
- clientId=item[1]
- portLists=item[2]
- out[clientId]=AlsaClient(clientName, clientId, portLists)
- return out
-
- @staticmethod
- def listPorts():
- out=[]
- x=AlsaConnector.listClients()
- for i in x:
- for j in x[i].ports:
- out.append(x[i].ports[j])
- return out
-
- @staticmethod
- def listPortsString():
- out=[]
- x=AlsaConnector.listClients()
- for i in x:
- for j in x[i].ports:
- out.append(j.globalString)
- return out
-
- @staticmethod
- def connect( fro, to):
- AlsaConnector.connector.connect_ports(fro, to)
-
- @staticmethod
- def disconnect( fro, to):
- AlsaConnector.connector.diconnect_ports(fro, to)
-
- @staticmethod
- def findMe(cname, pname):
- l=AlsaConnector.listPorts()
- for xi in reversed(range(len(l))):
- x=l[xi]
- name=x.globalString
- lastSpace=name.rfind(' ')
- if lastSpace<0: return None
- name=name[:lastSpace]
- if name == (cname+":"+pname):
- return x
- return None
-
-
- @staticmethod
- def getPortFromId(alsaId):
- clients=AlsaConnector.listClients()
- if alsaId[0] in clients:
- if alsaId[1] in clients[alsaId[0]].ports:
- return clients[alsaId[0]].ports[alsaId[1]]
- return None
-
- @staticmethod
- def getPortNameFromId(alsaId):
- clients=AlsaConnector.listClients()
- if alsaId[0] in clients:
- if alsaId[1] in clients[alsaId[0]].ports:
- return clients[alsaId[0]].ports[alsaId[1]].globalString
- return None
-
- @staticmethod
- def portAuto(port):
- if isinstance(port, (tuple, list)):
- port=AlsaConnector.getPortFromId(port)
- elif isinstance(port, str):
- port=AlsaConnector.getPortFromId(port)
- return port
-
- @staticmethod
- def getPortFromName(name):
- return extractFromAlsaCompleteName(name)
-
- @staticmethod
- def extractFromAlsaCompleteName(name):
- clientName=""
- portName=""
- clientId=0
- portId=0
- i=len(name)-1
- tmp=""
-
- #port id
- while name[i]!=':':
- tmp+=name[i]
- i-=1
- i-=1
- portId=int(reverse(tmp))
-
-
- tmp=""
- while name[i]!=' ':
- tmp+=name[i]
- i-=1
- clientId=int(reverse(tmp))
- i-=1
-
- tmp=""
- while name[i]!=':':
- tmp+=name[i]
- i-=1
- portName=reverse(tmp)
- clientName=name[:i]
- return (clientName, portName, (clientId, portId))
-
-
- __alsa_connector_singleton=AlsaConnector()
|