Skip to content

Commit

Permalink
分布式文件服务器
Browse files Browse the repository at this point in the history
分布式文件服务器
  • Loading branch information
qieangel2013 committed Sep 14, 2016
1 parent ea8a163 commit 442ae97
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 91 deletions.
65 changes: 45 additions & 20 deletions server/FileDistributedClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,14 @@ public function __construct()

public function addServerClient($address)
{
$client = new swoole_client(SWOOLE_TCP, SWOOLE_SOCK_ASYNC);
$client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
$client->set(array(
'open_length_check' => 1,
'package_length_type' => 'N',
'package_length_offset' => 0, //第N个字节是包长度的值
'package_body_offset' => 4, //第几个字节开始计算长度
'package_max_length' => 2000000 //协议最大长度
));
$client->on('Connect', array(
&$this,
'onConnect'
Expand Down Expand Up @@ -76,17 +83,15 @@ public function onReceive($client, $data)
{
$remote_info = $this->unpackmes($data);
if (is_array($remote_info)) {
foreach ($remote_info as &$val) {
if (isset($val['type'])) {
switch ($val['type']) {
switch ($remote_info['type']) {
case 'filemes':
if (file_exists(LISTENPATH . str_replace("@", "_", rawurldecode($val['data']['path'])))) {
$strlendata = file_get_contents(LISTENPATH . str_replace("@", "_", rawurldecode($val['data']['path'])));
if (file_exists(LISTENPATH . str_replace("@", "_", rawurldecode($remote_info['data']['path'])))) {
$strlendata = file_get_contents(LISTENPATH . str_replace("@", "_", rawurldecode($remote_info['data']['path'])));
if (strlen($strlendata) > 0) {
$datas = array(
'type' => 'filesize',
'data' => array(
'path' => $val['data']['path'],
'path' => $remote_info['data']['path'],
'filesize' => strlen($strlendata)
)
);
Expand All @@ -95,8 +100,24 @@ public function onReceive($client, $data)
}
break;
case 'filesizemes':
if ($client->sendfile(LISTENPATH . str_replace("@", "_", rawurldecode($val['data']['path'])))) {
$client->send(pack('N',$remote_info['data']['filesize']));
if ($client->sendfile(LISTENPATH . str_replace("@", "_", rawurldecode($remote_info['data']['path'])))) {
}
/*$filesizedata=$remote_info['data']['filesize'];
$filepath=str_replace("@", "_", rawurldecode($remote_info['data']['path']));
swoole_async_read(LISTENPATH . str_replace("@", "_", rawurldecode($remote_info['data']['path'])),function($filename,$content) use($client,$filesizedata,$filepath){
if (empty($content)){
$data_file = array('type' =>'filepool','data'=>array('id'=>md5($filepath),'pathpre' => $filepath,'filesize'=>$filesizedata,'curszize'=>strlen($content),'content'=>$content,'succ'=>0));
$client->send($this->packmes($data_file));
return false;
}else{
$data_file = array('type' =>'filepool','data'=>array('id'=>md5($filepath),'pathpre' => $filepath,'filesize'=>$filesizedata,'curszize'=>strlen($content),'content'=>$content,'succ'=>1));
$client->send(pack('N', strlen($content)).$content);
//$client->send($this->packmes($data_file));
return true;
}
},8192);*/
break;
case 'system': //启动一个进程来处理已存在的图片
$listenpath = LISTENPATH;
Expand Down Expand Up @@ -143,29 +164,22 @@ public function onReceive($client, $data)


$client->send($this->packmes($data));
sleep(1);
//usleep(300000);
});
break;
case 'asyncfile':
$data_sa = array(
'type' => 'file',
'data' => array(
'path' => $val['data']['path']
'path' => $remote_info['data']['path']
)
);

$client->send($this->packmes($data_sa));
break;
default:
break;

break;

}
}

}

}

} else {
echo date('[ c ]') . '参数不对 \r\n';
Expand Down Expand Up @@ -204,6 +218,7 @@ public function onClose($client)
*/
public function onError($client)
{
//echo socket_strerror($client->errCode);
$this->removeuser($this->cur_address);
$this->del_server[ip2long($this->cur_address)] = $this->cur_address;
$this->table->del(ip2long($this->cur_address));
Expand Down Expand Up @@ -261,7 +276,7 @@ public function getlistDir($dir)
//解包装数据
public function unpackmes($data, $format = '\r\n\r\n', $preformat = '######')
{
$pos = strpos($data, $format);
/*$pos = strpos($data, $format);
$resultdata = array();
if ($pos !== false) {
$tmpdata = explode($format, $data);
Expand All @@ -288,12 +303,22 @@ public function unpackmes($data, $format = '\r\n\r\n', $preformat = '######')
return $resultdata;
} else {
return $data;
}*/

$resultdata=substr($data, 4);
$result=json_decode($resultdata, true);
if(!is_array($result)){
return $resultdata;
}else{
return $result;
}

}
//包装数据
public function packmes($data, $format = '\r\n\r\n', $preformat = '######')
{
return $preformat . json_encode($data, true) . $format;
//return $preformat . json_encode($data, true) . $format;
return pack('N', strlen(json_encode($data, true))).json_encode($data, true);
}
//获取目录文件
public function getlistDirFile($dir)
Expand Down
112 changes: 41 additions & 71 deletions server/FileDistributedServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,23 @@ public function __construct()
//'task_worker_num' => 8,
'dispatch_mode' => 4, //1: 轮循, 3: 争抢
'daemonize' => true,
'open_length_check' => true,
'package_length_type' => 'N',
'package_length_offset' => 0, //第N个字节是包长度的值
'package_body_offset' => 4, //第几个字节开始计算长度
'package_max_length' => 2000000, //协议最大长度
'log_file' => ServerLog
));
} else {
$server->set(array(
'worker_num' => 1,
//'task_worker_num' => 8,
'dispatch_mode' => 4, //1: 轮循, 3: 争抢
'open_length_check' => true,
'package_length_type' => 'N',
'package_length_offset' => 0, //第N个字节是包长度的值
'package_body_offset' => 4, //第几个字节开始计算长度
'package_max_length' => 2000000, //协议最大长度
'daemonize' => true
));
}
Expand Down Expand Up @@ -174,8 +184,6 @@ public function onStart($serv)
);
}
$localclient->send(FileDistributedClient::getInstance()->packmes($data));
//sleep(1);
usleep(300000);
}

}
Expand Down Expand Up @@ -264,24 +272,23 @@ public function onReceive($serv, $fd, $from_id, $data)
}
}
} else {
foreach ($remote_info as &$val) {
if (isset($val['type']) && $val['type'] == 'system' && $val['data']['code'] == 10001) {
if ($this->client_a != $val['data']['fd']) {
if (!$this->table->get(ip2long($val['data']['fd']))) {
$client = FileDistributedClient::getInstance()->addServerClient($val['data']['fd']);
$this->b_server_pool[ip2long($val['data']['fd'])] = array(
'fd' => $val['data']['fd'],
if ($remote_info['type'] == 'system' && $remote_info['data']['code'] == 10001) {
if ($this->client_a != $remote_info['data']['fd']) {
if (!$this->table->get(ip2long($remote_info['data']['fd']))) {
$client = FileDistributedClient::getInstance()->addServerClient($remote_info['data']['fd']);
$this->b_server_pool[ip2long($remote_info['data']['fd'])] = array(
'fd' => $remote_info['data']['fd'],
'client' => $client
);
$this->client_a = $val['data']['fd'];
$this->client_a = $remote_info['data']['fd'];
} else {
if (FileDistributedClient::getInstance()->getkey()) {
$client = FileDistributedClient::getInstance()->addServerClient($val['data']['fd']);
$this->b_server_pool[ip2long($val['data']['fd'])] = array(
'fd' => $val['data']['fd'],
$client = FileDistributedClient::getInstance()->addServerClient($remote_info['data']['fd']);
$this->b_server_pool[ip2long($remote_info['data']['fd'])] = array(
'fd' => $remote_info['data']['fd'],
'client' => $client
);
$this->client_a = $val['data']['fd'];
$this->client_a = $remote_info['data']['fd'];
if ($this->localip == FileDistributedClient::getInstance()->getkey()) {
FileDistributedClient::getInstance()->delkey();
}
Expand All @@ -301,49 +308,49 @@ public function onReceive($serv, $fd, $from_id, $data)
array_push($this->client_pool_ser, $this->connectioninfo['remote_ip']);
}
}
echo date('[ c ]') . str_replace("\n", "", var_export($val, true));
echo date('[ c ]') . str_replace("\n", "", var_export($remote_info, true));
} else {
if (isset($val['type'])) {
switch ($val['type']) {
switch ($remote_info['type']) {
case 'filesize':
if (isset($val['data']['path'])) {
if (isset($remote_info['data']['path'])) {
$data_s = array(
'type' => 'filesizemes',
'data' => array(
'path' => $val['data']['path']
'path' => $remote_info['data']['path'],
'filesize'=>$remote_info['data']['filesize']
)
);
array_push($this->client_pool_ser_c, $val);
array_push($this->client_pool_ser_c,$remote_info);
$serv->send($fd, FileDistributedClient::getInstance()->packmes($data_s));
}
break;
case 'file':
if (isset($val['data']['path'])) {
if (!file_exists(LISTENPATH . str_replace("@", "_", rawurldecode($val['data']['path'])))) {
if (isset($remote_info['data']['path'])) {
if (!file_exists(LISTENPATH . str_replace("@", "_", rawurldecode($remote_info['data']['path'])))) {
$data_s = array(
'type' => 'filemes',
'data' => array(
'path' => $val['data']['path']
'path' => $remote_info['data']['path']
)
);
$serv->send($fd, FileDistributedClient::getInstance()->packmes($data_s));
}
}
break;
case 'asyncfileclient':
if (isset($val['data']['path'])) {
if (empty($val['data']['pre'])) {
if (isset($remote_info['data']['path'])) {
if (empty($remote_info['data']['pre'])) {
$dataas = array(
'type' => 'asyncfile',
'data' => array(
'path' => rawurlencode('/') . $val['data']['fileex']
'path' => rawurlencode('/') . $remote_info['data']['fileex']
)
);
} else {
$dataas = array(
'type' => 'asyncfile',
'data' => array(
'path' => $val['data']['pre']
'path' => $remote_info['data']['pre']
)
);
}
Expand All @@ -352,68 +359,31 @@ public function onReceive($serv, $fd, $from_id, $data)
}
break;
case 'fileclient':
if (empty($val['data']['pre'])) {
if (empty($remote_info['data']['pre'])) {
$datas = array(
'type' => 'file',
'data' => array(
'path' => rawurlencode('/') . $val['data']['fileex']
'path' => rawurlencode('/') . $remote_info['data']['fileex']
)
);
} else {
$datas = array(
'type' => 'file',
'data' => array(
'path' => rawurlencode(rawurldecode($val['data']['pre']) . '/' . rawurldecode($val['data']['fileex']))
'path' => rawurlencode(rawurldecode($remote_info['data']['pre']) . '/' . rawurldecode($remote_info['data']['fileex']))
)
);
}
foreach ($this->b_server_pool as $k => $v) {
$v['client']->send(FileDistributedClient::getInstance()->packmes($datas));
if($v['fd']!=$this->localip) $v['client']->send(FileDistributedClient::getInstance()->packmes($datas));
}
break;
default:
default:
break;
}
echo date('[ c ]') . str_replace("\n", "", var_export($val, true));
} else {
if (!$this->tmpdata_flag) {
$tdf = array_shift($this->client_pool_ser_c);
$this->curpath['path'] = LISTENPATH . str_replace("@", "_", rawurldecode($tdf['data']['path']));
$this->filesizes = $tdf['data']['filesize'];
$this->tmpdata_flag = 1;
}
if (isset($this->curpath['path']) && $this->curpath['path']!=LISTENPATH) {
if (is_dir(dirname($this->curpath['path'])) && is_readable(dirname($this->curpath['path']))) {
} else {
FileDistributedClient::getInstance()->mklistDir(dirname($this->curpath['path']));
}
if ($this->oldpath != $this->curpath['path']) {
$this->tmpdata .= $val;
if (strlen($this->tmpdata) > $this->filesizes) {
$this->tmpdatas = substr($this->tmpdata, $this->filesizes, strlen($this->tmpdata));
$this->tmpdata = substr($this->tmpdata, 0, $this->filesizes);
}
}
if (strlen($this->tmpdata) == $this->filesizes) {

if (file_put_contents($this->curpath['path'], $this->tmpdata)) {
$this->tmpdata = '';
$this->oldpath = $this->curpath['path'];

if (strlen($this->tmpdatas) > 0) {
$this->tmpdata = $this->tmpdatas;
$this->tmpdatas = '';
}
$this->tmpdata_flag = 0;
}

}
}
echo date('[ c ]') . str_replace("\n", "", var_export($remote_info, true));
}

}
}


}

}
Expand Down

0 comments on commit 442ae97

Please sign in to comment.