com.taobao.metamorphosis.client.producer
类 SimpleMessageProducer

java.lang.Object
  继承者 com.taobao.metamorphosis.client.producer.SimpleMessageProducer
所有已实现的接口:
MessageProducer, Shutdownable, TransactionSession
直接已知子类:
AsyncMetaMessageProducer, OrderedMessageProducer, SimpleXAMessageProducer

public class SimpleMessageProducer
extends Object
implements MessageProducer, TransactionSession

消费生产者实现

作者:
boyan, wuhua

字段摘要
protected static long DEFAULT_OP_TIMEOUT
           
protected  ThreadLocal<com.taobao.metamorphosis.client.producer.SimpleMessageProducer.LastSentInfo> lastSentInfo
          事务相关代码
protected  PartitionSelector partitionSelector
           
protected  ProducerZooKeeper producerZooKeeper
           
protected  RemotingClientWrapper remotingClient
           
protected  String sessionId
           
protected  ThreadLocal<TransactionContext> transactionContext
          与线程关联的事务上下文
protected  long transactionRequestTimeoutInMills
           
protected  int transactionTimeout
           
 
构造方法摘要
SimpleMessageProducer(MetaMessageSessionFactory messageSessionFactory, RemotingClientWrapper remotingClient, PartitionSelector partitionSelector, ProducerZooKeeper producerZooKeeper, String sessionId)
           
 
方法摘要
protected  void beforeSendMessageFirstTime(String serverUrl)
          在第一次发送前开始事务
 void beginTransaction()
          开启一个事务并关联到当前线程
protected  void checkMessage(Message message)
           
protected  void checkState()
           
 void commit()
          提交事务,将事务内发送的消息持久化,此方法仅能在beginTransaction之后调用
 MetaMessageSessionFactory getParent()
           
 PartitionSelector getPartitionSelector()
          返回本生产者的分区选择器
 String getSessionId()
           
protected  TransactionId getTransactionId()
          返回事务id
 int getTransactionTimeout()
          返回当前设置的事务超时时间,默认为0,表示永不超时
protected  BooleanCommand invokeToGroup(String serverUrl, Partition partition, PutCommand putCommand, Message message, long timeout, TimeUnit unit)
           
protected  boolean isInTransaction()
          判断是否处于事务中
 boolean isOrdered()
          已过时。 
protected  void logLastSentInfo(String serverUrl)
          记录上一次投递信息
 void publish(String topic)
          发布topic,以便producer从zookeeper获取broker列表并连接,在发送消息前必须先调用此方法
 void removeContext(TransactionContext ctx)
           
protected  void resetLastSentInfo()
           
 void rollback()
          回滚事务内所发送的任何消息,此方法仅能在beginTransaction之后调用
 SendResult sendMessage(Message message)
          发送消息
 SendResult sendMessage(Message message, long timeout, TimeUnit unit)
          发送消息,如果超出指定的时间内没有返回,则抛出异常
 void sendMessage(Message message, SendMessageCallback cb)
          异步发送消息,在默认时间内(3秒)回调callback,此模式下无法使用事务
 void sendMessage(Message message, SendMessageCallback cb, long time, TimeUnit unit)
          异步发送消息,在指定时间内回调callback,此模式下无法使用事务
protected  SendResult sendMessageToServer(Message message, long timeout, TimeUnit unit)
          正常的消息发送到服务器
 void setDefaultTopic(String topic)
          设置发送消息的默认topic,当发送的message的topic没有找到可用broker和分区的时候,选择这个默认topic指定的broker发送 。
 void setTransactionRequestTimeout(long time, TimeUnit timeUnit)
          Set transaction command request timeout.default is five seconds.
 void setTransactionTimeout(int seconds)
          设置事务超时时间,从事务开始计时,如果超过设定时间还没有提交或者回滚,则服务端将无条件回滚该事务。
 void shutdown()
          关闭生产者,释放资源
 
从类 java.lang.Object 继承的方法
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

字段详细信息

DEFAULT_OP_TIMEOUT

protected static final long DEFAULT_OP_TIMEOUT
另请参见:
常量字段值

remotingClient

protected final RemotingClientWrapper remotingClient

partitionSelector

protected final PartitionSelector partitionSelector

producerZooKeeper

protected final ProducerZooKeeper producerZooKeeper

sessionId

protected final String sessionId

transactionTimeout

protected volatile int transactionTimeout

transactionRequestTimeoutInMills

protected long transactionRequestTimeoutInMills

lastSentInfo

protected final ThreadLocal<com.taobao.metamorphosis.client.producer.SimpleMessageProducer.LastSentInfo> lastSentInfo
事务相关代码


transactionContext

protected final ThreadLocal<TransactionContext> transactionContext
与线程关联的事务上下文

构造方法详细信息

SimpleMessageProducer

public SimpleMessageProducer(MetaMessageSessionFactory messageSessionFactory,
                             RemotingClientWrapper remotingClient,
                             PartitionSelector partitionSelector,
                             ProducerZooKeeper producerZooKeeper,
                             String sessionId)
方法详细信息

setTransactionRequestTimeout

public void setTransactionRequestTimeout(long time,
                                         TimeUnit timeUnit)
从接口 MessageProducer 复制的描述
Set transaction command request timeout.default is five seconds.

指定者:
接口 MessageProducer 中的 setTransactionRequestTimeout

getParent

public MetaMessageSessionFactory getParent()

getPartitionSelector

public PartitionSelector getPartitionSelector()
从接口 MessageProducer 复制的描述
返回本生产者的分区选择器

指定者:
接口 MessageProducer 中的 getPartitionSelector
返回:

isOrdered

@Deprecated
public boolean isOrdered()
已过时。 

从接口 MessageProducer 复制的描述
返回本生产者发送消息是否有序,这里的有序是指发往同一个partition的消息有序。此方法已经废弃,总是返回false

指定者:
接口 MessageProducer 中的 isOrdered
返回:
true表示有序

publish

public void publish(String topic)
从接口 MessageProducer 复制的描述
发布topic,以便producer从zookeeper获取broker列表并连接,在发送消息前必须先调用此方法

指定者:
接口 MessageProducer 中的 publish

setDefaultTopic

public void setDefaultTopic(String topic)
从接口 MessageProducer 复制的描述
设置发送消息的默认topic,当发送的message的topic没有找到可用broker和分区的时候,选择这个默认topic指定的broker发送 。调用本方法会自动publish此topic。

指定者:
接口 MessageProducer 中的 setDefaultTopic

sendMessage

public SendResult sendMessage(Message message,
                              long timeout,
                              TimeUnit unit)
                       throws MetaClientException,
                              InterruptedException
从接口 MessageProducer 复制的描述
发送消息,如果超出指定的时间内没有返回,则抛出异常

指定者:
接口 MessageProducer 中的 sendMessage
参数:
message - 消息对象
timeout - 超时时间
unit - 超时的时间单位
返回:
发送结果
抛出:
MetaClientException - 客户端异常
InterruptedException - 响应中断

sendMessageToServer

protected SendResult sendMessageToServer(Message message,
                                         long timeout,
                                         TimeUnit unit)
                                  throws MetaClientException,
                                         InterruptedException,
                                         MetaOpeartionTimeoutException
正常的消息发送到服务器

抛出:
MetaClientException
InterruptedException
MetaOpeartionTimeoutException

removeContext

public void removeContext(TransactionContext ctx)
指定者:
接口 TransactionSession 中的 removeContext

getSessionId

public String getSessionId()
指定者:
接口 TransactionSession 中的 getSessionId

setTransactionTimeout

public void setTransactionTimeout(int seconds)
                           throws MetaClientException
从接口 MessageProducer 复制的描述
设置事务超时时间,从事务开始计时,如果超过设定时间还没有提交或者回滚,则服务端将无条件回滚该事务。

指定者:
接口 MessageProducer 中的 setTransactionTimeout
参数:
seconds - 事务超时时间,单位:秒
抛出:
MetaClientException
另请参见:
MessageProducer.beginTransaction(), MessageProducer.rollback(), MessageProducer.commit()

getTransactionTimeout

public int getTransactionTimeout()
                          throws MetaClientException
从接口 MessageProducer 复制的描述
返回当前设置的事务超时时间,默认为0,表示永不超时

指定者:
接口 MessageProducer 中的 getTransactionTimeout
返回:
事务超时时间,单位:秒
抛出:
MetaClientException

beginTransaction

public void beginTransaction()
                      throws MetaClientException
开启一个事务并关联到当前线程

指定者:
接口 MessageProducer 中的 beginTransaction
抛出:
MetaClientException - 如果已经处于事务中,则抛出TransactionInProgressException异常

beforeSendMessageFirstTime

protected void beforeSendMessageFirstTime(String serverUrl)
                                   throws MetaClientException,
                                          XAException
在第一次发送前开始事务

参数:
serverUrl -
抛出:
MetaClientException
XAException

logLastSentInfo

protected void logLastSentInfo(String serverUrl)
记录上一次投递信息

参数:
serverUrl -

getTransactionId

protected TransactionId getTransactionId()
                                  throws MetaClientException
返回事务id

返回:
抛出:
MetaClientException

isInTransaction

protected boolean isInTransaction()
判断是否处于事务中

返回:

commit

public void commit()
            throws MetaClientException
提交事务,将事务内发送的消息持久化,此方法仅能在beginTransaction之后调用

指定者:
接口 MessageProducer 中的 commit
抛出:
MetaClientException
另请参见:
beginTransaction()

rollback

public void rollback()
              throws MetaClientException
回滚事务内所发送的任何消息,此方法仅能在beginTransaction之后调用

指定者:
接口 MessageProducer 中的 rollback
抛出:
MetaClientException
另请参见:
beginTransaction()

resetLastSentInfo

protected void resetLastSentInfo()

invokeToGroup

protected BooleanCommand invokeToGroup(String serverUrl,
                                       Partition partition,
                                       PutCommand putCommand,
                                       Message message,
                                       long timeout,
                                       TimeUnit unit)
                                throws InterruptedException,
                                       TimeoutException,
                                       com.taobao.gecko.service.exception.NotifyRemotingException
抛出:
InterruptedException
TimeoutException
com.taobao.gecko.service.exception.NotifyRemotingException

checkState

protected void checkState()

checkMessage

protected void checkMessage(Message message)
                     throws MetaClientException
抛出:
MetaClientException

sendMessage

public void sendMessage(Message message,
                        SendMessageCallback cb,
                        long time,
                        TimeUnit unit)
从接口 MessageProducer 复制的描述
异步发送消息,在指定时间内回调callback,此模式下无法使用事务

指定者:
接口 MessageProducer 中的 sendMessage

sendMessage

public void sendMessage(Message message,
                        SendMessageCallback cb)
从接口 MessageProducer 复制的描述
异步发送消息,在默认时间内(3秒)回调callback,此模式下无法使用事务

指定者:
接口 MessageProducer 中的 sendMessage

sendMessage

public SendResult sendMessage(Message message)
                       throws MetaClientException,
                              InterruptedException
从接口 MessageProducer 复制的描述
发送消息

指定者:
接口 MessageProducer 中的 sendMessage
参数:
message - 消息对象
返回:
发送结果
抛出:
MetaClientException - 客户端异常
InterruptedException - 响应中断

shutdown

public void shutdown()
              throws MetaClientException
从接口 MessageProducer 复制的描述
关闭生产者,释放资源

指定者:
接口 MessageProducer 中的 shutdown
指定者:
接口 Shutdownable 中的 shutdown
抛出:
MetaClientException


Copyright © 2010–2013. All rights reserved.