Skip to content

Commit 7a51187

Browse files
MINOR: Use TransactionOperation enum instead of String for pending state checks (#21218)
### PR Description this PR refactors `TransactionManager#throwIfPendingState` to use a typed `TransactionOperation` enum instead of a raw String. ###Background / Motivation The previous String-based API relies on manually passing operation names, which can make it easier for naming to drift or become inconsistent over time. Using an enum makes the intent more explicit and keeps the operation naming centralized. ### What’s changed - Added a `TransactionOperation` enum with a `displayName` (and `toString()` override) to preserve readable exception messages. - Updated relevant call sites (`beginTransaction`, `prepareTransaction`, `sendOffsetsToTransaction`, `maybeAddPartition`) to pass the corresponding enum value. - Updated `throwIfPendingState` to accept `TransactionOperation` instead of `String`. ### Behavior / Compatibility - No behavioral change is intended. The exception message remains human-readable and consistent via `TransactionOperation#toString()`. ### Testing No additional tests were added since this is a small refactor; existing tests should continue to cover the pending-transition behavior. Reviewers: TaiJuWu <tjwu1217@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
1 parent 2bae9c3 commit 7a51187

File tree

1 file changed

+23
-5
lines changed

1 file changed

+23
-5
lines changed

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,24 @@ private enum Priority {
205205
this.priority = priority;
206206
}
207207
}
208+
209+
private enum TransactionOperation {
210+
SEND("send"),
211+
BEGIN_TRANSACTION("beginTransaction"),
212+
PREPARE_TRANSACTION("prepareTransaction"),
213+
SEND_OFFSETS_TO_TRANSACTION("sendOffsetsToTransaction");
214+
215+
final String displayName;
216+
217+
TransactionOperation(String displayName) {
218+
this.displayName = displayName;
219+
}
220+
221+
@Override
222+
public String toString() {
223+
return displayName;
224+
}
225+
}
208226

209227
public TransactionManager(final LogContext logContext,
210228
final String transactionalId,
@@ -331,7 +349,7 @@ synchronized TransactionalRequestResult initializeTransactions(
331349

332350
public synchronized void beginTransaction() {
333351
ensureTransactional();
334-
throwIfPendingState("beginTransaction");
352+
throwIfPendingState(TransactionOperation.BEGIN_TRANSACTION);
335353
maybeFailWithError();
336354
transitionTo(State.IN_TRANSACTION);
337355
}
@@ -343,7 +361,7 @@ public synchronized void beginTransaction() {
343361
*/
344362
public synchronized void prepareTransaction() {
345363
ensureTransactional();
346-
throwIfPendingState("prepareTransaction");
364+
throwIfPendingState(TransactionOperation.PREPARE_TRANSACTION);
347365
maybeFailWithError();
348366
transitionTo(State.PREPARED_TRANSACTION);
349367
this.preparedTxnState = new ProducerIdAndEpoch(
@@ -406,7 +424,7 @@ private TransactionalRequestResult beginCompletingTransaction(TransactionResult
406424
public synchronized TransactionalRequestResult sendOffsetsToTransaction(final Map<TopicPartition, OffsetAndMetadata> offsets,
407425
final ConsumerGroupMetadata groupMetadata) {
408426
ensureTransactional();
409-
throwIfPendingState("sendOffsetsToTransaction");
427+
throwIfPendingState(TransactionOperation.SEND_OFFSETS_TO_TRANSACTION);
410428
maybeFailWithError();
411429

412430
if (currentState != State.IN_TRANSACTION) {
@@ -438,7 +456,7 @@ public synchronized TransactionalRequestResult sendOffsetsToTransaction(final Ma
438456

439457
public synchronized void maybeAddPartition(TopicPartition topicPartition) {
440458
maybeFailWithError();
441-
throwIfPendingState("send");
459+
throwIfPendingState(TransactionOperation.SEND);
442460

443461
if (isTransactional()) {
444462
if (!hasProducerId()) {
@@ -1248,7 +1266,7 @@ private TxnOffsetCommitHandler txnOffsetCommitHandler(TransactionalRequestResult
12481266
return new TxnOffsetCommitHandler(result, builder);
12491267
}
12501268

1251-
private void throwIfPendingState(String operation) {
1269+
private void throwIfPendingState(TransactionOperation operation) {
12521270
if (pendingTransition != null) {
12531271
if (pendingTransition.result.isAcked()) {
12541272
pendingTransition = null;

0 commit comments

Comments
 (0)