1、通过配置附加事件处理器,编辑 /environments/dev/common/config/main-local.php、/environments/prod/common/config/main-local.php,如图1

'copyAssetQueue' => [ // 复制资源文件队列
'class' => 'yii/queue/redis/Queue',
'redis' => 'redis', // Redis 连接组件或它的配置
'channel' => 'cpa:queue:copy:asset', // 队列键前缀
'ttr' => 10 * 60, // 作业处理的最长时间,单位(秒)
'on afterExec' => ['common/components/queue/CopyAssetEventHandler', 'afterExec'], // 每次成功执行作业后
'on afterError' => ['common/components/queue/CopyAssetEventHandler', 'afterError'], // 在作业执行期间发生未捕获的异常时
'as log' => 'yii/queue/LogBehavior',
],
'uploadAssetQueue' => [ // 上传资源文件队列
'class' => 'yii/queue/redis/Queue',
'redis' => 'redis', // Redis 连接组件或它的配置
'channel' => 'cpa:queue:upload:asset', // 队列键前缀
'ttr' => 5 * 60, // 作业处理的最长时间,单位(秒)
'on afterExec' => ['common/components/queue/UploadAssetEventHandler', 'afterExec'], // 每次成功执行作业后
'on afterError' => ['common/components/queue/UploadAssetEventHandler', 'afterError'], // 在作业执行期间发生未捕获的异常时
'as log' => 'yii/queue/LogBehavior',
],
'pubArticleQueue' => [ // 发布文章队列
'class' => 'yii/queue/redis/Queue',
'redis' => 'redis', // Redis 连接组件或它的配置
'channel' => 'cpa:queue:pub:article', // 队列键前缀
'ttr' => 5 * 60, // 作业处理的最长时间,单位(秒)
'on afterExec' => ['common/components/queue/PubArticleEventHandler', 'afterExec'], // 每次成功执行作业后
'on afterError' => ['common/components/queue/PubArticleEventHandler', 'afterError'], // 在作业执行期间发生未捕获的异常时
'as log' => 'yii/queue/LogBehavior',
],
'sourceCallbackQueue' => [ // 来源回调队列
'class' => 'yii/queue/redis/Queue',
'redis' => 'redis', // Redis 连接组件或它的配置
'channel' => 'cpa:queue:source:callback', // 队列键前缀
'ttr' => 5 * 60, // 作业处理的最长时间,单位(秒)
'on afterExec' => ['common/components/queue/SourceCallbackEventHandler', 'afterExec'], // 每次成功执行作业后
'on afterError' => ['common/components/queue/SourceCallbackEventHandler', 'afterError'], // 在作业执行期间发生未捕获的异常时
'as log' => 'yii/queue/LogBehavior',
],
2、编辑 /qq/rests/qq_cw_app/IndexAction.php,一个推送队列的入口
$data = [
'channel_id' => 2, // 渠道ID
'channel_code' => 'wx', // 渠道代码,qq:企鹅号;wx:微信公众帐号
'channel_type_id' => 3, // 渠道的类型ID
'channel_type_code' => 'wx', // 渠道的类型代码,qq_cw:企鹅号的内容网站应用;qq_tp:企鹅号的第三方服务平台应用;wx:微信公众帐号应用
'source' => 'spider', // 来源,xContent:内容库;vms:视频管理系统;cms:内容管理系统;spider:自媒体
'task_id' => 2, // 任务ID
];
$assets = [
[
'type' => 'image',
'channel_article_id' => 1,
'absolute_url' => 'http://localhost/channel-pub-api/storage/spider/images/1.png',
],
[
'type' => 'video',
'channel_article_id' => 1,
'absolute_url' => 'http://localhost/channel-pub-api/storage/spider/videos/7月份北上广深等十大城市租金环比上涨 看东方 20180820 高清_高清.mp4',
],
];
$assetServiceCopyAssetsAsyncResult = AssetService::copyAssetsAsync($data, $assets);
print_r($assetServiceCopyAssetsAsyncResult);
exit;
3、编辑 /common/services/AssetService.php,复制来源的资源文件至渠道发布的资源目录,队列任务执行成功后,调用相应服务,否则,插入发布日志(异步)
/**
* 复制来源的资源文件至渠道发布的资源目录,队列任务执行成功后,调用相应服务,否则,插入发布日志(异步)
* @param array $data 数据
* 格式如下:
* [
* 'channel_id' => 1, // 渠道ID
* 'channel_code' => 'qq', // 渠道代码,qq:企鹅号;wx:微信公众帐号
* 'channel_type_id' => 1, // 渠道的类型ID
* 'channel_type_code' => 'qq_cw', // 渠道的类型代码,qq_cw:企鹅号的内容网站应用;qq_tp:企鹅号的第三方服务平台应用;wx:微信公众帐号应用
* 'source' => 'spider', // 来源,xContent:内容库;vms:视频管理系统;cms:内容管理系统;spider:自媒体
* 'task_id' => 1, // 任务ID
* ]
*
* @param array $assets 来源的资源文件的绝对URL
* 格式如下:
* [
* [
* 'type' => 'image', // 资源文件的类型,image:图片;video:视频
* 'channel_article_id' => 1, // 渠道的文章ID
* 'absolute_url' => 'http://localhost/spider/storage/spider/images/1.png', // 来源的资源文件的绝对URL
* ],
* [
* 'type' => 'video', // 资源文件的类型,image:图片;video:视频
* 'channel_article_id' => 1, // 渠道的文章ID
* 'absolute_url' => 'http://localhost/channel-pub-api/storage/spider/videos/7月份北上广深等十大城市租金环比上涨 看东方 20180820 高清_高清.mp4', // 来源的资源文件的绝对URL
* ],
* ]
*
* @return array $channelPubApiAssetAbsolutePaths 渠道发布的资源文件的相对路径
* 格式如下:
* [
* [
* 'type' => 'image',
* 'channel_article_id' => 1,
* 'relative_path' => '/2018/09/20/1537439889.2333.1441541478.png',
* ],
* [
* 'type' => 'video',
* 'channel_article_id' => 1,
* 'relative_path' => '/2018/09/20/1537439889.2403.62871268.mp4',
* ],
* ]
*
* @throws Exception execution failed
*/
public static function copyAssetsAsync($data, $assets)
{
$assetData = [];
$time = time();
foreach ($assets as $key => $asset) {
$assetData[] = [
'channel_id' => $data['channel_id'],
'channel_code' => $data['channel_code'],
'channel_type_id' => $data['channel_type_id'],
'channel_type_code' => $data['channel_type_code'],
'source' => $data['source'],
'type' => $asset['type'],
'absolute_url' => $asset['absolute_url'],
'relative_path' => '',
'size' => 0,
'task_id' => $data['task_id'],
'channel_article_id' => $asset['channel_article_id'],
'status' => Asset::STATUS_ENABLED,
'is_deleted' => Asset::IS_DELETED_NO,
'created_at' => $time,
'updated_at' => $time,
'deleted_at' => Asset::DELETED_AT_DEFAULT,
];
}
// 批量创建资源
$asset = new Asset();
$assetCreateMultipleResult = $asset->createMultiple($assetData);
// 将任务发送到队列(复制资源文件队列),通过标准工作人员进行处理
Yii::$app->copyAssetQueue->push(new CopyAssetJob([
'taskId' => $data['task_id'],
]));
}
4、编辑 /common/jobs/CopyAssetJob.php,队列的任务类
<?php
/**
* Created by PhpStorm.
* User: Qiang Wang
* Date: 2018/10/22
* Time: 17:10
*/
namespace common/jobs;
use Yii;
use common/logics/Asset;
use common/services/TaskService;
use common/services/AssetService;
use yii/web/ServerErrorHttpException;
/**
* 复制来源的资源文件至渠道发布的资源目录
*
* @author Qiang Wang <shuijingwanwq@163.com>
* @since 1.0
*/
class CopyAssetJob extends Job
{
public $taskId;
/*
* @throws ServerErrorHttpException 如果基于任务ID查找状态为启用的资源列表为空,将抛出 500 HTTP 异常
*/
public function execute($queue)
{
// 基于ID查找状态为启用的单个数据模型(任务)
$taskEnabledItem = TaskService::findModelEnabledById($this->taskId);
// 基于任务ID查找状态为启用的资源列表
$assetEnabledItems = Asset::findAllEnabledByTaskId($this->taskId);
if (empty($assetEnabledItems)) {
throw new ServerErrorHttpException(Yii::t('common/error', Yii::t('common/error', Yii::t('common/error', '35020'), ['task_id' => $this->taskId])), 35020);
}
$source = $taskEnabledItem->source;
$assets = [];
foreach ($assetEnabledItems as $assetEnabledItem) {
$assets[] = [
'type' => $assetEnabledItem->type,
'absolute_url' => $assetEnabledItem->absolute_url,
];
}
// 复制来源的资源文件至渠道发布的资源目录,返回相对路径(同步)
$assetServiceCopyAssetsSyncResult = AssetService::copyAssetsSync($source, $assets);
foreach ($assetEnabledItems as $key => $assetEnabledItem) {
$assetEnabledItem->relative_path = $assetServiceCopyAssetsSyncResult[$key]['relative_path'];
// 取得文件大小,单位(字节)
$assetEnabledItem->size = filesize(Yii::$app->params['channelPubApi']['asset'][$assetEnabledItem->type]['basePath'] . $assetServiceCopyAssetsSyncResult[$key]['relative_path']);
$assetEnabledItems[$key] = $assetEnabledItem;
}
// 批量更新资源
$assetEnabledItem->updateMultiple($assetEnabledItems);
}
}
5、编辑 /common/components/queue/CopyAssetEventHandler.php,在配置中所定义的事件处理器,当复制资源文件队列,每次成功执行作业后(afterExec),将调用相应服务进行后续处理,即推送任务至上传资源文件队列;当复制资源文件队列,在作业执行期间发生未捕获的异常时(afterError),插入发布日志,将作业推送至来源回调队列
<?php
/**
* Created by PhpStorm.
* User: Qiang Wang
* Date: 2018/10/23
* Time: 14:23
*/
namespace common/components/queue;
use Yii;
use common/logics/ChannelAppTask;
use common/logics/PubLog;
use common/services/PubLogService;
use common/services/TaskService;
use common/services/SourceCallbackService;
use yii/helpers/Json;
use yii/base/Component;
use yii/queue/ExecEvent;
use yii/web/NotFoundHttpException;
use yii/web/UnprocessableEntityHttpException;
use yii/web/ServerErrorHttpException;
use yii/db/Exception;
/**
* Class CopyAssetEventHandler
* @package common/components/queue
*
* @author Qiang Wang <shuijingwanwq@163.com>
* @since 1.0
*/
class CopyAssetEventHandler extends Component
{
/**
* @param ExecEvent $event
* @throws NotFoundHttpException 如果未找到数据模型,将抛出 404 HTTP 异常
* @throws UnprocessableEntityHttpException 如果找到数据模型,状态未启用,将抛出 422 HTTP 异常
* @throws ServerErrorHttpException 如果基于任务ID查找状态为启用的资源列表为空,将抛出 500 HTTP 异常
* @throws Exception execution failed
*/
public static function afterExec(ExecEvent $event)
{
$taskId = $event->job->taskId;
// 基于ID查找状态为启用的单个数据模型(任务)
$taskEnabledItem = TaskService::findModelEnabledById($taskId);
// 基于任务ID查找状态为启用的资源列表
$channelAppTaskEnabledItems = ChannelAppTask::findAllEnabledByTaskId($taskId);
if (empty($channelAppTaskEnabledItems)) {
throw new ServerErrorHttpException(Yii::t('common/error', Yii::t('common/error', Yii::t('common/error', '35021'), ['task_id' => $taskId])), 35021);
}
try {
// 调用相应服务进行后续处理
$serviceClass = 'common/services//' . str_replace(' ', '', ucwords(str_replace('_', ' ', $taskEnabledItem->channel_type_code))) . 'AssetService'; // 例:common/services/QqCwAssetService
$serviceAction = 'copyAssetExecHandler';
$serviceClass::$serviceAction($taskEnabledItem->id);
} catch (/Throwable $e) {
$pubLogData = [];
$time = time();
foreach ($channelAppTaskEnabledItems as $channelAppTaskEnabledItem) {
$pubLogData[] = [
'channel_id' => $channelAppTaskEnabledItem['channel_id'],
'channel_code' => $channelAppTaskEnabledItem['channel_code'],
'channel_type_id' => $channelAppTaskEnabledItem['channel_type_id'],
'channel_type_code' => $channelAppTaskEnabledItem['channel_type_code'],
'task_id' => $channelAppTaskEnabledItem['task_id'],
'channel_app_task_id' => $channelAppTaskEnabledItem['id'],
'channel_app_task_uuid' => $channelAppTaskEnabledItem['uuid'],
'code' => $e->getCode(),
'message' => $e->getMessage(),
'data' => Json::encode([]),
'status' => PubLog::STATUS_ERROR,
'is_deleted' => PubLog::IS_DELETED_NO,
'created_at' => $time,
'updated_at' => $time,
'deleted_at' => PubLog::DELETED_AT_DEFAULT,
];
}
// 发布任务成功后,调用相应服务失败,插入发布日志,将作业推送至来源回调队列(异步)
SourceCallbackService::errorAsync($pubLogData);
}
}
/**
* @param ExecEvent $event
* @throws NotFoundHttpException 如果未找到数据模型,将抛出 404 HTTP 异常
* @throws UnprocessableEntityHttpException 如果找到数据模型,状态未启用,将抛出 422 HTTP 异常
* @throws ServerErrorHttpException 如果基于任务ID查找状态为启用的资源列表为空,将抛出 500 HTTP 异常
* @throws Exception execution failed
*/
public static function afterError(ExecEvent $event)
{
$taskId = $event->job->taskId;
// 基于ID查找状态为启用的单个数据模型(任务)
$taskEnabledItem = TaskService::findModelEnabledById($taskId);
// 基于任务ID查找状态为启用的资源列表
$channelAppTaskEnabledItems = ChannelAppTask::findAllEnabledByTaskId($taskId);
if (empty($channelAppTaskEnabledItems)) {
throw new ServerErrorHttpException(Yii::t('common/error', Yii::t('common/error', Yii::t('common/error', '35021'), ['task_id' => $taskId])), 35021);
}
$pubLogData = [];
$time = time();
foreach ($channelAppTaskEnabledItems as $channelAppTaskEnabledItem) {
$pubLogData[] = [
'channel_id' => $channelAppTaskEnabledItem['channel_id'],
'channel_code' => $channelAppTaskEnabledItem['channel_code'],
'channel_type_id' => $channelAppTaskEnabledItem['channel_type_id'],
'channel_type_code' => $channelAppTaskEnabledItem['channel_type_code'],
'task_id' => $channelAppTaskEnabledItem['task_id'],
'channel_app_task_id' => $channelAppTaskEnabledItem['id'],
'channel_app_task_uuid' => $channelAppTaskEnabledItem['uuid'],
'code' => $event->error->getCode(),
'message' => $event->error->getMessage(),
'data' => Json::encode([]),
'status' => PubLog::STATUS_ERROR,
'is_deleted' => PubLog::IS_DELETED_NO,
'created_at' => $time,
'updated_at' => $time,
'deleted_at' => PubLog::DELETED_AT_DEFAULT,
];
}
// 发布任务失败后,插入发布日志,将作业推送至来源回调队列(异步)
SourceCallbackService::errorAsync($pubLogData);
}
}
6、打开网址:http://api.channel-pub-api.localhost/qq/v1/qq-cw-apps?group_id=spider ,推送任务至队列
7、info 命令打印关于队列状态的信息,复制资源文件队列中1个任务状态为等待,上传资源文件队列中0个任务状态为等待
./yii copy-asset-queue/info --color=0 ./yii upload-asset-queue/info --color=0
Jobs - waiting: 1 - delayed: 0 - reserved: 0 - done: 0 Jobs - waiting: 0 - delayed: 0 - reserved: 0 - done: 0
8、run 命令获取并执行循环中的任务(复制资源文件队列),直到队列为空,复制资源文件队列中的任务成功执行后,将下一步的上传任务推送至上传资源文件队列,复制资源文件队列中0个任务状态为等待,复制资源文件队列中1个任务状态为完成,上传资源文件队列中1个任务状态为等待,如图2

./yii copy-asset-queue/run --verbose=1 --isolate=1 --color=0 ./yii copy-asset-queue/info ./yii upload-asset-queue/info --color=0
2018-10-27 17:23:54 [pid: 5216] - Worker is started 2018-10-27 17:23:54 [1] common/jobs/CopyAssetJob (attempt: 1, pid: 5216) - Started 2018-10-27 17:23:55 [1] common/jobs/CopyAssetJob (attempt: 1, pid: 5216) - Done (0.249 s) 2018-10-27 17:23:55 [pid: 5216] - Worker is stopped (0:00:01) Jobs - waiting: 0 - delayed: 0 - reserved: 0 - done: 1 Jobs - waiting: 1 - delayed: 0 - reserved: 0 - done: 0
9、编辑 /common/jobs/CopyAssetJob.php,队列的任务类,特意抛出一个异常,以测试当复制资源文件队列,在作业执行期间发生未捕获的异常时(afterError),插入发布日志,将作业推送至来源回调队列
<?php
/**
* Created by PhpStorm.
* User: Qiang Wang
* Date: 2018/10/22
* Time: 17:10
*/
namespace common/jobs;
use Yii;
use common/logics/Asset;
use common/services/TaskService;
use common/services/AssetService;
use yii/web/ServerErrorHttpException;
/**
* 复制来源的资源文件至渠道发布的资源目录
*
* @author Qiang Wang <shuijingwanwq@163.com>
* @since 1.0
*/
class CopyAssetJob extends Job
{
public $taskId;
/*
* @throws ServerErrorHttpException 如果基于任务ID查找状态为启用的资源列表为空,将抛出 500 HTTP 异常
*/
public function execute($queue)
{
// 基于ID查找状态为启用的单个数据模型(任务)
$taskEnabledItem = TaskService::findModelEnabledById($this->taskId);
// 基于任务ID查找状态为启用的资源列表
$assetEnabledItems = Asset::findAllEnabledByTaskId($this->taskId);
throw new ServerErrorHttpException(Yii::t('common/error', Yii::t('common/error', Yii::t('common/error', '35020'), ['task_id' => $this->taskId])), 35020);
if (empty($assetEnabledItems)) {
throw new ServerErrorHttpException(Yii::t('common/error', Yii::t('common/error', Yii::t('common/error', '35020'), ['task_id' => $this->taskId])), 35020);
}
$source = $taskEnabledItem->source;
$assets = [];
foreach ($assetEnabledItems as $assetEnabledItem) {
$assets[] = [
'type' => $assetEnabledItem->type,
'absolute_url' => $assetEnabledItem->absolute_url,
];
}
// 复制来源的资源文件至渠道发布的资源目录,返回相对路径(同步)
$assetServiceCopyAssetsSyncResult = AssetService::copyAssetsSync($source, $assets);
foreach ($assetEnabledItems as $key => $assetEnabledItem) {
$assetEnabledItem->relative_path = $assetServiceCopyAssetsSyncResult[$key]['relative_path'];
// 取得文件大小,单位(字节)
$assetEnabledItem->size = filesize(Yii::$app->params['channelPubApi']['asset'][$assetEnabledItem->type]['basePath'] . $assetServiceCopyAssetsSyncResult[$key]['relative_path']);
$assetEnabledItems[$key] = $assetEnabledItem;
}
// 批量更新资源
$assetEnabledItem->updateMultiple($assetEnabledItems);
}
}
10、清空 Redis,即清空队列中的数据,打开网址:http://api.channel-pub-api.localhost/qq/v1/qq-cw-apps?group_id=spider ,推送任务至队列
11、info 命令打印关于队列状态的信息,复制资源文件队列中1个任务状态为等待,上传资源文件队列中0个任务状态为等待,来源回调队列中0个任务状态为等待
./yii copy-asset-queue/info --color=0 ./yii upload-asset-queue/info --color=0 ./yii source-callback-queue/info --color=0
Jobs - waiting: 1 - delayed: 0 - reserved: 0 - done: 0 Jobs - waiting: 0 - delayed: 0 - reserved: 0 - done: 0 Jobs - waiting: 0 - delayed: 0 - reserved: 0 - done: 0
12、run 命令获取并执行循环中的任务(复制资源文件队列),直到队列为空,复制资源文件队列中的任务执行失败(因为任务中有抛出未捕获的异常)后,将下一步的上传任务推送至上传资源文件队列,复制资源文件队列中0个任务状态为等待,复制资源文件队列中1个任务状态为完成,上传资源文件队列中0个任务状态为等待,来源回调队列中1个任务状态为等待,如图3

./yii copy-asset-queue/run --verbose=1 --isolate=1 --color=0 ./yii copy-asset-queue/info --color=0 ./yii upload-asset-queue/info --color=0 ./yii source-callback-queue/info --color=0
2018-10-27 17:37:10 [pid: 32132] - Worker is started 2018-10-27 17:37:11 [1] common/jobs/CopyAssetJob (attempt: 1, pid: 32132) - Started 2018-10-27 17:37:11 [1] common/jobs/CopyAssetJob (attempt: 1, pid: 32132) - Error (0.232 s) > yii/web/ServerErrorHttpException: Based on task ID: 2, find the list of enabled resources is empty 2018-10-27 17:37:11 [pid: 32132] - Worker is stopped (0:00:01) Jobs - waiting: 0 - delayed: 0 - reserved: 0 - done: 1 Jobs - waiting: 0 - delayed: 0 - reserved: 0 - done: 0 Jobs - waiting: 1 - delayed: 0 - reserved: 0 - done: 0
13、查看系统日志表的异常信息,即 log 表中的 message,有助于后续开发人员的分析工作,如图4

[1] common/jobs/CopyAssetJob (attempt: 1, PID: 32132) is finished with error: yii/web/ServerErrorHttpException: Based on task ID: 2, find the list of enabled resources is empty in E:/wwwroot/channel-pub-api/common/jobs/CopyAssetJob.php:38
Stack trace:
#0 E:/wwwroot/channel-pub-api/vendor/yiisoft/yii2-queue/src/Queue.php(214): common/jobs/CopyAssetJob->execute(Object(yii/queue/redis/Queue))
#1 E:/wwwroot/channel-pub-api/vendor/yiisoft/yii2-queue/src/cli/Queue.php(162): yii/queue/Queue->handleMessage('1', 'O:24:"common//jo...', '600', '1')
#2 E:/wwwroot/channel-pub-api/vendor/yiisoft/yii2-queue/src/cli/Command.php(145): yii/queue/cli/Queue->execute('1', 'O:24:"common//jo...', '600', '1', '32132')
#3 [internal function]: yii/queue/cli/Command->actionExec('1', '600', '1', '32132')
#4 E:/wwwroot/channel-pub-api/vendor/yiisoft/yii2/base/InlineAction.php(57): call_user_func_array(Array, Array)
#5 E:/wwwroot/channel-pub-api/vendor/yiisoft/yii2/base/Controller.php(157): yii/base/InlineAction->runWithParams(Array)
#6 E:/wwwroot/channel-pub-api/vendor/yiisoft/yii2/console/Controller.php(148): yii/base/Controller->runAction('exec', Array)
#7 E:/wwwroot/channel-pub-api/vendor/yiisoft/yii2/base/Module.php(528): yii/console/Controller->runAction('exec', Array)
#8 E:/wwwroot/channel-pub-api/vendor/yiisoft/yii2/console/Application.php(180): yii/base/Module->runAction('copy-asset-queu...', Array)
#9 E:/wwwroot/channel-pub-api/vendor/yiisoft/yii2/console/Application.php(147): yii/console/Application->runAction('copy-asset-queu...', Array)
#10 E:/wwwroot/channel-pub-api/vendor/yiisoft/yii2/base/Application.php(386): yii/console/Application->handleRequest(Object(yii/console/Request))
#11 E:/wwwroot/channel-pub-api/yii(23): yii/base/Application->run()
#12 {main}.
14、发布任务失败后,插入发布日志,将作业推送至来源回调队列(异步),查看发布日志表中的 message,即 pub_log,此为业务数据,最终会显示给用户,如图5

Based on task ID: 2, find the list of enabled resources is empty
原创文章,作者:3628473679,如若转载,请注明出处:https://blog.ytso.com/tech/webdev/181253.html