網站首頁 編程語言 正文
使用python的subprocess模塊實現對SVN的相關操作。
設置GitSvn類,在該類下自定義執行SVN常規操作的方法。
SVN的常規操作包括:
(1)獲取SVN當前版本,通過方法get_version()實現;
(2)下載SVN指定倉庫,通過方法download()實現,實際是通過調用SVN的命令行操作指令svn checkout實現的下載整個倉庫功能;
(3)獲取SVN某個倉庫下的所有文件列表,通過方法search_dir()實現,實際是通過調用SVN的命令行操作指令svn list實現的獲取倉庫文件列表功能;
(4)在SVN指定位置創建新的文件夾,通過方法mkdir_command()實現,實際是通過調用SVN的命令行操作指令svn mkdir實現的創建文件夾或者目錄功能;
(5)將本地倉庫文件添加到SVN倉庫,通過方法upload_file()實現,實際是通過調用SVN的命令行操作指令svn add實現的添加文件功能;
(6)將本地倉庫已添加的文件提交SVN指定倉庫,通過方法commit_command()實現,實際是通過調用SVN的命令行操作指令svn commit實現的提交文件功能;
(7)刪除SVN倉庫的目錄,通過方法delete_url()實現,實際是通過調用SVN的命令行操作指令svn delete實現的刪除目錄功能;
(8)鎖定SVN倉庫的文件,通過方法lock_file()實現,實際是通過調用SVN的命令行操作指令svn lock實現的鎖定文件功能;
(9)將SVN倉庫的文件解除鎖定,通過方法unlock_file()實現,實際是通過調用SVN的命令行操作指令svn unlock實現的解除文件鎖定功能;
(10)查看SVN倉庫文件當前狀態,通過方法check_status()實現,實際是通過調用SVN的命令行操作指令svn status實現的查看文件狀態功能;
(11)更新SVN倉庫的某個文件,通過方法update_file()實現,實際是通過調用SVN的命令行操作指令svn up實現的更新文件功能;
GitSvn類定義如下:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author: Logintern09
import os
import time
import subprocess
from log_manage import logger
rq = time.strftime('%Y%m%d', time.localtime(time.time()))
file_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "temp_files")
tempfile = os.path.join(file_path, rq + '.log')
class GitSvn(object):
def __init__(self, bill_addr):
self.bill_addr = bill_addr
def get_version(self):
cmd = "svn info %s" % self.bill_addr
logger.info(cmd)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(result, error) = output.communicate()
result = result.decode(encoding="gbk")
logger.info(result)
error = error.decode(encoding="gbk")
logger.error(error)
with open(tempfile, "w") as f:
f.write(result)
if error.strip() != "":
if error.startswith("svn: E170013:"):
res = "網絡異常,暫時連接不上系統,請檢查網絡或其他配置!"
raise SystemError(res)
def download(self, dir_path):
with open(tempfile, "r") as f:
result = f.readlines()
for line in result:
if line.startswith("Revision"):
laster_version = int(line.split(":")[-1].strip())
logger.info(line)
break
cur_path = os.path.dirname(os.path.realpath(__file__))
id_path = os.path.join(cur_path, default_sys_name)
cmd = "svn checkout %s %s -r r%s" % (dir_path, id_path, laster_version)
logger.info(cmd)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(result, error) = output.communicate()
result = result.decode(encoding="gbk")
error = error.decode(encoding="gbk")
logger.info(result)
logger.error(error)
if error.strip() != "":
if error.startswith("svn: E170013:"):
res = "網絡異常,暫時連接不上系統,請檢查網絡或其他配置!"
raise SystemError(res)
if (error.startswith("svn: E155004:")) or (error.startswith("svn: E155037:")):
cur_path = os.path.dirname(os.path.realpath(__file__))
file_path = os.path.join(cur_path, default_sys_name)
cmd = "svn cleanup"
logger.info(cmd)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
cwd=file_path)
(result, error) = output.communicate()
result = result.decode(encoding="gbk")
error = error.decode(encoding="gbk")
logger.info(result)
logger.error(error)
def search_dir(self):
cmd = "svn list %s" % self.bill_addr
logger.info(cmd)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(result, error) = output.communicate()
result = result.decode(encoding="gbk")
error = error.decode(encoding="gbk")
logger.info(result)
logger.error(error)
with open(tempfile, "w") as f:
f.write(result)
if error.strip() != "":
if error.startswith("svn: E170013:"):
res = "網絡異常,暫時連接不上系統,請檢查網絡或其他配置!"
raise SystemError(res)
def mkdir_command(self, id_num):
cmd = "svn mkdir %s" % id_num
logger.info(cmd)
cur_path = os.path.dirname(os.path.realpath(__file__))
file_path = os.path.join(cur_path, default_sys_name)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
cwd=file_path)
(result, error) = output.communicate()
result = result.decode(encoding="gbk")
error = error.decode(encoding="gbk")
logger.info(result)
logger.error(error)
if error.strip() != "":
if error.startswith("svn: E170013:"):
res = "網絡異常,暫時連接不上系統,請檢查網絡或其他配置!"
raise SystemError(res)
def upload_file(self, file_path):
cmd = "svn add %s --force" % file_path
root_path, file_name = os.path.split(file_path)
logger.info(cmd)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
cwd=root_path)
(result, error) = output.communicate()
result = result.decode(encoding="gbk")
error = error.decode(encoding="gbk")
logger.info(result)
logger.error(error)
if error.strip() != "":
if error.startswith("svn: E170013:"):
res = "網絡異常,暫時連接不上系統,請檢查網絡或其他配置!"
raise SystemError(res)
def commit_command(self, file):
# 執行了鎖定的用戶執行了提交操作(提交操作將自動解鎖)
if os.path.isfile(file):
file_path, file_name = os.path.split(file)
else:
file_path = file
cmd = "svn commit %s -m 'update_files'" % file
logger.info(cmd)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
cwd=file_path)
(result, error) = output.communicate()
result = result.decode(encoding="gbk")
error = error.decode(encoding="gbk")
logger.info(result)
logger.error(error)
if error.strip() != "":
if error.startswith("svn: E170013:"):
res = "網絡異常,暫時連接不上系統,請檢查網絡或其他配置!"
raise SystemError(res)
elif error.startswith("svn: E730053:"):
res = "數據上傳失敗,請重新提交數據!!!"
raise SystemError(res)
else:
res = "數據上傳失敗,請重新提交數據!!!"
raise SystemError(res)
def delete_url(self, url):
cmd = "svn delete %s" % url
logger.info(cmd)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(result, error) = output.communicate()
result = result.decode(encoding="gbk")
error = error.decode(encoding="gbk")
logger.info(result)
logger.error(error)
if error.strip() != "":
if error.startswith("svn: E170013:"):
res = "網絡異常,暫時連接不上系統,請檢查網絡或其他配置!"
raise SystemError(res)
def lock_file(self, file_name):
cur_path = os.path.dirname(os.path.realpath(__file__))
id_path = os.path.join(cur_path, default_sys_name)
file_path = os.path.join(id_path, file_name)
cmd = "svn lock %s" % file_path
logger.info(cmd)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(result, error) = output.communicate()
result = result.decode(encoding="gbk")
error = error.decode(encoding="gbk")
logger.info(result)
logger.error(error)
if error.strip() != "":
if error.startswith("svn: E170013:"):
res = "網絡異常,暫時連接不上系統,請檢查網絡或其他配置!"
raise SystemError(res)
elif error.startswith("svn: warning: W160042:"):
res = "系統資源已被其他用戶鎖定,請稍后重試!"
raise SystemError(res)
def unlock_file(self, file_name):
# 不使用--force 參數 可以解鎖被自己鎖定的文集 即普通的release lock
cur_path = os.path.dirname(os.path.realpath(__file__))
id_path = os.path.join(cur_path, default_sys_name)
file_path = os.path.join(id_path, file_name)
cmd = "svn unlock %s --force" % file_path
logger.info(cmd)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(result, error) = output.communicate()
result = result.decode(encoding="gbk")
error = error.decode(encoding="gbk")
logger.info(result)
logger.error(error)
if error.strip() != "":
if error.startswith("svn: E170013:"):
res = "網絡異常,暫時連接不上系統,請檢查網絡或其他配置!"
raise SystemError(res)
def check_status(self, file_name):
# ?:不在svn的控制中;M:內容被修改;C:發生沖突;A:預定加入到版本庫;K:被鎖定
cur_path = os.path.dirname(os.path.realpath(__file__))
id_path = os.path.join(cur_path, default_sys_name)
file_path = os.path.join(id_path, file_name)
cmd = "svn status -v %s" % file_path
logger.info(cmd)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(result, error) = output.communicate()
result = result.decode(encoding="gbk")
error = error.decode(encoding="gbk")
logger.info(result)
logger.error(error)
# 獲取狀態結果解析
status_list = result.split(" ")
status_flag_list = []
for i in status_list:
if i.isalpha():
status_flag_list.append(i)
else:
break
if result.strip() != "":
if error.startswith("svn: E170013:"):
res = "網絡異常,暫時連接不上系統,請檢查網絡或其他配置!"
raise SystemError(res)
elif "C" in status_flag_list:
return "C"
elif "K" in status_flag_list:
return "K"
def update_file(self, file_name):
cur_path = os.path.dirname(os.path.realpath(__file__))
id_path = os.path.join(cur_path, default_sys_name)
file_path = os.path.join(id_path, file_name)
cmd = "svn up %s" % file_path
logger.info(cmd)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(result, error) = output.communicate()
result = result.decode(encoding="gbk")
error = error.decode(encoding="gbk")
logger.info(result)
logger.error(error)
with open(tempfile, "w") as f:
f.write(result)
with open(tempfile, "r") as f:
result = f.readlines()
count = -1
for line in result:
count += 1
if (line.strip() != "") and (count == 1):
if line.startswith("C"):
res = "更新系統資源時發現存在沖突,請稍后重試!"
raise SystemError(res)
if error.strip() != "":
if error.startswith("svn: E170013:"):
res = "網絡異常,暫時連接不上系統,請檢查網絡或其他配置!"
raise SystemError(res)
elif (error.startswith("svn: E155037:")) or (error.startswith("svn: E155004:")):
cur_path = os.path.dirname(os.path.realpath(__file__))
file_path = os.path.join(cur_path, default_sys_name)
cmd = "svn cleanup"
logger.info(cmd)
output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
cwd=file_path)
(result, error) = output.communicate()
result = result.decode(encoding="gbk")
error = error.decode(encoding="gbk")
logger.info(result)
logger.error(error)
上述類GitSvn的應用實例如下:
if __name__ == '__main__':
default_url = svn:*** # 用戶本地SVN客戶端的URL地址
git_class = GitSvn(default_url)
git_class.get_version()
git_class.download()
# 驗證查看目錄文件列表功能
git_class.search_dir()
# 驗證刪除目錄功能
cur_path = os.path.dirname(os.path.realpath(__file__))
file_path = os.path.join(cur_path, default_sys_name)
id_path = os.path.join(file_path, 'SCR202202100002')
git_class.delete_url(id_path)
# 驗證文件加鎖功能
git_class.lock_file(file_name="單號總表.xlsx")
# 驗證文件解鎖功能
git_class.unlock_file(file_name="單號總表.xlsx")
# 檢查文件狀態
git_class.check_status(file_name="單號總表.xlsx")
# 驗證創建目錄功能
git_class.mkdir_command("SCR202203280001")
# 驗證提交文件功能
cur_path = os.path.dirname(os.path.realpath(__file__))
sys_path = os.path.join(cur_path, default_sys_name)
target_dir = os.path.join(sys_path, "SCR202203280001")
git_class.upload_file(target_dir)
git_class.commit_command(target_dir)
原文鏈接:https://blog.csdn.net/Logintern09/article/details/126679743
相關推薦
- 2022-08-01 C++鏈式二叉樹深入分析_C 語言
- 2022-11-28 一文帶你搞懂Golang結構體內存布局_Golang
- 2022-02-17 Centos 安裝docker 警告:正在等候 事務 鎖定 arb/rpm/.rpm.lock 正在
- 2022-12-10 jquery異常問題Uncaught?TypeError:?$(...).on?is?not?a?f
- 2022-07-03 python中dict獲取關鍵字與值的實現_python
- 2022-10-03 Pandas數據集的分塊讀取的實現_python
- 2022-05-18 centos?自動運行python腳本和配置?Python?定時任務_python
- 2022-05-17 Error running ‘myToncat‘: Address localhost:8080 i
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支