-
Notifications
You must be signed in to change notification settings - Fork 26
add new interfaces to P2P Mode #330
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
sa-buc seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
@@ -232,7 +257,11 @@ public void close() { | |||
} | |||
} | |||
// 退出任务 | |||
abortIfNeed(); | |||
if (isPeer) { | |||
abortIfNeedWhenPeerClosed(); |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
executor.resumePeer(executionId, newTraceId); | ||
} | ||
|
||
public ObTableLoadClientStatus sendHeartBeat() throws ObDirectLoadException { |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
@@ -304,6 +323,25 @@ public void resume(ObDirectLoadStatementExecutionId executionId) throws ObDirect | |||
executor.resume(executionId); | |||
} | |||
|
|||
public void resumePeer(ObDirectLoadStatementExecutionId executionId, ObDirectLoadTraceId newTraceId) throws ObDirectLoadException { |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
@@ -309,4 +315,303 @@ public static void run() { | |||
|
|||
}; | |||
|
|||
private static class MultiPeerNodeWriteTest { |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
connection = buildConnection(1); | ||
statement = buildStatement(connection); | ||
|
||
SendHeartBeat sendHeartBeat = new SendHeartBeat(connection, statement); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
不应该在SendHeartBeat里面再build一个新的stmt去发心跳么?
@@ -251,6 +252,24 @@ public void begin() throws ObDirectLoadException { | |||
} | |||
} | |||
|
|||
public ObDirectLoadStatementFuture beginPeerAsync() { |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
@@ -279,6 +282,10 @@ public void write(ObDirectLoadBucket bucket) throws ObDirectLoadException { | |||
executor.write(bucket); | |||
} | |||
|
|||
public void startHeartBeat() throws ObDirectLoadException { | |||
executor.startHeartBeatInP2PMode(); |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
@@ -377,6 +389,11 @@ public ObDirectLoadStatement build() throws ObDirectLoadException { | |||
return connection.buildStatement(this); | |||
} | |||
|
|||
public Builder setP2PMode() { |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
@@ -355,6 +419,10 @@ public void write(ObDirectLoadBucket bucket) throws ObDirectLoadException { | |||
} | |||
} | |||
|
|||
public void execAbort() throws ObDirectLoadException { | |||
abort(); |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
@@ -382,6 +388,11 @@ public ObDirectLoadTraceId getTraceId() { | |||
return traceId; | |||
} | |||
|
|||
public Builder setNodeRole(NodeRole nodeRole) { |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
@@ -317,7 +336,7 @@ private synchronized void abortIfNeed() { | |||
} | |||
} | |||
|
|||
private ObDirectLoadStatementFuture abort() { | |||
public ObDirectLoadStatementFuture abort() { |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
Thread thread = new Thread(NodeWriter); | ||
threads[i] = thread; | ||
} | ||
P2PNodeWriter commitNodeWriter = new P2PNodeWriter(executionIdBytes, writeThreadNum, threads); |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
@@ -58,13 +58,19 @@ public class ObDirectLoadStatementExecutor { | |||
private long taskId = 0; | |||
private ObAddr svrAddr = null; | |||
private ObDirectLoadException cause = null; // 失败原因 | |||
private NodeRole nodeRole = NodeRole.PRIMARY; |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
compareAndSetState(NONE, LOADING_ONLY, "resume"); | ||
if (NodeRole.P2P == nodeRole) { | ||
compareAndSetState(NONE, LOADING, "resume in P2P mode"); | ||
startHeartBeat(); |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
e1d7a8e
to
24c79d5
Compare
@@ -327,6 +331,7 @@ public static final class Builder { | |||
private ObDirectLoadStatementExecutionId executionId = null; | |||
|
|||
private static final long MAX_QUERY_TIMEOUT = Integer.MAX_VALUE; | |||
private boolean isP2PMode = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
为什么加到这个位置?
@@ -123,6 +141,16 @@ public synchronized ObDirectLoadStatementFuture begin() { | |||
public synchronized ObDirectLoadStatementFuture commit() { | |||
logger.info("statement call commit"); | |||
ObDirectLoadStatementAsyncPromiseTask task = null; | |||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
为啥要套一层这个?
Summary
add new interfaces to support peer node and abort
Solution Description