alsaconnector.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. #!/usr/bin/python
  2. from pyalsa import alsaseq
  3. import time
  4. def reverse(s):
  5. out=""
  6. for i in reversed(range(len(s))):
  7. out+=s[i]
  8. return out
  9. SND_SEQ_PORT_CAP_READ =(1<<0)
  10. SND_SEQ_PORT_CAP_WRITE=(1<<1)
  11. SND_SEQ_PORT_CAP_SYNC_READ=(1<<2)
  12. SND_SEQ_PORT_CAP_SYNC_WRITE=(1<<3)
  13. SND_SEQ_PORT_CAP_DUPLEX=(1<<4)
  14. SND_SEQ_PORT_CAP_SUBS_READ =(1<<5)
  15. SND_SEQ_PORT_CAP_SUBS_WRITE=(1<<6)
  16. SND_SEQ_PORT_CAP_NO_EXPORT=(1<<7)
  17. class AlsaPort:
  18. def __init__(self, clientName, name=None, clientId=None, pid=None):
  19. self.clientName=clientName
  20. self._update(name, clientId, pid)
  21. def update(self):
  22. self.update()
  23. def _update(self, name=None, clientId=None, pid=None):
  24. clientName=self.clientName
  25. if name==None:
  26. x = AlsaConnector.extractFromAlsaCompleteName(clientName)
  27. self.clientName = x[0]
  28. self.name = x[1]
  29. self.clientId = x[2][0]
  30. self.portId = x[2][1]
  31. else:
  32. self.clientName=clientName
  33. self.name=name
  34. self.clientId=clientId
  35. self.portId=pid
  36. self.globalId=(self.clientId, self.portId)
  37. self.globalString=self.clientName+":"+self.name+" "+str(self.clientId)+":"+str(self.portId)
  38. x=AlsaConnector.connector.get_port_info(self.portId, self.clientId)
  39. self.capability=x['capability']
  40. self.type=x['type']
  41. self.isInput()
  42. self.isOutput()
  43. def isInput(self):
  44. return self.capability & (SND_SEQ_PORT_CAP_READ | SND_SEQ_PORT_CAP_DUPLEX)
  45. def isOutput(self):
  46. return self.capability & (SND_SEQ_PORT_CAP_WRITE | SND_SEQ_PORT_CAP_DUPLEX)
  47. def __str__(self):
  48. x = " (" +( "In" if self.isInput() else "")
  49. x += ( " Out" if self.isOutput() else "")
  50. return self.globalString+x+")"
  51. class AlsaClient:
  52. def __init__(self, clientName, clientId, portList):
  53. self.clientName=clientName
  54. self.clientId=clientId
  55. self.ports={}
  56. for p in portList:
  57. self.ports[p[1]]=AlsaPort(clientName, p[0], clientId, p[1])
  58. def __str__(self):
  59. return self.clientName+":"+str(self.clientId)
  60. class AlsaConnector:
  61. def __init__(self):
  62. AlsaConnector.connector=None
  63. if AlsaConnector.connector==None:
  64. AlsaConnector.connector=alsaseq.Sequencer()
  65. @staticmethod
  66. def listClients():
  67. t=time.time()
  68. out={}
  69. for item in AlsaConnector.connector.connection_list():
  70. clientName=item[0]
  71. clientId=item[1]
  72. portLists=item[2]
  73. out[clientId]=AlsaClient(clientName, clientId, portLists)
  74. return out
  75. @staticmethod
  76. def listPorts():
  77. out=[]
  78. x=AlsaConnector.listClients()
  79. for i in x:
  80. for j in x[i].ports:
  81. out.append(x[i].ports[j])
  82. return out
  83. @staticmethod
  84. def listPortsString():
  85. out=[]
  86. x=AlsaConnector.listClients()
  87. for i in x:
  88. for j in x[i].ports:
  89. out.append(j.globalString)
  90. return out
  91. @staticmethod
  92. def connect( fro, to):
  93. AlsaConnector.connector.connect_ports(fro, to)
  94. @staticmethod
  95. def disconnect( fro, to):
  96. AlsaConnector.connector.diconnect_ports(fro, to)
  97. @staticmethod
  98. def findMe(cname, pname):
  99. l=AlsaConnector.listPorts()
  100. for xi in reversed(range(len(l))):
  101. x=l[xi]
  102. name=x.globalString
  103. lastSpace=name.rfind(' ')
  104. if lastSpace<0: return None
  105. name=name[:lastSpace]
  106. if name == (cname+":"+pname):
  107. return x
  108. return None
  109. @staticmethod
  110. def getPortFromId(alsaId):
  111. clients=AlsaConnector.listClients()
  112. if alsaId[0] in clients:
  113. if alsaId[1] in clients[alsaId[0]].ports:
  114. return clients[alsaId[0]].ports[alsaId[1]]
  115. return None
  116. @staticmethod
  117. def getPortNameFromId(alsaId):
  118. clients=AlsaConnector.listClients()
  119. if alsaId[0] in clients:
  120. if alsaId[1] in clients[alsaId[0]].ports:
  121. return clients[alsaId[0]].ports[alsaId[1]].globalString
  122. return None
  123. @staticmethod
  124. def portAuto(port):
  125. if isinstance(port, (tuple, list)):
  126. port=AlsaConnector.getPortFromId(port)
  127. elif isinstance(port, str):
  128. port=AlsaConnector.getPortFromId(port)
  129. return port
  130. @staticmethod
  131. def getPortFromName(name):
  132. return extractFromAlsaCompleteName(name)
  133. @staticmethod
  134. def extractFromAlsaCompleteName(name):
  135. clientName=""
  136. portName=""
  137. clientId=0
  138. portId=0
  139. i=len(name)-1
  140. tmp=""
  141. #port id
  142. while name[i]!=':':
  143. tmp+=name[i]
  144. i-=1
  145. i-=1
  146. portId=int(reverse(tmp))
  147. tmp=""
  148. while name[i]!=' ':
  149. tmp+=name[i]
  150. i-=1
  151. clientId=int(reverse(tmp))
  152. i-=1
  153. tmp=""
  154. while name[i]!=':':
  155. tmp+=name[i]
  156. i-=1
  157. portName=reverse(tmp)
  158. clientName=name[:i]
  159. return (clientName, portName, (clientId, portId))
  160. __alsa_connector_singleton=AlsaConnector()