import
subprocess
import
logging
import
argparse
import
os
from
datetime
import
datetime
logging.basicConfig(
level
=
logging.INFO,
format
=
'%(asctime)s - %(levelname)s - %(message)s'
,
filename
=
'./time_sync.log'
,
filemode
=
'a'
)
def
check_root_privileges():
if
os.geteuid() !
=
0
:
logging.error(
"此脚本需要root权限运行"
)
print
(
"错误: 此脚本需要root权限运行"
)
exit(
1
)
def
sync_time(server, timeout
=
5
):
try
:
logging.info(f
"正在尝试从 {server} 同步时间..."
)
result
=
subprocess.run(
[
"ntpdate"
,
"-u"
,
"-t"
,
str
(timeout), server],
capture_output
=
True
,
text
=
True
,
check
=
True
)
output
=
result.stdout.strip()
logging.info(f
"时间同步成功: {output}"
)
return
{
"success"
:
True
,
"message"
: output,
"time_adjustment"
: parse_time_adjustment(output)
}
except
subprocess.CalledProcessError as e:
error_msg
=
e.stderr.strip()
logging.error(f
"同步时间失败: {error_msg}"
)
return
{
"success"
:
False
,
"message"
: error_msg,
"time_adjustment"
:
None
}
except
Exception as e:
error_msg
=
str
(e)
logging.error(f
"发生未知错误: {error_msg}"
)
return
{
"success"
:
False
,
"message"
: error_msg,
"time_adjustment"
:
None
}
def
parse_time_adjustment(output):
try
:
import
re
match
=
re.search(r
'offset (-?\d+\.\d+) seconds'
, output)
if
match:
return
float
(match.group(
1
))
return
None
except
Exception:
return
None
def
update_hardware_clock():
try
:
logging.info(
"正在更新硬件时钟..."
)
result
=
subprocess.run(
[
"hwclock"
,
"--systohc"
],
capture_output
=
True
,
text
=
True
,
check
=
True
)
output
=
result.stdout.strip()
logging.info(
"硬件时钟更新成功"
)
return
{
"success"
:
True
,
"message"
: output
if
output
else
"硬件时钟已成功更新"
}
except
subprocess.CalledProcessError as e:
error_msg
=
e.stderr.strip()
logging.error(f
"更新硬件时钟失败: {error_msg}"
)
return
{
"success"
:
False
,
"message"
: error_msg
}
except
Exception as e:
error_msg
=
str
(e)
logging.error(f
"更新硬件时钟时发生未知错误: {error_msg}"
)
return
{
"success"
:
False
,
"message"
: error_msg
}
def
main():
parser
=
argparse.ArgumentParser(description
=
"Linux系统时间同步脚本"
)
parser.add_argument(
"-s"
,
"--server"
, default
=
"ntp.aliyun.com"
,
help
=
"指定NTP服务器 (默认: ntp.aliyun.com)"
)
parser.add_argument(
"-t"
,
"--timeout"
,
type
=
int
, default
=
5
,
help
=
"连接超时时间(秒)(默认: 5)"
)
parser.add_argument(
"--no-hwclock"
, action
=
"store_true"
,
help
=
"不更新硬件时钟"
)
parser.add_argument(
"-v"
,
"--verbose"
, action
=
"store_true"
,
help
=
"显示详细输出信息"
)
args
=
parser.parse_args()
check_root_privileges()
start_time
=
datetime.now()
logging.info(f
"时间同步开始于 {start_time}"
)
print
(f
"开始同步时间,服务器: {args.server}"
)
sync_result
=
sync_time(args.server, args.timeout)
if
sync_result[
"success"
]:
print
(
"系统时间同步成功"
)
if
args.verbose
and
sync_result[
"time_adjustment"
]
is
not
None
:
print
(f
"时间调整: {sync_result['time_adjustment']} 秒"
)
if
not
args.no_hwclock:
hw_result
=
update_hardware_clock()
if
hw_result[
"success"
]:
print
(
"硬件时钟更新成功"
)
else
:
print
(f
"硬件时钟更新失败: {hw_result['message']}"
)
else
:
print
(f
"时间同步失败: {sync_result['message']}"
)
end_time
=
datetime.now()
logging.info(f
"时间同步结束于 {end_time}"
)
logging.info(f
"执行总时间: {end_time - start_time}"
)
if
__name__
=
=
"__main__"
:
main()