From 442ae97664f87cea3b2fc0036fe95e89a7ed564b Mon Sep 17 00:00:00 2001 From: qieangel2013 <904208360@qq.com> Date: Wed, 14 Sep 2016 16:35:43 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=86=E5=B8=83=E5=BC=8F=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 分布式文件服务器 --- server/FileDistributedClient.php | 65 ++++++++++++------ server/FileDistributedServer.php | 112 +++++++++++-------------------- 2 files changed, 86 insertions(+), 91 deletions(-) diff --git a/server/FileDistributedClient.php b/server/FileDistributedClient.php index e01d2bf..ab362ad 100644 --- a/server/FileDistributedClient.php +++ b/server/FileDistributedClient.php @@ -33,7 +33,14 @@ public function __construct() public function addServerClient($address) { - $client = new swoole_client(SWOOLE_TCP, SWOOLE_SOCK_ASYNC); + $client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC); + $client->set(array( + 'open_length_check' => 1, + 'package_length_type' => 'N', + 'package_length_offset' => 0, //第N个字节是包长度的值 + 'package_body_offset' => 4, //第几个字节开始计算长度 + 'package_max_length' => 2000000 //协议最大长度 + )); $client->on('Connect', array( &$this, 'onConnect' @@ -76,17 +83,15 @@ public function onReceive($client, $data) { $remote_info = $this->unpackmes($data); if (is_array($remote_info)) { - foreach ($remote_info as &$val) { - if (isset($val['type'])) { - switch ($val['type']) { + switch ($remote_info['type']) { case 'filemes': - if (file_exists(LISTENPATH . str_replace("@", "_", rawurldecode($val['data']['path'])))) { - $strlendata = file_get_contents(LISTENPATH . str_replace("@", "_", rawurldecode($val['data']['path']))); + if (file_exists(LISTENPATH . str_replace("@", "_", rawurldecode($remote_info['data']['path'])))) { + $strlendata = file_get_contents(LISTENPATH . str_replace("@", "_", rawurldecode($remote_info['data']['path']))); if (strlen($strlendata) > 0) { $datas = array( 'type' => 'filesize', 'data' => array( - 'path' => $val['data']['path'], + 'path' => $remote_info['data']['path'], 'filesize' => strlen($strlendata) ) ); @@ -95,8 +100,24 @@ public function onReceive($client, $data) } break; case 'filesizemes': - if ($client->sendfile(LISTENPATH . str_replace("@", "_", rawurldecode($val['data']['path'])))) { + $client->send(pack('N',$remote_info['data']['filesize'])); + if ($client->sendfile(LISTENPATH . str_replace("@", "_", rawurldecode($remote_info['data']['path'])))) { } + /*$filesizedata=$remote_info['data']['filesize']; + $filepath=str_replace("@", "_", rawurldecode($remote_info['data']['path'])); + swoole_async_read(LISTENPATH . str_replace("@", "_", rawurldecode($remote_info['data']['path'])),function($filename,$content) use($client,$filesizedata,$filepath){ + if (empty($content)){ + $data_file = array('type' =>'filepool','data'=>array('id'=>md5($filepath),'pathpre' => $filepath,'filesize'=>$filesizedata,'curszize'=>strlen($content),'content'=>$content,'succ'=>0)); + $client->send($this->packmes($data_file)); + return false; + }else{ + $data_file = array('type' =>'filepool','data'=>array('id'=>md5($filepath),'pathpre' => $filepath,'filesize'=>$filesizedata,'curszize'=>strlen($content),'content'=>$content,'succ'=>1)); + $client->send(pack('N', strlen($content)).$content); + //$client->send($this->packmes($data_file)); + return true; + } + + },8192);*/ break; case 'system': //启动一个进程来处理已存在的图片 $listenpath = LISTENPATH; @@ -143,29 +164,22 @@ public function onReceive($client, $data) $client->send($this->packmes($data)); - sleep(1); - //usleep(300000); }); break; case 'asyncfile': $data_sa = array( 'type' => 'file', 'data' => array( - 'path' => $val['data']['path'] + 'path' => $remote_info['data']['path'] ) ); $client->send($this->packmes($data_sa)); break; default: - break; - + break; - } - } - - } - + } } else { echo date('[ c ]') . '参数不对 \r\n'; @@ -204,6 +218,7 @@ public function onClose($client) */ public function onError($client) { + //echo socket_strerror($client->errCode); $this->removeuser($this->cur_address); $this->del_server[ip2long($this->cur_address)] = $this->cur_address; $this->table->del(ip2long($this->cur_address)); @@ -261,7 +276,7 @@ public function getlistDir($dir) //解包装数据 public function unpackmes($data, $format = '\r\n\r\n', $preformat = '######') { - $pos = strpos($data, $format); + /*$pos = strpos($data, $format); $resultdata = array(); if ($pos !== false) { $tmpdata = explode($format, $data); @@ -288,12 +303,22 @@ public function unpackmes($data, $format = '\r\n\r\n', $preformat = '######') return $resultdata; } else { return $data; + }*/ + + $resultdata=substr($data, 4); + $result=json_decode($resultdata, true); + if(!is_array($result)){ + return $resultdata; + }else{ + return $result; } + } //包装数据 public function packmes($data, $format = '\r\n\r\n', $preformat = '######') { - return $preformat . json_encode($data, true) . $format; + //return $preformat . json_encode($data, true) . $format; + return pack('N', strlen(json_encode($data, true))).json_encode($data, true); } //获取目录文件 public function getlistDirFile($dir) diff --git a/server/FileDistributedServer.php b/server/FileDistributedServer.php index eeafb95..7d9b8f1 100644 --- a/server/FileDistributedServer.php +++ b/server/FileDistributedServer.php @@ -44,6 +44,11 @@ public function __construct() //'task_worker_num' => 8, 'dispatch_mode' => 4, //1: 轮循, 3: 争抢 'daemonize' => true, + 'open_length_check' => true, + 'package_length_type' => 'N', + 'package_length_offset' => 0, //第N个字节是包长度的值 + 'package_body_offset' => 4, //第几个字节开始计算长度 + 'package_max_length' => 2000000, //协议最大长度 'log_file' => ServerLog )); } else { @@ -51,6 +56,11 @@ public function __construct() 'worker_num' => 1, //'task_worker_num' => 8, 'dispatch_mode' => 4, //1: 轮循, 3: 争抢 + 'open_length_check' => true, + 'package_length_type' => 'N', + 'package_length_offset' => 0, //第N个字节是包长度的值 + 'package_body_offset' => 4, //第几个字节开始计算长度 + 'package_max_length' => 2000000, //协议最大长度 'daemonize' => true )); } @@ -174,8 +184,6 @@ public function onStart($serv) ); } $localclient->send(FileDistributedClient::getInstance()->packmes($data)); - //sleep(1); - usleep(300000); } } @@ -264,24 +272,23 @@ public function onReceive($serv, $fd, $from_id, $data) } } } else { - foreach ($remote_info as &$val) { - if (isset($val['type']) && $val['type'] == 'system' && $val['data']['code'] == 10001) { - if ($this->client_a != $val['data']['fd']) { - if (!$this->table->get(ip2long($val['data']['fd']))) { - $client = FileDistributedClient::getInstance()->addServerClient($val['data']['fd']); - $this->b_server_pool[ip2long($val['data']['fd'])] = array( - 'fd' => $val['data']['fd'], + if ($remote_info['type'] == 'system' && $remote_info['data']['code'] == 10001) { + if ($this->client_a != $remote_info['data']['fd']) { + if (!$this->table->get(ip2long($remote_info['data']['fd']))) { + $client = FileDistributedClient::getInstance()->addServerClient($remote_info['data']['fd']); + $this->b_server_pool[ip2long($remote_info['data']['fd'])] = array( + 'fd' => $remote_info['data']['fd'], 'client' => $client ); - $this->client_a = $val['data']['fd']; + $this->client_a = $remote_info['data']['fd']; } else { if (FileDistributedClient::getInstance()->getkey()) { - $client = FileDistributedClient::getInstance()->addServerClient($val['data']['fd']); - $this->b_server_pool[ip2long($val['data']['fd'])] = array( - 'fd' => $val['data']['fd'], + $client = FileDistributedClient::getInstance()->addServerClient($remote_info['data']['fd']); + $this->b_server_pool[ip2long($remote_info['data']['fd'])] = array( + 'fd' => $remote_info['data']['fd'], 'client' => $client ); - $this->client_a = $val['data']['fd']; + $this->client_a = $remote_info['data']['fd']; if ($this->localip == FileDistributedClient::getInstance()->getkey()) { FileDistributedClient::getInstance()->delkey(); } @@ -301,29 +308,29 @@ public function onReceive($serv, $fd, $from_id, $data) array_push($this->client_pool_ser, $this->connectioninfo['remote_ip']); } } - echo date('[ c ]') . str_replace("\n", "", var_export($val, true)); + echo date('[ c ]') . str_replace("\n", "", var_export($remote_info, true)); } else { - if (isset($val['type'])) { - switch ($val['type']) { + switch ($remote_info['type']) { case 'filesize': - if (isset($val['data']['path'])) { + if (isset($remote_info['data']['path'])) { $data_s = array( 'type' => 'filesizemes', 'data' => array( - 'path' => $val['data']['path'] + 'path' => $remote_info['data']['path'], + 'filesize'=>$remote_info['data']['filesize'] ) ); - array_push($this->client_pool_ser_c, $val); + array_push($this->client_pool_ser_c,$remote_info); $serv->send($fd, FileDistributedClient::getInstance()->packmes($data_s)); } break; case 'file': - if (isset($val['data']['path'])) { - if (!file_exists(LISTENPATH . str_replace("@", "_", rawurldecode($val['data']['path'])))) { + if (isset($remote_info['data']['path'])) { + if (!file_exists(LISTENPATH . str_replace("@", "_", rawurldecode($remote_info['data']['path'])))) { $data_s = array( 'type' => 'filemes', 'data' => array( - 'path' => $val['data']['path'] + 'path' => $remote_info['data']['path'] ) ); $serv->send($fd, FileDistributedClient::getInstance()->packmes($data_s)); @@ -331,19 +338,19 @@ public function onReceive($serv, $fd, $from_id, $data) } break; case 'asyncfileclient': - if (isset($val['data']['path'])) { - if (empty($val['data']['pre'])) { + if (isset($remote_info['data']['path'])) { + if (empty($remote_info['data']['pre'])) { $dataas = array( 'type' => 'asyncfile', 'data' => array( - 'path' => rawurlencode('/') . $val['data']['fileex'] + 'path' => rawurlencode('/') . $remote_info['data']['fileex'] ) ); } else { $dataas = array( 'type' => 'asyncfile', 'data' => array( - 'path' => $val['data']['pre'] + 'path' => $remote_info['data']['pre'] ) ); } @@ -352,68 +359,31 @@ public function onReceive($serv, $fd, $from_id, $data) } break; case 'fileclient': - if (empty($val['data']['pre'])) { + if (empty($remote_info['data']['pre'])) { $datas = array( 'type' => 'file', 'data' => array( - 'path' => rawurlencode('/') . $val['data']['fileex'] + 'path' => rawurlencode('/') . $remote_info['data']['fileex'] ) ); } else { $datas = array( 'type' => 'file', 'data' => array( - 'path' => rawurlencode(rawurldecode($val['data']['pre']) . '/' . rawurldecode($val['data']['fileex'])) + 'path' => rawurlencode(rawurldecode($remote_info['data']['pre']) . '/' . rawurldecode($remote_info['data']['fileex'])) ) ); } foreach ($this->b_server_pool as $k => $v) { - $v['client']->send(FileDistributedClient::getInstance()->packmes($datas)); + if($v['fd']!=$this->localip) $v['client']->send(FileDistributedClient::getInstance()->packmes($datas)); } break; - default: + default: break; } - echo date('[ c ]') . str_replace("\n", "", var_export($val, true)); - } else { - if (!$this->tmpdata_flag) { - $tdf = array_shift($this->client_pool_ser_c); - $this->curpath['path'] = LISTENPATH . str_replace("@", "_", rawurldecode($tdf['data']['path'])); - $this->filesizes = $tdf['data']['filesize']; - $this->tmpdata_flag = 1; - } - if (isset($this->curpath['path']) && $this->curpath['path']!=LISTENPATH) { - if (is_dir(dirname($this->curpath['path'])) && is_readable(dirname($this->curpath['path']))) { - } else { - FileDistributedClient::getInstance()->mklistDir(dirname($this->curpath['path'])); - } - if ($this->oldpath != $this->curpath['path']) { - $this->tmpdata .= $val; - if (strlen($this->tmpdata) > $this->filesizes) { - $this->tmpdatas = substr($this->tmpdata, $this->filesizes, strlen($this->tmpdata)); - $this->tmpdata = substr($this->tmpdata, 0, $this->filesizes); - } - } - if (strlen($this->tmpdata) == $this->filesizes) { - - if (file_put_contents($this->curpath['path'], $this->tmpdata)) { - $this->tmpdata = ''; - $this->oldpath = $this->curpath['path']; - - if (strlen($this->tmpdatas) > 0) { - $this->tmpdata = $this->tmpdatas; - $this->tmpdatas = ''; - } - $this->tmpdata_flag = 0; - } - - } - } + echo date('[ c ]') . str_replace("\n", "", var_export($remote_info, true)); } - - } - } - + } }