1、通过配置附加事件处理器,编辑 /environments/dev/common/config/main-local.php、/environments/prod/common/config/main-local.php,如图1
'copyAssetQueue' => [ // 复制资源文件队列 'class' => 'yii/queue/redis/Queue', 'redis' => 'redis', // Redis 连接组件或它的配置 'channel' => 'cpa:queue:copy:asset', // 队列键前缀 'ttr' => 10 * 60, // 作业处理的最长时间,单位(秒) 'on afterExec' => ['common/components/queue/CopyAssetEventHandler', 'afterExec'], // 每次成功执行作业后 'on afterError' => ['common/components/queue/CopyAssetEventHandler', 'afterError'], // 在作业执行期间发生未捕获的异常时 'as log' => 'yii/queue/LogBehavior', ], 'uploadAssetQueue' => [ // 上传资源文件队列 'class' => 'yii/queue/redis/Queue', 'redis' => 'redis', // Redis 连接组件或它的配置 'channel' => 'cpa:queue:upload:asset', // 队列键前缀 'ttr' => 5 * 60, // 作业处理的最长时间,单位(秒) 'on afterExec' => ['common/components/queue/UploadAssetEventHandler', 'afterExec'], // 每次成功执行作业后 'on afterError' => ['common/components/queue/UploadAssetEventHandler', 'afterError'], // 在作业执行期间发生未捕获的异常时 'as log' => 'yii/queue/LogBehavior', ], 'pubArticleQueue' => [ // 发布文章队列 'class' => 'yii/queue/redis/Queue', 'redis' => 'redis', // Redis 连接组件或它的配置 'channel' => 'cpa:queue:pub:article', // 队列键前缀 'ttr' => 5 * 60, // 作业处理的最长时间,单位(秒) 'on afterExec' => ['common/components/queue/PubArticleEventHandler', 'afterExec'], // 每次成功执行作业后 'on afterError' => ['common/components/queue/PubArticleEventHandler', 'afterError'], // 在作业执行期间发生未捕获的异常时 'as log' => 'yii/queue/LogBehavior', ], 'sourceCallbackQueue' => [ // 来源回调队列 'class' => 'yii/queue/redis/Queue', 'redis' => 'redis', // Redis 连接组件或它的配置 'channel' => 'cpa:queue:source:callback', // 队列键前缀 'ttr' => 5 * 60, // 作业处理的最长时间,单位(秒) 'on afterExec' => ['common/components/queue/SourceCallbackEventHandler', 'afterExec'], // 每次成功执行作业后 'on afterError' => ['common/components/queue/SourceCallbackEventHandler', 'afterError'], // 在作业执行期间发生未捕获的异常时 'as log' => 'yii/queue/LogBehavior', ],
2、编辑 /qq/rests/qq_cw_app/IndexAction.php,一个推送队列的入口
$data = [ 'channel_id' => 2, // 渠道ID 'channel_code' => 'wx', // 渠道代码,qq:企鹅号;wx:微信公众帐号 'channel_type_id' => 3, // 渠道的类型ID 'channel_type_code' => 'wx', // 渠道的类型代码,qq_cw:企鹅号的内容网站应用;qq_tp:企鹅号的第三方服务平台应用;wx:微信公众帐号应用 'source' => 'spider', // 来源,xContent:内容库;vms:视频管理系统;cms:内容管理系统;spider:自媒体 'task_id' => 2, // 任务ID ]; $assets = [ [ 'type' => 'image', 'channel_article_id' => 1, 'absolute_url' => 'http://localhost/channel-pub-api/storage/spider/images/1.png', ], [ 'type' => 'video', 'channel_article_id' => 1, 'absolute_url' => 'http://localhost/channel-pub-api/storage/spider/videos/7月份北上广深等十大城市租金环比上涨 看东方 20180820 高清_高清.mp4', ], ]; $assetServiceCopyAssetsAsyncResult = AssetService::copyAssetsAsync($data, $assets); print_r($assetServiceCopyAssetsAsyncResult); exit;
3、编辑 /common/services/AssetService.php,复制来源的资源文件至渠道发布的资源目录,队列任务执行成功后,调用相应服务,否则,插入发布日志(异步)
/** * 复制来源的资源文件至渠道发布的资源目录,队列任务执行成功后,调用相应服务,否则,插入发布日志(异步) * @param array $data 数据 * 格式如下: * [ * 'channel_id' => 1, // 渠道ID * 'channel_code' => 'qq', // 渠道代码,qq:企鹅号;wx:微信公众帐号 * 'channel_type_id' => 1, // 渠道的类型ID * 'channel_type_code' => 'qq_cw', // 渠道的类型代码,qq_cw:企鹅号的内容网站应用;qq_tp:企鹅号的第三方服务平台应用;wx:微信公众帐号应用 * 'source' => 'spider', // 来源,xContent:内容库;vms:视频管理系统;cms:内容管理系统;spider:自媒体 * 'task_id' => 1, // 任务ID * ] * * @param array $assets 来源的资源文件的绝对URL * 格式如下: * [ * [ * 'type' => 'image', // 资源文件的类型,image:图片;video:视频 * 'channel_article_id' => 1, // 渠道的文章ID * 'absolute_url' => 'http://localhost/spider/storage/spider/images/1.png', // 来源的资源文件的绝对URL * ], * [ * 'type' => 'video', // 资源文件的类型,image:图片;video:视频 * 'channel_article_id' => 1, // 渠道的文章ID * 'absolute_url' => 'http://localhost/channel-pub-api/storage/spider/videos/7月份北上广深等十大城市租金环比上涨 看东方 20180820 高清_高清.mp4', // 来源的资源文件的绝对URL * ], * ] * * @return array $channelPubApiAssetAbsolutePaths 渠道发布的资源文件的相对路径 * 格式如下: * [ * [ * 'type' => 'image', * 'channel_article_id' => 1, * 'relative_path' => '/2018/09/20/1537439889.2333.1441541478.png', * ], * [ * 'type' => 'video', * 'channel_article_id' => 1, * 'relative_path' => '/2018/09/20/1537439889.2403.62871268.mp4', * ], * ] * * @throws Exception execution failed */ public static function copyAssetsAsync($data, $assets) { $assetData = []; $time = time(); foreach ($assets as $key => $asset) { $assetData[] = [ 'channel_id' => $data['channel_id'], 'channel_code' => $data['channel_code'], 'channel_type_id' => $data['channel_type_id'], 'channel_type_code' => $data['channel_type_code'], 'source' => $data['source'], 'type' => $asset['type'], 'absolute_url' => $asset['absolute_url'], 'relative_path' => '', 'size' => 0, 'task_id' => $data['task_id'], 'channel_article_id' => $asset['channel_article_id'], 'status' => Asset::STATUS_ENABLED, 'is_deleted' => Asset::IS_DELETED_NO, 'created_at' => $time, 'updated_at' => $time, 'deleted_at' => Asset::DELETED_AT_DEFAULT, ]; } // 批量创建资源 $asset = new Asset(); $assetCreateMultipleResult = $asset->createMultiple($assetData); // 将任务发送到队列(复制资源文件队列),通过标准工作人员进行处理 Yii::$app->copyAssetQueue->push(new CopyAssetJob([ 'taskId' => $data['task_id'], ])); }
4、编辑 /common/jobs/CopyAssetJob.php,队列的任务类
<?php /** * Created by PhpStorm. * User: Qiang Wang * Date: 2018/10/22 * Time: 17:10 */ namespace common/jobs; use Yii; use common/logics/Asset; use common/services/TaskService; use common/services/AssetService; use yii/web/ServerErrorHttpException; /** * 复制来源的资源文件至渠道发布的资源目录 * * @author Qiang Wang <shuijingwanwq@163.com> * @since 1.0 */ class CopyAssetJob extends Job { public $taskId; /* * @throws ServerErrorHttpException 如果基于任务ID查找状态为启用的资源列表为空,将抛出 500 HTTP 异常 */ public function execute($queue) { // 基于ID查找状态为启用的单个数据模型(任务) $taskEnabledItem = TaskService::findModelEnabledById($this->taskId); // 基于任务ID查找状态为启用的资源列表 $assetEnabledItems = Asset::findAllEnabledByTaskId($this->taskId); if (empty($assetEnabledItems)) { throw new ServerErrorHttpException(Yii::t('common/error', Yii::t('common/error', Yii::t('common/error', '35020'), ['task_id' => $this->taskId])), 35020); } $source = $taskEnabledItem->source; $assets = []; foreach ($assetEnabledItems as $assetEnabledItem) { $assets[] = [ 'type' => $assetEnabledItem->type, 'absolute_url' => $assetEnabledItem->absolute_url, ]; } // 复制来源的资源文件至渠道发布的资源目录,返回相对路径(同步) $assetServiceCopyAssetsSyncResult = AssetService::copyAssetsSync($source, $assets); foreach ($assetEnabledItems as $key => $assetEnabledItem) { $assetEnabledItem->relative_path = $assetServiceCopyAssetsSyncResult[$key]['relative_path']; // 取得文件大小,单位(字节) $assetEnabledItem->size = filesize(Yii::$app->params['channelPubApi']['asset'][$assetEnabledItem->type]['basePath'] . $assetServiceCopyAssetsSyncResult[$key]['relative_path']); $assetEnabledItems[$key] = $assetEnabledItem; } // 批量更新资源 $assetEnabledItem->updateMultiple($assetEnabledItems); } }
5、编辑 /common/components/queue/CopyAssetEventHandler.php,在配置中所定义的事件处理器,当复制资源文件队列,每次成功执行作业后(afterExec),将调用相应服务进行后续处理,即推送任务至上传资源文件队列;当复制资源文件队列,在作业执行期间发生未捕获的异常时(afterError),插入发布日志,将作业推送至来源回调队列
<?php /** * Created by PhpStorm. * User: Qiang Wang * Date: 2018/10/23 * Time: 14:23 */ namespace common/components/queue; use Yii; use common/logics/ChannelAppTask; use common/logics/PubLog; use common/services/PubLogService; use common/services/TaskService; use common/services/SourceCallbackService; use yii/helpers/Json; use yii/base/Component; use yii/queue/ExecEvent; use yii/web/NotFoundHttpException; use yii/web/UnprocessableEntityHttpException; use yii/web/ServerErrorHttpException; use yii/db/Exception; /** * Class CopyAssetEventHandler * @package common/components/queue * * @author Qiang Wang <shuijingwanwq@163.com> * @since 1.0 */ class CopyAssetEventHandler extends Component { /** * @param ExecEvent $event * @throws NotFoundHttpException 如果未找到数据模型,将抛出 404 HTTP 异常 * @throws UnprocessableEntityHttpException 如果找到数据模型,状态未启用,将抛出 422 HTTP 异常 * @throws ServerErrorHttpException 如果基于任务ID查找状态为启用的资源列表为空,将抛出 500 HTTP 异常 * @throws Exception execution failed */ public static function afterExec(ExecEvent $event) { $taskId = $event->job->taskId; // 基于ID查找状态为启用的单个数据模型(任务) $taskEnabledItem = TaskService::findModelEnabledById($taskId); // 基于任务ID查找状态为启用的资源列表 $channelAppTaskEnabledItems = ChannelAppTask::findAllEnabledByTaskId($taskId); if (empty($channelAppTaskEnabledItems)) { throw new ServerErrorHttpException(Yii::t('common/error', Yii::t('common/error', Yii::t('common/error', '35021'), ['task_id' => $taskId])), 35021); } try { // 调用相应服务进行后续处理 $serviceClass = 'common/services//' . str_replace(' ', '', ucwords(str_replace('_', ' ', $taskEnabledItem->channel_type_code))) . 'AssetService'; // 例:common/services/QqCwAssetService $serviceAction = 'copyAssetExecHandler'; $serviceClass::$serviceAction($taskEnabledItem->id); } catch (/Throwable $e) { $pubLogData = []; $time = time(); foreach ($channelAppTaskEnabledItems as $channelAppTaskEnabledItem) { $pubLogData[] = [ 'channel_id' => $channelAppTaskEnabledItem['channel_id'], 'channel_code' => $channelAppTaskEnabledItem['channel_code'], 'channel_type_id' => $channelAppTaskEnabledItem['channel_type_id'], 'channel_type_code' => $channelAppTaskEnabledItem['channel_type_code'], 'task_id' => $channelAppTaskEnabledItem['task_id'], 'channel_app_task_id' => $channelAppTaskEnabledItem['id'], 'channel_app_task_uuid' => $channelAppTaskEnabledItem['uuid'], 'code' => $e->getCode(), 'message' => $e->getMessage(), 'data' => Json::encode([]), 'status' => PubLog::STATUS_ERROR, 'is_deleted' => PubLog::IS_DELETED_NO, 'created_at' => $time, 'updated_at' => $time, 'deleted_at' => PubLog::DELETED_AT_DEFAULT, ]; } // 发布任务成功后,调用相应服务失败,插入发布日志,将作业推送至来源回调队列(异步) SourceCallbackService::errorAsync($pubLogData); } } /** * @param ExecEvent $event * @throws NotFoundHttpException 如果未找到数据模型,将抛出 404 HTTP 异常 * @throws UnprocessableEntityHttpException 如果找到数据模型,状态未启用,将抛出 422 HTTP 异常 * @throws ServerErrorHttpException 如果基于任务ID查找状态为启用的资源列表为空,将抛出 500 HTTP 异常 * @throws Exception execution failed */ public static function afterError(ExecEvent $event) { $taskId = $event->job->taskId; // 基于ID查找状态为启用的单个数据模型(任务) $taskEnabledItem = TaskService::findModelEnabledById($taskId); // 基于任务ID查找状态为启用的资源列表 $channelAppTaskEnabledItems = ChannelAppTask::findAllEnabledByTaskId($taskId); if (empty($channelAppTaskEnabledItems)) { throw new ServerErrorHttpException(Yii::t('common/error', Yii::t('common/error', Yii::t('common/error', '35021'), ['task_id' => $taskId])), 35021); } $pubLogData = []; $time = time(); foreach ($channelAppTaskEnabledItems as $channelAppTaskEnabledItem) { $pubLogData[] = [ 'channel_id' => $channelAppTaskEnabledItem['channel_id'], 'channel_code' => $channelAppTaskEnabledItem['channel_code'], 'channel_type_id' => $channelAppTaskEnabledItem['channel_type_id'], 'channel_type_code' => $channelAppTaskEnabledItem['channel_type_code'], 'task_id' => $channelAppTaskEnabledItem['task_id'], 'channel_app_task_id' => $channelAppTaskEnabledItem['id'], 'channel_app_task_uuid' => $channelAppTaskEnabledItem['uuid'], 'code' => $event->error->getCode(), 'message' => $event->error->getMessage(), 'data' => Json::encode([]), 'status' => PubLog::STATUS_ERROR, 'is_deleted' => PubLog::IS_DELETED_NO, 'created_at' => $time, 'updated_at' => $time, 'deleted_at' => PubLog::DELETED_AT_DEFAULT, ]; } // 发布任务失败后,插入发布日志,将作业推送至来源回调队列(异步) SourceCallbackService::errorAsync($pubLogData); } }
6、打开网址:http://api.channel-pub-api.localhost/qq/v1/qq-cw-apps?group_id=spider ,推送任务至队列
7、info 命令打印关于队列状态的信息,复制资源文件队列中1个任务状态为等待,上传资源文件队列中0个任务状态为等待
./yii copy-asset-queue/info --color=0 ./yii upload-asset-queue/info --color=0
Jobs - waiting: 1 - delayed: 0 - reserved: 0 - done: 0 Jobs - waiting: 0 - delayed: 0 - reserved: 0 - done: 0
8、run 命令获取并执行循环中的任务(复制资源文件队列),直到队列为空,复制资源文件队列中的任务成功执行后,将下一步的上传任务推送至上传资源文件队列,复制资源文件队列中0个任务状态为等待,复制资源文件队列中1个任务状态为完成,上传资源文件队列中1个任务状态为等待,如图2
./yii copy-asset-queue/run --verbose=1 --isolate=1 --color=0 ./yii copy-asset-queue/info ./yii upload-asset-queue/info --color=0
2018-10-27 17:23:54 [pid: 5216] - Worker is started 2018-10-27 17:23:54 [1] common/jobs/CopyAssetJob (attempt: 1, pid: 5216) - Started 2018-10-27 17:23:55 [1] common/jobs/CopyAssetJob (attempt: 1, pid: 5216) - Done (0.249 s) 2018-10-27 17:23:55 [pid: 5216] - Worker is stopped (0:00:01) Jobs - waiting: 0 - delayed: 0 - reserved: 0 - done: 1 Jobs - waiting: 1 - delayed: 0 - reserved: 0 - done: 0
9、编辑 /common/jobs/CopyAssetJob.php,队列的任务类,特意抛出一个异常,以测试当复制资源文件队列,在作业执行期间发生未捕获的异常时(afterError),插入发布日志,将作业推送至来源回调队列
<?php /** * Created by PhpStorm. * User: Qiang Wang * Date: 2018/10/22 * Time: 17:10 */ namespace common/jobs; use Yii; use common/logics/Asset; use common/services/TaskService; use common/services/AssetService; use yii/web/ServerErrorHttpException; /** * 复制来源的资源文件至渠道发布的资源目录 * * @author Qiang Wang <shuijingwanwq@163.com> * @since 1.0 */ class CopyAssetJob extends Job { public $taskId; /* * @throws ServerErrorHttpException 如果基于任务ID查找状态为启用的资源列表为空,将抛出 500 HTTP 异常 */ public function execute($queue) { // 基于ID查找状态为启用的单个数据模型(任务) $taskEnabledItem = TaskService::findModelEnabledById($this->taskId); // 基于任务ID查找状态为启用的资源列表 $assetEnabledItems = Asset::findAllEnabledByTaskId($this->taskId); throw new ServerErrorHttpException(Yii::t('common/error', Yii::t('common/error', Yii::t('common/error', '35020'), ['task_id' => $this->taskId])), 35020); if (empty($assetEnabledItems)) { throw new ServerErrorHttpException(Yii::t('common/error', Yii::t('common/error', Yii::t('common/error', '35020'), ['task_id' => $this->taskId])), 35020); } $source = $taskEnabledItem->source; $assets = []; foreach ($assetEnabledItems as $assetEnabledItem) { $assets[] = [ 'type' => $assetEnabledItem->type, 'absolute_url' => $assetEnabledItem->absolute_url, ]; } // 复制来源的资源文件至渠道发布的资源目录,返回相对路径(同步) $assetServiceCopyAssetsSyncResult = AssetService::copyAssetsSync($source, $assets); foreach ($assetEnabledItems as $key => $assetEnabledItem) { $assetEnabledItem->relative_path = $assetServiceCopyAssetsSyncResult[$key]['relative_path']; // 取得文件大小,单位(字节) $assetEnabledItem->size = filesize(Yii::$app->params['channelPubApi']['asset'][$assetEnabledItem->type]['basePath'] . $assetServiceCopyAssetsSyncResult[$key]['relative_path']); $assetEnabledItems[$key] = $assetEnabledItem; } // 批量更新资源 $assetEnabledItem->updateMultiple($assetEnabledItems); } }
10、清空 Redis,即清空队列中的数据,打开网址:http://api.channel-pub-api.localhost/qq/v1/qq-cw-apps?group_id=spider ,推送任务至队列
11、info 命令打印关于队列状态的信息,复制资源文件队列中1个任务状态为等待,上传资源文件队列中0个任务状态为等待,来源回调队列中0个任务状态为等待
./yii copy-asset-queue/info --color=0 ./yii upload-asset-queue/info --color=0 ./yii source-callback-queue/info --color=0
Jobs - waiting: 1 - delayed: 0 - reserved: 0 - done: 0 Jobs - waiting: 0 - delayed: 0 - reserved: 0 - done: 0 Jobs - waiting: 0 - delayed: 0 - reserved: 0 - done: 0
12、run 命令获取并执行循环中的任务(复制资源文件队列),直到队列为空,复制资源文件队列中的任务执行失败(因为任务中有抛出未捕获的异常)后,将下一步的上传任务推送至上传资源文件队列,复制资源文件队列中0个任务状态为等待,复制资源文件队列中1个任务状态为完成,上传资源文件队列中0个任务状态为等待,来源回调队列中1个任务状态为等待,如图3
./yii copy-asset-queue/run --verbose=1 --isolate=1 --color=0 ./yii copy-asset-queue/info --color=0 ./yii upload-asset-queue/info --color=0 ./yii source-callback-queue/info --color=0
2018-10-27 17:37:10 [pid: 32132] - Worker is started 2018-10-27 17:37:11 [1] common/jobs/CopyAssetJob (attempt: 1, pid: 32132) - Started 2018-10-27 17:37:11 [1] common/jobs/CopyAssetJob (attempt: 1, pid: 32132) - Error (0.232 s) > yii/web/ServerErrorHttpException: Based on task ID: 2, find the list of enabled resources is empty 2018-10-27 17:37:11 [pid: 32132] - Worker is stopped (0:00:01) Jobs - waiting: 0 - delayed: 0 - reserved: 0 - done: 1 Jobs - waiting: 0 - delayed: 0 - reserved: 0 - done: 0 Jobs - waiting: 1 - delayed: 0 - reserved: 0 - done: 0
13、查看系统日志表的异常信息,即 log 表中的 message,有助于后续开发人员的分析工作,如图4
[1] common/jobs/CopyAssetJob (attempt: 1, PID: 32132) is finished with error: yii/web/ServerErrorHttpException: Based on task ID: 2, find the list of enabled resources is empty in E:/wwwroot/channel-pub-api/common/jobs/CopyAssetJob.php:38 Stack trace: #0 E:/wwwroot/channel-pub-api/vendor/yiisoft/yii2-queue/src/Queue.php(214): common/jobs/CopyAssetJob->execute(Object(yii/queue/redis/Queue)) #1 E:/wwwroot/channel-pub-api/vendor/yiisoft/yii2-queue/src/cli/Queue.php(162): yii/queue/Queue->handleMessage('1', 'O:24:"common//jo...', '600', '1') #2 E:/wwwroot/channel-pub-api/vendor/yiisoft/yii2-queue/src/cli/Command.php(145): yii/queue/cli/Queue->execute('1', 'O:24:"common//jo...', '600', '1', '32132') #3 [internal function]: yii/queue/cli/Command->actionExec('1', '600', '1', '32132') #4 E:/wwwroot/channel-pub-api/vendor/yiisoft/yii2/base/InlineAction.php(57): call_user_func_array(Array, Array) #5 E:/wwwroot/channel-pub-api/vendor/yiisoft/yii2/base/Controller.php(157): yii/base/InlineAction->runWithParams(Array) #6 E:/wwwroot/channel-pub-api/vendor/yiisoft/yii2/console/Controller.php(148): yii/base/Controller->runAction('exec', Array) #7 E:/wwwroot/channel-pub-api/vendor/yiisoft/yii2/base/Module.php(528): yii/console/Controller->runAction('exec', Array) #8 E:/wwwroot/channel-pub-api/vendor/yiisoft/yii2/console/Application.php(180): yii/base/Module->runAction('copy-asset-queu...', Array) #9 E:/wwwroot/channel-pub-api/vendor/yiisoft/yii2/console/Application.php(147): yii/console/Application->runAction('copy-asset-queu...', Array) #10 E:/wwwroot/channel-pub-api/vendor/yiisoft/yii2/base/Application.php(386): yii/console/Application->handleRequest(Object(yii/console/Request)) #11 E:/wwwroot/channel-pub-api/yii(23): yii/base/Application->run() #12 {main}.
14、发布任务失败后,插入发布日志,将作业推送至来源回调队列(异步),查看发布日志表中的 message,即 pub_log,此为业务数据,最终会显示给用户,如图5
Based on task ID: 2, find the list of enabled resources is empty
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/250439.html