import
socket
from
urllib.parse
import
urlparse
import
requests
from
xml.etree
import
ElementTree as ET
def
find_devices():
ssdp_request
=
(
"M-SEARCH * HTTP/1.1\r\n"
"HOST: 239.255.255.250:1900\r\n"
"MAN: \"ssdp:discover\"\r\n"
"MX: 2\r\n"
"ST: urn:schemas-upnp-org:device:MediaRenderer:1\r\n"
"\r\n"
).encode()
sock
=
socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL,
2
)
sock.settimeout(
3
)
sock.sendto(ssdp_request, (
"239.255.255.250"
,
1900
))
locations
=
set
()
try
:
while
True
:
data, _
=
sock.recvfrom(
4096
)
response
=
data.decode(
'utf-8'
, errors
=
'ignore'
)
for
line
in
response.split(
'\r\n'
):
if
line.lower().startswith(
'location:'
):
location
=
line.split(
':'
,
1
)[
1
].strip()
locations.add(location)
except
socket.timeout:
pass
finally
:
sock.close()
return
list
(locations)
def
play(control_url, video_url):
soap_action
=
"urn:schemas-upnp-org:service:AVTransport:1#SetAVTransportURI"
headers
=
{
"Content-Type"
:
'text/xml; charset="utf-8"'
,
"SOAPAction"
: f
'"{soap_action}"'
}
soap_body
=
f
try
:
response
=
requests.post(control_url, data
=
soap_body, headers
=
headers, timeout
=
5
)
response.raise_for_status()
return
True
except
Exception as e:
print
(f
"投屏失败: {str(e)}"
)
return
False
def
parse_device(location):
try
:
response
=
requests.get(location, timeout
=
3
)
response.raise_for_status()
root
=
ET.fromstring(response.content)
device
=
root.find(
'.//{urn:schemas-upnp-org:device-1-0}device'
)
if
device
is
None
:
return
None
friendly_name
=
device.findtext(
'{urn:schemas-upnp-org:device-1-0}friendlyName'
,
'Unknown Device'
)
url_parts
=
urlparse(location)
base_url
=
f
"{url_parts.scheme}://{url_parts.hostname}"
if
url_parts.port:
base_url
+
=
f
":{url_parts.port}"
service
=
root.find(
'.//{urn:schemas-upnp-org:device-1-0}service'
'[{urn:schemas-upnp-org:device-1-0}serviceType'
'="urn:schemas-upnp-org:service:AVTransport:1"]'
)
if
service
is
None
:
return
None
control_path
=
service.findtext(
'{urn:schemas-upnp-org:device-1-0}controlURL'
)
control_url
=
f
"{base_url}{control_path}"
if
control_path
else
None
return
{
'name'
: friendly_name,
'control_url'
: control_url,
'location'
: location
}
except
Exception as e:
print
(f
"Error parsing {location}: {str(e)}"
)
return
None
def
main():
print
(
"正在扫描局域网中的DLNA设备..."
)
locations
=
find_devices()
if
not
locations:
print
(
"未找到任何DLNA设备"
)
return
devices
=
[]
for
loc
in
locations:
if
dev_info :
=
parse_device(loc):
if
not
any
(d[
'control_url'
]
=
=
dev_info[
'control_url'
]
for
d
in
devices):
devices.append(dev_info)
if
not
devices:
print
(
"未找到有效的投屏设备"
)
return
print
(
"\n发现以下设备:"
)
for
idx, dev
in
enumerate
(devices):
print
(f
"[{idx+1}] {dev['name']}"
)
try
:
choice
=
int
(
input
(
"\n请输入设备编号: "
))
-
1
selected
=
devices[choice]
print
(f
"\n已选择设备:{selected['name']}"
)
print
(f
"控制地址: {selected['control_url']}"
)
video_url
=
input
(
"请输入要投屏的视频URL(支持常见流媒体格式): "
).strip()
if
play(selected[
'control_url'
], video_url):
print
(f
"已发送投屏请求到 {selected['name']}"
)
else
:
print
(
"投屏请求失败"
)
except
(ValueError, IndexError):
print
(
"?"
)
if
__name__
=
=
"__main__"
:
main()