diff --git a/server/FileDistributedClient.php b/server/FileDistributedClient.php index 9f2d764..7a03572 100644 --- a/server/FileDistributedClient.php +++ b/server/FileDistributedClient.php @@ -21,6 +21,7 @@ class FileDistributedClient private $table; private $cur_address; private $del_server = array(); + private $flagclient; public function __construct() { require_once __DIR__ . '/lib/phpredis.php'; @@ -61,7 +62,7 @@ public function addServerClient($address) public function onConnect($serv) { $localinfo = swoole_get_local_ip(); - $serv->send(json_encode(array( + $serv->send($this->packmes(array( 'type' => 'system', 'data' => array( 'code' => 10001, @@ -73,21 +74,94 @@ public function onConnect($serv) public function onReceive($client, $data) { - $remote_info = json_decode($data, true); - if ($remote_info['type'] == 'filemes') { - $strlendata = file_get_contents(LISTENPATH .$remote_info['data']['path']); - $datas = array( - 'type' => 'filesize', - 'data' => array( - 'path' => $remote_info['data']['path'], - 'filesize' => strlen($strlendata) - ) - ); - $client->send(json_encode($datas, true)); - } else if ($remote_info['type'] == 'filesizemes') { - if ($client->sendfile(LISTENPATH .$remote_info['data']['path'])) { + $remote_info = $this->unpackmes($data); + if (is_array($remote_info)) { + foreach ($remote_info as &$val) { + switch ($val['type']) { + case 'filemes': + $strlendata = file_get_contents(LISTENPATH . '/' . $val['data']['path']); + $datas = array( + 'type' => 'filesize', + 'data' => array( + 'path' => $val['data']['path'], + 'filesize' => strlen($strlendata) + ) + ); + $client->send($this->packmes($datas)); + break; + case 'filesizemes': + if ($client->sendfile(LISTENPATH . '/' . $val['data']['path'])) { + } + break; + case 'system': //启动一个进程来处理已存在的图片 + $listenpath = LISTENPATH; + $this->flagclient = $flagclient = 0; + $process = new swoole_process(function($process) use ($listenpath, $flagclient) + { + if (!$flagclient) { + $filelist = $this->getlistDirFile($listenpath); + if (!empty($filelist)) { + foreach ($filelist as &$v) { + $process->write($v); + } + $flagclient = 1; + } + } + + }); + $process->start(); + swoole_event_add($process->pipe, function($pipe) use ($client, $listenpath, $process) + { + $data_l = $process->read(); + $infofile = pathinfo($data_l); + if ($infofile['dirname'] == $listenpath) { + $data = array( + 'type' => 'asyncfileclient', + 'data' => array( + 'path' => iconv('GB2312', 'UTF-8', $data_l), + 'fileex' => $infofile, + 'pre' => '' + ) + ); + } else { + $data = array( + 'type' => 'asyncfileclient', + 'data' => array( + 'path' => iconv('GB2312', 'UTF-8', $data_l), + 'fileex' => $infofile, + 'pre' => substr($infofile['dirname'], strlen($listenpath), strlen($infofile['dirname'])) + ) + ); + } + + + $client->send($this->packmes($data)); + }); + break; + case 'asyncfile': + $data_sa = array( + 'type' => 'file', + 'data' => array( + 'path' => $val['data']['path'] + ) + ); + + $client->send($this->packmes($data_sa)); + break; + default: + break; + + + } } + + + } else { + echo date('[ c ]') . '参数不对 \r\n'; } + + + } public function onTask($serv, $task_id, $from_id, $data) { @@ -173,6 +247,29 @@ public function getlistDir($dir) } return $dirInfo; } + //解包装数据 + public function unpackmes($data, $format = '\r\n\r\n') + { + $pos = strpos($data, $format); + if ($pos !== false) { + $tmpdata = explode($format, $data); + foreach ($tmpdata as $k => $v) { + if (empty($v)) { + unset($tmpdata[$k]); + } else { + $tmpdata[$k] = json_decode($v, true); + } + } + return $tmpdata; + } else { + return $data; + } + } + //包装数据 + public function packmes($data, $format = '\r\n\r\n') + { + return json_encode($data, true) . $format; + } //获取目录文件 public function getlistDirFile($dir) { diff --git a/server/FileDistributedServer.php b/server/FileDistributedServer.php index a139be6..36881d7 100644 --- a/server/FileDistributedServer.php +++ b/server/FileDistributedServer.php @@ -24,7 +24,11 @@ class FileDistributedServer private $filefd; private $filesizes; private $tmpdata; + private $tmpdatas; private $oldpath; + private $client_pool_ser = array(); + private $client_pool_ser_c = array(); + private $tmpdata_flag; private $wd = array(); public function __construct() { @@ -108,7 +112,8 @@ public function onStart($serv) $wd = inotify_add_watch($this->filefd, $v, IN_CREATE | IN_MOVED_TO | IN_CLOSE_WRITE); //IN_MODIFY、IN_ALL_EVENTS、IN_CLOSE_WRITE $this->wd[$wd] = array( 'wd' => $wd, - 'path' => $v + 'path' => $v, + 'pre' => '' ); } } @@ -128,17 +133,32 @@ public function onStart($serv) $wd = inotify_add_watch($fd, $listenpath, IN_CREATE | IN_MOVED_TO | IN_CLOSE_WRITE); $this->wd[$wd] = array( 'wd' => $wd, - 'path' => $listenpath + 'path' => $listenpath, + 'pre' => $vv['name'] ); } else { $path_listen = $this->wd[$vv['wd']]['path'] . '/' . $vv['name']; - $data = array( - 'type' => 'fileclient', - 'data' => array( - 'path' => iconv('GB2312', 'UTF-8', $path_listen) - ) - ); - $localclient->send(json_encode($data, true)); + $infofile = pathinfo($path_listen); + if (empty($this->wd[$vv['wd']]['pre'])) { + $data = array( + 'type' => 'fileclient', + 'data' => array( + 'path' => iconv('GB2312', 'UTF-8', $path_listen), + 'fileex' => $infofile, + 'pre' => '' + ) + ); + } else { + $data = array( + 'type' => 'fileclient', + 'data' => array( + 'path' => iconv('GB2312', 'UTF-8', $path_listen), + 'fileex' => $infofile, + 'pre' => $this->wd[$vv['wd']]['pre'] + ) + ); + } + $localclient->send(FileDistributedClient::getInstance()->packmes($data)); } } @@ -146,6 +166,7 @@ public function onStart($serv) } }); + } public function onWorkerStart($serv, $worker_id) { @@ -186,16 +207,26 @@ public function onConnect($serv, $fd) } public function onReceive($serv, $fd, $from_id, $data) { - $remote_info = json_decode($data, true); + $remote_info = FileDistributedClient::getInstance()->unpackmes($data); //判断是否为二进制图片流 if (!is_array($remote_info)) { + if (!$this->tmpdata_flag) { + $tdf = array_shift($this->client_pool_ser_c); + $this->curpath['path'] = LISTENPATH . '/' . $tdf['data']['path']; + $this->filesizes = $tdf['data']['filesize']; + $this->tmpdata_flag = 1; + } if (isset($this->curpath['path'])) { if (is_dir(dirname($this->curpath['path'])) && is_readable(dirname($this->curpath['path']))) { } else { FileDistributedClient::getInstance()->mklistDir(dirname($this->curpath['path'])); } if ($this->oldpath != $this->curpath['path']) { - $this->tmpdata .= $data; + $this->tmpdata .= $remote_info; + if (strlen($this->tmpdata) > $this->filesizes) { + $this->tmpdatas = substr($this->tmpdata, $this->filesizes, strlen($this->tmpdata)); + $this->tmpdata = substr($this->tmpdata, 0, $this->filesizes); + } } if (strlen($this->tmpdata) == $this->filesizes) { $infofile = pathinfo($this->curpath['path']); @@ -206,6 +237,12 @@ public function onReceive($serv, $fd, $from_id, $data) if (file_put_contents($this->curpath['path'], $this->tmpdata)) { $this->tmpdata = ''; $this->oldpath = $this->curpath['path']; + + if (strlen($this->tmpdatas) > 0) { + $this->tmpdata = $this->tmpdatas; + $this->tmpdatas = ''; + } + $this->tmpdata_flag = 0; } } else { if (in_array($infofile['extension'], array( @@ -220,108 +257,178 @@ public function onReceive($serv, $fd, $from_id, $data) if (file_put_contents($this->curpath['path'], $this->tmpdata)) { $this->tmpdata = ''; $this->oldpath = $this->curpath['path']; + if (strlen($this->tmpdatas) > 0) { + $this->tmpdata = $this->tmpdatas; + $this->tmpdatas = ''; + + } + $this->tmpdata_flag = 0; } //写入图片流 } } } } } else { - if ($remote_info['type'] == 'system' && $remote_info['data']['code'] == 10001) { - if ($this->client_a != $remote_info['data']['fd']) { - if (!$this->table->get(ip2long($remote_info['data']['fd']))) { - $client = FileDistributedClient::getInstance()->addServerClient($remote_info['data']['fd']); - $this->b_server_pool[ip2long($remote_info['data']['fd'])] = array( - 'fd' => $remote_info['data']['fd'], - 'client' => $client - ); - $this->client_a = $remote_info['data']['fd']; - } else { - if (FileDistributedClient::getInstance()->getkey()) { - $client = FileDistributedClient::getInstance()->addServerClient($remote_info['data']['fd']); - $this->b_server_pool[ip2long($remote_info['data']['fd'])] = array( - 'fd' => $remote_info['data']['fd'], + foreach ($remote_info as &$val) { + if ($val['type'] == 'system' && $val['data']['code'] == 10001) { + if ($this->client_a != $val['data']['fd']) { + if (!$this->table->get(ip2long($val['data']['fd']))) { + $client = FileDistributedClient::getInstance()->addServerClient($val['data']['fd']); + $this->b_server_pool[ip2long($val['data']['fd'])] = array( + 'fd' => $val['data']['fd'], 'client' => $client ); - $this->client_a = $remote_info['data']['fd']; - if ($this->localip == FileDistributedClient::getInstance()->getkey()) { - FileDistributedClient::getInstance()->delkey(); + $this->client_a = $val['data']['fd']; + } else { + if (FileDistributedClient::getInstance()->getkey()) { + $client = FileDistributedClient::getInstance()->addServerClient($val['data']['fd']); + $this->b_server_pool[ip2long($val['data']['fd'])] = array( + 'fd' => $val['data']['fd'], + 'client' => $client + ); + $this->client_a = $val['data']['fd']; + if ($this->localip == FileDistributedClient::getInstance()->getkey()) { + FileDistributedClient::getInstance()->delkey(); + } } } + } - - } - } else { - switch ($remote_info['type']) { - case 'filesize': - if (isset($remote_info['data']['path'])) { - $data_s = array( - 'type' => 'filesizemes', + if ($this->localip != $this->connectioninfo['remote_ip']) { + if (!in_array($this->connectioninfo['remote_ip'], $this->client_pool_ser)) { + $serv->send($fd, FileDistributedClient::getInstance()->packmes(array( + 'type' => 'system', 'data' => array( - 'path' => $remote_info['data']['path'] + 'code' => 10002, + 'fd' => $this->localip ) - ); - $this->filesizes = $remote_info['data']['filesize']; - $serv->send($fd, json_encode($data_s, true)); + ))); + array_push($this->client_pool_ser, $this->connectioninfo['remote_ip']); } - break; - case 'file': - if (isset($remote_info['data']['path'])) { - if(!file_exists(LISTENPATH .$remote_info['data']['path'])){ - $this->curpath['path'] = LISTENPATH .$remote_info['data']['path']; - $data_s = array( - 'type' => 'filemes', - 'data' => array( - 'path' => $remote_info['data']['path'] - ) - ); - $serv->send($fd, json_encode($data_s, true)); - } - } - break; - case 'fileclient': - $infofile = pathinfo($remote_info['data']['path']); - if ($infofile['basename']) { - $extend = explode(".", $infofile['basename']); - $va = count($extend) - 1; - if (in_array($extend[$va], array( - 'txt', - 'log', - 'jpg', - 'png', - 'jpeg', - 'JPG', - 'JPEG', - 'PNG', - 'bmp' - ))) { - if (isset($this->curpath['path']) && $remote_info['data']['path'] == $this->curpath['path']) { - } else { - $datas = array( - 'type' => 'file', + } + } else { + switch ($val['type']) { + case 'filesize': + if (isset($val['data']['path'])) { + $data_s = array( + 'type' => 'filesizemes', + 'data' => array( + 'path' => $val['data']['path'] + ) + ); + $this->filesizes = $val['data']['filesize']; + array_push($this->client_pool_ser_c, $val); + $serv->send($fd, FileDistributedClient::getInstance()->packmes($data_s)); + } + break; + case 'file': + if (isset($val['data']['path'])) { + if (!file_exists(LISTENPATH . '/' . $val['data']['path'])) { + $this->curpath['path'] = LISTENPATH . '/' . $val['data']['path']; + $data_s = array( + 'type' => 'filemes', 'data' => array( - 'path' => substr($remote_info['data']['path'],strlen(LISTENPATH),strlen($remote_info['data']['path'])) + 'path' => $val['data']['path'] ) ); - foreach ($this->b_server_pool as $k => $v) { - if (file_exists($remote_info['data']['path'])) { - if ($this->localip != $v['fd'] && $this->curpath['path'] != $remote_info['data']['path']) { - if ($v['client']->send(json_encode($datas))) { + $serv->send($fd, FileDistributedClient::getInstance()->packmes($data_s)); + } + } + break; + case 'asyncfileclient': + if (isset($val['data']['path'])) { + $extend = explode(".", $val['data']['fileex']['basename']); + $va = count($extend) - 1; + if (in_array($extend[$va], array( + 'txt', + 'log', + 'jpg', + 'png', + 'jpeg', + 'JPG', + 'JPEG', + 'PNG', + 'bmp' + ))) { + if (empty($val['data']['pre'])) { + $dataas = array( + 'type' => 'asyncfile', + 'data' => array( + 'path' => $val['data']['fileex']['basename'] + ) + ); + } else { + $dataas = array( + 'type' => 'asyncfile', + 'data' => array( + 'path' => $val['data']['pre'] . '/' . $val['data']['fileex']['basename'] + ) + ); + } + + $serv->send($fd, FileDistributedClient::getInstance()->packmes($dataas)); + + } + } + break; + case 'fileclient': + $infofile = pathinfo($val['data']['path']); + if ($infofile['basename']) { + $extend = explode(".", $infofile['basename']); + $va = count($extend) - 1; + if (in_array($extend[$va], array( + 'txt', + 'log', + 'jpg', + 'png', + 'jpeg', + 'JPG', + 'JPEG', + 'PNG', + 'bmp' + ))) { + if (isset($this->curpath['path']) && $val['data']['path'] == $this->curpath['path']) { + } else { + if (empty($val['data']['pre'])) { + $datas = array( + 'type' => 'file', + 'data' => array( + 'path' => $val['data']['fileex']['basename'] + ) + ); + } else { + $datas = array( + 'type' => 'file', + 'data' => array( + 'path' => $val['data']['pre'] . '/' . $val['data']['fileex']['basename'] + ) + ); + } + + foreach ($this->b_server_pool as $k => $v) { + if (file_exists($val['data']['path'])) { + if ($this->localip != $v['fd'] && $this->curpath['path'] != $val['data']['path']) { + if ($v['client']->send(FileDistributedClient::getInstance()->packmes($datas))) { + } } + } } - } } } - } - break; - default: - break; + break; + default: + break; + } } } - echo date('[ c ]') . str_replace("\n", "",var_export($remote_info, true)) .'\r\n'; + + //print_r($remote_info); + echo date('[ c ]') . str_replace("\n", "", var_export($remote_info, true)); } + } /** * 服务器断开连接