理想论坛记录
以省为单位的图书馆传统业务的云上构想
图书管理系统各馆都有,一般高校基本上是汇文,图创,Ilas系统,原985高校图书馆使用国外的系统较多,这些系统管理的本馆图书,读者和借阅日志都是本校数据,数据不全且各馆数据互不相通,形成数据孤岛。比如国内书目500万左右,一般馆可能在100万种左右,各馆的OPAC查询到的就只有20%左右的书目数据,每年出版新书在10万以上,但多数图书馆采购的新书可能在2万种左右,也只占20%左右,如果要一键查询图书在各高校的收藏情况,可能采用超星的读秀是可行的,但由于超星作为一家商业公司在无政策规范和指导下,收集的各馆馆藏数据有一定阻力,导致时效性不强;
如何由国资背景公司,业内领袖、商业公司和各图书馆成员合作,共建共享搭建一套以省为单位的云上系统,统一管理书目、馆藏、读者和借阅日志数据,实现在数字中国的背景下,实现图书馆界的行业大数据,推动数据驱动下的图书馆传统业务的新发展。以下是对具体数据治理的一些探讨:
一、数据提交
1、书目数据
各馆图书编目基本都接入了CALIS的联合编目系统,但书目基本是各高校图书馆已经购买图书的集合,而不是国内出版图书的合集。现在由新华书店,清华大学和杭州麦达提供的中国图书数据服务(https://cckb.lib.tsinghua.edu.cn/)作为一个先行者做了很好的示范工程。2023年5月1日引用CCBD网站数据显示:4,974,962 基础图书书目信息;2,267,083 采访MARC数据; 2,282,090 国图标准MARC数据;1,328,234 Calis标准MARC数据,提供全面,权威、多字段揭示和唯一编号的书目数据,特别是图书封面图像,目录,前言等是传统MARC数据没有揭示的,关于唯一编号CCBD数据可能需要改进和申请DOI解析工作。各馆基于这套数据关联本地书目记录后,书目数据治理就可以完成。基本流程为各馆实时提交本馆数据后和CCBD的书目数据比对,平台记录下CCBDID和本馆书目记录号实现关联,关于比对的准确性不必追求百分百准确,因为主要业务还是基于本地管理系统运行。
2、馆藏数据
收集此类数据现在做法可能是由各馆每年从管理系统中导出后交给公司处理;另外就是直接爬取,这种方式容易引起图书馆反感。现在可以采用FLINK CDC方式实时传递到云平台,多数图书管理系统 的后台数据库为ORACLE,并且以11G的版本居多。基本流程是通过FLINK CDC方式上传书目记录号和条码号,实现和CCBDID的关联。
3、读者数据
读者数据要实现唯一标识,长期服务,ORCID是一个途径,但存在一定的安全隐患,通过微信公众号的OPENID可以做到,虽然国际化程度不高。基本流程是通过各校的统一认证系统绑定读者证号,通过学校名和读者学号唯一限定。微信在读者手机的安装比例应该接近100%,并且可以身份认证有多种方式,最重要的是能够免费发送模板消息,比如发送借还消息,图书推荐信息。为了保护读者隐私,只收集读者性别,年级,学位,学科和学校。
4、借还数据
基本流程是通过FLINK CDC监控loan_work表实现实时借还数据提交,通过监控LOG_CIR提交历史借还数据(interlib为例)
二、具体应用
1、OPAC
通过省中心OPAC查询CCBD中心库所有纸质图书和数字图书,通过学校名称限定导航到本馆书目,显示关联的本地书目记录号和馆藏条码,通过链接的方式跳转到各馆OPAC详细书目和馆藏地,在馆状态的页面。
2、采访和推荐
个人认为应该将采购权更多的给读者,调动读者参与的积极性,请读者推荐采购方式,并将推荐结果插入到本地的管理系统中,由馆员基于本地系统负责审核并下订单,并通过微信公众号推送消息,因为图书采购的招标和支付流程限制了直接基于平台的订购。基于平台的直接采购下单和图书的直接邮寄到读者是下一步推动的方向。
3、数据统计
生成图工委需要的各馆年度统计报表;
以大学生读书调查表为模板,提供数据报表的直接生成;
全省借阅数据的实时显示;
4、数据挖掘
和学工合作,分析读者心理状态;
和教务合作,推出专业书单;
和出版社合作,推送出版主题;
三、代码示例
以下代码由非专业程序员个人编写,东拼西凑,水平之低,见笑!
1、早期通过SIP2代理方式收集流通日志代码
通过SIP2自助借还协议的代理实现实时借阅的数据上传和推送,代码分三部分,一是SIP2代理,二是读者和书目数据提取后并推送到百度IOT平台,三是订阅IOT接受数据并存放到本地数据库,基本想法是多数高校实现了自助借还,通过SIP2代理方式即可捕获各馆借阅实时数据,然后直接通过读取数据库提取读者和书目信息,既可以记录下那类读者借阅了哪本图书的日志,为什么要采用百度的IOT平台了,一是免费、强大又稳定,每年全省的流通量不会超过1千万,在免费额度内。二是为了公开透明,要收集流通日志数据,需要等到各图书馆的高度信任,上传到百度这样的大平台上,各图书馆都能看到并自主管理。
1.1 tcp端口转发并把借书和还书事件数据通过HTTP POST到处理程序
//代理主程序;端口配置文件,转发数据到SIP2服务器;TCP转发;UDP转发;baidu IOT
1 #nullable enable 2 using System; 3 using System.Collections.Generic; 4 using System.Threading.Tasks; 5 using System.Linq; 6 using cn.edu.yangtzeu.mqtt; 7 using System.Text.Json; 8 9 namespace NetProxy 10 { 11 internal static class Program 12 { 13 private static void Main(string[] args) 14 { 15 try 16 { 17 var configJson =JsonDocument.Parse(System.IO.File.ReadAllText("config.json")); 18 19 Dictionary<string, ProxyConfig>? configs = System.Text.Json.JsonSerializer.Deserialize<Dictionary<string, ProxyConfig>>(configJson.RootElement.GetProperty("sipProxy")); 20 if (configs == null) 21 { 22 throw new Exception("configs is null"); 23 } 24 25 var tasks = configs.SelectMany(c => ProxyFromConfig(c.Key, c.Value)); 26 Task.WhenAll(tasks).Wait(); 27 } 28 catch (Exception ex) 29 { 30 Console.WriteLine($"An error occurred : {ex}"); 31 } 32 } 33 34 private static IEnumerable<Task> ProxyFromConfig(string proxyName, ProxyConfig proxyConfig) 35 { 36 var forwardPort = proxyConfig.forwardPort; 37 var localPort = proxyConfig.localPort; 38 var forwardIp = proxyConfig.forwardIp; 39 var localIp = proxyConfig.localIp; 40 var protocol = proxyConfig.protocol; 41 try 42 { 43 if (forwardIp == null) 44 { 45 throw new Exception("forwardIp is null"); 46 } 47 if (!forwardPort.HasValue) 48 { 49 throw new Exception("forwardPort is null"); 50 } 51 if (!localPort.HasValue) 52 { 53 throw new Exception("localPort is null"); 54 } 55 if (protocol != "udp" && protocol != "tcp" && protocol != "any") 56 { 57 throw new Exception($"protocol is not supported {protocol}"); 58 } 59 } 60 catch (Exception ex) 61 { 62 Console.WriteLine($"Failed to start {proxyName} : {ex.Message}"); 63 throw; 64 } 65 66 bool protocolHandled = false; 67 if (protocol == "udp" || protocol == "any") 68 { 69 protocolHandled = true; 70 Task task; 71 try 72 { 73 var proxy = new UdpProxy(); 74 task = proxy.Start(forwardIp, forwardPort.Value, localPort.Value, localIp); 75 } 76 catch (Exception ex) 77 { 78 Console.WriteLine($"Failed to start {proxyName} : {ex.Message}"); 79 throw; 80 } 81 82 yield return task; 83 } 84 85 if (protocol == "tcp" || protocol == "any") 86 { 87 protocolHandled = true; 88 Task task; 89 try 90 { 91 var proxy = new TcpProxy(); 92 93 task = proxy.Start(forwardIp, forwardPort.Value, localPort.Value, localIp); 94 } 95 catch (Exception ex) 96 { 97 Console.WriteLine($"Failed to start {proxyName} : {ex.Message}"); 98 throw; 99 } 100 101 yield return task; 102 } 103 104 if (!protocolHandled) 105 { 106 throw new InvalidOperationException($"protocol not supported {protocol}"); 107 } 108 } 109 } 110 111 public class ProxyConfig 112 { 113 public string? protocol { get; set; } 114 public ushort? localPort { get; set; } 115 public string? localIp { get; set; } 116 public string? forwardIp { get; set; } 117 public ushort? forwardPort { get; set; } 118 } 119 120 internal interface IProxy 121 { 122 Task Start(string remoteServerHostNameOrAddress, ushort remoteServerPort, ushort localPort, string? localIp = null); 123 } 124 }
1 { 2 "sipProxy": { 3 "http": { 4 "localPort": 2023, 5 "protocol": "tcp", 6 "forwardIp": "10.203.1.237", 7 "forwardPort": 2002 8 } 9 }, 10 "localWebApi": "http://127.0.0.1:2016/api/readerholding", 11 "isCustomized": true 12 }
1 #nullable enable 2 using System; 3 using System.Buffers; 4 using System.Collections.Concurrent; 5 using System.Collections.Generic; 6 using System.IO; 7 using System.Linq; 8 using System.Net; 9 using System.Net.Http; 10 using System.Net.Mail; 11 using System.Net.Sockets; 12 using System.Text; 13 using System.Threading; 14 using System.Threading.Tasks; 15 16 using cn.edu.yangtzeu.mqtt; 17 18 namespace NetProxy 19 { 20 internal class TcpProxy : IProxy 21 { 22 //lagerror 23 BdMqtt bdMqtt = new BdMqtt(); 24 //处理MQTT消息 25 public void handleMqtt(string msg) 26 { 27 Console.WriteLine("delegate:" + msg); 28 MqttMsg mqttMsg = new MqttMsg(); 29 mqttMsg.Source = "sip2"; 30 mqttMsg.Code = 0; 31 mqttMsg.Msg = msg; 32 //修改后通过WEBAPI上传,减少连接数和流量 33 //把MSG传给webapi,借书和还书 34 bdMqtt.UploadAsync(mqttMsg); 35 } 36 /// <summary> 37 /// Milliseconds 38 /// </summary> 39 public int ConnectionTimeout { get; set; } = (4 * 60 * 1000); 40 41 public async Task Start(string remoteServerHostNameOrAddress, ushort remoteServerPort, ushort localPort, string? localIp) 42 { 43 var connections = new ConcurrentBag<TcpConnection>(); 44 45 IPAddress localIpAddress = string.IsNullOrEmpty(localIp) ? IPAddress.IPv6Any : IPAddress.Parse(localIp); 46 var localServer = new TcpListener(new IPEndPoint(localIpAddress, localPort)); 47 localServer.Server.SetSocketOption(SocketOptionLevel.IPv6, SocketOptionName.IPv6Only, false); 48 localServer.Start(); 49 50 Console.WriteLine($"TCP proxy started [{localIpAddress}]:{localPort} -> [{remoteServerHostNameOrAddress}]:{remoteServerPort}"); 51 52 //开一个线程,10秒一次,清除超时的连接,默认值4分钟 53 var _ = Task.Run(async () => 54 { 55 while (true) 56 { 57 58 await Task.Delay(TimeSpan.FromSeconds(10)).ConfigureAwait(false); 59 60 var tempConnections = new List<TcpConnection>(connections.Count); 61 while (connections.TryTake(out var connection)) 62 { 63 tempConnections.Add(connection); 64 } 65 66 foreach (var tcpConnection in tempConnections) 67 { 68 if (tcpConnection.LastActivity + ConnectionTimeout < Environment.TickCount64) 69 { 70 tcpConnection.Stop(); 71 } 72 else 73 { 74 connections.Add(tcpConnection); 75 } 76 } 77 } 78 }); 79 80 while (true) 81 { 82 try 83 { 84 var ips = await Dns.GetHostAddressesAsync(remoteServerHostNameOrAddress).ConfigureAwait(false); 85 86 var tcpConnection = await TcpConnection.AcceptTcpClientAsync(localServer, 87 new IPEndPoint(ips[0], remoteServerPort)) 88 .ConfigureAwait(false); 89 //代理处理关联 90 tcpConnection.mqttPush = handleMqtt; 91 tcpConnection.Run(); 92 connections.Add(tcpConnection); 93 } 94 catch (Exception ex) 95 { 96 Console.ForegroundColor = ConsoleColor.Red; 97 Console.WriteLine(ex); 98 Console.ResetColor(); 99 } 100 } 101 } 102 103 } 104 105 internal class TcpConnection 106 { 107 private readonly TcpClient _localServerConnection; 108 private readonly EndPoint? _sourceEndpoint; 109 private readonly IPEndPoint _remoteEndpoint; 110 private readonly TcpClient _forwardClient; 111 private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); 112 private readonly EndPoint? _serverLocalEndpoint; 113 private EndPoint? _forwardLocalEndpoint; 114 private long _totalBytesForwarded; 115 private long _totalBytesResponded; 116 public long LastActivity { get; private set; } = Environment.TickCount64; 117 //自定义 118 //public BdMqtt bdMqtt = new BdMqtt(); 119 public delegate void sipCallback(string msg); 120 public sipCallback? mqttPush; 121 122 public static async Task<TcpConnection> AcceptTcpClientAsync(TcpListener tcpListener, IPEndPoint remoteEndpoint) 123 { 124 var localServerConnection = await tcpListener.AcceptTcpClientAsync().ConfigureAwait(false); 125 localServerConnection.NoDelay = true; 126 return new TcpConnection(localServerConnection, remoteEndpoint); 127 } 128 129 private TcpConnection(TcpClient localServerConnection, IPEndPoint remoteEndpoint) 130 { 131 _localServerConnection = localServerConnection; 132 _remoteEndpoint = remoteEndpoint; 133 134 _forwardClient = new TcpClient {NoDelay = true}; 135 136 _sourceEndpoint = _localServerConnection.Client.RemoteEndPoint; 137 _serverLocalEndpoint = _localServerConnection.Client.LocalEndPoint; 138 } 139 140 public void Run() 141 { 142 RunInternal(_cancellationTokenSource.Token); 143 } 144 145 public void Stop() 146 { 147 try 148 { 149 _cancellationTokenSource.Cancel(); 150 } 151 catch (Exception ex) 152 { 153 Console.WriteLine($"An exception occurred while closing TcpConnection : {ex}"); 154 } 155 } 156 157 private void RunInternal(CancellationToken cancellationToken) 158 { 159 Task.Run(async () => 160 { 161 try 162 { 163 using (_localServerConnection) 164 using (_forwardClient) 165 { 166 await _forwardClient.ConnectAsync(_remoteEndpoint.Address, _remoteEndpoint.Port, cancellationToken).ConfigureAwait(false); 167 _forwardLocalEndpoint = _forwardClient.Client.LocalEndPoint; 168 169 Console.WriteLine($"Established TCP {_sourceEndpoint} => {_serverLocalEndpoint} => {_forwardLocalEndpoint} => {_remoteEndpoint}"); 170 171 using (var serverStream = _forwardClient.GetStream()) 172 using (var clientStream = _localServerConnection.GetStream()) 173 //task取消时候关闭流 174 using (cancellationToken.Register(() => 175 { 176 serverStream.Close(); 177 clientStream.Close(); 178 }, true)) 179 //等待数据传输完毕 180 { 181 await Task.WhenAny( 182 CopyToAsync(clientStream, serverStream, 81920, Direction.Forward, cancellationToken), 183 CopyToAsync(serverStream, clientStream, 81920, Direction.Responding, cancellationToken) 184 ).ConfigureAwait(false); 185 } 186 } 187 } 188 catch (Exception ex) 189 { 190 Console.WriteLine($"An exception occurred during TCP stream : {ex}"); 191 } 192 finally 193 { 194 Console.WriteLine($"Closed TCP {_sourceEndpoint} => {_serverLocalEndpoint} => {_forwardLocalEndpoint} => {_remoteEndpoint}. {_totalBytesForwarded} bytes forwarded, {_totalBytesResponded} bytes responded."); 195 } 196 }); 197 } 198 199 private async Task CopyToAsync(Stream source, Stream destination, int bufferSize = 81920, Direction direction = Direction.Unknown, CancellationToken cancellationToken = default) 200 { 201 byte[] buffer = ArrayPool<byte>.Shared.Rent(bufferSize); 202 203 try 204 { 205 while (true) 206 { 207 int bytesRead = await source.ReadAsync(new Memory<byte>(buffer), cancellationToken).ConfigureAwait(false); 208 if (bytesRead == 0) break; 209 LastActivity = Environment.TickCount64; 210 await destination.WriteAsync(new ReadOnlyMemory<byte>(buffer, 0, bytesRead), cancellationToken).ConfigureAwait(false); 211 //自定义 212 /*1.线程内发送 2.代理发送 213 Console.WriteLine(direction.ToString() + "|" + Encoding.UTF8.GetString(buffer.Skip(0).Take(bytesRead).ToArray())); 214 MqttMsg mqttMsg = new MqttMsg(); 215 mqttMsg.Source = "sip2"; 216 mqttMsg.Code = 0; 217 mqttMsg.MsgId = Guid.NewGuid().ToString("N"); 218 mqttMsg.Msg = "数据正常";*/ 219 Console.WriteLine(direction.ToString() + "|" + Encoding.UTF8.GetString(buffer.Skip(0).Take(bytesRead).ToArray())); 220 string temp = Encoding.UTF8.GetString(buffer.Skip(0).Take(bytesRead).ToArray()); 221 //string temp1 = Encoding.UTF8.GetString(new byte[bytesRead](buffer, 0, bytesRead)); 222 if (temp.Substring(0, 2) == "12" || temp.Substring(0, 2) == "10") 223 { 224 //bdMqtt.publish(mqttMsg, "$iot/JournalBrowse/user/cxstar"); 225 mqttPush(temp); 226 } 227 228 229 //计算转发字节数 230 switch (direction) 231 { 232 case Direction.Forward: 233 Interlocked.Add(ref _totalBytesForwarded, bytesRead); 234 break; 235 case Direction.Responding: 236 Interlocked.Add(ref _totalBytesResponded, bytesRead); 237 break; 238 } 239 } 240 } 241 finally 242 { 243 ArrayPool<byte>.Shared.Return(buffer); 244 } 245 } 246 } 247 248 internal enum Direction 249 { 250 Unknown = 0, 251 Forward, 252 Responding, 253 } 254 }
#nullable enable using System; using System.Collections.Concurrent; using System.Net; using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; namespace NetProxy { internal class UdpProxy : IProxy { /// <summary> /// Milliseconds /// </summary> public int ConnectionTimeout { get; set; } = (4 * 60 * 1000); public async Task Start(string remoteServerHostNameOrAddress, ushort remoteServerPort, ushort localPort, string? localIp = null) { var connections = new ConcurrentDictionary<IPEndPoint, UdpConnection>(); // TCP will lookup every time while this is only once. var ips = await Dns.GetHostAddressesAsync(remoteServerHostNameOrAddress).ConfigureAwait(false); var remoteServerEndPoint = new IPEndPoint(ips[0], remoteServerPort); var localServer = new UdpClient(AddressFamily.InterNetworkV6); localServer.Client.SetSocketOption(SocketOptionLevel.IPv6, SocketOptionName.IPv6Only, false); IPAddress localIpAddress = string.IsNullOrEmpty(localIp) ? IPAddress.IPv6Any : IPAddress.Parse(localIp); localServer.Client.Bind(new IPEndPoint(localIpAddress, localPort)); Console.WriteLine($"UDP proxy started [{localIpAddress}]:{localPort} -> [{remoteServerHostNameOrAddress}]:{remoteServerPort}"); var _ = Task.Run(async () => { while (true) { await Task.Delay(TimeSpan.FromSeconds(10)).ConfigureAwait(false); foreach (var connection in connections.ToArray()) { if (connection.Value.LastActivity + ConnectionTimeout < Environment.TickCount64) { connections.TryRemove(connection.Key, out UdpConnection? c); connection.Value.Stop(); } } } }); while (true) { try { var message = await localServer.ReceiveAsync().ConfigureAwait(false); var sourceEndPoint = message.RemoteEndPoint; var client = connections.GetOrAdd(sourceEndPoint, ep => { var udpConnection = new UdpConnection(localServer, sourceEndPoint, remoteServerEndPoint); udpConnection.Run(); return udpConnection; }); await client.SendToServerAsync(message.Buffer).ConfigureAwait(false); } catch (Exception ex) { Console.WriteLine($"an exception occurred on receiving a client datagram: {ex}"); } } } } internal class UdpConnection { private readonly UdpClient _localServer; private readonly UdpClient _forwardClient; public long LastActivity { get; private set; } = Environment.TickCount64; private readonly IPEndPoint _sourceEndpoint; private readonly IPEndPoint _remoteEndpoint; private readonly EndPoint? _serverLocalEndpoint; private EndPoint? _forwardLocalEndpoint; private bool _isRunning; private long _totalBytesForwarded; private long _totalBytesResponded; private readonly TaskCompletionSource<bool> _forwardConnectionBindCompleted = new TaskCompletionSource<bool>(); public UdpConnection(UdpClient localServer, IPEndPoint sourceEndpoint, IPEndPoint remoteEndpoint) { _localServer = localServer; _serverLocalEndpoint = _localServer.Client.LocalEndPoint; _isRunning = true; _remoteEndpoint = remoteEndpoint; _sourceEndpoint = sourceEndpoint; _forwardClient = new UdpClient(AddressFamily.InterNetworkV6); _forwardClient.Client.SetSocketOption(SocketOptionLevel.IPv6, SocketOptionName.IPv6Only, false); } public async Task SendToServerAsync(byte[] message) { LastActivity = Environment.TickCount64; await _forwardConnectionBindCompleted.Task.ConfigureAwait(false); var sent = await _forwardClient.SendAsync(message, message.Length, _remoteEndpoint).ConfigureAwait(false); Interlocked.Add(ref _totalBytesForwarded, sent); } public void Run() { Task.Run(async () => { using (_forwardClient) { _forwardClient.Client.Bind(new IPEndPoint(IPAddress.Any, 0)); _forwardLocalEndpoint = _forwardClient.Client.LocalEndPoint; _forwardConnectionBindCompleted.SetResult(true); Console.WriteLine($"Established UDP {_sourceEndpoint} => {_serverLocalEndpoint} => {_forwardLocalEndpoint} => {_remoteEndpoint}"); while (_isRunning) { try { var result = await _forwardClient.ReceiveAsync().ConfigureAwait(false); LastActivity = Environment.TickCount64; var sent = await _localServer.SendAsync(result.Buffer, result.Buffer.Length, _sourceEndpoint).ConfigureAwait(false); Interlocked.Add(ref _totalBytesResponded, sent); } catch (Exception ex) { if (_isRunning) { Console.WriteLine($"An exception occurred while receiving a server datagram : {ex}"); } } } } }); } public void Stop() { try { Console.WriteLine($"Closed UDP {_sourceEndpoint} => {_serverLocalEndpoint} => {_forwardLocalEndpoint} => {_remoteEndpoint}. {_totalBytesForwarded} bytes forwarded, {_totalBytesResponded} bytes responded."); _isRunning = false; _forwardClient.Close(); } catch (Exception ex) { Console.WriteLine($"An exception occurred while closing UdpConnection : {ex}"); } } } }
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading; 6 using System.Threading.Tasks; 7 using Newtonsoft.Json; 8 9 using MQTTnet; 10 using MQTTnet.Client; 11 using System.Net.Http; 12 using System.Text.Json; 13 using System.Net; 14 15 namespace cn.edu.yangtzeu.mqtt 16 { 17 public class BdMqtt 18 { 19 public string? urlStr; 20 public bool isCustomized; 21 22 public BdMqtt() 23 { 24 var configJson = JsonDocument.Parse(System.IO.File.ReadAllText("config.json")); 25 urlStr= configJson.RootElement.GetProperty("localWebApi").GetString(); 26 isCustomized = configJson.RootElement.GetProperty("isCustomized").GetBoolean(); 27 if (isCustomized == false) 28 { 29 urlStr = urlStr + "?isCustomized=false"; 30 } 31 else 32 { 33 urlStr = urlStr + "?isCustomized=true"; 34 } 35 } 36 37 public async void UploadAsync(MqttMsg msg) 38 { 39 try 40 { 41 //Console.WriteLine("接收到要POST到readerandholding接口的数据:{0},url:{1}",msg.Msg,urlStr); 42 string result = string.Empty; 43 using (HttpContent httpContent = new StringContent(System.Text.Json.JsonSerializer.Serialize(msg), System.Text.Encoding.UTF8, "application/json")) 44 { 45 using (HttpClient httpClient = new HttpClient()) 46 { 47 try 48 { 49 using (HttpResponseMessage httpRes = await httpClient.PostAsync(urlStr, httpContent)) 50 { 51 //if (httpRes.IsSuccessStatusCode) 52 //{ 53 result = httpRes.Content.ReadAsStringAsync().Result; 54 Console.WriteLine("UPLOAD SIP2消息到readerandholding接口的返回值:{0},statusCode:{1}",result,httpRes.StatusCode.ToString()); 55 //} 56 httpRes.Dispose(); 57 } 58 59 } 60 catch (Exception ex) 61 { 62 Console.WriteLine("bdmqtt:" + ex.Message); 63 } 64 finally 65 { 66 httpClient.Dispose(); 67 httpContent.Dispose(); 68 } 69 } 70 } 71 return ; 72 } 73 catch (Exception ex) 74 { 75 Console.WriteLine("bdmqtt:"+ex.Message); 76 } 77 } 78 } 79 80 public class MqttMsg 81 { 82 public string? Source { set; get; } 83 public string? MsgId { set; get; } 84 public int? Code { set; get; } 85 public string? Msg { set; get; } 86 public dynamic? Data { set; get; } 87 } 88 public class RetMsg 89 { 90 public int? ReturnCode { set; get; } 91 public string? ReturnMsg { set; get; } 92 public dynamic? data { set; get; } 93 } 94 95 }
//运行截图

1.2 接收SIP2的借还数据后取得读者证号获取读者信息和图书条码号获取图书信息,其中读者数据来源于网信中心,图书数据来源于INERLIB的biblios和holding表
//读者和馆藏获取
using Microsoft.AspNetCore.Mvc; using Newtonsoft.Json.Linq; using Oracle.ManagedDataAccess.Client; using readerAndholding.Model; using readerAndholding.Mqtt; using System.Collections; using System.Data; using System.Text; namespace readerAndholding.Controllers { [Route("api/[controller]")] [ApiController] public class ReaderHoldingController : ControllerBase { IConfiguration? _configuration; IHttpClientFactory _httpClientFactory; MqttHttp _mqttHttp; string Save; string University; public ReaderHoldingController(IConfiguration configuration, IHttpClientFactory httpClientFactory,MqttHttp mqttHttp) { _configuration = configuration; University = _configuration["MqttOption:UniversityName"].ToString(); Save= _configuration["MqttOption:Save"].ToString(); _httpClientFactory = httpClientFactory; _mqttHttp = mqttHttp; } /// <summary> /// 用来回溯借阅记录 /// </summary> /// <param name="readerNo">读者证号</param> /// <param name="barcode">条码号</param> /// <param name="timestamp">借阅时间戳</param> /// <returns></returns> [HttpGet] public async Task<ActionResult> Get(string readerNo, string barcode,Int64 timestamp) { Msg msg = new Msg(); //将教师信息屏蔽 if (readerNo.Length < 7) { msg.code = 1; msg.message = "屏蔽教职工数据"; msg.reflection = "ReaderHoldingController.get"; Logger.Warn(msg.ToString()); return Ok(msg); } string retLocal = "{'msg':'发送错误'}"; Hashtable ht = new Hashtable(); try { //获取读者信息 Reader reader = new Reader().getReader(readerNo); //获取书目信息 Holding holding = new Holding().GetHolding(barcode); //如果没有查到读者或者书目,直接返回 if(reader.ReaderNo== null || holding.RecordNo==null) { msg.code = 2; msg.message = "读者或书目记录为空"; msg.reflection = "ReaderHoldingController.get"; Logger.Warn(msg.ToString()); return Ok(msg); } ht.Add("reader", reader); ht.Add("holding", holding); //先推送到本地 retLocal = Newtonsoft.Json.JsonConvert.SerializeObject(ht); await MqttPublish.MqttPulishAsync(retLocal, "local/readerAndholding/" +University); //再推送到云端 Tsdb tsdb=new Tsdb(); tsdb.datapoints = new List<DatapointsItem>(); DatapointsItem datapointsItem = new DatapointsItem(); Tags tags= new Tags(); tags.CalisId = holding.CalisId; tags.ISBN = holding.ISBN; tags.Title = holding.Title; tags.PublishDate = holding.PublishDate; tags.Publisher = holding.Publisher; tags.Price = holding.Price; tags.ClassNo = holding.ClassNo; tags.Author = holding.Author; tags.RecordNo = holding.RecordNo; //引用第三方,从MARC中获取CALISID号,但多数没有同步 tags.CalisId = holding.CalisId; tags.Save =Save; tags.University = University; tags.Academy = reader.Academy; tags.Major = reader.Major; tags.Subject = reader.Subject; tags.Grade = reader.Grade; tags.Degree = reader.Degree; tags.Sex = reader.Sex; datapointsItem.field = "ReaderNo"; datapointsItem.metric = "circulationT"; datapointsItem.type = "String"; datapointsItem.value = reader.ReaderNo; if(timestamp>1000) { datapointsItem.timestamp = timestamp; } datapointsItem.tags= tags; tsdb.datapoints.Add(datapointsItem); string retCloud = Newtonsoft.Json.JsonConvert.SerializeObject(tsdb); await _mqttHttp.HttpPublish(retCloud); } catch (Exception ex) { msg.code = 3; msg.message = ex.Message; msg.data = ex.StackTrace; msg.reflection = "ReaderHoldingController.get"; Logger.Error(msg.ToString(), ex); return Ok(msg); } Logger.Info(msg.ToString()); return Ok(retLocal); } /// <summary> /// 接收SIP2代理传过来的原始数,查询读者和书目信息,并入库和推送 /// </summary> /// <param name="mqttMsg">SIP2消息</param> /// <param name="isCustomized">是否实现了书目和读者的信息接口</param> /// <returns></returns> [HttpPost] public async Task<ActionResult> Post([FromBody] MqttMsg mqttMsg,bool isCustomized) { Msg msg= new Msg(); //如果获取读者和书目信息没有本地定制,则直接发送SIP2中的10,12消息头到本地 if (!isCustomized) { try { //推送到本地 await MqttPublish.MqttPulishAsync(Newtonsoft.Json.JsonConvert.SerializeObject(mqttMsg), "local/readerAndholding/" + _configuration["MqttOption:UniversityName"].ToString()); } catch (Exception ex) { msg.code = 4; msg.message = ex.Message; msg.source = "发送MQTTMsg到本地错误"; msg.data = ex.StackTrace; msg.reflection = "ReaderHoldingController.post"; Logger.Error(msg.ToString(), ex); return Ok(msg); } return Ok(true); } //获取读者和书目信息没有本地定制,推送到本地和云端 else { try { var temp = mqttMsg.Msg.Split("|"); string readerNo = ""; string name; string barcode = ""; string title; string remark; for (int i = 0; i < temp.Length; i++) { if (temp[i].Substring(0, 2) == "AA") { readerNo = temp[i].Substring(2, temp[i].Length - 2); } else if (temp[i].Substring(0, 2) == "AB") { barcode = temp[i].Substring(2, temp[i].Length - 2); } else if (temp[i].Substring(0, 2) == "AE") { name = temp[i].Substring(2, temp[i].Length - 2); } else if (temp[i].Substring(0, 2) == "AF") { remark = temp[i].Substring(2, temp[i].Length - 2); } else if (temp[i].Substring(0, 2) == "AJ") { title = temp[i].Substring(2, temp[i].Length - 2); } } //将教师信息屏蔽 if (readerNo.Length < 7) { msg.code = 1; msg.message = "屏蔽教职工数据"; msg.reflection = "ReaderHoldingController.get"; Logger.Warn(msg.ToString()); return Ok(msg); } string retLocal = "{'msg':'发送错误'}"; Hashtable ht = new Hashtable(); try { //获取读者信息 Reader reader = new Reader().getReader(readerNo); //获取书目信息 Holding holding = new Holding().GetHolding(barcode); ht.Add("reader", reader); ht.Add("holding", holding); //如果没有查到读者或者书目,直接返回 if (reader.ReaderNo == null || holding.RecordNo == null) { msg.code = 2; msg.message = "读者或书目记录为空"; msg.reflection = "ReaderHoldingController.get"; Logger.Warn(msg.ToString()); return Ok(msg); } mqttMsg.Data = ht; //先推送到本地 retLocal = Newtonsoft.Json.JsonConvert.SerializeObject(mqttMsg); await MqttPublish.MqttPulishAsync(retLocal, "local/readerAndholding/" + University); //再推送到云端 Tsdb tsdb = new Tsdb(); tsdb.datapoints = new List<DatapointsItem>(); DatapointsItem datapointsItem = new DatapointsItem(); Tags tags = new Tags(); tags.CalisId = holding.CalisId; tags.ISBN = holding.ISBN; tags.Title = holding.Title; tags.PublishDate = holding.PublishDate; tags.Publisher = holding.Publisher; tags.Price = holding.Price; tags.ClassNo = holding.ClassNo; tags.Author = holding.Author; tags.RecordNo = holding.RecordNo; //引用第三方,从MARC中获取CALISID号,但多数没有同步 tags.CalisId = holding.CalisId; tags.Save = Save; tags.University = University; tags.Academy = reader.Academy; tags.Major = reader.Major; tags.Subject = reader.Subject; tags.Grade = reader.Grade; tags.Degree = reader.Degree; tags.Sex = reader.Sex; datapointsItem.field = "ReaderNo"; datapointsItem.metric = "circulationT"; datapointsItem.type = "String"; datapointsItem.value = reader.ReaderNo; datapointsItem.tags = tags; TimeSpan ts = DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, 0); datapointsItem.timestamp = Convert.ToInt64(ts.TotalMilliseconds); tsdb.datapoints.Add(datapointsItem); retLocal = Newtonsoft.Json.JsonConvert.SerializeObject(tsdb); await _mqttHttp.HttpPublish(retLocal); } catch (Exception ex) { Logger.Error(msg.ToString(),ex); return NoContent(); } } catch (Exception ex) { msg.code = 3; msg.message = ex.Message; msg.data = ex.StackTrace; msg.reflection = "ReaderHoldingController.post"; Logger.Error(msg.ToString(), ex); return Ok(msg); } Logger.Info(msg.ToString()); return Ok(msg); } } /// <summary> /// 按年度回溯借阅日志(只适合INTERLIB) /// </summary> /// <param name="year">年</param> /// <returns></returns> [HttpGet] [Route("updateYear")] public async Task<ActionResult> updateYear(string year) { Msg msg= new Msg(); int i = 0; int j = 0; using (OracleConnection conn = new OracleConnection()) { string oracleConnstr = "Data Source = (DESCRIPTION = (ADDRESS_LIST = (ADDRESS = (PROTOCOL = TCP)(HOST = 10.203.1.237)(PORT = 1521)))(CONNECT_DATA = (SERVICE_NAME = interlib))); User Id = inter; Password = win ; "; conn.ConnectionString = oracleConnstr; conn.Open(); OracleCommand comm = new OracleCommand(); comm.Connection = conn; string commstr ; commstr = "select data2,data3,to_char(regtime,'yyyy-mm-dd hh24:mi:ss') from log_cir where to_char(regtime, 'yyyy') = :year and logtype = '30001'"; comm.CommandText = commstr; //取出long类型中的字符串 comm.Parameters.Clear(); comm.Parameters.Add("@year", year); OracleDataReader dataReader= comm.ExecuteReader(); HttpClient hc= new HttpClient(); string readerNo=""; string barcode=""; Int64 timestamp = 0; while(dataReader.Read()) { try { i++; readerNo = dataReader.GetString(0); Console.WriteLine(i.ToString()+":"+readerNo); barcode = dataReader.GetString(1); timestamp = (Convert.ToDateTime(dataReader.GetString(2)).ToUniversalTime().Ticks - 621355968000000000) / 10000; string url = string.Format("http://127.0.0.1:2016/api/readerholding?readerNo={0}&barcode={1}×tamp={2}", readerNo, barcode, timestamp); var httpResponseMessage = await hc.GetAsync(url); if (httpResponseMessage.IsSuccessStatusCode) { var tempStr = await httpResponseMessage.Content.ReadAsStringAsync(); j++; } } catch(Exception ex) { msg.message = ex.StackTrace; msg.source = "在循环处理年度日志中发生错误"; msg.reflection = "ReaderHoldingController:updateyear"; msg.code = 1; Logger.Error(msg.ToString(), ex); } } } msg.message = string.Format("i:{0};j:{1}",i,j); Logger.Info(msg.ToString()); return Ok(msg); } } }
//主程序,实现的MQTT协议和WEBSOCKER的数据转发,可以提供WEB端的大屏幕实时数据展示,其中推送到BAIDU IOT 平台采用了HTTP的方式,为了节约MQTT的连接数
1 using MQTTnet.AspNetCore; 2 using MQTTnet.Server; 3 using readerAndholding.Mqtt; 4 5 var builder = WebApplication.CreateBuilder(args); 6 7 var host = Host.CreateDefaultBuilder(Array.Empty<string>()) 8 .ConfigureWebHostDefaults( 9 webBuilder => 10 { 11 webBuilder.UseKestrel( 12 o => 13 { 14 // This will allow MQTT connections based on TCP port 1883. 15 o.ListenAnyIP(2012, v => v.UseMqtt()); 16 17 // This will allow MQTT connections based on HTTP WebSockets with URI "localhost:5000/mqtt" 18 // See code below for URI configuration. 19 o.ListenAnyIP(2014); // Default HTTP pipeline 20 21 }); 22 23 webBuilder.UseStartup<Startup>(); 24 }); 25 host.RunConsoleAsync(); 26 27 // Add services to the container. 28 29 builder.Services.AddControllers(); 30 builder.Services.AddHttpClient(); 31 builder.Services.AddSingleton<MqttHttp>(); 32 33 var app = builder.Build(); 34 35 // Configure the HTTP request pipeline. 36 app.UseRouting(); 37 38 app.UseAuthorization(); 39 40 app.UseStaticFiles(); 41 42 app.MapControllers(); 43 44 app.Run("http://*:2016");
//运行截图


1.3 订阅百度IOT数据,通过Z3950协议查询ISBN,记录下CALISID,将书目和读者数据入库并提供MQTT和websocket数据推送
//主程序
using Microsoft.EntityFrameworkCore; using Mqtt.Models.Z39; using MQTTnet.AspNetCore; using z39web.Models.DB; using z39web.Models.Mqtt; using z39web.Models.Z39; var builder = WebApplication.CreateBuilder(args); //添加MQTT服务 var host = Host.CreateDefaultBuilder(Array.Empty<string>()) .ConfigureWebHostDefaults( webBuilder => { webBuilder.UseKestrel( o => { o.ListenAnyIP(2020, v => v.UseMqtt()); o.ListenAnyIP(2022); }); webBuilder.UseStartup<Startup>(); }); host.RunConsoleAsync(); // Add services to the container. /*MYSQL,直接用原始SQL语句插入JSON var connStr =builder.Configuration["ConnectionStrings:MysqlConnection"]; builder.Services.AddDbContext<MyDBContext>(options => options.UseMySQL(connStr) ); */ //isbn,calisId查询 builder.Services.AddSingleton<IZ39,Z39>(); builder.Services.AddSingleton<IMqttReceive,MqttReceive>(); builder.Services.AddControllers(); builder.Services.AddHttpClient(); var app = builder.Build(); // Configure the HTTP request pipeline. app.UseRouting(); app.UseAuthorization(); app.UseStaticFiles(); app.MapControllers(); app.Run("http://*:2018");
//CALIS查询比对和入库,用到的是群友dp2的开源代码,其中封装好的z39的检索和解析库
1 using System.Text; 2 using System.Reflection.Metadata; 3 using System; 4 5 using MySql.Data; 6 using MySql.EntityFrameworkCore; 7 using MySql; 8 9 using DigitalPlatform.Z3950; 10 using static DigitalPlatform.Z3950.ZClient; 11 using DigitalPlatform.Text; 12 using DigitalPlatform.Net; 13 using System.Web; 14 using DigitalPlatform.Marc; 15 using System.Xml; 16 using z39web.Models.DB; 17 using Microsoft.EntityFrameworkCore; 18 using MySql.Data.MySqlClient; 19 using ConfigurationManager = Microsoft.Extensions.Configuration.ConfigurationManager; 20 21 namespace z39web.Models.Z39 22 { 23 public class Z39 : IZ39 24 { 25 public ZClient _zclient; 26 public TargetInfo _targetInfo; 27 28 public Z39() 29 { 30 //初始化变量 31 ConfigurationManager configurationManager = new ConfigurationManager(); 32 configurationManager.AddJsonFile("appsettings.json", true, reloadOnChange: false); 33 string MysqlConnection = configurationManager["ConnectionStrings:MysqlConnection"].ToString(); 34 35 _targetInfo = new TargetInfo(); 36 _targetInfo.HostName = configurationManager["Calis:Host"].ToString(); 37 _targetInfo.Password = configurationManager["Calis:Password"].ToString(); 38 _targetInfo.UserName = configurationManager["Calis:UserName"].ToString(); 39 _targetInfo.Port = Convert.ToInt32(configurationManager["Calis:Port"]); 40 _targetInfo.DbNames = new string[] { configurationManager["Calis:DbNames"].ToString() }; 41 _targetInfo.AuthenticationMethod = Convert.ToInt32(configurationManager["Calis:AuthenticationMethod"]); 42 Encoding.RegisterProvider(CodePagesEncodingProvider.Instance); 43 _targetInfo.DefaultQueryTermEncoding = Encoding.GetEncoding(936); 44 _targetInfo.DefaultRecordsEncoding = Encoding.GetEncoding(936); 45 _targetInfo.PresentPerBatchCount = 1; 46 47 //全局连接 48 _zclient = new ZClient(); 49 50 InitialResult result = _zclient.TryInitialize(_targetInfo).GetAwaiter().GetResult(); 51 Console.WriteLine("calis z39初始化结果:"+result.ToString()); 52 if (result.Value == -1) 53 { 54 Console.WriteLine("全局联机错误:{0}", result.ErrorInfo); 55 } 56 } 57 58 //接口实现,指定查询类型isbn和calisId,isbn不加-,calisId不加CAL_ 59 public Msg Search(string typ, string str) 60 { 61 Msg msg = new Msg(); 62 63 msg = LocalSearch(typ, str); 64 if (msg.Code == 0) 65 { 66 return msg; 67 } 68 else 69 { 70 msg = CalisSearch(typ, str); 71 msg.Source = "calis"; 72 } 73 return msg; 74 } 75 76 //本地检索,若没有对应的calisId, 77 private Msg LocalSearch(string typ, string str) 78 { 79 Msg msg = new Msg(); 80 string commstr = ""; 81 Biblios biblios = new Biblios(); 82 if (string.IsNullOrWhiteSpace(typ) && string.IsNullOrWhiteSpace(str)) 83 { 84 msg.Code = 1; 85 msg.Message = "查询类型和变量不符合要求"; 86 return msg; 87 } 88 if (typ == "calisId") 89 { 90 commstr = "select * from biblios where calisId=@calisId"; 91 } 92 else if (typ == "isbn") 93 { 94 commstr = "select * from biblios where isbn=@isbn"; 95 } 96 else 97 { 98 msg.Code = 1; 99 msg.Message = "查询类型和变量不符合要求"; 100 return msg; 101 } 102 //从本地数据库中查找MARC记录 103 try 104 { 105 using (MySqlConnection conn = new MySqlConnection()) 106 { 107 string connstr = "server=10.40.6.13;user id=root;Password=this;sslmode=None;database=calisbiblios"; 108 conn.ConnectionString = connstr; 109 conn.Open(); 110 MySqlCommand comm = new MySqlCommand(); 111 comm.Connection = conn; 112 comm.CommandText = commstr; 113 comm.Parameters.Clear(); 114 if (typ == "calisId") 115 { 116 comm.Parameters.Add("@calisId", MySqlDbType.VarString).Value = str; 117 } 118 else 119 { 120 comm.Parameters.Add("@isbn", MySqlDbType.VarString).Value = str.Replace("-", ""); 121 } 122 MySqlDataReader dr = comm.ExecuteReader(); 123 if (dr.Read()) 124 { 125 biblios.CalisId = dr["calisId"].ToString(); 126 biblios.Isbn = dr["isbn"].ToString(); 127 biblios.Title = dr["title"].ToString(); 128 biblios.Author = dr["author"].ToString(); 129 biblios.Publisher = dr["publisher"].ToString(); 130 biblios.PublishDate = dr["publishDate"].ToString(); 131 biblios.ClassNo = dr["classNo"].ToString(); 132 biblios.Price = dr["price"].ToString(); 133 biblios.Marc = Convert.ToBase64String(Encoding.UTF8.GetBytes(dr["marc"].ToString())); 134 msg.Data = biblios; 135 } 136 else 137 { 138 msg.Code = 2; 139 msg.Message = "本地没有找到记录"; 140 } 141 dr.Close(); 142 conn.Close(); 143 } 144 } 145 catch (Exception ex) 146 { 147 msg.Code = 3; 148 msg.Message = ex.Message; 149 } 150 return msg; 151 } 152 153 //calis检索,则查询CALIS并插入本地库 154 private Msg CalisSearch(string typ, string str) 155 { 156 Msg msg = new Msg(); 157 /* 158 //初始化连接 159 InitialResult result = _zclient.TryInitialize(_targetInfo).GetAwaiter().GetResult(); 160 //连接错误返回 161 if (result.Value == -1) 162 { 163 Console.WriteLine("初始化连接发送错误:{0}", result.ErrorInfo); 164 msg.Code = 1; 165 msg.Message = result.ErrorInfo; 166 return msg; 167 }*/ 168 //参数检查 169 string strQueryString = ""; 170 if (typ == "isbn") 171 { 172 strQueryString = "\"" + str + "\"/1=7"; 173 } 174 else if (typ == "calisId") 175 { 176 strQueryString = "\"" + str + "\"/1=12"; 177 } 178 else 179 { 180 msg.Code = 1; 181 msg.Message = "参数不合要求"; 182 return msg; 183 } 184 //查询 185 SearchResult search_result = _zclient.Search( 186 strQueryString, 187 _targetInfo.DefaultQueryTermEncoding, 188 _targetInfo.DbNames, 189 _targetInfo.PreferredRecordSyntax, 190 "default").Result; 191 Console.WriteLine("calis z39查询结果:"+search_result.ToString()); 192 if (search_result.ResultCount < 1) 193 { 194 msg.Message = "没有检索到数据"; 195 return msg; 196 } 197 //检索错误,返回 198 if (search_result.Value == -1 || search_result.Value == 0) 199 { 200 Console.WriteLine("检索发送错误:{0}", search_result.ErrorInfo); 201 msg.Code = 2; 202 msg.Message = search_result.ErrorInfo; 203 return msg; 204 } 205 else 206 { 207 Console.WriteLine("检索共命中记录:{0} ", search_result.ResultCount); 208 } 209 //取回数据 210 long _resultCount = search_result.ResultCount; 211 int _fetched = 0; 212 PresentResult present_result = _zclient.Present( 213 "default", 214 _fetched, 215 Math.Min((int)_resultCount - _fetched, 1), 216 1, 217 "F", 218 _targetInfo.PreferredRecordSyntax).Result; 219 220 //取回数据错误,返回 221 if (present_result.Value == -1) 222 { 223 Console.WriteLine("present result:{0}", present_result.ToString()); 224 msg.Code = 3; 225 msg.Message = present_result.ErrorInfo; 226 return msg; 227 } 228 else 229 { 230 //如果查到MARC数据,并且在本地数据库中没有,则插入到本地 231 if (present_result.Records.Count == 1) 232 { 233 Biblios biblios = new Biblios(); 234 string? calisIdValue = ""; 235 string? isbnValue = ""; 236 string? priceValue = ""; 237 string? titleValue = ""; 238 string? authorValue = ""; 239 string? publisherValue = ""; 240 string? publishDateValue = ""; 241 string? classNoValue = ""; 242 string strMarc="", strXml, strError; 243 try 244 { 245 XmlDocument xmlDocument; 246 MarcLoader.ConvertIso2709ToMarcString(present_result.Records[0].m_baRecord, Encoding.GetEncoding(936), false, out strMarc, out strError); 247 MarcUtil.Marc2Xml(strMarc, "unimarc", out xmlDocument, out strError); 248 var nsmgr = new XmlNamespaceManager(xmlDocument.NameTable); 249 nsmgr.AddNamespace("unimarc", "http://dp2003.com/UNIMARC"); 250 var json = Newtonsoft.Json.JsonConvert.SerializeXmlNode(xmlDocument); 251 //取出索引字段 252 var calisIdxml = xmlDocument.SelectSingleNode("//unimarc:controlfield[@tag='001']", nsmgr); 253 calisIdValue=calisIdxml==null?"":calisIdxml.InnerText; 254 var isbnxml = xmlDocument.SelectSingleNode("//unimarc:datafield[@tag='010']/unimarc:subfield[@code='a']", nsmgr); 255 isbnValue=isbnxml==null?"":isbnxml.InnerText; 256 var pricexml = xmlDocument.SelectSingleNode("//unimarc:datafield[@tag='010']/unimarc:subfield[@code='d']", nsmgr); 257 priceValue= pricexml == null ? "" : pricexml.InnerText; 258 var titlexml = xmlDocument.SelectSingleNode("//unimarc:datafield[@tag='200']/unimarc:subfield[@code='a']", nsmgr); 259 titleValue = titlexml == null ? "" : titlexml.InnerText; 260 var authorxml = xmlDocument.SelectSingleNode("//unimarc:datafield[@tag='200']/unimarc:subfield[@code='f']", nsmgr); 261 authorValue = authorxml == null ? "" : authorxml.InnerText; 262 var publisherxml = xmlDocument.SelectSingleNode("//unimarc:datafield[@tag='210']/unimarc:subfield[@code='c']", nsmgr); 263 publisherValue = publisherxml == null ? "" : publisherxml.InnerText; 264 var publishDatexml = xmlDocument.SelectSingleNode("//unimarc:datafield[@tag='210']/unimarc:subfield[@code='d']", nsmgr); 265 publishDateValue = publishDatexml == null ? "" : publishDatexml.InnerText; 266 //因为下载限制,使用采用简编数据填充并标识 267 var classNoxml = xmlDocument.SelectSingleNode("//unimarc:datafield[@tag='690']/unimarc:subfield[@code='a']", nsmgr); 268 classNoValue = classNoxml == null ? "simple" : classNoxml.InnerText; 269 //返回biblios 270 biblios.Author = authorValue; 271 biblios.Publisher = publisherValue; 272 biblios.PublishDate = publishDateValue; 273 biblios.Price = priceValue; 274 biblios.CalisId = calisIdValue.Substring(4); 275 biblios.Title = titleValue; 276 biblios.Isbn = isbnValue; 277 biblios.Marc = Convert.ToBase64String(Encoding.UTF8.GetBytes(strMarc)); 278 msg.Data = biblios; 279 //简编没有分类号 280 biblios.ClassNo = classNoValue; 281 } 282 catch (Exception ex) 283 { 284 msg.Code = 100; 285 msg.Message = ex.Message; 286 msg.Source = "解析MARC数据错误"; 287 msg.ToString(); 288 } 289 //数据插入 290 using (MySqlConnection conn = new MySqlConnection()) 291 { 292 string connstr = "server=10.40.6.13;user id=root;Password=this;sslmode=None;database=calisbiblios"; 293 conn.ConnectionString = connstr; 294 conn.Open(); 295 MySqlCommand comm = new MySqlCommand(); 296 comm.Connection = conn; 297 string commstr = "insert into biblios (calisId,isbn,title,author,publisher,publishDate,price,classNo,marc) values (@calisId,@isbn,@title,@author,@publisher,@publishDate,@price,@classNo,@marc)"; 298 comm.CommandText = commstr; 299 comm.Parameters.Clear(); 300 comm.Parameters.Add("@calisId", MySqlDbType.VarString).Value = calisIdValue.Substring(4); 301 comm.Parameters.Add("@isbn", MySqlDbType.VarString).Value = isbnValue.Replace("-", ""); 302 comm.Parameters.Add("@title", MySqlDbType.VarString).Value = titleValue; 303 comm.Parameters.Add("@author", MySqlDbType.VarString).Value = authorValue; 304 comm.Parameters.Add("@publisher", MySqlDbType.VarString).Value = publisherValue; 305 comm.Parameters.Add("@publishDate", MySqlDbType.VarString).Value = publishDateValue; 306 comm.Parameters.Add("@price", MySqlDbType.VarString).Value = priceValue; 307 comm.Parameters.Add("@classNo", MySqlDbType.VarString).Value = classNoValue; 308 comm.Parameters.Add("@marc", MySqlDbType.VarString).Value = strMarc; 309 try 310 { 311 int i = comm.ExecuteNonQuery(); 312 } 313 catch (Exception ex) 314 { 315 Console.WriteLine(ex.Message); 316 msg.Code = 4; 317 msg.Message = ex.Message; 318 } 319 finally 320 { 321 conn.Close(); 322 } 323 } 324 } 325 else 326 { 327 msg.Code = 5; 328 msg.Message = "calis书目数据中未能查到记录"; 329 } 330 } 331 return msg; 332 } 333 } 334 }
//运行截图

2、后期的基于CCBD数据和FLINK CDC方式采集数据
在早期想通过CALIS的书目数据来统一规范书目数据,但在实施过程中,由于超过了下载额度而暂停。另外在读取读者数据和书目数据时需要直接读取数据库,也有图书馆意愿和安全风险考虑,前期只做了一些试验后就停止了,虽然还在一直运行着。
接触到CCBD的数据后,并在业内大佬指点下,试验了一下FLINK CDC的方式采集图书管理系统中的数据,想在各馆各自独立管理数据互不联通和全面采用下一代图书管理平台的中间时段做一些数据整合的尝试,先把纸质图书管理好、利用好是图书馆的基本工作。
2.1 FLINK CDC同步图书管理系统中ORACLE数据,本地端通过BINLOG的方式捕获到数据变化后上传到云平台
1、ORACLE数据库设置,打开归档模式,并启用监听
https://debezium.io/documentation/reference/2.2/connectors/oracle.html
2、FLINK CDC数据捕获上传到云平台,运行在各图书馆,数据为测试环境,通过DOCKER部署ORALCE 19C,然后将真实环境的备份数据还原到测试环境中。
1 package com.example.sendmessage; 2 3 import com.alibaba.fastjson.JSON; 4 import com.alibaba.fastjson.JSONArray; 5 import com.alibaba.fastjson.JSONObject; 6 import com.alibaba.fastjson.parser.Feature; 7 import com.alibaba.fastjson.parser.ParserConfig; 8 import com.alibaba.fastjson.serializer.SerializerFeature; 9 import com.google.gson.Gson; 10 import com.ververica.cdc.connectors.oracle.OracleSource; 11 import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord; 12 import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; 13 14 import org.apache.flink.streaming.api.datastream.DataStream; 15 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 16 import org.apache.flink.streaming.api.functions.sink.SinkFunction; 17 import org.apache.flink.streaming.api.functions.source.SourceFunction; 18 import org.apache.flink.api.common.functions.MapFunction; 19 import org.apache.http.HttpEntity; 20 import org.apache.http.client.methods.CloseableHttpResponse; 21 import org.apache.http.client.methods.HttpPost; 22 import org.apache.http.entity.ContentType; 23 import org.apache.http.entity.StringEntity; 24 import org.apache.http.impl.client.CloseableHttpClient; 25 import org.apache.http.impl.client.HttpClients; 26 import org.apache.http.util.EntityUtils; 27 28 import java.io.InputStream; 29 import java.util.Properties; 30 import java.util.Scanner; 31 32 public class TEST { 33 public static void main(String[] args) throws Exception { 34 Properties pro=new Properties(); 35 pro.put("database.pdb.name","inte"); 36 pro.put("log.mining.strategy","online_catalog"); 37 //pro.put("log.mining.continuous.mine","true"); 38 SourceFunction<String> sourceFunction = OracleSource.<String>builder() 39 .hostname("10.40.6.131") 40 .url("jdbc:oracle:thin:@10.40.6.13:1521/inter") 41 .port(1521) 42 .database("inter") // monitor XE database 43 .schemaList("inter") // monitor inventory schema 44 //.startupOptions(StartupOptions.latest()) 45 .tableList("inter.loan_work") // monitor products table 46 .debeziumProperties(pro) 47 .username("c##dbzuser") 48 .password("this") 49 .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String 50 .build(); 51 // Configuration configuration=new Configuration(); 52 //configuration.setString("execution.savepoint.path","file:///D:/checkoracle/cd720959c394888c1bc277e310b35b2d/chk-23"); 53 54 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 55 56 //env.enableCheckpointing(30000); 57 //env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkoracle"); 58 // 获取 CDC 数据流 59 String URL = "http://reader.yzu.edu.cn/syncdb/api/loan_work"; 60 // DataStream<String> cdcStream = env.addSource(sourceFunction); 61 DataStream<String> mappedStream = processCdcStream(env, sourceFunction); 62 63 mappedStream.addSink(new HttpSink(URL)); 64 // mappedStream.print(); 65 // mappedStream.print(); 66 67 68 69 env.execute(); 70 String str = System.console().readLine(); 71 } 72 //数据map 处理部分 73 public static DataStream<String> processCdcStream(StreamExecutionEnvironment env, SourceFunction<String> source) { 74 DataStream<String> cdcStream = env.addSource(source); 75 DataStream<String> mappedStream = cdcStream.map(new MapFunction<String, String>() { 76 @Override 77 public String map(String value) throws Exception { 78 // 在这里对输入数据进行处理 79 //JOSN数组 80 JSONArray jsonArray = new JSONArray(); 81 //全部的json数据 82 JSONObject jsonObject = JSON.parseObject(value); 83 JSONObject allJson = new JSONObject(); 84 allJson.put("id",2); 85 allJson.put("source",jsonObject.toString()); 86 allJson.put("university","Yangtze University"); 87 allJson.put("token","4-8-2021"); 88 jsonArray.add(allJson); 89 90 91 value=JSON.toJSONString(jsonArray); 92 // 返回处理后的输出数据 93 return value ; 94 } 95 }); 96 return mappedStream; 97 } 98 99 public static class HttpSink implements SinkFunction<String> { 100 private String URL; 101 102 public HttpSink(String URL) { 103 this.URL = URL; 104 } 105 106 @Override 107 public void invoke(String value, Context context) throws Exception { 108 CloseableHttpClient httpclient = HttpClients.createDefault(); 109 110 111 HttpPost httpPost = new HttpPost(URL); 112 113 httpPost.setHeader("Content-Type", "application/json"); 114 JSONArray jsonArray = JSONArray.parseArray(value); 115 String json = jsonArray.toString(); 116 HttpEntity requestEntity = new StringEntity(json, ContentType.APPLICATION_JSON); 117 String requestStr = EntityUtils.toString(requestEntity); 118 System.out.println(requestStr); 119 httpPost.setEntity(requestEntity); 120 121 // 122 123 CloseableHttpResponse response = httpclient.execute(httpPost); 124 try { 125 HttpEntity responseEntity = response.getEntity(); 126 if (responseEntity != null) { 127 System.out.println("Response body: " + EntityUtils.toString(responseEntity)); 128 } 129 } finally { 130 response.close(); 131 } 132 } 133 134 } 135 }
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> 4 <modelVersion>4.0.0</modelVersion> 5 <parent> 6 <groupId>org.springframework.boot</groupId> 7 <artifactId>spring-boot-starter-parent</artifactId> 8 <version>2.3.1.RELEASE</version> 9 <relativePath/> <!-- lookup parent from repository --> 10 </parent> 11 <groupId>com.example</groupId> 12 <artifactId>sendMessage</artifactId> 13 <version>0.0.1-SNAPSHOT</version> 14 <name>sendMessage</name> 15 <description>sendMessage</description> 16 <properties> 17 <java.version>8</java.version> 18 <flink-version>1.13.0</flink-version> 19 </properties> 20 <dependencies> 21 <dependency> 22 <groupId>org.springframework.boot</groupId> 23 <artifactId>spring-boot-starter-web</artifactId> 24 <version>3.0.4</version> 25 </dependency> 26 27 <dependency> 28 <groupId>org.springframework.boot</groupId> 29 <artifactId>spring-boot-devtools</artifactId> 30 <version>3.0.4</version> 31 <scope>runtime</scope> 32 <optional>true</optional> 33 </dependency> 34 <dependency> 35 <groupId>org.springframework.boot</groupId> 36 <artifactId>spring-boot-starter-test</artifactId> 37 <version>3.0.4</version> 38 <scope>test</scope> 39 </dependency> 40 <dependency> 41 <groupId>org.apache.flink</groupId> 42 <artifactId>flink-java</artifactId> 43 <version>${flink-version}</version> 44 </dependency> 45 <dependency> 46 <groupId>org.apache.flink</groupId> 47 <artifactId>flink-streaming-java_2.12</artifactId> 48 <version>${flink-version}</version> 49 </dependency> 50 <dependency> 51 <groupId>org.apache.flink</groupId> 52 <artifactId>flink-clients_2.12</artifactId> 53 <version>${flink-version}</version> 54 </dependency> 55 <dependency> 56 <groupId>mysql</groupId> 57 <artifactId>mysql-connector-java</artifactId> 58 <version>8.0.32</version> 59 </dependency> 60 <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-oracle-cdc --> 61 <dependency> 62 <groupId>com.ververica</groupId> 63 <artifactId>flink-sql-connector-oracle-cdc</artifactId> 64 <version>2.3.0</version> 65 </dependency> 66 <dependency> 67 <groupId>org.apache.flink</groupId> 68 <artifactId>flink-table-planner-blink_2.12</artifactId> 69 <version>${flink-version}</version> 70 </dependency> 71 <dependency> 72 <groupId>com.ververica</groupId> 73 <artifactId>flink-connector-mysql-cdc</artifactId> 74 <version>2.2.1</version> 75 </dependency> 76 <!--老版本的用户打开这个注释,把上面2.0注释掉 77 <dependency> 78 <groupId>com.alibaba.ververica</groupId> 79 <artifactId>flink-connector-mysql-cdc</artifactId> 80 <version>1.4.0</version> 81 </dependency>--> 82 <dependency> 83 <groupId>org.projectlombok</groupId> 84 <artifactId>lombok</artifactId> 85 <version>1.18.20</version> 86 </dependency> 87 88 <dependency> 89 <groupId>com.alibaba</groupId> 90 <artifactId>fastjson</artifactId> 91 <version>1.2.75</version> 92 </dependency> 93 <dependency> 94 <groupId>com.google.code.gson</groupId> 95 <artifactId>gson</artifactId> 96 <version>2.8.6</version> 97 </dependency> 98 <dependency> 99 <groupId>com.ververica</groupId> 100 <artifactId>flink-connector-debezium</artifactId> 101 <version>2.3.0</version> 102 </dependency> 103 104 105 <dependency> 106 <groupId>org.apache.httpcomponents</groupId> 107 <artifactId>httpcore</artifactId> 108 <version>4.4.5</version> 109 </dependency> 110 <dependency> 111 <groupId>org.apache.httpcomponents</groupId> 112 <artifactId>httpclient</artifactId> 113 <version>4.5.6</version> 114 </dependency> 115 <dependency> 116 <groupId>org.apache.httpcomponents</groupId> 117 <artifactId>httpmime</artifactId> 118 <version>4.5.2</version> 119 </dependency> 120 <dependency> 121 <groupId>org.apache.flink</groupId> 122 <artifactId>flink-connector-http_2.12</artifactId> 123 <version>${flink-version}</version> 124 </dependency> 125 </dependencies> 126 127 <build> 128 <plugins> 129 <plugin> 130 <groupId>org.springframework.boot</groupId> 131 <artifactId>spring-boot-maven-plugin</artifactId> 132 <version>2.3.1.RELEASE</version> 133 </plugin> 134 </plugins> 135 </build> 136 137 </project>
3、云平台,正在编写测试代码中,没有完成,推动相关各方部署
1,接受各馆管理系统数据
2、推送到KAFKA后提供给以下功能消费订阅
3、入库
4、关联
5、推送
6、分析