ImMsgHistoryController.class.php
4.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
<?php
/**
* Created by PhpStorm.
* User: zhoutao
* Date: 2018/1/18
* Time: 上午10:56
*/
namespace Frontend\Controller\Callback;
use Com\IM\TimRestAPI;
use Common\Service\ImRecordService;
/**
* Class ImMsgHistoryController
* @package Frontend\Controller\Callback
* @property TimRestAPI $imSdk
*/
class ImMsgHistoryController extends AbstractController
{
protected $imSdk = null;
/**
* 获取远端聊天室历史记录
*/
public function Index()
{
set_time_limit(0);
// 获取时间范围
$timeRangeStart = empty($this->callBackData['timeRangeStart']) ?
I('post.timeRangeStart') : $this->callBackData['timeRangeStart'];
$timeRangeEnd = empty($this->callBackData['timeRangeEnd']) ?
I('post.timeRangeEnd') : $this->callBackData['timeRangeEnd'];
if (empty($timeRangeStart) || empty($timeRangeEnd)) {
exit('SUCCESS');
}
// 获取开始时间的小时整点
$startWholePoint = rgmdate($timeRangeStart, 'Y-m-d H:00:00');
$startWholePointTimestamp = rstrtotime($startWholePoint);
// 获取时间段内的每个时间批次
$this->imSdk = new TimRestAPI();
for ($i = $startWholePointTimestamp; $i < $timeRangeEnd; $i += 60 * 60 * 1000) {
// 处理单个时间批次聊天记录
$this->dealWithSingleTimeBatch(rgmdate($i, 'YmdH'));
}
exit('SUCCESS');
}
/**
* 处理单个时间批次聊天记录
* @param string $timeBatch 时间批次 比如: 2018011812
* @return bool
*/
private function dealWithSingleTimeBatch($timeBatch)
{
// 判断是否已经拉取过了
$imRecordServ = new ImRecordService();
$imRecordSingle = $imRecordServ->get_by_conds(['time_batch' => $timeBatch], [], true);
if (!empty($imRecordSingle)) {
return true;
}
$res = $this->imSdk->get_history('Group', $timeBatch);
\Think\Log::record('腾讯云返回(历史记录文件列表):::' . var_export($res, true));
if (!isset($res['File'])) {
return true;
}
// 先检查文件夹是否存在
if (!file_exists(TEMP_PATH)) {
@mkdir(TEMP_PATH);
}
$insertData = [];
foreach ($res['File'] as $key => $file) {
// 下载远端文件
$fileName = TEMP_PATH . $res['MsgTime'] . '-' . $key;
$fileNameAddSuffix = TEMP_PATH . $res['MsgTime'] . '-' . $key . '.gz';
// 如果有文件存在了 说明并发时有别的进程在处理
if (is_file($fileNameAddSuffix)) {
break;
}
file_put_contents($fileNameAddSuffix, file_get_contents($file['URL']));
// 解压
$this->unZip($fileNameAddSuffix);
// 读取文件并写入数据库
foreach (json_decode(file_get_contents($fileName), true)['MsgList'] as $msg) {
foreach ($msg['MsgBody'] as $singleMsg) {
$insertData[] = [
'time_batch' => $timeBatch,
'from_account' => $msg['From_Account'],
'group_id' => $msg['GroupId'],
'msg_timestamp' => $msg['MsgTimestamp'],
'msg_type' => $singleMsg['MsgType'],
'msg_content' => $singleMsg['MsgContent']['Text'],
'msg_seq' => $msg['MsgSeq'],
];
}
// 到数量先写入数据库 防止内存溢出
if (count($insertData) >= 500) {
// 写入数据库
$imRecordServ->insert_all($insertData);
// 清空数组
$insertData = [];
}
};
// 上面写入数据库会有 少于 500 的情况 这里补漏
if (!empty($insertData)) {
// 写入数据库
$imRecordServ->insert_all($insertData);
}
// 删除文件
@unlink($fileNameAddSuffix);
@unlink($fileName);
}
return true;
}
/**
* 解压文件
* @param $filePath
* @return bool
*/
private function unZip($filePath)
{
$bufferSize = 4096;
$outFileName = str_replace('.gz', '', $filePath);
$file = gzopen($filePath, 'rb');
$outFile = fopen($outFileName, 'wb');
while (!gzeof($file)) {
fwrite($outFile, gzread($file, $bufferSize));
}
fclose($outFile);
gzclose($file);
return true;
}
}