點(diǎn)擊進(jìn)入IT資料庫
前言
經(jīng)過前面對(duì)Spring AOP、事務(wù)的總結(jié),我們已經(jīng)對(duì)它們有了一個(gè)比較感性的認(rèn)知了。今天,我繼續(xù)安利一個(gè)獨(dú)門絕技:Spring 事務(wù)的鉤子函數(shù)。單純的講技術(shù)可能比較枯燥乏味。
接下來,我將以一個(gè)實(shí)際的案例來描述Spring事務(wù)鉤子函數(shù)的正確使用姿勢(shì)。
一、案例背景
拿支付系統(tǒng)相關(guān)的業(yè)務(wù)來舉例。在支付系統(tǒng)中,我們需要記錄每個(gè)賬戶的資金流水(記錄用戶A因?yàn)槟膫€(gè)操作扣了錢,因?yàn)槟膫€(gè)操作加了錢),這樣我們才能對(duì)每個(gè)賬戶的賬做到心中有數(shù),對(duì)于支付系統(tǒng)而言,資金流水的數(shù)據(jù)可謂是最重要的。
因此,為了防止支付系統(tǒng)的老大徇私舞弊,CTO提了一個(gè)流水存檔的需求:要求支付系統(tǒng)對(duì)每個(gè)賬戶的資金流水做一份存檔,要求支付系統(tǒng)在寫流水的時(shí)候,把流水相關(guān)的信息以消息的形式推送到kafka,由存檔系統(tǒng)消費(fèi)這個(gè)消息并落地到庫里(這個(gè)庫只有存檔系統(tǒng)擁有寫權(quán)限)。
整個(gè)需求的流程如下所示:
圖片
整個(gè)需求的流程還是比較簡(jiǎn)單的,考慮到后續(xù)會(huì)有其他事業(yè)部也要進(jìn)行數(shù)據(jù)存檔操作,CTO建議支付系統(tǒng)團(tuán)隊(duì)內(nèi)部開發(fā)一個(gè)二方庫,這個(gè)二方庫的主要功能就是發(fā)送消息到kafka中去。
二、確定方案
既然要求開發(fā)一個(gè)二方庫,因此,我們需要考慮如下幾件事情:
1、技術(shù)棧使用的springboot,因此,這里最好以starter的方式提供
2、二方庫需要發(fā)送消息給kafka,最好是二方庫內(nèi)部基于kafka生產(chǎn)者的api創(chuàng)建生產(chǎn)者,不要使用Spring自帶的kafkaTemplate,因?yàn)榧煞接锌赡芤呀?jīng)使用了kafkaTemplate。不能與集成方造成沖突。
3、減少對(duì)接方的集成難度、學(xué)習(xí)成本,最好是提供一個(gè)簡(jiǎn)單實(shí)用的api,業(yè)務(wù)側(cè)能簡(jiǎn)單上手。
4、發(fā)送消息這個(gè)操作需要支持事務(wù),盡量不影響主業(yè)務(wù)
在上述的幾件事情中,最需要注意的應(yīng)該就是第4點(diǎn):發(fā)送消息這個(gè)操作需要支持事務(wù),盡量不影響主業(yè)務(wù)。這是什么意思呢?首先,盡量不影響主業(yè)務(wù),這個(gè)最簡(jiǎn)單的方式就是使用異步機(jī)制。其次,需要支持事務(wù)是指:假設(shè)我們的api是在事務(wù)方法內(nèi)部調(diào)用的,那么我們需要保證事務(wù)提交后再執(zhí)行這個(gè)api。那么,我們的流水落地api應(yīng)該要有這樣的功能:
圖片
內(nèi)部可以判斷當(dāng)前是否存在事務(wù),如果存在事務(wù),則需要等事務(wù)提交后再異步發(fā)送消息給kafka。如果不存在事務(wù)則直接異步發(fā)送消息給kafka。而且這樣的判斷邏輯得放在二方庫內(nèi)部才行。那現(xiàn)在擺在我們面前的問題就是:我要如何判斷當(dāng)前是否存在事務(wù),以及如何在事務(wù)提交后再觸發(fā)我們自定義的邏輯呢?
三、TransactionSynchronizationManager顯神威
這個(gè)類內(nèi)部所有的變量、方法都是static修飾的,也就是說它其實(shí)是一個(gè)工具類。是一個(gè)事務(wù)同步器。下述是流水落地API的偽代碼,這段代碼就解決了我們上述提到的疑問:
private final ExecutorService executor = Executors.newSingleThreadExecutor();
public void sendLog() {
// 判斷當(dāng)前是否存在事務(wù)
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
// 無事務(wù),異步發(fā)送消息給kafka
executor.submit(() -> {
// 發(fā)送消息給kafka
try {
// 發(fā)送消息給kafka
} catch (Exception e) {
// 記錄異常信息,發(fā)郵件或者進(jìn)入待處理列表,讓開發(fā)人員感知異常
}
});
return;
}
// 有事務(wù),則添加一個(gè)事務(wù)同步器,并重寫afterCompletion方法(此方法在事務(wù)提交后會(huì)做回調(diào))
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCompletion(int status) {
if (status == TransactionSynchronization.STATUS_COMMITTED) {
// 事務(wù)提交后,再異步發(fā)送消息給kafka
executor.submit(() -> {
try {
// 發(fā)送消息給kafka
} catch (Exception e) {
// 記錄異常信息,發(fā)郵件或者進(jìn)入待處理列表,讓開發(fā)人員感知異常
}
});
}
}
});
}
代碼比較簡(jiǎn)單,其主要是TransactionSynchronizationManager
的使用。
3.1、判斷是否存在事務(wù)?TransactionSynchronizationManager.isSynchronizationActive() 方法顯神威
我們先看下這個(gè)方法的源碼:
// TransactionSynchronizationManager.java類內(nèi)部的部分代碼
private static final ThreadLocal
> synchronizations =
new NamedThreadLocal<>(
"Transaction synchronizations");
public static boolean
isSynchronizationActive() {
return (synchronizations.get() != null);
}
很明顯,synchronizations
是一個(gè)線程變量(ThreadLocal)。那它是在什么時(shí)候set進(jìn)去的呢?
這里的話,可以參考下這個(gè)方法:org.springframework.transaction.support.TransactionSynchronizationManager#initSynchronization
,其源碼如下所示:
/**
* Activate transaction synchronization for the current thread.
* Called by a transaction manager on transaction begin.
* @throws IllegalStateException if synchronization is already active
*/
public static void initSynchronization() throws IllegalStateException {
if (isSynchronizationActive()) {
throw new IllegalStateException("Cannot activate transaction synchronization - already active");
}
logger.trace("Initializing transaction synchronization");
synchronizations.set(new LinkedHashSet<>());
}
由源碼中的注釋也可以知道,它是在事務(wù)管理器開啟事務(wù)時(shí)調(diào)用的。換句話說,只要我們的程序執(zhí)行到帶有事務(wù)特性的方法時(shí),就會(huì)在線程變量中放入一個(gè)LinkedHashSet,用來標(biāo)識(shí)當(dāng)前存在事務(wù)。只要isSynchronizationActive
返回true,則代表當(dāng)前有事務(wù)。
因此,結(jié)合這兩個(gè)方法我們是指能解決我們最開始提出的疑問:要如何判斷當(dāng)前是否存在事務(wù)
3.2、如何在事務(wù)提交后觸發(fā)自定義邏輯?TransactionSynchronizationManager.registerSynchronization()方法顯神威
我們來看下這個(gè)方法的源代碼:
/**
* Register a new transaction synchronization for the current thread.
* Typically called by resource management code.
*
Note that synchronizations can implement the
* {@link org.springframework.core.Ordered} interface.
* They will be executed in an order according to their order value (if any).
* @param synchronization the synchronization object to register
* @throws IllegalStateException if transaction synchronization is not active
* @see org.springframework.core.Ordered
*/
public static void registerSynchronization(TransactionSynchronization synchronization)
throws IllegalStateException {
Assert.notNull(synchronization, "TransactionSynchronization must not be null");
if (!isSynchronizationActive()) {
throw new IllegalStateException("Transaction synchronization is not active");
}
synchronizations.get().add(synchronization);
}
這里又使用到了synchronizations
線程變量,我們?cè)谂袛嗍欠翊嬖谑聞?wù)時(shí),就是判斷這個(gè)線程變量?jī)?nèi)部是否有值。那我們現(xiàn)在想在事務(wù)提交后觸發(fā)自定義邏輯和這個(gè)有什么關(guān)系呢?
我們?cè)谏厦鏄?gòu)建流水落地api的偽代碼中有向synchronizations
內(nèi)部添加了一個(gè)TransactionSynchronizationAdapter
,內(nèi)部并重寫了afterCompletion
方法,其代碼如下所示:
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCompletion(int status) {
if (status == TransactionSynchronization.STATUS_COMMITTED) {
// 事務(wù)提交后,再異步發(fā)送消息給kafka
executor.submit(() -> {
try {
// 發(fā)送消息給kafka
} catch (Exception e) {
// 記錄異常信息,發(fā)郵件或者進(jìn)入待處理列表,讓開發(fā)人員感知異常
}
});
}
}
});
我們結(jié)合registerSynchronization
的源碼來看,其實(shí)這段代碼主要就是向線程變量?jī)?nèi)部的LinkedHashSet
添加了一個(gè)對(duì)象而已,但就是這么一個(gè)操作,讓Spring在事務(wù)執(zhí)行的過程中變得“有事情可做”。這是什么意思呢?
是因?yàn)镾pring在執(zhí)行事務(wù)方法時(shí),對(duì)于操作事務(wù)的每一個(gè)階段都有一個(gè)回調(diào)操作,比如:trigger系列的回調(diào)
圖片
invoke系列的回調(diào)
圖片
而我們現(xiàn)在的需求就是在事務(wù)提交后觸發(fā)自定義的函數(shù),那就是在invokeAfterCommit
和invokeAfterCompletion
這兩個(gè)方法來選了。首先,這兩個(gè)方法都會(huì)拿到所有TransactionSynchronization
的集合(其中會(huì)包括我們上述添加的TransactionSynchronizationAdapter
)。
但是要注意一點(diǎn):invokeAfterCommit
只能拿到集合,invokeAfterCompletion
除了集合還有一個(gè)int類型的參數(shù),而這個(gè)int類型的參數(shù)其實(shí)是當(dāng)前事務(wù)的一種狀態(tài)。也就是說,如果我們重寫了invokeAfterCompletion
方法,我們除了能拿到集合外,還能拿到當(dāng)前事務(wù)的狀態(tài)。
因此,此時(shí)我們可以根據(jù)這個(gè)狀態(tài)來做不同的事情,比如:可以在事務(wù)提交時(shí)做自定義處理,也可以在事務(wù)回滾時(shí)做自定義處理等等。
四、總結(jié)
上面有說到,我們判斷當(dāng)前是否存在事務(wù)、添加鉤子函數(shù)都是依賴線程變量的。因此,我們?cè)谑褂眠^程中,一定要避免切換線程。否則會(huì)出現(xiàn)不生效的情況。
IT架構(gòu)師/技術(shù)大咖的交流圈子,為您提供架構(gòu)體系知識(shí)、技術(shù)文章、流行實(shí)踐案例、解決方案等,行業(yè)大咖分享交流/同行經(jīng)驗(yàn)分享互動(dòng),期待你的加入!掃碼即可加入哦,隨著材料不斷增多社群會(huì)不定期漲價(jià)早加入更優(yōu)惠
免責(zé)聲明:
本公眾號(hào)部分分享的資料來自網(wǎng)絡(luò)收集和整理,所有文字和圖片版權(quán)歸屬于原作者所有,且僅代表作者個(gè)人觀點(diǎn),與本公眾號(hào)無關(guān),文章僅供讀者學(xué)習(xí)交流使用,并請(qǐng)自行核實(shí)相關(guān)內(nèi)容,如文章內(nèi)容涉及侵權(quán),請(qǐng)聯(lián)系后臺(tái)管理員刪除。
特別聲明:以上內(nèi)容(如有圖片或視頻亦包括在內(nèi))為自媒體平臺(tái)“網(wǎng)易號(hào)”用戶上傳并發(fā)布,本平臺(tái)僅提供信息存儲(chǔ)服務(wù)。
Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.