SOFAStack 消息队列的消费者和生产者客户端对象是线程安全的,可以在多个线程之间共享使用。
您可以在服务器上(或者多台服务器)部署多个生产者和消费者实例,也可以在同一个生产者或消费者实例里采用多线程发送或接收消息,从而提高消息发送或接收 TPS 的能力。请避免为每个线程创建一个客户端实例。
在多线程之间共享 Producer 的示例代码如下:
import java.util.Properties;
import com.alipay.sofa.sofamq.client.PropertyKeyConst;
import io.openmessaging.api.Message;
import io.openmessaging.api.MessagingAccessPoint;
import io.openmessaging.api.OMS;
import io.openmessaging.api.OMSBuiltinKeys;
import io.openmessaging.api.Producer;
import io.openmessaging.api.SendResult;
public class Main {
public static void main(String... args) {
Properties credentials = new Properties();
// 阿里云账号 AccessKey 拥有所有 API 的访问权限,风险很高。强烈建议您创建并使用 RAM 用户进行 API 访问或日常运维,请登录 RAM 控制台创建 RAM 用户。
// 此处以把 AccessKey 和 AccessKeySecret 保存在环境变量为例说明。
// 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险
credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "SOFA_AK_ENV");
credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "SOFA_SK_ENV");
// 设置 TCP 接入域名,进入控制台的概览页面查看接入点配置
MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint")
.withCredentials(credentials).build();
Properties properties = new Properties();
// 设置用户实例,进入控制台的概览页面查看接入点配置
properties.setProperty(PropertyKeyConst.INSTANCE_ID, "$instanceId");
// 您在控制台创建的 Group ID
properties.setProperty(PropertyKeyConst.GROUP_ID, "YOUR_GROUP");
finalProducer producer = accessPoint.createProducer(properties);
producer.start();
//创建的 Producer 和 Consumer 对象为线程安全的,可以在多线程间进行共享,避免每个线程创建一个实例。
//在 thread 和 anotherThread 中共享 Producer 对象,并发地发送消息至消息队列。
Thread thread = new Thread(newRunnable() {
@Override
public void run() {
try {
Message msg = new Message(//
// Message 所属的 Topic
"TopicTestMQ",
// Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列的服务器过滤
"TagA",
// Message Body 可以是任何二进制形式的数据,消息队列不做任何干预,
// 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
"Hello MQ".getBytes());
SendResult sendResult = producer.send(msg);
// 同步发送消息,只要不抛异常就是成功
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
e.printStackTrace();
}
}
});
thread.start();
Thread anotherThread = new Thread(new Runnable() {
@Override
public void run() {
try {
Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
SendResult sendResult = producer.send(msg);
// 同步发送消息,只要不抛异常就是成功
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
e.printStackTrace();
}
}
});
anotherThread.start();
// Producer 实例若不再使用时,可将 Producer 关闭,进行资源释放
// producer.shutdown();
}
}
文档内容是否对您有帮助?