postgresql/lightdb的notify机制–可靠缓存、MQ消息事务的救星


http://www.light-pg.com/docs/lightdb/13.3-22.2/sql-notify.html

http://www.light-pg.com/docs/lightdb/13.3-22.2/sql-listen.html

https://wiki.postgresql.org/wiki/PgNotificationHelper

https://jdbc.postgresql.org/documentation/head/listennotify.html

https://tapoueh.org/blog/2018/07/postgresql-listen-notify/

在写入clog/xact前那一刻,内核会将通知加入队列。如下:

xact.c

static void
CommitTransaction(void)
{
......
    /*
     * Insert notifications sent by NOTIFY commands into the queue.  This
     * should be late in the pre-commit sequence to minimize time spent
     * holding the notify-insertion lock.  However, this could result in
     * creating a snapshot, so we must do it before serializable cleanup.
     */
    PreCommit_Notify();
......

asyc.c负责notify相关的实现:

/*
 * PreCommit_Notify
 *
 *        This is called at transaction commit, before actually committing to
 *        clog.
 *
 *        If there are pending LISTEN actions, make sure we are listed in the
 *        shared-memory listener array.  This must happen before commit to
 *        ensure we don't miss any notifies from transactions that commit
 *        just after ours.
 *
 *        If there are outbound notify requests in the pendingNotifies list,
 *        add them to the global queue.  We do that before commit so that
 *        we can still throw error if we run out of queue space.
 */
void
PreCommit_Notify(void)
{
    ListCell   *p;

    if (!pendingActions && !pendingNotifies)
        return;                    /* no relevant statements in this xact */

    if (Trace_notify)
        elog(DEBUG1, "PreCommit_Notify");

    /* Preflight for any pending listen/unlisten actions */
    if (pendingActions != NULL)
    {
        foreach(p, pendingActions->actions)
        {
            ListenAction *actrec = (ListenAction *) lfirst(p);

            switch (actrec->action)
            {
                case LISTEN_LISTEN:
                    Exec_ListenPreCommit();
                    break;
                case LISTEN_UNLISTEN:
                    /* there is no Exec_UnlistenPreCommit() */
                    break;
                case LISTEN_UNLISTEN_ALL:
                    /* there is no Exec_UnlistenAllPreCommit() */
                    break;
            }
        }
    }

    /* Queue any pending notifies (must happen after the above) */
    if (pendingNotifies)
    {
        ListCell   *nextNotify;

        /*
         * Make sure that we have an XID assigned to the current transaction.
         * GetCurrentTransactionId is cheap if we already have an XID, but not
         * so cheap if we don't, and we'd prefer not to do that work while
         * holding NotifyQueueLock.
         */
        (void) GetCurrentTransactionId();

        /*
         * Serialize writers by acquiring a special lock that we hold till
         * after commit.  This ensures that queue entries appear in commit
         * order, and in particular that there are never uncommitted queue
         * entries ahead of committed ones, so an uncommitted transaction
         * can't block delivery of deliverable notifications.
         *
         * We use a heavyweight lock so that it'll automatically be released
         * after either commit or abort.  This also allows deadlocks to be
         * detected, though really a deadlock shouldn't be possible here.
         *
         * The lock is on "database 0", which is pretty ugly but it doesn't
         * seem worth inventing a special locktag category just for this.
         * (Historical note: before PG 9.0, a similar lock on "database 0" was
         * used by the flatfiles mechanism.)
         */
        LockSharedObject(DatabaseRelationId, InvalidOid, 0,
                         AccessExclusiveLock);

        /* Now push the notifications into the queue */
        backendHasSentNotifications = true;

        nextNotify = list_head(pendingNotifies->events);
        while (nextNotify != NULL)
        {
            /*
             * Add the pending notifications to the queue.  We acquire and
             * release NotifyQueueLock once per page, which might be overkill
             * but it does allow readers to get in while we're doing this.
             *
             * A full queue is very uncommon and should really not happen,
             * given that we have so much space available in the SLRU pages.
             * Nevertheless we need to deal with this possibility. Note that
             * when we get here we are in the process of committing our
             * transaction, but we have not yet committed to clog, so at this
             * point in time we can still roll the transaction back.
             */
            LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
            asyncQueueFillWarning();
            if (asyncQueueIsFull())
                ereport(ERROR,
                        (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
                         errmsg("too many notifications in the NOTIFY queue")));
            nextNotify = asyncQueueAddEntries(nextNotify);
            LWLockRelease(NotifyQueueLock);
        }
    }
}

调用RecordTransactionCommit()(在此之前,WAL记录已经刷新到pg_wal中)更新事务的提交状态到pg_xact后,会调用AtCommit_Notify发送通知。如下:

    smgrDoPendingDeletes(true);

    AtCommit_Notify();
    AtEOXact_GUC(true, 1);
    AtEOXact_SPI(true);

async.c中:

/*
 * AtCommit_Notify
 *
 *        This is called at transaction commit, after committing to clog.
 *
 *        Update listenChannels and clear transaction-local state.
 */
void
AtCommit_Notify(void)
{
    ListCell   *p;

    /*
     * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
     * return as soon as possible
     */
    if (!pendingActions && !pendingNotifies)
        return;

    if (Trace_notify)
        elog(DEBUG1, "AtCommit_Notify");

    /* Perform any pending listen/unlisten actions */
    if (pendingActions != NULL)
    {
        foreach(p, pendingActions->actions)
        {
            ListenAction *actrec = (ListenAction *) lfirst(p);

            switch (actrec->action)
            {
                case LISTEN_LISTEN:
                    Exec_ListenCommit(actrec->channel);
                    break;
                case LISTEN_UNLISTEN:
                    Exec_UnlistenCommit(actrec->channel);
                    break;
                case LISTEN_UNLISTEN_ALL:
                    Exec_UnlistenAllCommit();
                    break;
            }
        }
    }

    /* If no longer listening to anything, get out of listener array */
    if (amRegisteredListener && listenChannels == NIL)
        asyncQueueUnregister();

    /* And clean up */
    ClearPendingActionsAndNotifies();
}

 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/288653.html

(0)
上一篇 2022年9月10日
下一篇 2022年9月10日

相关推荐

发表回复

登录后才能评论